This guide covers the code-first DSL, architecture, and extending the Data Hub plugin.
Pipeline Hooks - Execute custom code at every stage of pipeline execution
Use the code-first DSL when:
Use the visual builder when:
import { createPipeline } from '@oronts/vendure-data-hub-plugin';
const pipeline = createPipeline()
.name('Product Sync')
.description('Sync products from ERP system')
.trigger('schedule', { type: 'SCHEDULE', cron: '0 2 * * *' })
.extract('fetch-erp', {
adapterCode: 'httpApi',
connectionCode: 'erp-api',
url: '/products',
dataPath: 'data.products',
})
.transform('map-fields', {
operators: [
{ op: 'rename', args: { from: 'product_name', to: 'name' } },
{ op: 'rename', args: { from: 'product_sku', to: 'sku' } },
{ op: 'slugify', args: { source: 'sku', target: 'slug' } },
],
})
.load('upsert-products', {
adapterCode: 'productUpsert',
strategy: 'UPSERT',
matchField: 'slug',
})
.edge('schedule', 'fetch-erp')
.edge('fetch-erp', 'map-fields')
.edge('map-fields', 'upsert-products')
.build();
Register functions that can modify records at any pipeline stage:
DataHubPlugin.init({
scripts: {
'validate-sku': async (records, context) => {
return records.filter(r => r.sku && String(r.sku).length > 0);
},
},
})
// Then in pipeline:
const pipeline = createPipeline()
.name('Product Sync')
// ... steps ...
.hooks({
AFTER_EXTRACT: [{ type: 'SCRIPT', scriptName: 'validate-sku' }],
})
.build();
24 hook stages are available (18 for step types and 6 global): BEFORE/AFTER for each step type (EXTRACT, TRANSFORM, VALIDATE, ENRICH, ROUTE, LOAD, EXPORT, FEED, SINK), plus PIPELINE_STARTED/COMPLETED/FAILED, ON_ERROR/ON_RETRY/ON_DEAD_LETTER.
All 24 hook stages support record modification via interceptor and script hooks. This includes terminal steps like SINK (search indexing), EXPORT (file export), and FEED (feed generation) — allowing you to programmatically modify records right before they reach Meilisearch, Elasticsearch, CSV export, XML feeds, etc.
The DSL is fully typed. TypeScript will catch errors like:
// TypeScript error: Property 'invalidOption' does not exist
.extract('fetch', {
adapterCode: 'httpApi',
invalidOption: true, // Error!
})