Integrate Data Hub with message queues for event-driven data pipelines.
Queue/messaging integration enables:
| System | Consume | Produce | Status |
|---|---|---|---|
| RabbitMQ (AMQP) | ✅ | ✅ | Recommended - Native AMQP 0-9-1 protocol |
| RabbitMQ (HTTP) | ✅ | ✅ | HTTP Management API fallback |
| Amazon SQS | ✅ | ✅ | Full support (requires @aws-sdk/client-sqs) |
| Redis Streams | ✅ | ✅ | Consumer groups, XACK (requires ioredis) |
| Apache Kafka | ⚠️ | ⚠️ | REST Proxy required (not native client) |
| Google Pub/Sub | ❌ | ❌ | Use custom adapter with @google-cloud/pubsub |
Install the required packages for your queue system:
# For Amazon SQS
npm install @aws-sdk/client-sqs
# For Redis Streams
npm install ioredis
# For RabbitMQ AMQP (recommended)
npm install amqplib
Note: If the optional dependency is not installed, the adapter will throw a helpful error message explaining which package to install.
See Custom Triggers for implementation guide.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │────▶│ Queue │────▶│ Data Hub │
│ System │ │ Broker │ │ Pipeline │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Consumer │
│ (Vendure) │
└─────────────┘
DataHubPlugin.init({
connections: [
{
code: 'rabbitmq-main',
type: 'rabbitmq-amqp', // Use AMQP protocol
name: 'RabbitMQ Production',
config: {
host: 'rabbitmq.example.com',
port: 5672,
username: 'user',
password: 'pass',
vhost: '/',
ssl: false,
},
},
],
});
DataHubPlugin.init({
connections: [
{
code: 'rabbitmq-http',
type: 'rabbitmq', // HTTP Management API
name: 'RabbitMQ via HTTP',
config: {
host: 'rabbitmq.example.com',
port: 15672, // Management API port
username: 'user',
password: 'pass',
vhost: '/',
},
},
],
});
DataHubPlugin.init({
connections: [
{
code: 'sqs-queue',
type: 'sqs',
name: 'AWS SQS',
config: {
region: 'us-east-1',
accessKeyId: 'AKIA...',
secretAccessKey: 'secret',
// Optional: for LocalStack or custom endpoints
// endpoint: 'http://localhost:4566',
accountId: '123456789012',
},
},
],
});
DataHubPlugin.init({
connections: [
{
code: 'redis-streams',
type: 'redis-streams',
name: 'Redis Streams',
config: {
host: 'localhost',
port: 6379,
password: 'your-password',
db: 0,
// Consumer group settings
consumerGroup: 'datahub-consumers',
consumerName: 'consumer-1',
},
},
],
});
DataHubPlugin.init({
connections: [
{
code: 'kafka-cluster',
type: 'kafka',
name: 'Kafka Cluster',
config: {
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
clientId: 'vendure-datahub',
ssl: true,
sasl: {
mechanism: 'plain',
username: 'api-key',
password: 'api-secret',
},
},
},
],
});
import { createPipeline } from '@oronts/vendure-data-hub-plugin';
const orderProcessor = createPipeline()
.name('order-queue-processor')
.description('Process orders from message queue')
.trigger('order-queue', {
type: 'MESSAGE',
message: {
connectionCode: 'rabbitmq-main',
queue: 'orders.created',
batchSize: 10,
ackMode: 'MANUAL',
maxRetries: 3,
deadLetterQueue: 'orders.dlq',
},
})
.extract('from-message', {
adapterCode: 'inMemory',
// Message body is automatically injected
})
.transform('validate', {
adapterCode: 'validateRequired',
fields: ['orderId', 'customerId', 'items'],
})
.transform('enrich', {
adapterCode: 'map',
mapping: {
'processedAt': 'new Date().toISOString()',
'source': '"queue"',
},
})
.load('upsert-order', {
adapterCode: 'orderLoader',
})
.build();
| Option | Type | Description |
|---|---|---|
connectionCode |
string | Reference to queue connection |
queue |
string | Queue or topic name |
batchSize |
number | Messages to process at once |
ackMode |
‘AUTO’ | ‘MANUAL’ | Acknowledgment mode |
maxRetries |
number | Retries before DLQ |
deadLetterQueue |
string | DLQ for failed messages |
consumerGroup |
string | Consumer group (Kafka) |
const stockUpdatePipeline = createPipeline()
.name('stock-to-queue')
.description('Send stock updates to queue')
.trigger('schedule', {
type: 'SCHEDULE',
schedule: { cron: '*/5 * * * *' },
})
.extract('stock-changes', {
adapterCode: 'vendureQuery',
entity: 'PRODUCT_VARIANT',
// Get recently updated variants
})
.transform('prepare-message', {
adapterCode: 'map',
mapping: {
'sku': 'sku',
'stockOnHand': 'stockOnHand',
'timestamp': 'new Date().toISOString()',
},
})
.sink('to-queue', {
adapterCode: 'queue-producer',
connectionCode: 'rabbitmq-main',
queue: 'inventory.updates',
routingKey: 'stock.updated',
})
.build();
| Option | Type | Description |
|---|---|---|
connectionCode |
string | Reference to queue connection |
queue |
string | Target queue or topic |
routingKey |
string | Routing key (RabbitMQ) |
partition |
string | Partition key field (Kafka) |
headers |
object | Message headers |
persistent |
boolean | Persist messages |
// When order is placed externally, sync to Vendure
const externalOrderSync = createPipeline()
.name('external-order-sync')
.trigger('external-orders', {
type: 'MESSAGE',
message: {
connectionCode: 'kafka-cluster',
queue: 'ecommerce.orders',
consumerGroup: 'vendure-sync',
},
})
.extract('from-message', { adapterCode: 'inMemory' })
.transform('map-order', {
adapterCode: 'map',
mapping: {
'code': 'externalOrderId',
'customerId': 'customer.email',
'lines': 'items',
},
})
.load('create-order', { adapterCode: 'orderLoader' })
.build();
// Consume stock updates from warehouse system
const warehouseStockSync = createPipeline()
.name('warehouse-stock-sync')
.trigger('warehouse-updates', {
type: 'MESSAGE',
message: {
connectionCode: 'rabbitmq-main',
queue: 'warehouse.stock',
},
})
.extract('from-message', { adapterCode: 'inMemory' })
.load('update-stock', {
adapterCode: 'stockAdjust',
skuField: 'sku',
stockByLocationField: 'stockByLocation',
absolute: true,
})
.build();
// Receive price updates from ERP
const erpPriceSync = createPipeline()
.name('erp-price-sync')
.trigger('erp-prices', {
type: 'MESSAGE',
message: {
connectionCode: 'sqs-queue',
queue: 'erp-price-updates',
},
})
.extract('from-message', { adapterCode: 'inMemory' })
.transform('convert-price', {
adapterCode: 'toCents',
source: 'price',
target: 'priceInCents',
})
.load('update-variant', {
adapterCode: 'variantUpsert',
strategy: 'UPDATE',
matchField: 'sku',
})
.build();
// Publish product changes to multiple queues
const productChangeFanout = createPipeline()
.name('product-change-fanout')
.trigger('product-event', {
type: 'EVENT',
event: 'ProductEvent',
})
.extract('from-event', { adapterCode: 'inMemory' })
.sink('to-search-queue', {
adapterCode: 'queue-producer',
connectionCode: 'rabbitmq-main',
queue: 'search.reindex',
})
.sink('to-analytics-queue', {
adapterCode: 'queue-producer',
connectionCode: 'rabbitmq-main',
queue: 'analytics.product-change',
})
.sink('to-feed-queue', {
adapterCode: 'queue-producer',
connectionCode: 'rabbitmq-main',
queue: 'feeds.regenerate',
})
.build();
Messages that fail processing are retried with exponential backoff:
Configure DLQ for failed messages:
.trigger('order-queue', {
type: 'MESSAGE',
message: {
connectionCode: 'rabbitmq-main',
queue: 'orders.created',
maxRetries: 3,
deadLetterQueue: 'orders.dead-letter',
},
})
Create a separate pipeline to handle dead letters:
const dlqProcessor = createPipeline()
.name('order-dlq-processor')
.trigger('dlq', {
type: 'MESSAGE',
message: {
connectionCode: 'rabbitmq-main',
queue: 'orders.dead-letter',
},
})
.extract('from-message', { adapterCode: 'inMemory' })
.transform('add-metadata', {
adapterCode: 'enrich',
set: {
'_dlqProcessedAt': 'new Date().toISOString()',
'_status': 'manual-review',
},
})
.load('save-for-review', {
adapterCode: 'restPost',
url: 'https://api.example.com/dlq-review',
})
.build();
Monitor queue-based pipelines:
query {
dataHubPipelineRuns(
filter: { pipelineCode: "order-queue-processor" }
take: 100
) {
items {
id
status
startedAt
completedAt
metrics
triggerPayload
}
}
}
Check queue consumer status:
query {
dataHubQueueStats {
pending
running
failed
completedToday
}
dataHubConsumers {
pipelineCode
queueName
isActive
messagesProcessed
messagesFailed
lastMessageAt
}
}
Use consistent message format:
{
"id": "msg-12345",
"type": "order.created",
"timestamp": "2024-01-15T10:30:00Z",
"source": "external-system",
"data": {
"orderId": "ORD-001",
"customerId": "CUST-123",
"items": []
},
"metadata": {
"correlationId": "abc-123",
"version": "1.0"
}
}
Ensure pipeline can handle duplicate messages:
.transform('check-idempotency', {
adapterCode: 'deltaFilter',
idPath: 'id',
// Only process if record changed
})
Process messages in batches for efficiency:
message: {
queue: 'high-volume-events',
batchSize: 100, // Process 100 messages at once
}
Ensure messages are acknowledged before shutdown:
message: {
ackMode: 'MANUAL', // Ack only after successful processing
}
maxRetries settingbatchSize for throughput