Create custom trigger types to start pipelines from new event sources.
Data Hub supports several built-in trigger types:
| Type | Description | Status |
|---|---|---|
MANUAL |
Triggered via UI or API | ✅ Implemented |
SCHEDULE |
Cron-based scheduling | ✅ Implemented |
WEBHOOK |
HTTP webhook endpoint | ✅ Implemented |
EVENT |
Vendure event subscription | ✅ Implemented |
FILE |
File watch (FTP/S3/SFTP) | ✅ Implemented |
MESSAGE |
Queue/messaging | ✅ Implemented |
This guide covers how to implement custom trigger handlers. For production use, the built-in FILE and MESSAGE triggers cover most event-driven scenarios.
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Trigger Source │────▶│ Trigger Handler │────▶│ Pipeline Engine │
│ (Queue/Event) │ │ (Your Code) │ │ (Execution) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
// src/triggers/message-trigger.types.ts
import { TriggerType } from '@oronts/vendure-data-hub-plugin';
export interface MessageTriggerConfig {
/** Connection code for queue system */
connectionCode: string;
/** Queue or topic name */
queue: string;
/** Consumer group (for Kafka) */
consumerGroup?: string;
/** Batch size for consuming messages */
batchSize?: number;
/** Acknowledgment mode */
ackMode?: 'AUTO' | 'MANUAL';
/** Dead letter queue for failed messages */
deadLetterQueue?: string;
/** Max retries before DLQ */
maxRetries?: number;
}
export interface MessagePayload {
id: string;
body: unknown;
headers?: Record<string, string>;
timestamp: string;
retryCount?: number;
}
// src/triggers/message-trigger.handler.ts
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { RequestContext, TransactionalConnection } from '@vendure/core';
import {
PipelineExecutionService,
ConnectionService,
DataHubLogger,
DataHubLoggerFactory,
} from '@oronts/vendure-data-hub-plugin';
import { MessageTriggerConfig, MessagePayload } from './message-trigger.types';
@Injectable()
export class MessageTriggerHandler implements OnModuleInit, OnModuleDestroy {
private readonly logger: DataHubLogger;
private consumers: Map<string, QueueConsumer> = new Map();
private isRunning = false;
constructor(
private pipelineService: PipelineExecutionService,
private connectionService: ConnectionService,
private connection: TransactionalConnection,
loggerFactory: DataHubLoggerFactory,
) {
this.logger = loggerFactory.createLogger('MessageTriggerHandler');
}
async onModuleInit() {
// Load all pipelines with message triggers and start consumers
await this.initializeConsumers();
this.isRunning = true;
}
async onModuleDestroy() {
this.isRunning = false;
await this.stopAllConsumers();
}
/**
* Initialize consumers for all pipelines with message triggers
*/
private async initializeConsumers() {
const ctx = RequestContext.empty();
const pipelines = await this.pipelineService.findPipelinesWithTriggerType(ctx, 'message');
for (const pipeline of pipelines) {
// Find message trigger by TYPE in steps array
const messageTriggers = findEnabledTriggersByType(pipeline.definition, 'message');
for (const trigger of messageTriggers) {
const config = trigger.config as MessageTriggerConfig;
await this.startConsumer(pipeline.id, pipeline.code, config);
}
}
this.logger.info(`Initialized ${this.consumers.size} message consumers`);
}
/**
* Start a consumer for a specific pipeline
*/
async startConsumer(pipelineId: string, pipelineCode: string, config: MessageTriggerConfig) {
const key = `${pipelineCode}:${config.queue}`;
if (this.consumers.has(key)) {
this.logger.warn(`Consumer already exists for ${key}`);
return;
}
try {
// Get connection configuration
const ctx = RequestContext.empty();
const connection = await this.connectionService.findByCode(ctx, config.connectionCode);
if (!connection) {
throw new Error(`Connection not found: ${config.connectionCode}`);
}
// Create appropriate consumer based on connection type
const consumer = await this.createConsumer(connection.type, connection.config, config);
// Set up message handler
consumer.onMessage(async (message: MessagePayload) => {
await this.handleMessage(pipelineId, pipelineCode, config, message);
});
// Start consuming
await consumer.start();
this.consumers.set(key, consumer);
this.logger.info(`Started consumer for ${key}`);
} catch (error) {
this.logger.error(`Failed to start consumer for ${key}`, error);
}
}
/**
* Stop a consumer
*/
async stopConsumer(pipelineCode: string, queue: string) {
const key = `${pipelineCode}:${queue}`;
const consumer = this.consumers.get(key);
if (consumer) {
await consumer.stop();
this.consumers.delete(key);
this.logger.info(`Stopped consumer for ${key}`);
}
}
/**
* Stop all consumers
*/
private async stopAllConsumers() {
for (const [key, consumer] of this.consumers) {
await consumer.stop();
this.logger.info(`Stopped consumer for ${key}`);
}
this.consumers.clear();
}
/**
* Handle incoming message
*/
private async handleMessage(
pipelineId: string,
pipelineCode: string,
config: MessageTriggerConfig,
message: MessagePayload,
) {
const ctx = RequestContext.empty();
try {
this.logger.debug(`Received message for ${pipelineCode}`, {
messageId: message.id,
queue: config.queue,
});
// Create trigger payload
const triggerPayload = {
type: 'MESSAGE' as const,
timestamp: new Date().toISOString(),
data: message.body,
meta: {
messageId: message.id,
queue: config.queue,
headers: message.headers,
retryCount: message.retryCount || 0,
},
};
// Execute pipeline
const result = await this.pipelineService.executePipeline(
ctx,
pipelineCode,
{ triggerPayload },
);
this.logger.info(`Pipeline ${pipelineCode} completed`, {
messageId: message.id,
status: result.status,
recordsProcessed: result.metrics?.totalRecords,
});
return { success: true };
} catch (error) {
this.logger.error(`Pipeline ${pipelineCode} failed`, {
messageId: message.id,
error: error.message,
});
// Handle retry logic
const retryCount = (message.retryCount || 0) + 1;
if (retryCount < (config.maxRetries || 3)) {
return { success: false, retry: true, retryCount };
}
// Send to DLQ if configured
if (config.deadLetterQueue) {
// Implement DLQ sending
}
return { success: false, retry: false };
}
}
/**
* Create consumer based on connection type
*/
private async createConsumer(
type: string,
connectionConfig: any,
triggerConfig: MessageTriggerConfig,
): Promise<QueueConsumer> {
switch (type) {
case 'redis':
return new RedisConsumer(connectionConfig, triggerConfig);
case 'rabbitmq':
return new RabbitMQConsumer(connectionConfig, triggerConfig);
case 'kafka':
return new KafkaConsumer(connectionConfig, triggerConfig);
case 'sqs':
return new SQSConsumer(connectionConfig, triggerConfig);
default:
throw new Error(`Unsupported queue type: ${type}`);
}
}
}
// Consumer interface
interface QueueConsumer {
onMessage(handler: (message: MessagePayload) => Promise<{ success: boolean; retry?: boolean }>): void;
start(): Promise<void>;
stop(): Promise<void>;
}
// src/triggers/consumers/redis-consumer.ts
import { Queue, Worker } from 'bullmq';
import { MessagePayload, MessageTriggerConfig } from '../message-trigger.types';
interface RedisConnectionConfig {
host: string;
port: number;
password?: string;
db?: number;
}
export class RedisConsumer implements QueueConsumer {
private worker: Worker | null = null;
private messageHandler: ((message: MessagePayload) => Promise<any>) | null = null;
constructor(
private connectionConfig: RedisConnectionConfig,
private triggerConfig: MessageTriggerConfig,
) {}
onMessage(handler: (message: MessagePayload) => Promise<any>) {
this.messageHandler = handler;
}
async start() {
const connection = {
host: this.connectionConfig.host,
port: this.connectionConfig.port,
password: this.connectionConfig.password,
db: this.connectionConfig.db || 0,
};
this.worker = new Worker(
this.triggerConfig.queue,
async (job) => {
if (!this.messageHandler) return;
const message: MessagePayload = {
id: job.id || '',
body: job.data,
timestamp: new Date(job.timestamp).toISOString(),
retryCount: job.attemptsMade,
};
const result = await this.messageHandler(message);
if (!result.success && result.retry) {
throw new Error('Retry requested');
}
},
{
connection,
concurrency: this.triggerConfig.batchSize || 1,
},
);
this.worker.on('error', (err) => {
console.error('Worker error:', err);
});
}
async stop() {
if (this.worker) {
await this.worker.close();
this.worker = null;
}
}
}
// src/triggers/consumers/rabbitmq-consumer.ts
import * as amqp from 'amqplib';
import { MessagePayload, MessageTriggerConfig } from '../message-trigger.types';
interface RabbitMQConnectionConfig {
url: string;
// or individual fields
host?: string;
port?: number;
username?: string;
password?: string;
vhost?: string;
}
export class RabbitMQConsumer implements QueueConsumer {
private connection: amqp.Connection | null = null;
private channel: amqp.Channel | null = null;
private messageHandler: ((message: MessagePayload) => Promise<any>) | null = null;
constructor(
private connectionConfig: RabbitMQConnectionConfig,
private triggerConfig: MessageTriggerConfig,
) {}
onMessage(handler: (message: MessagePayload) => Promise<any>) {
this.messageHandler = handler;
}
async start() {
const url = this.connectionConfig.url || this.buildUrl();
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// Ensure queue exists
await this.channel.assertQueue(this.triggerConfig.queue, { durable: true });
// Set prefetch for batch processing
await this.channel.prefetch(this.triggerConfig.batchSize || 1);
// Start consuming
await this.channel.consume(
this.triggerConfig.queue,
async (msg) => {
if (!msg || !this.messageHandler) return;
const message: MessagePayload = {
id: msg.properties.messageId || msg.properties.correlationId || '',
body: JSON.parse(msg.content.toString()),
headers: msg.properties.headers as Record<string, string>,
timestamp: new Date().toISOString(),
retryCount: (msg.properties.headers?.['x-retry-count'] as number) || 0,
};
try {
const result = await this.messageHandler(message);
if (result.success || !result.retry) {
this.channel?.ack(msg);
} else {
// Requeue with retry count
this.channel?.nack(msg, false, true);
}
} catch (error) {
this.channel?.nack(msg, false, true);
}
},
{ noAck: this.triggerConfig.ackMode === 'AUTO' },
);
}
async stop() {
if (this.channel) {
await this.channel.close();
this.channel = null;
}
if (this.connection) {
await this.connection.close();
this.connection = null;
}
}
private buildUrl(): string {
const { host, port, username, password, vhost } = this.connectionConfig;
const auth = username ? `${username}:${password}@` : '';
return `amqp://${auth}${host || 'localhost'}:${port || 5672}/${vhost || ''}`;
}
}
// src/triggers/message-trigger.module.ts
import { Module } from '@nestjs/common';
import { MessageTriggerHandler } from './message-trigger.handler';
@Module({
providers: [MessageTriggerHandler],
exports: [MessageTriggerHandler],
})
export class MessageTriggerModule {}
// In your plugin
import { VendurePlugin } from '@vendure/core';
import { DataHubPlugin } from '@oronts/vendure-data-hub-plugin';
import { MessageTriggerModule } from './triggers/message-trigger.module';
@VendurePlugin({
imports: [DataHubPlugin, MessageTriggerModule],
})
export class MyQueueTriggersPlugin {}
import { createPipeline } from '@oronts/vendure-data-hub-plugin';
const orderSyncPipeline = createPipeline()
.name('order-queue-sync')
.description('Process orders from message queue')
.trigger('queue-trigger', {
type: 'MESSAGE',
message: {
connectionCode: 'rabbitmq-main',
queue: 'orders.created',
batchSize: 10,
ackMode: 'MANUAL',
deadLetterQueue: 'orders.dead-letter',
maxRetries: 3,
},
})
.extract('from-payload', {
adapterCode: 'inMemory',
// Data comes from trigger payload
})
.transform('validate', {
adapterCode: 'validateRequired',
fields: ['orderId', 'customerId', 'items'],
})
.load('create-order', {
adapterCode: 'orderLoader',
})
.build();
DataHubPlugin.init({
connections: [
{
code: 'rabbitmq-main',
type: 'rabbitmq',
config: {
host: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
vhost: '/',
},
},
{
code: 'redis-queue',
type: 'redis',
config: {
host: 'localhost',
port: 6379,
password: 'secret',
},
},
{
code: 'kafka-cluster',
type: 'kafka',
config: {
brokers: ['kafka1:9092', 'kafka2:9092'],
clientId: 'vendure-datahub',
ssl: true,
sasl: {
mechanism: 'plain',
username: 'user',
password: 'pass',
},
},
},
],
});
const fullQueuePipeline = createPipeline()
.name('queue-to-queue')
.description('Consume from one queue, process, produce to another')
// Consume from input queue
.trigger('input-queue', {
type: 'MESSAGE',
message: {
connectionCode: 'rabbitmq-main',
queue: 'orders.pending',
},
})
.extract('from-payload', { adapterCode: 'inMemory' })
.transform('process', {
adapterCode: 'map',
mapping: {
'orderId': 'id',
'status': '"processed"',
'processedAt': 'new Date().toISOString()',
},
})
// Produce to output queue
.sink('output-queue', {
adapterCode: 'queue-producer', // Custom sink
connectionCode: 'rabbitmq-main',
queue: 'orders.processed',
})
.build();
import { SinkAdapter, SinkContext, SinkResult } from '@oronts/vendure-data-hub-plugin';
export const queueProducerSink: SinkAdapter = {
type: 'SINK',
code: 'queue-producer',
name: 'Queue Producer',
sinkType: 'CUSTOM',
schema: {
fields: [
{ key: 'connectionCode', type: 'string', required: true, label: 'Connection' },
{ key: 'queue', type: 'string', required: true, label: 'Queue/Topic' },
{ key: 'routingKey', type: 'string', label: 'Routing Key' },
],
},
async index(context, config, records): Promise<SinkResult> {
const connection = await context.connections.get(config.connectionCode);
// Implement queue-specific producer logic
// ...
return { indexed: records.length, deleted: 0, failed: 0 };
},
};
trigger('queue-trigger', {
type: 'MESSAGE',
message: {
connectionCode: 'rabbitmq-main',
queue: 'orders',
maxRetries: 5,
deadLetterQueue: 'orders.dlq',
},
})
const dlqProcessingPipeline = createPipeline()
.name('dlq-processor')
.trigger('dlq-trigger', {
type: 'MESSAGE',
message: {
connectionCode: 'rabbitmq-main',
queue: 'orders.dlq',
},
})
.extract('from-payload', { adapterCode: 'inMemory' })
.load('log-error', {
adapterCode: 'webhook',
url: 'https://monitoring.example.com/dlq-alert',
})
.build();
FILE triggers automatically monitor remote file systems (FTP, SFTP, S3) and trigger pipelines when new files are detected.
*.csv, orders-*.json)import { createPipeline } from '@oronts/vendure-data-hub-plugin';
const autoImportPipeline = createPipeline()
.name('auto-import-orders')
.description('Automatically import orders from FTP when files arrive')
.trigger('file-watcher', {
type: 'FILE',
fileWatch: {
connectionCode: 'sftp-main', // FTP/SFTP/S3 connection
path: '/incoming/orders', // Remote path to watch
pattern: 'orders-*.csv', // Glob pattern (optional)
pollIntervalMs: 60000, // Poll every 60 seconds
minFileAge: 30, // Wait 30 sec after modification
recursive: true, // Watch subdirectories
},
})
.extract('read-file', {
adapterCode: 'ftp',
connectionCode: 'sftp-main',
remotePath: '', // Use detected file path
format: 'csv',
})
.transform('validate', {
adapterCode: 'validateRequired',
fields: ['orderId', 'customerId'],
})
.load('create-orders', {
adapterCode: 'orderLoader',
})
.build();
DataHubPlugin.init({
connections: [
{
code: 'sftp-main',
type: 'sftp',
name: 'SFTP Server',
config: {
host: 'ftp.example.com',
port: 22,
username: 'ftpuser',
password: 'secret',
// Or use private key
// privateKey: fs.readFileSync('/path/to/key'),
},
},
],
});
DataHubPlugin.init({
connections: [
{
code: 's3-bucket',
type: 's3',
name: 'AWS S3',
config: {
region: 'us-east-1',
bucket: 'my-data-bucket',
accessKeyId: 'AKIA...',
secretAccessKey: 'secret',
},
},
],
});
When a file is detected, the pipeline receives:
{
type: 'FILE',
timestamp: '2026-02-23T10:30:00.000Z',
data: {
path: '/incoming/orders/orders-2026-02-23.csv',
name: 'orders-2026-02-23.csv',
modifiedAt: '2026-02-23T10:29:45.000Z',
size: 1024000, // bytes
connectionCode: 'sftp-main',
},
meta: {
triggerKey: 'file-watcher',
watchPath: '/incoming/orders',
pattern: 'orders-*.csv',
},
}
Use template expressions to access trigger data:
.extract('read-file', {
adapterCode: 'ftp',
connectionCode: '',
remotePath: '', // Detected file path
format: 'csv',
})
FILE triggers support powerful glob patterns:
| Pattern | Matches | Example |
|---|---|---|
*.csv |
All CSV files | orders.csv, products.csv |
orders-*.json |
Files starting with “orders-“ | orders-2026-02-23.json |
**/*.xml |
XML files in any subdirectory | 2026/02/data.xml |
{orders,products}-*.csv |
Multiple prefixes | orders-*.csv, products-*.csv |
Create separate triggers for different paths:
const pipeline = createPipeline()
.name('multi-source-import')
.trigger('watch-orders', {
type: 'FILE',
fileWatch: {
connectionCode: 'sftp-main',
path: '/incoming/orders',
pattern: '*.csv',
pollIntervalMs: 60000,
},
})
.trigger('watch-products', {
type: 'FILE',
fileWatch: {
connectionCode: 'sftp-main',
path: '/incoming/products',
pattern: '*.json',
pollIntervalMs: 120000, // Poll less frequently
},
})
// ... extraction and loading steps
.build();
Process different file types differently:
const pipeline = createPipeline()
.name('smart-file-processor')
.trigger('watch-all', {
type: 'FILE',
fileWatch: {
connectionCode: 's3-bucket',
path: '/incoming',
pattern: '*', // Watch all files
},
})
.extract('read-file', {
adapterCode: 's3',
// ... config
})
.route('by-file-type', {
routes: [
{
condition: '',
stepKey: 'csv-transform',
},
{
condition: '',
stepKey: 'json-transform',
},
],
})
// ... different transform steps
.build();
minFileAge prevents processing files that are still being written:
fileWatch: {
minFileAge: 60, // Wait 60 seconds after last modification
}
Why This Matters:
Choose based on your requirements:
| Interval | Use Case | Trade-off |
|---|---|---|
| 30s | Real-time processing | Higher load on file system |
| 5 min | Standard batch imports | Balanced |
| 1 hour | Large file processing | Lower overhead |
Minimum: 30 seconds (enforced)
FILE triggers automatically track processed files:
FILE triggers use distributed locks:
If file processing fails:
Monitor file trigger activity:
triggerType: FILEpattern: 'orders-YYYY-MM-DD-*.csv' // Good
pattern: '*' // Too broad
minFileAge: 60 // Good for large files
minFileAge: 0 // Risk processing incomplete files
pollIntervalMs: 300000 // 5 min - good default
// After successful load, use webhook or custom step to archive
.sink('archive-file', {
adapterCode: 'ftp',
operation: 'move',
sourcePath: '',
targetPath: '/archive/',
})
pattern: 'orders--*.csv' // Prevents overwrites
import { describe, it, expect, vi } from 'vitest';
import { MessageTriggerHandler } from './message-trigger.handler';
describe('MessageTriggerHandler', () => {
it('should handle message and execute pipeline', async () => {
const mockPipelineService = {
executePipeline: vi.fn().mockResolvedValue({
status: 'COMPLETED',
metrics: { totalRecords: 5 },
}),
};
const handler = new MessageTriggerHandler(
mockPipelineService as any,
{} as any,
{} as any,
{ createLogger: () => mockLogger } as any,
);
await handler.handleMessage(
'pipeline-1',
'test-pipeline',
{ connectionCode: 'test', queue: 'test' },
{ id: 'msg-1', body: { orderId: '123' }, timestamp: new Date().toISOString() },
);
expect(mockPipelineService.executePipeline).toHaveBeenCalledWith(
expect.anything(),
'test-pipeline',
expect.objectContaining({
triggerPayload: expect.objectContaining({
type: 'MESSAGE',
data: { orderId: '123' },
}),
}),
);
});
});