Create your first data pipeline in 5 minutes.
We’ll build a pipeline that:
product-importProduct ImportstartMANUALfetch-productsHTTP APIhttps://fakestoreapi.com/productsGETmap-fieldsrename: from title to nametemplate: target slug, template ${id}-${title}set: path enabled, value truecreate-productsProductUPSERTnameslugCreate the same pipeline using TypeScript:
// vendure-config.ts
import { VendureConfig } from '@vendure/core';
import { DataHubPlugin, createPipeline } from '@oronts/vendure-data-hub-plugin';
const productImportPipeline = createPipeline()
.name('Product Import')
.description('Import products from Fake Store API')
.capabilities({ requires: ['UpdateCatalog'] })
.trigger('start', { type: 'MANUAL' })
.extract('fetch-products', {
adapterCode: 'httpApi',
url: 'https://fakestoreapi.com/products',
method: 'GET',
})
.transform('map-fields', {
operators: [
{ op: 'rename', args: { from: 'title', to: 'name' } },
{ op: 'rename', args: { from: 'id', to: 'sku' } },
{ op: 'slugify', args: { source: 'name', target: 'slug' } },
{ op: 'set', args: { path: 'enabled', value: true } },
],
})
.load('create-products', {
adapterCode: 'productUpsert',
strategy: 'UPSERT',
matchField: 'slug',
conflictStrategy: 'SOURCE_WINS',
})
.edge('start', 'fetch-products')
.edge('fetch-products', 'map-fields')
.edge('map-fields', 'create-products')
.build();
export const config: VendureConfig = {
plugins: [
DataHubPlugin.init({
enabled: true,
pipelines: [{
code: 'product-import',
name: 'Product Import',
definition: productImportPipeline,
}],
}),
],
};
mutation {
startDataHubPipelineRun(pipelineId: "1") {
id
status
}
}
If configured with a webhook trigger, POST to:
POST /data-hub/webhook/product-import
Go to Data Hub > Runs to see:
Check Catalog > Products to see the imported products.
const csvImport = createPipeline()
.name('CSV Import')
.capabilities({ requires: ['UpdateCatalog'] })
.trigger('start', { type: 'MANUAL' })
.extract('parse-csv', {
adapterCode: 'file',
path: '/uploads/products.csv',
format: 'CSV',
delimiter: ',',
hasHeader: true,
})
.load('import', {
adapterCode: 'productUpsert',
strategy: 'UPSERT',
matchField: 'slug',
})
.edge('start', 'parse-csv')
.edge('parse-csv', 'import')
.build();
const scheduledSync = createPipeline()
.name('Scheduled Sync')
.capabilities({ requires: ['UpdateCatalog'] })
.trigger('schedule', {
type: 'SCHEDULE',
cron: '0 2 * * *', // Daily at 2 AM
timezone: 'UTC',
})
.extract('fetch', { adapterCode: 'httpApi', url: 'https://api.example.com/products' })
.load('sync', {
adapterCode: 'productUpsert',
strategy: 'UPSERT',
matchField: 'slug',
})
.edge('schedule', 'fetch')
.edge('fetch', 'sync')
.build();
const productExport = createPipeline()
.name('Product Export')
.capabilities({ requires: ['ReadCatalog'] })
.trigger('start', { type: 'MANUAL' })
.extract('query', {
adapterCode: 'vendureQuery',
entity: 'PRODUCT',
relations: 'variants,featuredAsset',
batchSize: 100,
})
.transform('prepare', {
operators: [
{ op: 'flatten', args: { source: 'variants' } },
],
})
.load('export', {
adapterCode: 'restPost',
endpoint: 'https://api.example.com/products',
method: 'POST',
})
.edge('start', 'query')
.edge('query', 'prepare')
.edge('prepare', 'export')
.build();