The createPipeline() function returns a builder for constructing pipelines.
import { createPipeline } from '@oronts/vendure-data-hub-plugin';
createPipeline()
.name('Pipeline Name')
.description('Pipeline description')
.version(1)
| Method | Description |
|---|---|
name(name: string) |
Human-readable name |
description(desc: string) |
Description |
version(v: number) |
Definition version |
.context({
channel: 'default',
contentLanguage: 'en',
channelStrategy: 'EXPLICIT', // 'EXPLICIT' | 'INHERIT' | 'MULTI'
validationMode: 'STRICT', // 'STRICT' | 'LENIENT'
runMode: 'BATCH', // 'SYNC' | 'ASYNC' | 'BATCH' | 'STREAM'
})
.capabilities({
writes: ['CATALOG'], // 'CATALOG' | 'CUSTOMERS' | 'ORDERS' | 'PROMOTIONS' | 'INVENTORY' | 'CUSTOM'
requires: [], // Required permissions
streamSafe: true, // Safe for streaming mode
})
Enables parallel step execution for the pipeline. When enabled, independent steps (those without data dependencies) run concurrently.
| Parameter | Type | Required | Description |
|---|---|---|---|
| maxConcurrentSteps | number | No | Maximum steps to run concurrently (default: 4, range: 2-16) |
| errorPolicy | string | No | 'FAIL_FAST' | 'CONTINUE' | 'BEST_EFFORT'. Default: 'FAIL_FAST' |
Error policies:
FAIL_FAST – Stop all steps on first error (default)CONTINUE – Continue other parallel steps, fail at endBEST_EFFORT – Continue all steps, collect all errorscreatePipeline()
.name('Parallel Import')
.parallel({ maxConcurrentSteps: 4, errorPolicy: 'CONTINUE' })
.extract('fetch-products', { /* ... */ })
.extract('fetch-prices', { /* ... */ })
.transform('merge', { /* ... */ })
.load('upsert', { /* ... */ })
.edge('fetch-products', 'merge')
.edge('fetch-prices', 'merge')
.edge('merge', 'upsert')
.build();
.dependsOn('other-pipeline-code', 'another-pipeline')
Pipelines that must complete before this one can run.
Pipeline lifecycle hooks using SCREAMING_SNAKE_CASE stage names. Each stage maps to an
array of HookAction objects. Six action types are supported: INTERCEPTOR, SCRIPT,
WEBHOOK, EMIT, TRIGGER_PIPELINE, and LOG.
Interceptor Hooks (modify records inline):
.hooks({
AFTER_EXTRACT: [{
type: 'INTERCEPTOR',
name: 'Add metadata',
code: `return records.map(r => ({ ...r, source: 'api' }));`,
}],
BEFORE_LOAD: [{
type: 'INTERCEPTOR',
name: 'Filter invalid',
code: `return records.filter(r => r.sku && r.name);`,
failOnError: true,
}],
})
Script Hooks (reference pre-registered functions):
.hooks({
AFTER_TRANSFORM: [{
type: 'SCRIPT',
scriptName: 'addCustomerSegment',
args: { spendThreshold: 5000 },
}],
})
Modify records before search engine indexing:
.hooks({
BEFORE_SINK: [{
type: 'SCRIPT',
scriptName: 'enrichForSearch',
args: { boostFeatured: true },
}],
})
Webhook Hooks (notify external systems):
.hooks({
PIPELINE_COMPLETED: [{
type: 'WEBHOOK',
url: 'https://api.example.com/notify',
headers: { 'Content-Type': 'application/json' },
retryConfig: {
maxAttempts: 3,
initialDelayMs: 1000,
maxDelayMs: 60000,
backoffMultiplier: 2,
},
}],
PIPELINE_FAILED: [{
type: 'WEBHOOK',
url: 'https://pagerduty.example.com/alert',
}],
})
Other Hook Types:
.hooks({
ON_ERROR: [{
type: 'EMIT',
event: 'pipeline.error',
}],
AFTER_LOAD: [{
type: 'TRIGGER_PIPELINE',
pipelineCode: 'post-import-sync',
}],
PIPELINE_STARTED: [{
type: 'LOG',
level: 'INFO',
message: 'Pipeline execution started',
}],
})
Define how the pipeline starts:
.trigger('start', {
type: 'MANUAL' | 'SCHEDULE' | 'WEBHOOK' | 'EVENT' | 'FILE' | 'MESSAGE',
// Type-specific options...
})
Manual Trigger:
.trigger('start', { type: 'MANUAL' })
Schedule Trigger:
.trigger('schedule', {
type: 'SCHEDULE',
cron: '0 2 * * *',
timezone: 'UTC',
})
Webhook Trigger:
.trigger('webhook', {
type: 'WEBHOOK',
path: '/product-sync',
signature: 'hmac-sha256',
idempotencyKey: 'X-Request-ID',
})
Event Trigger:
.trigger('on-order', {
type: 'EVENT',
event: 'OrderPlacedEvent',
filter: { state: 'ArrangingPayment' },
})
Pull data from external sources:
.extract('step-key', {
adapterCode: string,
// Adapter-specific options...
throughput?: Throughput,
async?: boolean,
})
HTTP API:
.extract('fetch-api', {
adapterCode: 'httpApi',
url: 'https://api.example.com/products',
method: 'GET',
headers: { 'Accept': 'application/json' },
dataPath: 'data.items',
pagination: {
type: 'PAGE',
limit: 100,
maxPages: 10,
},
bearerTokenSecretCode: 'api-key',
})
GraphQL:
.extract('query-graphql', {
adapterCode: 'graphql',
url: 'https://api.example.com/graphql',
query: `query { products { id name } }`,
dataPath: 'data.products',
})
File:
.extract('parse-file', {
adapterCode: 'file',
path: '/uploads/products.csv',
format: 'CSV',
delimiter: ',',
hasHeader: true,
})
Vendure Query:
.extract('query-vendure', {
adapterCode: 'vendureQuery',
entity: 'PRODUCT', // UPPERCASE: PRODUCT, COLLECTION, FACET, CUSTOMER, ORDER, etc.
relations: 'variants,featuredAsset,translations',
languageCode: 'en',
batchSize: 500,
})
Modify records:
.transform('step-key', {
operators: OperatorConfig[],
throughput?: Throughput,
async?: boolean,
retryPerRecord?: RetryPerRecordConfig,
})
Example:
.transform('map-fields', {
operators: [
{ op: 'rename', args: { from: 'title', to: 'name' } },
{ op: 'set', args: { path: 'enabled', value: true } },
{ op: 'slugify', args: { source: 'name', target: 'slug' } },
],
})
Per-record retry configuration for transform operators. When set, individual records that fail during transformation are retried independently rather than failing the entire batch.
| Field | Type | Required | Description |
|---|---|---|---|
maxRetries |
number | Yes | Max retries per record (default: 0) |
retryDelayMs |
number | No | Delay between retries in ms (default: 100) |
backoff |
string | No | 'FIXED' | 'EXPONENTIAL' (default: 'FIXED') |
retryableErrors |
string[] | No | Error message patterns to retry (optional, retries all errors if omitted) |
.transform('enrich-products', {
operators: [
{ op: 'httpLookup', args: { url: 'https://api.example.com/', target: 'extra' } },
],
retryPerRecord: {
maxRetries: 3,
retryDelayMs: 500,
backoff: 'EXPONENTIAL',
retryableErrors: ['ETIMEDOUT', 'ECONNRESET'],
},
})
Validate records:
.validate('step-key', {
errorHandlingMode: 'FAIL_FAST' | 'ACCUMULATE',
rules: ValidationRuleConfig[],
schemaRef?: SchemaRefConfig,
throughput?: Throughput,
})
Example:
.validate('check-data', {
errorHandlingMode: 'ACCUMULATE',
rules: [
{ type: 'business', spec: { field: 'sku', required: true } },
{ type: 'business', spec: { field: 'price', min: 0 } },
],
})
Add data from external lookups or static enrichment:
.enrich('step-key', {
adapterCode?: string, // Custom enricher adapter (optional if using built-in)
defaults?: Record<string, JsonValue>, // Set fields only if missing
set?: Record<string, JsonValue>, // Always overwrite these fields
computed?: Record<string, string>, // Template expressions: '${field1} ${field2}'
sourceType?: 'STATIC' | 'HTTP' | 'VENDURE',
endpoint?: string, // HTTP endpoint URL (for HTTP source type)
matchField?: string, // Field to match for lookups
entity?: string, // Vendure entity type (for VENDURE source type)
config?: JsonObject, // Additional adapter config
})
Static Enrichment (no adapter needed):
.enrich('add-defaults', {
defaults: { currency: 'USD', enabled: false },
set: { importSource: 'api-sync' },
computed: { fullTitle: '${brand} - ${name}' },
})
Split data flow based on conditions:
.route('step-key', {
branches: RouteBranchConfig[],
defaultTo?: string,
})
Example:
.route('by-category', {
branches: [
{
name: 'electronics',
when: [{ field: 'category', cmp: 'eq', value: 'electronics' }],
},
{
name: 'clothing',
when: [{ field: 'category', cmp: 'eq', value: 'clothing' }],
},
],
defaultTo: 'other-products',
})
Create or update Vendure entities:
.load('step-key', {
adapterCode: string,
strategy?: 'CREATE' | 'UPDATE' | 'UPSERT' | 'MERGE' | 'SOFT_DELETE' | 'HARD_DELETE',
channel?: string,
channelStrategy?: 'EXPLICIT' | 'INHERIT' | 'MULTI',
validationMode?: ValidationMode,
matchField?: string,
nameField?: string,
slugField?: string,
descriptionField?: string,
skuField?: string,
priceField?: string,
emailField?: string,
firstNameField?: string,
lastNameField?: string,
phoneNumberField?: string,
codeField?: string,
parentField?: string,
positionField?: string,
stockOnHandField?: string,
stockLocationField?: string,
urlField?: string,
enabledField?: string,
config?: JsonObject,
throughput?: Throughput,
async?: boolean,
})
Product Loader:
.load('import-products', {
adapterCode: 'productUpsert',
strategy: 'UPSERT',
matchField: 'slug',
conflictStrategy: 'SOURCE_WINS',
})
Variant Loader:
.load('import-variants', {
adapterCode: 'variantUpsert',
strategy: 'UPDATE',
matchField: 'sku',
})
Send data to external destinations:
.export('step-key', {
adapterCode: string,
target?: 'FILE' | 'API' | 'WEBHOOK' | 'S3' | 'SFTP' | 'EMAIL',
format?: 'CSV' | 'JSON' | 'XML' | 'XLSX' | 'NDJSON',
// Target-specific options...
})
File Export:
.export('write-file', {
adapterCode: 'file-export',
target: 'FILE',
format: 'CSV',
path: '/exports',
filename: 'products.csv',
})
S3 Export:
.export('upload-s3', {
adapterCode: 's3-export',
target: 'S3',
bucket: 'my-bucket',
prefix: 'exports/',
format: 'JSON',
connectionCode: 'aws-s3',
})
Generate product feeds:
.feed('step-key', {
adapterCode: 'googleMerchant' | 'metaCatalog' | 'amazonFeed' | 'customFeed',
feedType?: 'GOOGLE_SHOPPING' | 'META_CATALOG' | 'AMAZON' | 'CUSTOM',
format?: 'XML' | 'CSV' | 'TSV' | 'JSON' | 'NDJSON',
// Feed-specific options...
})
Google Feed:
.feed('google-shopping', {
adapterCode: 'googleMerchant',
feedType: 'GOOGLE_SHOPPING',
format: 'XML',
outputPath: '/feeds/google.xml',
targetCountry: 'US',
contentLanguage: 'en',
currency: 'USD',
titleField: 'name',
descriptionField: 'description',
priceField: 'price',
imageField: 'image',
})
Index data to search engines:
.sink('step-key', {
adapterCode: 'elasticsearch' | 'meilisearch' | 'algolia' | 'typesense',
sinkType?: 'ELASTICSEARCH' | 'OPENSEARCH' | 'MEILISEARCH' | 'ALGOLIA' | 'TYPESENSE' | 'CUSTOM',
indexName: string,
// Sink-specific options...
})
Elasticsearch:
.sink('index-products', {
adapterCode: 'elasticsearch',
sinkType: 'ELASTICSEARCH',
host: 'localhost',
port: 9200,
indexName: 'products',
idField: 'id',
bulkSize: 500,
})
Add a human-in-the-loop approval gate step. Gates pause pipeline execution until approval is granted – either manually, automatically when errors fall below a threshold, or after a timeout.
.gate('step-key', {
approvalType: 'MANUAL' | 'THRESHOLD' | 'TIMEOUT',
timeoutSeconds?: number,
errorThresholdPercent?: number,
notifyWebhook?: string,
notifyEmail?: string,
previewCount?: number,
})
| Parameter | Type | Required | Description |
|---|---|---|---|
approvalType |
string | Yes | 'MANUAL' | 'THRESHOLD' | 'TIMEOUT' |
timeoutSeconds |
number | No | Auto-approve after N seconds (TIMEOUT mode) |
errorThresholdPercent |
number | No | Auto-approve if error rate below threshold, 0-100 (THRESHOLD mode) |
notifyWebhook |
string | No | Webhook URL for gate notifications |
notifyEmail |
string | No | Email address for gate notifications |
previewCount |
number | No | Number of records to preview (default: 10) |
Manual Gate:
.gate('review-import', {
approvalType: 'MANUAL',
notifyEmail: 'admin@example.com',
previewCount: 25,
})
Threshold Gate:
.gate('error-check', {
approvalType: 'THRESHOLD',
errorThresholdPercent: 5,
notifyWebhook: 'https://hooks.example.com/gate',
})
Timeout Gate:
.gate('timed-review', {
approvalType: 'TIMEOUT',
timeoutSeconds: 3600,
notifyEmail: 'team@example.com',
})
Connect steps:
.edge(from: string, to: string, branch?: string)
Basic Connection:
.edge('extract', 'transform')
.edge('transform', 'load')
Branching:
.edge('route', 'process-electronics', 'electronics')
.edge('route', 'process-clothing', 'clothing')
.edge('route', 'process-other') // Default branch
Finalize the pipeline:
const definition = createPipeline()
// ... configuration ...
.build();
// Returns PipelineDefinition
Control execution performance:
{
throughput: {
batchSize: 100, // Records per batch
concurrency: 4, // Parallel batches
rateLimitRps: 10, // Max requests per second
drainStrategy: 'BACKOFF', // 'BACKOFF' | 'SHED' | 'QUEUE'
pauseOnErrorRate: {
threshold: 0.5, // Pause if error rate exceeds 50%
intervalSec: 60, // Check interval
},
},
// Retry configuration (step-level, not in throughput)
retries: 3,
retryDelayMs: 1000,
timeoutMs: 30000,
}
import { createPipeline, DataHubPlugin } from '@oronts/vendure-data-hub-plugin';
const productSync = createPipeline()
.name('Daily Product Sync')
.description('Sync products from ERP every day')
.version(1)
.capabilities({ writes: ['CATALOG'], streamSafe: true })
.trigger('schedule', {
type: 'SCHEDULE',
cron: '0 2 * * *',
timezone: 'UTC',
})
.extract('fetch-erp', {
adapterCode: 'httpApi',
connectionCode: 'erp-api',
url: '/products',
dataPath: 'data',
throughput: { batchSize: 500 },
})
.transform('map-fields', {
operators: [
{ op: 'rename', args: { from: 'product_name', to: 'name' } },
{ op: 'rename', args: { from: 'product_sku', to: 'sku' } },
{ op: 'slugify', args: { source: 'sku', target: 'slug' } },
{ op: 'math', args: { operation: 'multiply', source: 'price', operand: '100', target: 'price' } },
{ op: 'set', args: { path: 'enabled', value: true } },
],
})
.validate('check-data', {
errorHandlingMode: 'ACCUMULATE',
rules: [
{ type: 'business', spec: { field: 'name', required: true } },
{ type: 'business', spec: { field: 'sku', required: true } },
{ type: 'business', spec: { field: 'price', min: 0 } },
],
})
.load('upsert-products', {
adapterCode: 'productUpsert',
strategy: 'UPSERT',
matchField: 'slug',
conflictStrategy: 'SOURCE_WINS',
throughput: { batchSize: 50, concurrency: 2 },
})
.edge('schedule', 'fetch-erp')
.edge('fetch-erp', 'map-fields')
.edge('map-fields', 'check-data')
.edge('check-data', 'upsert-products')
.build();
export const config = {
plugins: [
DataHubPlugin.init({
pipelines: [{
code: 'product-sync',
name: 'Daily Product Sync',
enabled: true,
definition: productSync,
}],
}),
],
};