vendure-data-hub-plugin

Custom Triggers

Create custom trigger types to start pipelines from new event sources.

Overview

Data Hub supports several built-in trigger types:

Type Description Status
MANUAL Triggered via UI or API ✅ Implemented
SCHEDULE Cron-based scheduling ✅ Implemented
WEBHOOK HTTP webhook endpoint ✅ Implemented
EVENT Vendure event subscription ✅ Implemented
FILE File watch (FTP/S3/SFTP) ✅ Implemented
MESSAGE Queue/messaging ✅ Implemented

This guide covers how to implement custom trigger handlers. For production use, the built-in FILE and MESSAGE triggers cover most event-driven scenarios.

Trigger Architecture

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│  Trigger Source │────▶│  Trigger Handler │────▶│ Pipeline Engine │
│  (Queue/Event)  │     │  (Your Code)     │     │  (Execution)    │
└─────────────────┘     └──────────────────┘     └─────────────────┘

Implementing a Trigger Handler

Step 1: Define Trigger Configuration

// src/triggers/message-trigger.types.ts

import { TriggerType } from '@oronts/vendure-data-hub-plugin';

export interface MessageTriggerConfig {
    /** Connection code for queue system */
    connectionCode: string;
    /** Queue or topic name */
    queue: string;
    /** Consumer group (for Kafka) */
    consumerGroup?: string;
    /** Batch size for consuming messages */
    batchSize?: number;
    /** Acknowledgment mode */
    ackMode?: 'AUTO' | 'MANUAL';
    /** Dead letter queue for failed messages */
    deadLetterQueue?: string;
    /** Max retries before DLQ */
    maxRetries?: number;
}

export interface MessagePayload {
    id: string;
    body: unknown;
    headers?: Record<string, string>;
    timestamp: string;
    retryCount?: number;
}

Step 2: Create the Trigger Handler

// src/triggers/message-trigger.handler.ts

import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { RequestContext, TransactionalConnection } from '@vendure/core';
import {
    PipelineExecutionService,
    ConnectionService,
    DataHubLogger,
    DataHubLoggerFactory,
} from '@oronts/vendure-data-hub-plugin';
import { MessageTriggerConfig, MessagePayload } from './message-trigger.types';

@Injectable()
export class MessageTriggerHandler implements OnModuleInit, OnModuleDestroy {
    private readonly logger: DataHubLogger;
    private consumers: Map<string, QueueConsumer> = new Map();
    private isRunning = false;

    constructor(
        private pipelineService: PipelineExecutionService,
        private connectionService: ConnectionService,
        private connection: TransactionalConnection,
        loggerFactory: DataHubLoggerFactory,
    ) {
        this.logger = loggerFactory.createLogger('MessageTriggerHandler');
    }

    async onModuleInit() {
        // Load all pipelines with message triggers and start consumers
        await this.initializeConsumers();
        this.isRunning = true;
    }

    async onModuleDestroy() {
        this.isRunning = false;
        await this.stopAllConsumers();
    }

    /**
     * Initialize consumers for all pipelines with message triggers
     */
    private async initializeConsumers() {
        const ctx = RequestContext.empty();
        const pipelines = await this.pipelineService.findPipelinesWithTriggerType(ctx, 'message');

        for (const pipeline of pipelines) {
            // Find message trigger by TYPE in steps array
            const messageTriggers = findEnabledTriggersByType(pipeline.definition, 'message');
            for (const trigger of messageTriggers) {
                const config = trigger.config as MessageTriggerConfig;
                await this.startConsumer(pipeline.id, pipeline.code, config);
            }
        }

        this.logger.info(`Initialized ${this.consumers.size} message consumers`);
    }

    /**
     * Start a consumer for a specific pipeline
     */
    async startConsumer(pipelineId: string, pipelineCode: string, config: MessageTriggerConfig) {
        const key = `${pipelineCode}:${config.queue}`;

        if (this.consumers.has(key)) {
            this.logger.warn(`Consumer already exists for ${key}`);
            return;
        }

        try {
            // Get connection configuration
            const ctx = RequestContext.empty();
            const connection = await this.connectionService.findByCode(ctx, config.connectionCode);

            if (!connection) {
                throw new Error(`Connection not found: ${config.connectionCode}`);
            }

            // Create appropriate consumer based on connection type
            const consumer = await this.createConsumer(connection.type, connection.config, config);

            // Set up message handler
            consumer.onMessage(async (message: MessagePayload) => {
                await this.handleMessage(pipelineId, pipelineCode, config, message);
            });

            // Start consuming
            await consumer.start();

            this.consumers.set(key, consumer);
            this.logger.info(`Started consumer for ${key}`);
        } catch (error) {
            this.logger.error(`Failed to start consumer for ${key}`, error);
        }
    }

    /**
     * Stop a consumer
     */
    async stopConsumer(pipelineCode: string, queue: string) {
        const key = `${pipelineCode}:${queue}`;
        const consumer = this.consumers.get(key);

        if (consumer) {
            await consumer.stop();
            this.consumers.delete(key);
            this.logger.info(`Stopped consumer for ${key}`);
        }
    }

    /**
     * Stop all consumers
     */
    private async stopAllConsumers() {
        for (const [key, consumer] of this.consumers) {
            await consumer.stop();
            this.logger.info(`Stopped consumer for ${key}`);
        }
        this.consumers.clear();
    }

    /**
     * Handle incoming message
     */
    private async handleMessage(
        pipelineId: string,
        pipelineCode: string,
        config: MessageTriggerConfig,
        message: MessagePayload,
    ) {
        const ctx = RequestContext.empty();

        try {
            this.logger.debug(`Received message for ${pipelineCode}`, {
                messageId: message.id,
                queue: config.queue,
            });

            // Create trigger payload
            const triggerPayload = {
                type: 'MESSAGE' as const,
                timestamp: new Date().toISOString(),
                data: message.body,
                meta: {
                    messageId: message.id,
                    queue: config.queue,
                    headers: message.headers,
                    retryCount: message.retryCount || 0,
                },
            };

            // Execute pipeline
            const result = await this.pipelineService.executePipeline(
                ctx,
                pipelineCode,
                { triggerPayload },
            );

            this.logger.info(`Pipeline ${pipelineCode} completed`, {
                messageId: message.id,
                status: result.status,
                recordsProcessed: result.metrics?.totalRecords,
            });

            return { success: true };
        } catch (error) {
            this.logger.error(`Pipeline ${pipelineCode} failed`, {
                messageId: message.id,
                error: error.message,
            });

            // Handle retry logic
            const retryCount = (message.retryCount || 0) + 1;
            if (retryCount < (config.maxRetries || 3)) {
                return { success: false, retry: true, retryCount };
            }

            // Send to DLQ if configured
            if (config.deadLetterQueue) {
                // Implement DLQ sending
            }

            return { success: false, retry: false };
        }
    }

    /**
     * Create consumer based on connection type
     */
    private async createConsumer(
        type: string,
        connectionConfig: any,
        triggerConfig: MessageTriggerConfig,
    ): Promise<QueueConsumer> {
        switch (type) {
            case 'redis':
                return new RedisConsumer(connectionConfig, triggerConfig);
            case 'rabbitmq':
                return new RabbitMQConsumer(connectionConfig, triggerConfig);
            case 'kafka':
                return new KafkaConsumer(connectionConfig, triggerConfig);
            case 'sqs':
                return new SQSConsumer(connectionConfig, triggerConfig);
            default:
                throw new Error(`Unsupported queue type: ${type}`);
        }
    }
}

// Consumer interface
interface QueueConsumer {
    onMessage(handler: (message: MessagePayload) => Promise<{ success: boolean; retry?: boolean }>): void;
    start(): Promise<void>;
    stop(): Promise<void>;
}

Step 3: Implement Queue-Specific Consumers

Redis Consumer (using BullMQ)

// src/triggers/consumers/redis-consumer.ts

import { Queue, Worker } from 'bullmq';
import { MessagePayload, MessageTriggerConfig } from '../message-trigger.types';

interface RedisConnectionConfig {
    host: string;
    port: number;
    password?: string;
    db?: number;
}

export class RedisConsumer implements QueueConsumer {
    private worker: Worker | null = null;
    private messageHandler: ((message: MessagePayload) => Promise<any>) | null = null;

    constructor(
        private connectionConfig: RedisConnectionConfig,
        private triggerConfig: MessageTriggerConfig,
    ) {}

    onMessage(handler: (message: MessagePayload) => Promise<any>) {
        this.messageHandler = handler;
    }

    async start() {
        const connection = {
            host: this.connectionConfig.host,
            port: this.connectionConfig.port,
            password: this.connectionConfig.password,
            db: this.connectionConfig.db || 0,
        };

        this.worker = new Worker(
            this.triggerConfig.queue,
            async (job) => {
                if (!this.messageHandler) return;

                const message: MessagePayload = {
                    id: job.id || '',
                    body: job.data,
                    timestamp: new Date(job.timestamp).toISOString(),
                    retryCount: job.attemptsMade,
                };

                const result = await this.messageHandler(message);

                if (!result.success && result.retry) {
                    throw new Error('Retry requested');
                }
            },
            {
                connection,
                concurrency: this.triggerConfig.batchSize || 1,
            },
        );

        this.worker.on('error', (err) => {
            console.error('Worker error:', err);
        });
    }

    async stop() {
        if (this.worker) {
            await this.worker.close();
            this.worker = null;
        }
    }
}

RabbitMQ Consumer

// src/triggers/consumers/rabbitmq-consumer.ts

import * as amqp from 'amqplib';
import { MessagePayload, MessageTriggerConfig } from '../message-trigger.types';

interface RabbitMQConnectionConfig {
    url: string;
    // or individual fields
    host?: string;
    port?: number;
    username?: string;
    password?: string;
    vhost?: string;
}

export class RabbitMQConsumer implements QueueConsumer {
    private connection: amqp.Connection | null = null;
    private channel: amqp.Channel | null = null;
    private messageHandler: ((message: MessagePayload) => Promise<any>) | null = null;

    constructor(
        private connectionConfig: RabbitMQConnectionConfig,
        private triggerConfig: MessageTriggerConfig,
    ) {}

    onMessage(handler: (message: MessagePayload) => Promise<any>) {
        this.messageHandler = handler;
    }

    async start() {
        const url = this.connectionConfig.url || this.buildUrl();
        this.connection = await amqp.connect(url);
        this.channel = await this.connection.createChannel();

        // Ensure queue exists
        await this.channel.assertQueue(this.triggerConfig.queue, { durable: true });

        // Set prefetch for batch processing
        await this.channel.prefetch(this.triggerConfig.batchSize || 1);

        // Start consuming
        await this.channel.consume(
            this.triggerConfig.queue,
            async (msg) => {
                if (!msg || !this.messageHandler) return;

                const message: MessagePayload = {
                    id: msg.properties.messageId || msg.properties.correlationId || '',
                    body: JSON.parse(msg.content.toString()),
                    headers: msg.properties.headers as Record<string, string>,
                    timestamp: new Date().toISOString(),
                    retryCount: (msg.properties.headers?.['x-retry-count'] as number) || 0,
                };

                try {
                    const result = await this.messageHandler(message);

                    if (result.success || !result.retry) {
                        this.channel?.ack(msg);
                    } else {
                        // Requeue with retry count
                        this.channel?.nack(msg, false, true);
                    }
                } catch (error) {
                    this.channel?.nack(msg, false, true);
                }
            },
            { noAck: this.triggerConfig.ackMode === 'AUTO' },
        );
    }

    async stop() {
        if (this.channel) {
            await this.channel.close();
            this.channel = null;
        }
        if (this.connection) {
            await this.connection.close();
            this.connection = null;
        }
    }

    private buildUrl(): string {
        const { host, port, username, password, vhost } = this.connectionConfig;
        const auth = username ? `${username}:${password}@` : '';
        return `amqp://${auth}${host || 'localhost'}:${port || 5672}/${vhost || ''}`;
    }
}

Step 4: Register the Trigger Handler

// src/triggers/message-trigger.module.ts

import { Module } from '@nestjs/common';
import { MessageTriggerHandler } from './message-trigger.handler';

@Module({
    providers: [MessageTriggerHandler],
    exports: [MessageTriggerHandler],
})
export class MessageTriggerModule {}
// In your plugin
import { VendurePlugin } from '@vendure/core';
import { DataHubPlugin } from '@oronts/vendure-data-hub-plugin';
import { MessageTriggerModule } from './triggers/message-trigger.module';

@VendurePlugin({
    imports: [DataHubPlugin, MessageTriggerModule],
})
export class MyQueueTriggersPlugin {}

Using Message Triggers in Pipelines

Pipeline Definition

import { createPipeline } from '@oronts/vendure-data-hub-plugin';

const orderSyncPipeline = createPipeline()
    .name('order-queue-sync')
    .description('Process orders from message queue')
    .trigger('queue-trigger', {
        type: 'MESSAGE',
        message: {
            connectionCode: 'rabbitmq-main',
            queue: 'orders.created',
            batchSize: 10,
            ackMode: 'MANUAL',
            deadLetterQueue: 'orders.dead-letter',
            maxRetries: 3,
        },
    })
    .extract('from-payload', {
        adapterCode: 'inMemory',
        // Data comes from trigger payload
    })
    .transform('validate', {
        adapterCode: 'validateRequired',
        fields: ['orderId', 'customerId', 'items'],
    })
    .load('create-order', {
        adapterCode: 'orderLoader',
    })
    .build();

Connection Configuration

DataHubPlugin.init({
    connections: [
        {
            code: 'rabbitmq-main',
            type: 'rabbitmq',
            config: {
                host: 'localhost',
                port: 5672,
                username: 'guest',
                password: 'guest',
                vhost: '/',
            },
        },
        {
            code: 'redis-queue',
            type: 'redis',
            config: {
                host: 'localhost',
                port: 6379,
                password: 'secret',
            },
        },
        {
            code: 'kafka-cluster',
            type: 'kafka',
            config: {
                brokers: ['kafka1:9092', 'kafka2:9092'],
                clientId: 'vendure-datahub',
                ssl: true,
                sasl: {
                    mechanism: 'plain',
                    username: 'user',
                    password: 'pass',
                },
            },
        },
    ],
});

Bi-directional Queue Integration

Consuming + Producing

const fullQueuePipeline = createPipeline()
    .name('queue-to-queue')
    .description('Consume from one queue, process, produce to another')
    // Consume from input queue
    .trigger('input-queue', {
        type: 'MESSAGE',
        message: {
            connectionCode: 'rabbitmq-main',
            queue: 'orders.pending',
        },
    })
    .extract('from-payload', { adapterCode: 'inMemory' })
    .transform('process', {
        adapterCode: 'map',
        mapping: {
            'orderId': 'id',
            'status': '"processed"',
            'processedAt': 'new Date().toISOString()',
        },
    })
    // Produce to output queue
    .sink('output-queue', {
        adapterCode: 'queue-producer',  // Custom sink
        connectionCode: 'rabbitmq-main',
        queue: 'orders.processed',
    })
    .build();

Queue Producer Sink

import { SinkAdapter, SinkContext, SinkResult } from '@oronts/vendure-data-hub-plugin';

export const queueProducerSink: SinkAdapter = {
    type: 'SINK',
    code: 'queue-producer',
    name: 'Queue Producer',
    sinkType: 'CUSTOM',
    schema: {
        fields: [
            { key: 'connectionCode', type: 'string', required: true, label: 'Connection' },
            { key: 'queue', type: 'string', required: true, label: 'Queue/Topic' },
            { key: 'routingKey', type: 'string', label: 'Routing Key' },
        ],
    },

    async index(context, config, records): Promise<SinkResult> {
        const connection = await context.connections.get(config.connectionCode);

        // Implement queue-specific producer logic
        // ...

        return { indexed: records.length, deleted: 0, failed: 0 };
    },
};

Error Handling

Retry Configuration

trigger('queue-trigger', {
    type: 'MESSAGE',
    message: {
        connectionCode: 'rabbitmq-main',
        queue: 'orders',
        maxRetries: 5,
        deadLetterQueue: 'orders.dlq',
    },
})

Dead Letter Queue Processing

const dlqProcessingPipeline = createPipeline()
    .name('dlq-processor')
    .trigger('dlq-trigger', {
        type: 'MESSAGE',
        message: {
            connectionCode: 'rabbitmq-main',
            queue: 'orders.dlq',
        },
    })
    .extract('from-payload', { adapterCode: 'inMemory' })
    .load('log-error', {
        adapterCode: 'webhook',
        url: 'https://monitoring.example.com/dlq-alert',
    })
    .build();

FILE Triggers - Automatic File Detection

FILE triggers automatically monitor remote file systems (FTP, SFTP, S3) and trigger pipelines when new files are detected.

How It Works

  1. Poll for Files: Service polls remote path at configured intervals
  2. Pattern Matching: Filters files using glob patterns (e.g., *.csv, orders-*.json)
  3. Change Detection: Tracks processed files using timestamps
  4. Auto-Trigger: Executes pipeline with file metadata when new files appear
  5. Distributed Lock: Prevents duplicate processing in multi-instance deployments

Configuration

import { createPipeline } from '@oronts/vendure-data-hub-plugin';

const autoImportPipeline = createPipeline()
    .name('auto-import-orders')
    .description('Automatically import orders from FTP when files arrive')
    .trigger('file-watcher', {
        type: 'FILE',
        fileWatch: {
            connectionCode: 'sftp-main',      // FTP/SFTP/S3 connection
            path: '/incoming/orders',          // Remote path to watch
            pattern: 'orders-*.csv',           // Glob pattern (optional)
            pollIntervalMs: 60000,             // Poll every 60 seconds
            minFileAge: 30,                    // Wait 30 sec after modification
            recursive: true,                   // Watch subdirectories
        },
    })
    .extract('read-file', {
        adapterCode: 'ftp',
        connectionCode: 'sftp-main',
        remotePath: '',  // Use detected file path
        format: 'csv',
    })
    .transform('validate', {
        adapterCode: 'validateRequired',
        fields: ['orderId', 'customerId'],
    })
    .load('create-orders', {
        adapterCode: 'orderLoader',
    })
    .build();

Connection Setup

FTP/SFTP Connection

DataHubPlugin.init({
    connections: [
        {
            code: 'sftp-main',
            type: 'sftp',
            name: 'SFTP Server',
            config: {
                host: 'ftp.example.com',
                port: 22,
                username: 'ftpuser',
                password: 'secret',
                // Or use private key
                // privateKey: fs.readFileSync('/path/to/key'),
            },
        },
    ],
});

S3 Connection

DataHubPlugin.init({
    connections: [
        {
            code: 's3-bucket',
            type: 's3',
            name: 'AWS S3',
            config: {
                region: 'us-east-1',
                bucket: 'my-data-bucket',
                accessKeyId: 'AKIA...',
                secretAccessKey: 'secret',
            },
        },
    ],
});

Trigger Payload

When a file is detected, the pipeline receives:

{
    type: 'FILE',
    timestamp: '2026-02-23T10:30:00.000Z',
    data: {
        path: '/incoming/orders/orders-2026-02-23.csv',
        name: 'orders-2026-02-23.csv',
        modifiedAt: '2026-02-23T10:29:45.000Z',
        size: 1024000,  // bytes
        connectionCode: 'sftp-main',
    },
    meta: {
        triggerKey: 'file-watcher',
        watchPath: '/incoming/orders',
        pattern: 'orders-*.csv',
    },
}

Accessing File Data in Pipeline

Use template expressions to access trigger data:

.extract('read-file', {
    adapterCode: 'ftp',
    connectionCode: '',
    remotePath: '',  // Detected file path
    format: 'csv',
})

Glob Patterns

FILE triggers support powerful glob patterns:

Pattern Matches Example
*.csv All CSV files orders.csv, products.csv
orders-*.json Files starting with “orders-“ orders-2026-02-23.json
**/*.xml XML files in any subdirectory 2026/02/data.xml
{orders,products}-*.csv Multiple prefixes orders-*.csv, products-*.csv

Advanced Configuration

Multiple Watch Paths

Create separate triggers for different paths:

const pipeline = createPipeline()
    .name('multi-source-import')
    .trigger('watch-orders', {
        type: 'FILE',
        fileWatch: {
            connectionCode: 'sftp-main',
            path: '/incoming/orders',
            pattern: '*.csv',
            pollIntervalMs: 60000,
        },
    })
    .trigger('watch-products', {
        type: 'FILE',
        fileWatch: {
            connectionCode: 'sftp-main',
            path: '/incoming/products',
            pattern: '*.json',
            pollIntervalMs: 120000,  // Poll less frequently
        },
    })
    // ... extraction and loading steps
    .build();

Conditional Processing with ROUTE Step

Process different file types differently:

const pipeline = createPipeline()
    .name('smart-file-processor')
    .trigger('watch-all', {
        type: 'FILE',
        fileWatch: {
            connectionCode: 's3-bucket',
            path: '/incoming',
            pattern: '*',  // Watch all files
        },
    })
    .extract('read-file', {
        adapterCode: 's3',
        // ... config
    })
    .route('by-file-type', {
        routes: [
            {
                condition: '',
                stepKey: 'csv-transform',
            },
            {
                condition: '',
                stepKey: 'json-transform',
            },
        ],
    })
    // ... different transform steps
    .build();

File Age Filtering

minFileAge prevents processing files that are still being written:

fileWatch: {
    minFileAge: 60,  // Wait 60 seconds after last modification
}

Why This Matters:

Polling Interval Tuning

Choose based on your requirements:

Interval Use Case Trade-off
30s Real-time processing Higher load on file system
5 min Standard batch imports Balanced
1 hour Large file processing Lower overhead

Minimum: 30 seconds (enforced)

Checkpoint Management

FILE triggers automatically track processed files:

Distributed Deployment

FILE triggers use distributed locks:

Error Handling

If file processing fails:

  1. Error logged with file details
  2. Checkpoint NOT updated (file will be retried on next poll)
  3. Configure pipeline-level retry/error handling
  4. Use GATE step for manual review of problematic files

Monitoring

Monitor file trigger activity:

Best Practices

  1. Use Specific Patterns: Narrow patterns reduce false matches
    pattern: 'orders-YYYY-MM-DD-*.csv'  // Good
    pattern: '*'                          // Too broad
    
  2. Set Appropriate minFileAge: Allow time for uploads to complete
    minFileAge: 60  // Good for large files
    minFileAge: 0   // Risk processing incomplete files
    
  3. Test Poll Intervals: Balance responsiveness vs. system load
    pollIntervalMs: 300000  // 5 min - good default
    
  4. Use Archive Pattern: Move processed files to archive folder
    // After successful load, use webhook or custom step to archive
    .sink('archive-file', {
        adapterCode: 'ftp',
        operation: 'move',
        sourcePath: '',
        targetPath: '/archive/',
    })
    
  5. Handle File Conflicts: Use timestamps in filenames
    pattern: 'orders--*.csv'  // Prevents overwrites
    

Testing Triggers

import { describe, it, expect, vi } from 'vitest';
import { MessageTriggerHandler } from './message-trigger.handler';

describe('MessageTriggerHandler', () => {
    it('should handle message and execute pipeline', async () => {
        const mockPipelineService = {
            executePipeline: vi.fn().mockResolvedValue({
                status: 'COMPLETED',
                metrics: { totalRecords: 5 },
            }),
        };

        const handler = new MessageTriggerHandler(
            mockPipelineService as any,
            {} as any,
            {} as any,
            { createLogger: () => mockLogger } as any,
        );

        await handler.handleMessage(
            'pipeline-1',
            'test-pipeline',
            { connectionCode: 'test', queue: 'test' },
            { id: 'msg-1', body: { orderId: '123' }, timestamp: new Date().toISOString() },
        );

        expect(mockPipelineService.executePipeline).toHaveBeenCalledWith(
            expect.anything(),
            'test-pipeline',
            expect.objectContaining({
                triggerPayload: expect.objectContaining({
                    type: 'MESSAGE',
                    data: { orderId: '123' },
                }),
            }),
        );
    });
});