Build data pipelines using the visual drag-and-drop editor.
Pipeline Management - View and manage all your data pipelines
The editor has three main areas:
Simple Mode - Step-by-step list view for building pipelines
Workflow Mode - Visual drag-and-drop canvas with node palette
Every pipeline needs a trigger to define how it starts.
Extract steps pull data from sources.
httpApi) - REST API endpoints with pagination, authentication, and retry supportfile) - Parse CSV, JSON, XML, XLSX, NDJSON, TSV filesgraphql) - External GraphQL endpointsvendureQuery) - Vendure entity dataTransform steps modify records.
Load steps create or update Vendure entities or send data externally.
productUpsert) - Create/update productsvariantUpsert) - Create/update product variantscustomerUpsert) - Create/update customerscollectionUpsert) - Create/update collectionspromotionUpsert) - Create/update promotionsstockAdjust) - Adjust inventory levelsorderNote) - Add notes to ordersorderTransition) - Change order statesrestPost) - Send data to external APIsConnections show the data flow direction. A record must pass through connected steps in order.
Click a step to open its configuration panel:
Each adapter has specific settings. See Reference for details.
Route steps split data flow based on conditions:
Example: Route products by category:
Before running, validate your pipeline:
Fix all issues before saving.
Some pipelines accept input parameters:
| State | Description |
|---|---|
| Draft | Not yet saved or validated |
| Enabled | Ready to run, schedules active |
| Disabled | Cannot be run, schedules inactive |
| Running | Currently executing |
Note: Deleting a pipeline removes all run history.
Hooks allow you to execute custom code at specific stages of pipeline execution. Use hooks to modify data, send notifications, trigger other pipelines, or integrate with external systems.
Data Processing Stages:
| Stage | When It Runs | Can Modify Records |
|---|---|---|
BEFORE_EXTRACT |
Before data extraction | Yes (seed records) |
AFTER_EXTRACT |
After data is extracted | Yes |
BEFORE_TRANSFORM |
Before transformation | Yes |
AFTER_TRANSFORM |
After transformation | Yes |
BEFORE_VALIDATE |
Before validation | Yes |
AFTER_VALIDATE |
After validation | Yes |
BEFORE_ENRICH |
Before enrichment | Yes |
AFTER_ENRICH |
After enrichment | Yes |
BEFORE_ROUTE |
Before routing | Yes |
AFTER_ROUTE |
After routing | Yes |
BEFORE_LOAD |
Before loading to Vendure | Yes |
AFTER_LOAD |
After loading | Yes |
BEFORE_EXPORT |
Before file export | Yes |
AFTER_EXPORT |
After file export | Yes |
BEFORE_FEED |
Before feed generation | Yes |
AFTER_FEED |
After feed generation | Yes |
BEFORE_SINK |
Before search indexing | Yes |
AFTER_SINK |
After search indexing | Yes |
Lifecycle Stages (observe-only — WEBHOOK, EMIT, LOG, TRIGGER_PIPELINE only, no INTERCEPTOR/SCRIPT):
| Stage | When It Runs | Supported Hook Types |
|---|---|---|
PIPELINE_STARTED |
Pipeline execution begins | WEBHOOK, EMIT, LOG, TRIGGER_PIPELINE |
PIPELINE_COMPLETED |
Pipeline finishes successfully | WEBHOOK, EMIT, LOG, TRIGGER_PIPELINE |
PIPELINE_FAILED |
Pipeline fails | WEBHOOK, EMIT, LOG, TRIGGER_PIPELINE |
ON_ERROR |
When an error occurs | WEBHOOK, EMIT, LOG, TRIGGER_PIPELINE |
ON_RETRY |
When a record is retried | WEBHOOK, EMIT, LOG, TRIGGER_PIPELINE |
ON_DEAD_LETTER |
When a record is sent to dead letter queue | WEBHOOK, EMIT, LOG, TRIGGER_PIPELINE |
Interceptors run JavaScript code that can modify the records array:
.hooks({
AFTER_EXTRACT: [{
type: 'INTERCEPTOR',
name: 'Add metadata',
code: `
return records.map(r => ({
...r,
extractedAt: new Date().toISOString(),
source: 'supplier-api',
}));
`,
failOnError: false, // Don't fail pipeline if hook fails
timeout: 5000, // 5 second timeout
}],
BEFORE_LOAD: [{
type: 'INTERCEPTOR',
name: 'Filter invalid',
code: `
return records.filter(r => r.sku && r.name);
`,
}],
})
Modify records before search indexing (Meilisearch, Elasticsearch, etc.):
.hooks({
BEFORE_SINK: [{
type: 'INTERCEPTOR',
name: 'Enrich for search',
code: `
return records.map(r => ({
...r,
searchText: [r.name, r.sku, r.description].filter(Boolean).join(' ').toLowerCase(),
facetTags: (r.tags || '').split(',').map(t => t.trim()).filter(Boolean),
boostScore: r.featured ? 1.5 : 1.0,
}));
`,
}],
})
Transform records before CSV/JSON export:
.hooks({
BEFORE_EXPORT: [{
type: 'INTERCEPTOR',
name: 'Format for export',
code: `
return records.map(r => ({
...r,
price: (r.price / 100).toFixed(2),
createdAt: new Date(r.createdAt).toISOString(),
}));
`,
}],
})
Available in interceptor code:
records - The current records arraycontext - Hook context with pipelineId, runId, stageArray, Object, String, Number, Date, JSON, Mathconsole.log/warn/error - Logged to pipeline logsScript hooks reference pre-registered TypeScript functions. Register scripts via plugin options (recommended) or imperatively:
Via Plugin Options (Recommended):
DataHubPlugin.init({
scripts: {
'addSegment': async (records, context, args) => {
const threshold = args?.threshold || 1000;
return records.map(r => ({
...r,
segment: r.totalSpent > threshold ? 'vip' : 'standard',
}));
},
'validateRequired': async (records, context) => {
return records.filter(r => r.sku && r.name && r.price > 0);
},
'enrichWithTimestamp': async (records, context) => {
return records.map(r => ({
...r,
importedAt: Date.now(),
pipelineRun: context.runId,
}));
},
},
})
Via Service Injection:
import { HookService } from '@oronts/vendure-data-hub-plugin';
@VendurePlugin({ imports: [DataHubPlugin] })
export class MyPlugin implements OnModuleInit {
constructor(private hookService: HookService) {}
onModuleInit() {
this.hookService.registerScript('addSegment', async (records, context, args) => {
const threshold = args?.threshold || 1000;
return records.map(r => ({
...r,
segment: r.totalSpent > threshold ? 'vip' : 'standard',
}));
});
}
}
Tip: When registering scripts via
HookService.registerScript()in a NestJS service, your scripts can access any injected service (database, external APIs, Vendure services) through JavaScript closures. See the Developer Guide for examples.
Use in pipeline:
.hooks({
AFTER_TRANSFORM: [{
type: 'SCRIPT',
scriptName: 'addSegment',
args: { threshold: 5000 },
}],
AFTER_EXTRACT: [{
type: 'SCRIPT',
scriptName: 'validateRequired',
}],
})
Register a script for search index enrichment:
DataHubPlugin.init({
scripts: {
'buildSearchAttributes': async (records, context) => {
return records.map(r => ({
...r,
searchText: [r.name, r.sku, r.description]
.filter(Boolean).join(' ').toLowerCase(),
facetCategories: r.categories?.map(c => c.name) || [],
}));
},
},
})
// Use in pipeline:
.hooks({
BEFORE_SINK: [{ type: 'SCRIPT', scriptName: 'buildSearchAttributes' }],
})
Send HTTP notifications to external systems:
.hooks({
PIPELINE_COMPLETED: [{
type: 'WEBHOOK',
url: 'https://slack.example.com/webhook',
headers: { 'Content-Type': 'application/json' },
}],
PIPELINE_FAILED: [{
type: 'WEBHOOK',
url: 'https://pagerduty.example.com/alert',
secret: 'webhook-signing-secret', // HMAC signature
signatureHeader: 'X-Signature',
retryConfig: {
maxAttempts: 5,
initialDelayMs: 1000,
backoffMultiplier: 2,
},
}],
})
Emit Vendure domain events:
.hooks({
PIPELINE_COMPLETED: [{
type: 'EMIT',
event: 'ProductSyncCompleted',
}],
})
Start another pipeline with the current records:
.hooks({
AFTER_LOAD: [{
type: 'TRIGGER_PIPELINE',
pipelineCode: 'reindex-search',
}],
})
Test hooks without running the full pipeline:
failOnError: false for non-critical hooksThe Data Hub provides guided wizards for creating import and export pipelines:
Wizards offer pre-configured templates for common scenarios:
Import Templates: REST API Sync, JSON Import, Magento CSV, XML Feed, ERP Inventory, CRM Customer Sync Export Templates: Google Shopping, Meta Catalog, Amazon Feed, Product CSV/JSON, Order Analytics, Customer GDPR, Inventory Report
Custom templates registered via plugin options or connectors appear alongside built-in templates.
Test pipelines without persisting changes:
Dry run executes the full pipeline logic but doesn’t commit changes to the database.
product-import-daily not pipeline-1inventory-sync-hourlyerp-product-synccontinue, stop, or dead-letter