Complete reference for all pipeline step types.
Every pipeline is composed of steps connected by edges. Each step has a type
that determines its role in the data flow. The StepType enum defines the
following 11 step types:
| Step Type | Purpose |
|---|---|
TRIGGER |
Starts pipeline execution |
EXTRACT |
Pulls data from external sources |
TRANSFORM |
Modifies records with operators |
VALIDATE |
Validates records against rules |
ENRICH |
Adds data from external lookups |
ROUTE |
Splits data flow by conditions |
LOAD |
Creates or updates Vendure entities |
EXPORT |
Sends data to external destinations |
FEED |
Generates product feeds |
SINK |
Indexes data to search engines and queues |
GATE |
Human-in-the-loop approval gate |
Defines how a pipeline starts. Every pipeline must have exactly one trigger step as its root node.
Trigger types: MANUAL, SCHEDULE, WEBHOOK, EVENT, FILE, MESSAGE
See Pipeline Builder - trigger for full configuration.
Pulls data from external sources into the pipeline. Supports HTTP APIs, GraphQL endpoints, files, databases, S3, FTP, CDC, and Vendure entity queries.
See Extractors Reference for all extractor adapters and their configuration.
Modifies records using one or more operators. Supports 61 built-in operators across 11 categories (data, string, numeric, date, logic, JSON, enrichment, aggregation, file, validation, and scripting).
Optionally supports per-record retry via retryPerRecord configuration.
See Operators Reference for all operators and their arguments.
Validates records against business rules and schemas. Supports FAIL_FAST
(stop on first error) and ACCUMULATE (collect all errors) modes.
See Pipeline Builder - validate for configuration.
Adds data from external lookups to records. Uses adapter-based enrichment with configurable lookup sources.
See Pipeline Builder - enrich for configuration.
Splits data flow based on field conditions. Each branch defines a set of
conditions using comparison operators (eq, ne, gt, lt, in,
contains, regex, etc.).
Unmatched records: In graph execution mode, records that don’t match any branch condition are collected into a
defaultbranch. In linear execution mode, if no branch matches any records, the step returns an empty result and unmatched records are silently dropped. A warning is logged when records are dropped. To ensure all records are handled, add a catch-all branch with{ name: 'fallback', when: [] }(no conditions = always matches) as the last branch.
See Pipeline Builder - route for configuration.
Creates, updates, or deletes Vendure entities. Supports 24 loader codes including products, variants, customers, collections, facets, orders (upsert, notes, transitions, coupons), promotions, assets, inventory, entity deletion, and more.
Strategies: CREATE, UPDATE, UPSERT, MERGE, SOFT_DELETE, HARD_DELETE
See Loaders Reference for all loader adapters and their configuration.
Sends data to external destinations including files, S3, SFTP, HTTP, and email.
Formats: CSV, JSON, XML, XLSX, NDJSON, PARQUET
Targets: FILE, API, WEBHOOK, S3, SFTP, EMAIL
See Pipeline Builder - export for configuration.
Generates product feeds for marketing platforms.
Feed types: GOOGLE_SHOPPING, META_CATALOG, AMAZON, CUSTOM
See Feed Generators Reference for all feed adapters and their configuration.
Indexes data to search engines and publishes to message queues.
Sink types: ELASTICSEARCH, OPENSEARCH, MEILISEARCH, ALGOLIA, TYPESENSE, WEBHOOK, CUSTOM
See Sinks Reference for all sink adapters and their configuration.
Adds a human-in-the-loop approval gate that pauses pipeline execution until approval is granted. Gates are useful for reviewing data before it is loaded into Vendure, especially for large or high-risk imports.
.gate('step-key', {
approvalType: 'MANUAL' | 'THRESHOLD' | 'TIMEOUT',
timeoutSeconds?: number,
errorThresholdPercent?: number,
notifyWebhook?: string,
notifyEmail?: string,
previewCount?: number,
})
| Parameter | Type | Required | Description |
|---|---|---|---|
approvalType |
string | Yes | Approval mode (see below) |
timeoutSeconds |
number | No | Auto-approve after N seconds (TIMEOUT mode) |
errorThresholdPercent |
number | No | Auto-approve if error rate below threshold, 0-100 (THRESHOLD mode) |
notifyWebhook |
string | No | Webhook URL to call when gate is reached |
notifyEmail |
string | No | Email address to notify when gate is reached |
previewCount |
number | No | Number of records to include in the gate preview (default: 10) |
| Type | Behavior |
|---|---|
MANUAL |
Pipeline pauses until a user explicitly approves or rejects via the dashboard or API |
THRESHOLD |
Auto-approves if the upstream error rate is below errorThresholdPercent; otherwise pauses for manual review |
TIMEOUT |
Auto-approves after timeoutSeconds if no manual action is taken |
Manual approval with email notification:
.gate('review-before-load', {
approvalType: 'MANUAL',
notifyEmail: 'admin@example.com',
previewCount: 25,
})
Threshold-based auto-approval:
.gate('error-check', {
approvalType: 'THRESHOLD',
errorThresholdPercent: 5,
notifyWebhook: 'https://hooks.example.com/gate-reached',
})
Timeout-based auto-approval:
.gate('timed-review', {
approvalType: 'TIMEOUT',
timeoutSeconds: 3600,
notifyEmail: 'team@example.com',
previewCount: 50,
})
createPipeline()
.name('Reviewed Product Import')
.trigger('start', { type: 'MANUAL' })
.extract('fetch-erp', { adapterCode: 'httpApi', /* ... */ })
.transform('map-fields', { operators: [ /* ... */ ] })
.validate('check-data', { errorHandlingMode: 'ACCUMULATE', rules: [ /* ... */ ] })
.gate('review-before-load', {
approvalType: 'MANUAL',
notifyEmail: 'data-team@example.com',
previewCount: 20,
})
.load('upsert-products', { adapterCode: 'productUpsert', strategy: 'UPSERT', matchField: 'slug' })
.edge('start', 'fetch-erp')
.edge('fetch-erp', 'map-fields')
.edge('map-fields', 'check-data')
.edge('check-data', 'review-before-load')
.edge('review-before-load', 'upsert-products')
.build();