Create operators to add new transform operations.
interface SingleRecordOperator<TConfig = JsonObject> {
type: 'OPERATOR';
code: string;
name: string;
description?: string;
category: string;
pure?: boolean;
schema: StepConfigSchema;
icon?: string;
version?: string;
applyOne(
record: JsonObject,
config: TConfig,
helpers: AdapterOperatorHelpers,
): JsonObject | null;
}
interface AdapterOperatorHelpers {
get(record: JsonObject, path: string): unknown;
set(record: JsonObject, path: string, value: unknown): void;
}
import { JsonObject, SingleRecordOperator, AdapterOperatorHelpers, StepConfigSchema } from '@oronts/vendure-data-hub-plugin';
export const currencyConvertSchema: StepConfigSchema = {
fields: [
{ key: 'field', type: 'string', label: 'Price Field', required: true },
{ key: 'from', type: 'select', label: 'From Currency', required: true,
options: [{ value: 'USD', label: 'US Dollar' }, { value: 'EUR', label: 'Euro' }] },
{ key: 'to', type: 'select', label: 'To Currency', required: true,
options: [{ value: 'USD', label: 'US Dollar' }, { value: 'EUR', label: 'Euro' }] },
{ key: 'targetField', type: 'string', label: 'Target Field', required: false },
],
};
interface CurrencyConvertConfig {
field: string;
from: string;
to: string;
targetField?: string;
}
const EXCHANGE_RATES: Record<string, Record<string, number>> = {
USD: { EUR: 0.92, GBP: 0.79 },
EUR: { USD: 1.09, GBP: 0.86 },
};
export const currencyConvertOperator: SingleRecordOperator<CurrencyConvertConfig> = {
type: 'OPERATOR',
code: 'currencyConvert',
name: 'Currency Convert',
description: 'Convert currency values using exchange rates',
category: 'conversion',
pure: true,
schema: currencyConvertSchema,
icon: 'currency-exchange',
version: '1.0.0',
applyOne(record: JsonObject, config: CurrencyConvertConfig, helpers: AdapterOperatorHelpers): JsonObject | null {
const { field, from, to, targetField } = config;
const value = helpers.get(record, field);
if (value === undefined || value === null) return record;
const numValue = typeof value === 'number' ? value : parseFloat(String(value));
if (isNaN(numValue) || from === to) return record;
const rate = EXCHANGE_RATES[from]?.[to];
if (!rate) return record;
const converted = Math.round(numValue * rate * 100) / 100;
const result = { ...record };
helpers.set(result, targetField || field, converted);
return result;
},
};
Register operators via the plugin’s adapters option:
import { DataHubPlugin } from '@oronts/vendure-data-hub-plugin';
import { currencyConvertOperator } from './currency-convert.operator';
DataHubPlugin.init({
adapters: [currencyConvertOperator],
})
Or register programmatically:
import { VendurePlugin, OnModuleInit } from '@vendure/core';
import { DataHubPlugin, DataHubRegistryService } from '@oronts/vendure-data-hub-plugin';
import { currencyConvertOperator } from './currency-convert.operator';
@VendurePlugin({
imports: [DataHubPlugin],
})
export class MyOperatorPlugin implements OnModuleInit {
constructor(private registry: DataHubRegistryService) {}
onModuleInit() {
this.registry.registerRuntime(currencyConvertOperator);
}
}
In addition to the adapters array and registerRuntime() method shown above, you can register custom operators at runtime using DataHubRegistryService.registerRuntime(). This approach is useful for dynamically registering operators from other plugins or modules.
import { VendurePlugin, OnModuleInit } from '@vendure/core';
import { DataHubPlugin, DataHubRegistryService } from '@oronts/vendure-data-hub-plugin';
@VendurePlugin({
imports: [DataHubPlugin],
})
export class MyCustomOperatorPlugin implements OnModuleInit {
constructor(private registry: DataHubRegistryService) {}
onModuleInit() {
this.registry.registerRuntime({
type: 'OPERATOR',
code: 'myCustomOp',
name: 'My Custom Operator',
description: 'Custom processing logic',
category: 'CUSTOM',
pure: true,
schema: { fields: [] },
async apply(records, config, helpers) {
return {
records: records.map(r => ({ ...r, processed: true })),
};
},
});
}
}
Custom operators registered via registerRuntime() are discovered automatically by the transform executor. When the executor encounters an operator code that is not found in the built-in OPERATOR_REGISTRY, it falls back to the DataHubRegistryService runtime registry where custom operators are stored. This two-tier lookup (built-in first, then runtime) ensures that built-in operators always take priority while custom operators extend the available set without conflicts.
This means:
export const customPipeline = createPipeline()
.name('Pipeline with Custom Operator')
.trigger('start', { type: 'MANUAL' })
.extract('fetch', { adapterCode: 'httpApi', url: 'https://api.example.com/data' })
.transform('custom-step', {
operators: [
{ op: 'myCustomOp', args: { /* operator-specific config */ } },
],
})
.load('save', { adapterCode: 'productUpsert', strategy: 'UPSERT', matchField: 'slug' })
.edge('start', 'fetch')
.edge('fetch', 'custom-step')
.edge('custom-step', 'save')
.build();
Use standard categories for consistency:
| Category | Purpose |
|---|---|
data |
Basic data operations (set, copy, rename) |
string |
String manipulation |
numeric |
Number operations |
date |
Date/time operations |
logic |
Conditional logic |
json |
JSON operations |
validation |
Data validation |
aggregation |
Aggregation operations |
enrichment |
External data enrichment |
file |
File and media processing |
custom |
Custom operator category for runtime-registered operators |
| Return | Effect |
|---|---|
record |
Modified record continues to next step |
null |
Record is skipped (filtered out) |
throw |
Record fails and is quarantined |
Return null to skip records:
applyOne(record, config, helpers) {
if (!record.enabled) {
return null; // Skip disabled records
}
return record;
}
Throw to quarantine records:
applyOne(record, config, helpers) {
if (!helpers.get(record, 'sku')) {
throw new Error('Missing SKU');
}
return record;
}
Operators can be async for external lookups:
interface HttpLookupConfig {
source: string;
url: string;
resultPath: string;
target: string;
}
export const httpLookupOperator: SingleRecordOperator<HttpLookupConfig> = {
type: 'OPERATOR',
code: 'httpLookup',
name: 'HTTP Lookup',
category: 'enrichment',
pure: false,
schema: {
fields: [
{ key: 'source', type: 'string', required: true, label: 'Source Field' },
{ key: 'url', type: 'string', required: true, label: 'URL Template' },
{ key: 'resultPath', type: 'string', required: true, label: 'Result Path' },
{ key: 'target', type: 'string', required: true, label: 'Target Field' },
],
},
async applyOne(record, config, helpers) {
const { source, url, resultPath, target } = config;
const value = helpers.get(record, source);
const lookupUrl = url.replace('', String(value));
const response = await fetch(lookupUrl);
const data = await response.json();
const result = resultPath.split('.').reduce(
(obj, key) => (obj as any)?.[key],
data,
);
const clone = { ...record };
helpers.set(clone, target, result);
return clone;
},
};
The helpers object provides utilities for field access:
interface AdapterOperatorHelpers {
get(record: JsonObject, path: string): unknown;
set(record: JsonObject, path: string, value: unknown): void;
}
applyOne(record, config, helpers) {
// Get nested value
const value = helpers.get(record, 'nested.deep.field');
// Set nested value
const result = { ...record };
helpers.set(result, 'output.field', value);
return result;
}
interface SlugConfig {
source: string;
target?: string;
lowercase?: boolean;
separator?: string;
}
export const slugGeneratorOperator: SingleRecordOperator<SlugConfig> = {
type: 'OPERATOR',
code: 'customSlug',
name: 'Custom Slug Generator',
category: 'string',
description: 'Generate URL-friendly slug from field',
pure: true,
schema: {
fields: [
{ key: 'source', type: 'string', required: true, label: 'Source Field' },
{ key: 'target', type: 'string', label: 'Target Field' },
{ key: 'lowercase', type: 'boolean' },
{ key: 'separator', type: 'string' },
],
},
applyOne(record, config, helpers) {
const { source, target = 'slug', lowercase = true, separator = '-' } = config;
const value = String(helpers.get(record, source) || '');
let slug = value
.normalize('NFD')
.replace(/[\u0300-\u036f]/g, '')
.replace(/[^a-zA-Z0-9\s-]/g, '')
.trim()
.replace(/\s+/g, separator);
if (lowercase) {
slug = slug.toLowerCase();
}
const result = { ...record };
helpers.set(result, target, slug);
return result;
},
};
interface PriceFormatConfig {
source: string;
target?: string;
currency?: string;
locale?: string;
}
export const priceFormatterOperator: SingleRecordOperator<PriceFormatConfig> = {
type: 'OPERATOR',
code: 'formatPrice',
name: 'Format Price',
category: 'numeric',
description: 'Format price with currency symbol',
pure: true,
schema: {
fields: [
{ key: 'source', type: 'string', required: true, label: 'Source Field' },
{ key: 'target', type: 'string', label: 'Target Field' },
{ key: 'currency', type: 'string' },
{ key: 'locale', type: 'string' },
],
},
applyOne(record, config, helpers) {
const { source, target, currency = 'USD', locale = 'en-US' } = config;
const value = Number(helpers.get(record, source));
if (isNaN(value)) return record;
const formatted = new Intl.NumberFormat(locale, {
style: 'currency',
currency,
}).format(value / 100);
const result = { ...record };
helpers.set(result, target || `${source}_formatted`, formatted);
return result;
},
};
interface ArrayFilterConfig {
source: string;
filterField: string;
filterValue: unknown;
}
export const arrayFilterOperator: SingleRecordOperator<ArrayFilterConfig> = {
type: 'OPERATOR',
code: 'arrayFilter',
name: 'Array Filter',
category: 'json',
description: 'Filter array elements by condition',
pure: true,
schema: {
fields: [
{ key: 'source', type: 'string', required: true, label: 'Array Field' },
{ key: 'filterField', type: 'string', required: true, label: 'Filter Field' },
{ key: 'filterValue', type: 'string', required: true, label: 'Filter Value' },
],
},
applyOne(record, config, helpers) {
const { source, filterField, filterValue } = config;
const array = helpers.get(record, source);
if (!Array.isArray(array)) return record;
const filtered = array.filter(item =>
(item as any)[filterField] === filterValue
);
const result = { ...record };
helpers.set(result, source, filtered);
return result;
},
};
import { describe, it, expect } from 'vitest';
import { slugGeneratorOperator } from './slug-generator.operator';
const mockHelpers = {
get: (record: any, path: string) => path.split('.').reduce((obj, key) => obj?.[key], record),
set: (record: any, path: string, value: any) => {
const keys = path.split('.');
const last = keys.pop()!;
const target = keys.reduce((obj, key) => obj[key] ??= {}, record);
target[last] = value;
},
};
describe('Slug Generator Operator', () => {
it('should generate slug from name', () => {
const record = { name: 'Hello World' };
const config = { source: 'name', target: 'slug' };
const result = slugGeneratorOperator.applyOne(record, config, mockHelpers);
expect(result?.slug).toBe('hello-world');
});
it('should handle special characters', () => {
const record = { name: 'Café & Résumé' };
const config = { source: 'name', target: 'slug' };
const result = slugGeneratorOperator.applyOne(record, config, mockHelpers);
expect(result?.slug).toBe('cafe--resume');
});
});