Enterprise ETL & Data Integration for Vendure E-commerce
Features • Installation • Quick Start • Extractors • Operators • Loaders • Hooks • Docs • License
License: Commercial plugin — free for personal, learning, and non-commercial use. Commercial use requires a license. Contact office@oronts.com for details. See License.
A full-featured ETL (Extract, Transform, Load) plugin for Vendure e-commerce. Build data pipelines to import products, sync inventory, generate product feeds, index to search engines, and integrate with external systems.
Visual Pipeline Editor - Drag-and-drop workflow builder
Pipeline Management - Overview of all data pipelines
Adapters Catalog - Extractors, Operators, and Loaders
Logs & Analytics - Real-time monitoring and pipeline health
Hooks & Events - Test hooks and view pipeline events
Connections - Manage external system credentials
Queues - Monitor pipeline execution and dead letters
Import Wizard - Step-by-step guided data import with templates
Export Wizard - Generate product feeds for Google, Facebook, Amazon
npm install @oronts/vendure-data-hub-plugin
// vendure-config.ts
import { VendureConfig } from '@vendure/core';
import { DataHubPlugin } from '@oronts/vendure-data-hub-plugin';
export const config: VendureConfig = {
plugins: [
DataHubPlugin.init({
enabled: true,
}),
],
};
The plugin adds a “Data Hub” section to your admin dashboard for creating and managing pipelines.
Define pipelines in TypeScript:
import { DataHubPlugin, createPipeline } from '@oronts/vendure-data-hub-plugin';
const productImport = createPipeline()
.name('Product Import')
.description('Import products from supplier API')
.capabilities({ requires: ['UpdateCatalog'] })
.trigger('start', { type: 'MANUAL' })
.extract('fetch-products', {
adapterCode: 'httpApi',
url: 'https://api.supplier.com/products',
method: 'GET',
dataPath: 'data.products',
pagination: {
type: 'page',
limit: 100,
maxPages: 100,
},
})
.transform('prepare', {
operators: [
{ op: 'validateRequired', args: { fields: ['sku', 'name', 'price'] } },
{ op: 'trim', args: { path: 'name' } },
{ op: 'slugify', args: { source: 'name', target: 'slug' } },
{ op: 'currency', args: { source: 'price', target: 'priceInCents', decimals: 2 } },
{ op: 'set', args: { path: 'enabled', value: true } },
],
})
.load('upsert', {
adapterCode: 'productUpsert',
channel: '__default_channel__',
strategy: 'UPSERT',
conflictStrategy: 'SOURCE_WINS',
slugField: 'slug',
})
.edge('start', 'fetch-products')
.edge('fetch-products', 'prepare')
.edge('prepare', 'upsert')
.build();
export const config: VendureConfig = {
plugins: [
DataHubPlugin.init({
pipelines: [{
code: 'product-import',
name: 'Product Import',
definition: productImport,
}],
}),
],
};
| Option | Type | Default | Description |
|---|---|---|---|
enabled |
boolean |
true |
Enable or disable the plugin |
registerBuiltinAdapters |
boolean |
true |
Register built-in extractors, operators, loaders |
retentionDaysRuns |
number |
30 |
Days to keep pipeline run history |
retentionDaysErrors |
number |
90 |
Days to keep error records |
pipelines |
CodeFirstPipeline[] |
[] |
Define pipelines in code |
secrets |
CodeFirstSecret[] |
[] |
Define secrets in code |
connections |
CodeFirstConnection[] |
[] |
Define connections in code |
adapters |
AdapterDefinition[] |
[] |
Register custom adapters |
feedGenerators |
CustomFeedGenerator[] |
[] |
Register custom feed generators |
configPath |
string |
- | Path to external configuration file |
enableDashboard |
boolean |
true |
Enable the admin dashboard UI |
runtime |
RuntimeLimitsConfig |
- | Runtime limits (batch size, timeouts, etc.) |
security |
SecurityConfig |
- | Security settings (SSRF protection, script sandboxing) |
debug |
boolean |
false |
Enable debug logging |
| Extractor | Code | Description |
|---|---|---|
| HTTP/REST API | httpApi |
Fetch from REST APIs with pagination, auth (Bearer/Basic/HMAC), field mapping |
| GraphQL | graphql |
Query GraphQL endpoints with cursor/offset/relay pagination, variables, auth |
| Vendure Query | vendureQuery |
Query Vendure entities (Product, ProductVariant, Customer, Order, Collection, Facet, FacetValue, Promotion, Asset) |
| File | file |
Parse CSV, JSON, XML, XLSX, NDJSON, TSV files with custom delimiters and encoding |
| Database | database |
Query PostgreSQL, MySQL, MSSQL, SQLite with parameterized queries |
| S3 | s3 |
Read files from AWS S3 and S3-compatible storage (MinIO, DigitalOcean Spaces) |
| FTP/SFTP | ftp |
Download files from FTP/SFTP servers with SSH key support |
| Webhook | webhook |
Receive incoming webhook data with HMAC signature verification |
| CDC | cdc |
Polling-based change data capture with checkpoint tracking |
.extract('fetch', {
adapterCode: 'httpApi',
url: 'https://api.example.com/products',
method: 'GET',
headers: { 'Accept': 'application/json' },
dataPath: 'data.items', // JSON path to records array
connectionCode: 'my-api', // Optional: use saved connection
pagination: {
type: 'page',
limit: 100,
maxPages: 10,
},
})
.extract('products', {
adapterCode: 'vendureQuery',
entity: 'PRODUCT',
relations: 'variants,featuredAsset,facetValues',
batchSize: 100,
})
Transform operators organized by category. All operators take args with their configuration.
| Operator | Description | Example |
|---|---|---|
set |
Set field to static value | { op: 'set', args: { path: 'enabled', value: true } } |
copy |
Copy field value | { op: 'copy', args: { source: 'id', target: 'externalId' } } |
rename |
Rename field | { op: 'rename', args: { from: 'product_name', to: 'name' } } |
remove |
Delete field | { op: 'remove', args: { path: 'tempField' } } |
map |
Remap multiple fields | { op: 'map', args: { mapping: { name: 'title', desc: 'body' } } } |
template |
String templates | { op: 'template', args: { template: '${firstName} ${lastName}', target: 'fullName' } } |
hash |
Generate hash | { op: 'hash', args: { source: 'data', target: 'checksum', algorithm: 'sha256' } } |
uuid |
Generate UUID | { op: 'uuid', args: { target: 'id', version: 'v4' } } |
| Operator | Description | Example |
|---|---|---|
trim |
Remove whitespace | { op: 'trim', args: { path: 'name' } } |
uppercase |
Convert to uppercase | { op: 'uppercase', args: { path: 'sku' } } |
lowercase |
Convert to lowercase | { op: 'lowercase', args: { path: 'email' } } |
slugify |
URL-safe slug | { op: 'slugify', args: { source: 'name', target: 'slug' } } |
split |
Split to array | { op: 'split', args: { source: 'tags', delimiter: ',', target: 'tagArray' } } |
join |
Join array to string | { op: 'join', args: { source: 'parts', delimiter: '-', target: 'code' } } |
concat |
Concatenate fields | { op: 'concat', args: { sources: ['first', 'last'], separator: ' ', target: 'name' } } |
replace |
Replace text | { op: 'replace', args: { path: 'desc', search: '\n', replacement: '<br>', all: true } } |
extractRegex |
Extract with regex | { op: 'extractRegex', args: { source: 'sku', pattern: '([A-Z]+)', target: 'prefix' } } |
replaceRegex |
Regex replace | { op: 'replaceRegex', args: { path: 'text', pattern: '\\s+', replacement: ' ' } } |
stripHtml |
Remove HTML tags | { op: 'stripHtml', args: { source: 'htmlContent', target: 'plainText' } } |
truncate |
Truncate to length | { op: 'truncate', args: { source: 'description', length: 100, suffix: '...' } } |
| Operator | Description | Example |
|---|---|---|
math |
Math operations | { op: 'math', args: { operation: 'multiply', source: 'price', operand: 100, target: 'cents' } } |
toNumber |
Parse to number | { op: 'toNumber', args: { source: 'priceStr', target: 'price', default: 0 } } |
toString |
Convert to string | { op: 'toString', args: { source: 'id', target: 'idStr' } } |
currency |
To minor units | { op: 'currency', args: { source: 'price', target: 'priceInCents', decimals: 2 } } |
toCents |
Decimal to cents | { op: 'toCents', args: { source: 'price', target: 'priceInCents' } } |
round |
Round number | { op: 'round', args: { source: 'value', decimals: 2 } } |
unit |
Unit conversion | { op: 'unit', args: { source: 'weightKg', target: 'weightG', from: 'kg', to: 'g' } } |
parseNumber |
Locale-aware parse | { op: 'parseNumber', args: { source: 'euro', target: 'num', locale: 'de-DE' } } |
formatNumber |
Format number | { op: 'formatNumber', args: { source: 'price', target: 'display', style: 'currency', currency: 'USD' } } |
Math operations: add, subtract, multiply, divide, modulo, power, round, floor, ceil, abs
| Operator | Description | Example |
|---|---|---|
dateParse |
Parse date string | { op: 'dateParse', args: { source: 'dateStr', target: 'date', format: 'YYYY-MM-DD' } } |
dateFormat |
Format to string | { op: 'dateFormat', args: { source: 'createdAt', target: 'display', format: 'DD/MM/YYYY HH:mm' } } |
dateAdd |
Add/subtract time | { op: 'dateAdd', args: { source: 'orderDate', target: 'dueDate', amount: 7, unit: 'days' } } |
dateDiff |
Calculate difference | { op: 'dateDiff', args: { startDate: 'orderDate', endDate: 'deliveredAt', unit: 'days', target: 'duration' } } |
now |
Current timestamp | { op: 'now', args: { target: 'processedAt', format: 'ISO' } } |
| Operator | Description | Example |
|---|---|---|
pick |
Keep only fields | { op: 'pick', args: { fields: ['id', 'name', 'sku'] } } |
omit |
Remove fields | { op: 'omit', args: { fields: ['_internal', 'tempId'] } } |
parseJson |
Parse JSON string | { op: 'parseJson', args: { source: 'metaJson', target: 'meta' } } |
stringifyJson |
Stringify object | { op: 'stringifyJson', args: { source: 'data', target: 'dataJson' } } |
| Operator | Description | Example |
|---|---|---|
when |
Filter records | { op: 'when', args: { conditions: [{ field: 'stock', cmp: 'gt', value: 0 }], action: 'keep' } } |
ifThenElse |
Conditional value | { op: 'ifThenElse', args: { condition: { field: 'type', cmp: 'eq', value: 'digital' }, thenValue: true, elseValue: false, target: 'isDigital' } } |
switch |
Multi-case mapping | { op: 'switch', args: { source: 'code', cases: [{ value: 'A', result: 'Active' }], default: 'Unknown', target: 'status' } } |
Comparison operators (19): eq, ne, gt, gte, lt, lte, in, notIn, contains, notContains, startsWith, endsWith, regex, exists, notExists, isNull, isEmpty, isNotEmpty, matches (glob)
| Operator | Description | Example |
|---|---|---|
validateRequired |
Check required fields | { op: 'validateRequired', args: { fields: ['sku', 'name', 'price'] } } |
validateFormat |
Regex validation | { op: 'validateFormat', args: { field: 'email', pattern: '^[^@]+@[^@]+\\.[^@]+$' } } |
| Operator | Description | Example |
|---|---|---|
lookup |
Map value from dictionary | { op: 'lookup', args: { source: 'code', map: { 'A': 'Active' }, target: 'status' } } |
enrich |
Add/default fields | { op: 'enrich', args: { defaults: { currency: 'USD' } } } |
coalesce |
First non-null | { op: 'coalesce', args: { paths: ['name', 'title', 'label'], target: 'displayName' } } |
default |
Default if null | { op: 'default', args: { path: 'stock', value: 0 } } |
httpLookup |
Enrich from HTTP API | { op: 'httpLookup', args: { url: 'https://api.example.com/', target: 'externalData' } } |
Aggregation operators include array manipulation, grouping, and data joining (8 operators).
| Operator | Description | Example |
|---|---|---|
aggregate |
Aggregate values | { op: 'aggregate', args: { op: 'sum', source: 'amount', target: 'total' } } |
count |
Count elements | { op: 'count', args: { source: 'items', target: 'itemCount' } } |
unique |
Remove duplicates | { op: 'unique', args: { source: 'items', by: 'id', target: 'uniqueItems' } } |
flatten |
Flatten nested arrays | { op: 'flatten', args: { source: 'nested', target: 'flat', depth: 1 } } |
first |
Get first element | { op: 'first', args: { source: 'items', target: 'firstItem' } } |
last |
Get last element | { op: 'last', args: { source: 'items', target: 'lastItem' } } |
expand |
Explode to records | { op: 'expand', args: { path: 'variants' } } |
multiJoin |
Join datasets by key | { op: 'multiJoin', args: { leftKey: 'customerId', rightKey: 'id', rightDataPath: 'orders', type: 'LEFT' } } |
| Operator | Description | Example |
|---|---|---|
deltaFilter |
Change detection | { op: 'deltaFilter', args: { idPath: 'sku', includePaths: ['price', 'stock'] } } |
script |
Custom JavaScript | See Script Operator section below |
Execute custom JavaScript for complex transformations:
// Single record mode
.transform('enrich', {
operators: [{
op: 'script',
args: {
code: `
const margin = (record.price - record.cost) / record.price * 100;
return { ...record, margin: Math.round(margin * 100) / 100 };
`,
},
}],
})
// Batch mode - access all records
.transform('rank', {
operators: [{
op: 'script',
args: {
batch: true,
code: `
const sorted = records.sort((a, b) => b.sales - a.sales);
return sorted.map((r, i) => ({ ...r, rank: i + 1 }));
`,
},
}],
})
// Filter mode - return null to exclude
.transform('filter', {
operators: [{
op: 'script',
args: {
code: `return record.stock > 0 ? record : null;`,
},
}],
})
| Loader | Adapter Code | Description |
|---|---|---|
| Product Loader | productUpsert |
Create/update products with variants, prices, tax, and stock |
| Variant Loader | variantUpsert |
Update product variants by SKU with multi-currency prices and auto-create option groups |
| Customer Loader | customerUpsert |
Create/update customers with addresses and group memberships |
| Customer Group Loader | customerGroupUpsert |
Create/update customer groups by name; assign customers by email |
| Collection Loader | collectionUpsert |
Create/update collections with parent relationships |
| Promotion Loader | promotionUpsert |
Create/update promotions with conditions and actions |
| Order Upsert Loader | orderUpsert |
Order create/update for migrations with state transitions and line management |
| Order Note Loader | orderNote |
Attach notes to orders by code or id |
| Order Transition Loader | orderTransition |
Transition orders to new states |
| Stock Adjust Loader | stockAdjust |
Adjust inventory levels by SKU and stock location map |
| Inventory Adjust Loader | inventoryAdjust |
Adjust stock levels for product variants by SKU with location targeting |
| Asset Attach Loader | assetAttach |
Attach existing assets to products/collections |
| Apply Coupon Loader | applyCoupon |
Apply coupon codes to orders |
| Tax Rate Loader | taxRateUpsert |
Create/update tax rates by name with category and zone |
| Payment Method Loader | paymentMethodUpsert |
Create/update payment methods with handler and checker |
| Channel Loader | channelUpsert |
Create/update channels with currencies, languages, and zones |
| Shipping Method Loader | shippingMethodUpsert |
Create/update shipping methods with calculator and checker |
| Stock Location Loader | stockLocationUpsert |
Create/update stock locations and warehouses |
| Facet Loader | facetUpsert |
Create/update facets with translations |
| Facet Value Loader | facetValueUpsert |
Create/update facet values with translations |
| Entity Deletion Loader | entityDeletion |
Soft-delete any of 13 entity types (Products, Variants, Collections, Facets, FacetValues, Customers, CustomerGroups, Promotions, ShippingMethods, PaymentMethods, TaxRates, Assets, StockLocations) by slug, SKU, ID, code, email, or name |
| GraphQL Mutation Loader | graphqlMutation |
Execute arbitrary GraphQL mutations against Vendure |
| Asset Import Loader | assetImport |
Import assets from URLs or file paths |
| REST POST Loader | restPost |
POST/PUT records to external REST endpoints |
.load('import-products', {
adapterCode: 'productUpsert',
channel: '__default_channel__',
strategy: 'UPSERT', // CREATE, UPDATE, UPSERT
conflictStrategy: 'SOURCE_WINS', // SOURCE_WINS, VENDURE_WINS, MERGE, MANUAL_QUEUE
nameField: 'name',
slugField: 'slug',
skuField: 'sku',
priceField: 'price',
})
.load('update-stock', {
adapterCode: 'stockAdjust',
skuField: 'sku',
stockByLocationField: 'stockByLocation', // Map of location code -> quantity
absolute: true, // Set absolute value (false = delta)
})
.load('import-customers', {
adapterCode: 'customerUpsert',
emailField: 'email',
firstNameField: 'firstName',
lastNameField: 'lastName',
phoneNumberField: 'phone',
addressesField: 'addresses',
groupsField: 'groupCodes',
})
.load('import-assets', {
adapterCode: 'assetAttach',
entity: 'PRODUCT',
slugField: 'productSlug',
assetIdField: 'assetId',
channel: '__default_channel__',
})
Route records to different branches based on field conditions using 19 comparison operators. Supports AND logic (multiple conditions per branch), automatic default branch for unmatched records, and dependencyOnly edges for execution ordering without data flow.
.route('split-by-type', {
branches: [
{ name: 'physical', when: [{ field: 'type', cmp: 'eq', value: 'physical' }] },
{ name: 'digital', when: [{ field: 'type', cmp: 'eq', value: 'digital' }] },
],
})
Hooks let you run code at 24 different pipeline stages (18 step-level + 6 global). Two types:
Data Processing (18 step-level):
BEFORE_EXTRACT, AFTER_EXTRACTBEFORE_TRANSFORM, AFTER_TRANSFORMBEFORE_VALIDATE, AFTER_VALIDATEBEFORE_ENRICH, AFTER_ENRICHBEFORE_ROUTE, AFTER_ROUTEBEFORE_LOAD, AFTER_LOADBEFORE_EXPORT, AFTER_EXPORTBEFORE_FEED, AFTER_FEEDBEFORE_SINK, AFTER_SINKPipeline Lifecycle (6 global):
PIPELINE_STARTED, PIPELINE_COMPLETED, PIPELINE_FAILEDON_ERROR, ON_RETRY, ON_DEAD_LETTER| Type | Purpose | Can Modify Records |
|---|---|---|
INTERCEPTOR |
Inline JavaScript code | Yes |
SCRIPT |
Pre-registered functions | Yes |
WEBHOOK |
HTTP POST notification | No |
EMIT |
Vendure domain event | No |
TRIGGER_PIPELINE |
Start another pipeline | No |
LOG |
Log message to pipeline logs | No |
Inline JavaScript that can modify records:
const pipeline = createPipeline()
.name('With Interceptors')
.hooks({
AFTER_EXTRACT: [{
type: 'INTERCEPTOR',
name: 'Add metadata',
code: `
return records.map(r => ({
...r,
extractedAt: new Date().toISOString(),
source: 'api',
}));
`,
}],
BEFORE_TRANSFORM: [{
type: 'INTERCEPTOR',
name: 'Filter low stock',
code: `return records.filter(r => r.stock > 0);`,
failOnError: true,
}],
BEFORE_LOAD: [{
type: 'INTERCEPTOR',
name: 'Final validation',
code: `
return records.filter(r => {
if (!r.sku || !r.name) {
console.warn('Skipping invalid record:', r.id);
return false;
}
return true;
});
`,
}],
})
// ... steps
.build();
Reference pre-registered functions (type-safe, reusable):
// Register scripts at startup
hookService.registerScript('addCustomerSegment', async (records, context, args) => {
const threshold = args?.spendThreshold || 1000;
return records.map(r => ({
...r,
segment: r.totalSpent > threshold ? 'premium' : 'standard',
}));
});
// Use in pipeline
const pipeline = createPipeline()
.hooks({
AFTER_TRANSFORM: [{
type: 'SCRIPT',
scriptName: 'addCustomerSegment',
args: { spendThreshold: 5000 },
}],
})
.build();
Notify external systems:
.hooks({
PIPELINE_COMPLETED: [{
type: 'WEBHOOK',
url: 'https://slack.webhook.example.com/notify',
headers: { 'Content-Type': 'application/json' },
secret: 'webhook-signing-key',
signatureHeader: 'X-Signature',
retryConfig: {
maxAttempts: 5,
initialDelayMs: 1000,
maxDelayMs: 60000,
backoffMultiplier: 2,
},
}],
PIPELINE_FAILED: [{
type: 'WEBHOOK',
url: 'https://pagerduty.example.com/alert',
}],
})
Chain pipelines together:
.hooks({
AFTER_LOAD: [{
type: 'TRIGGER_PIPELINE',
pipelineCode: 'reindex-search', // Runs with loaded records as seed
}],
})
Generate feeds for advertising platforms.
.feed('google-feed', {
adapterCode: 'googleMerchant',
format: 'xml', // xml or tsv
targetCountry: 'US',
contentLanguage: 'en',
currency: 'USD',
storeUrl: 'https://mystore.com',
storeName: 'My Store',
includeOutOfStock: false,
outputPath: '/feeds/google-shopping.xml',
})
.feed('meta-catalog', {
adapterCode: 'metaCatalog',
format: 'csv',
currency: 'USD',
brandField: 'customFields.brand',
categoryField: 'customFields.googleCategory',
includeVariants: true,
outputPath: '/feeds/facebook-catalog.csv',
})
.feed('custom-feed', {
adapterCode: 'customFeed',
format: 'json', // xml, csv, json, tsv
rootElement: 'products',
itemElement: 'product',
fieldMapping: {
product_id: 'id',
product_name: 'name',
product_price: 'priceFormatted',
},
outputPath: '/feeds/custom-products.json',
})
Index products to search engines.
.sink('elasticsearch', {
adapterCode: 'elasticsearch',
node: 'http://localhost:9200',
indexName: 'products',
idField: 'id',
batchSize: 500,
refresh: true,
})
.sink('meilisearch', {
adapterCode: 'meilisearch',
host: 'http://localhost:7700',
apiKeySecretCode: 'meilisearch-key',
indexName: 'products',
primaryKey: 'id',
searchableFields: ['name', 'description', 'sku'],
filterableFields: ['category', 'brand', 'price'],
sortableFields: ['price', 'createdAt'],
})
.sink('algolia', {
adapterCode: 'algolia',
appId: 'your-app-id',
apiKeySecretCode: 'algolia-admin-key',
indexName: 'products',
idField: 'objectID',
})
.sink('typesense', {
adapterCode: 'typesense',
host: 'localhost',
port: 8108,
protocol: 'http',
apiKeySecretCode: 'typesense-key',
collectionName: 'products',
idField: 'id',
})
.trigger('start', { type: 'MANUAL' })
.trigger('schedule', {
type: 'SCHEDULE',
cron: '0 2 * * *', // Daily at 2 AM
timezone: 'America/New_York',
})
Common patterns:
0 * * * * - Every hour0 0 * * * - Daily at midnight0 2 * * * - Daily at 2 AM0 0 * * 0 - Weekly on Sunday0 0 1 * * - Monthly on the 1st.trigger('webhook', {
type: 'WEBHOOK',
authentication: 'API_KEY', // 'NONE' | 'API_KEY' | 'HMAC' | 'BASIC' | 'JWT'
apiKeySecretCode: 'my-api-key', // Secret code storing the API key
apiKeyHeaderName: 'x-api-key', // Header name for API key (default: x-api-key)
rateLimit: 100, // Requests per minute per IP (0 = unlimited)
requireIdempotencyKey: true, // Require X-Idempotency-Key header
})
Authentication Types:
| Type | Description | Configuration |
|---|---|---|
NONE |
No authentication (not recommended) | - |
API_KEY |
API key in header | apiKeySecretCode, apiKeyHeaderName, apiKeyPrefix |
HMAC |
HMAC-SHA256 signature | secretCode, hmacHeaderName, hmacAlgorithm |
BASIC |
HTTP Basic Auth | basicSecretCode (stores username:password) |
JWT |
JWT Bearer token | jwtSecretCode, jwtHeaderName |
Example - HMAC Authentication:
.trigger('webhook', {
type: 'WEBHOOK',
authentication: 'HMAC',
secretCode: 'hmac-secret', // Secret code storing HMAC key
hmacHeaderName: 'x-signature', // Header name (default: x-datahub-signature)
hmacAlgorithm: 'SHA256', // SHA256 or SHA512
})
Endpoint: POST /data-hub/webhook/{pipeline-code}
Security Features:
.trigger('on-order', {
type: 'EVENT',
event: 'OrderPlacedEvent',
filter: { state: 'ArrangingPayment' },
})
The plugin includes a full-featured admin dashboard:
dependencyOnly edges for execution ordering without data flow in graph modecompletedAt and errorMessage fields on pipeline runs for precise trackingDataHubPlugin.init({
secrets: [
{ code: 'api-key', provider: 'ENV', value: 'SUPPLIER_API_KEY' },
{ code: 'ftp-pass', provider: 'INLINE', value: 'secret123' },
],
})
Supported connection types: http, postgres, mysql, s3, ftp, sftp, rabbitmq, custom
DataHubPlugin.init({
connections: [
{
code: 'supplier-api',
type: 'http',
name: 'Supplier REST API',
settings: {
baseUrl: 'https://api.supplier.com',
timeout: 30000,
auth: {
type: 'bearer',
secretCode: 'supplier-api-key',
},
},
},
{
code: 'supplier-db',
type: 'postgres',
name: 'Supplier Database',
settings: {
host: '${DB_HOST}',
port: 5432,
database: 'supplier',
username: '${DB_USER}',
passwordSecretCode: 'db-password',
ssl: true,
},
},
{
code: 'product-bucket',
type: 's3',
name: 'Product Feed Bucket',
settings: {
bucket: 'product-feeds',
region: 'us-east-1',
accessKeyIdSecretCode: 'aws-access-key',
secretAccessKeySecretCode: 'aws-secret-key',
},
},
{
code: 'sftp-server',
type: 'sftp',
name: 'Supplier SFTP',
settings: {
host: 'sftp.supplier.com',
port: 22,
username: '${SFTP_USER}',
privateKeySecretCode: 'sftp-key',
},
},
],
})
import { SingleRecordOperator, JsonObject, AdapterOperatorHelpers } from '@oronts/vendure-data-hub-plugin';
interface CurrencyConvertConfig {
field: string;
from: string;
to: string;
targetField?: string;
}
const currencyConvert: SingleRecordOperator<CurrencyConvertConfig> = {
code: 'currencyConvert',
type: 'OPERATOR',
name: 'Currency Convert',
description: 'Convert between currencies',
category: 'CONVERSION',
pure: true,
schema: {
fields: [
{ key: 'field', type: 'string', label: 'Price Field', required: true },
{ key: 'from', type: 'string', label: 'From Currency', required: true },
{ key: 'to', type: 'string', label: 'To Currency', required: true },
{ key: 'targetField', type: 'string', label: 'Target Field', required: false },
],
},
applyOne(record: JsonObject, config: CurrencyConvertConfig, helpers: AdapterOperatorHelpers): JsonObject | null {
const rate = getExchangeRate(config.from, config.to);
const value = helpers.get(record, config.field) as number;
const converted = value * rate;
helpers.set(record, config.targetField || config.field, converted);
return record;
},
};
DataHubPlugin.init({
adapters: [currencyConvert],
})
import { ExtractorAdapter, ExtractContext, RecordEnvelope } from '@oronts/vendure-data-hub-plugin';
interface MyExtractorConfig {
endpoint: string;
}
const myExtractor: ExtractorAdapter<MyExtractorConfig> = {
code: 'myExtractor',
type: 'EXTRACTOR',
name: 'My Custom Source',
description: 'Fetch data from custom API',
schema: {
fields: [
{ key: 'endpoint', type: 'string', label: 'API Endpoint', required: true },
],
},
async *extract(context: ExtractContext, config: MyExtractorConfig): AsyncGenerator<RecordEnvelope, void, undefined> {
const response = await fetch(config.endpoint);
const data = await response.json();
for (const item of data.items) {
yield { data: item };
}
},
};
import { LoaderAdapter, LoadContext, JsonObject, LoadResult } from '@oronts/vendure-data-hub-plugin';
interface WebhookNotifyConfig {
endpoint: string;
batchSize?: number;
}
const webhookNotify: LoaderAdapter<WebhookNotifyConfig> = {
code: 'webhookNotify',
type: 'LOADER',
name: 'Webhook Notify',
description: 'Send records to webhook endpoint',
schema: {
fields: [
{ key: 'endpoint', type: 'string', label: 'Webhook URL', required: true },
{ key: 'batchSize', type: 'number', label: 'Batch Size', required: false },
],
},
async load(context: LoadContext, config: WebhookNotifyConfig, records: readonly JsonObject[]): Promise<LoadResult> {
await fetch(config.endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(records),
});
return { succeeded: records.length, failed: 0, errors: [] };
},
};
| Permission | Description |
|---|---|
CreateDataHubPipeline |
Create pipelines |
ReadDataHubPipeline |
View pipelines |
UpdateDataHubPipeline |
Modify pipelines |
DeleteDataHubPipeline |
Delete pipelines |
RunDataHubPipeline |
Execute pipelines |
PublishDataHubPipeline |
Publish pipeline versions |
ReviewDataHubPipeline |
Review/approve pipelines |
CreateDataHubSecret |
Create secrets |
ReadDataHubSecret |
View secrets (values masked) |
UpdateDataHubSecret |
Modify secrets |
DeleteDataHubSecret |
Delete secrets |
ManageDataHubConnections |
Manage connections |
ManageDataHubAdapters |
Configure adapters |
ViewDataHubRuns |
View execution history |
RetryDataHubRecord |
Retry failed records |
ViewDataHubQuarantine |
View dead letter queue |
EditDataHubQuarantine |
Manage quarantined records |
ReplayDataHubRecord |
Replay processed records |
UpdateDataHubSettings |
Modify plugin settings |
ViewDataHubAnalytics |
View analytics dashboard |
ManageDataHubWebhooks |
Configure webhook endpoints |
ManageDataHubDestinations |
Manage export destinations |
ManageDataHubFeeds |
Manage product feeds |
ViewDataHubEntitySchemas |
View entity schemas |
SubscribeDataHubEvents |
Subscribe to pipeline events |
ManageDataHubFiles |
Upload and manage files |
ReadDataHubFiles |
Read uploaded files |
Require specific Vendure permissions to run a pipeline:
const importPipeline = createPipeline()
.capabilities({ requires: ['UpdateCatalog', 'UpdateStock'] })
// ...
const exportPipeline = createPipeline()
.capabilities({ requires: ['ReadCustomer', 'ReadOrder'] })
// ...
const pipeline = createPipeline()
.context({
errorHandling: {
strategy: 'continue', // continue, stop, dead-letter
maxRetries: 3,
retryDelayMs: 1000,
},
})
.build();
.load('import', {
adapterCode: 'productUpsert',
errorHandling: {
mode: 'queue', // stop, continue, queue, dead-letter
retryAttempts: 3,
retryDelayMs: 1000,
},
})
Failed records automatically capture JavaScript stack traces when errors originate from exceptions. Stack traces are stored on the error record and visible in the dashboard error viewer and dead letter queue, aiding production debugging.
| Requirement | Version |
|---|---|
| Vendure | ^3.0.0 |
| Node.js | >=18.0.0 |
Commercial plugin - Free for non-commercial use.
Contact office@oronts.com for licensing.
Oronts provides custom development and integration services:
| Contact: office@oronts.com | oronts.com |
Author: Oronts - AI-powered automation, e-commerce platforms, cloud infrastructure.
Contributors: Refaat Al Ktifan (Refaat@alktifan.com)