Complete reference for all sink integrations including search engines, message queues, and webhooks.
Sinks send transformed data to external systems for search indexing, message queuing, or webhook notifications.
Code: meilisearch
Index records to MeiliSearch for fast, typo-tolerant search.
| Field | Type | Required | Description |
|---|---|---|---|
host |
string | Yes | Host URL (e.g., http://localhost:7700) |
apiKeySecretCode |
string | Yes | Secret code for API key |
indexName |
string | Yes | Target index name |
primaryKey |
string | Yes | Primary key field name |
bulkSize |
number | No | Records per batch |
searchableFields |
json | No | Array of searchable field names |
filterableFields |
json | No | Array of filterable field names |
sortableFields |
json | No | Array of sortable field names |
.sink('meilisearch-products', {
adapterCode: 'meilisearch',
host: 'http://localhost:7700',
apiKeySecretCode: 'meilisearch-api-key',
indexName: 'products',
primaryKey: 'id',
bulkSize: 500,
searchableFields: ['name', 'description', 'sku'],
filterableFields: ['categoryId', 'price', 'inStock'],
sortableFields: ['price', 'name', 'createdAt'],
})
The sink automatically configures index settings:
{
searchableFields: ['name', 'description'], // Fields for full-text search
filterableFields: ['categoryId', 'price'], // Fields for filtering
sortableFields: ['price', 'createdAt'], // Fields for sorting
}
Store API key as a secret:
DataHubPlugin.init({
secrets: [
{
code: 'meilisearch-api-key',
provider: 'ENV',
value: 'MEILISEARCH_API_KEY',
},
],
})
Code: elasticsearch
Index records to Elasticsearch or OpenSearch.
| Field | Type | Required | Description |
|---|---|---|---|
host |
string | Yes | Host URL (e.g., http://localhost:9200) |
apiKeySecretCode |
string | No | Secret code for API key auth |
basicSecretCode |
string | No | Secret code for Basic auth (base64-encoded username:password) |
indexName |
string | Yes | Target index name |
idField |
string | Yes | Document ID field |
bulkSize |
number | No | Records per batch |
refresh |
boolean | No | Refresh index after indexing |
.sink('elasticsearch-products', {
adapterCode: 'elasticsearch',
host: 'https://elasticsearch.example.com:9200',
apiKeySecretCode: 'elasticsearch-api-key',
indexName: 'products',
idField: 'id',
bulkSize: 1000,
refresh: true,
})
.sink('elasticsearch-products', {
adapterCode: 'elasticsearch',
host: 'http://localhost:9200',
basicSecretCode: 'es-basic-auth',
indexName: 'products',
idField: 'id',
})
Elasticsearch creates dynamic mappings. For production, create index mappings beforehand:
{
"mappings": {
"properties": {
"id": { "type": "keyword" },
"name": { "type": "text" },
"sku": { "type": "keyword" },
"price": { "type": "integer" },
"description": { "type": "text" },
"categories": { "type": "keyword" }
}
}
}
Works with Amazon OpenSearch Service using the same configuration.
Code: algolia
Index records to Algolia search service.
| Field | Type | Required | Description |
|---|---|---|---|
applicationId |
string | Yes | Algolia Application ID |
apiKeySecretCode |
string | Yes | Secret code for Admin API key |
indexName |
string | Yes | Target index name |
idField |
string | Yes | Field for object ID |
bulkSize |
number | No | Records per batch |
.sink('algolia-products', {
adapterCode: 'algolia',
applicationId: 'YOUR_APP_ID',
apiKeySecretCode: 'algolia-admin-key',
indexName: 'products',
idField: 'id',
bulkSize: 1000,
})
Algolia requires a unique objectID field. The idField specifies which record field to use:
.sink('algolia-products', {
adapterCode: 'algolia',
applicationId: 'YOUR_APP_ID',
apiKeySecretCode: 'algolia-admin-key',
indexName: 'products',
idField: 'sku', // Use SKU as the Algolia objectID
// ...
})
DataHubPlugin.init({
secrets: [
{
code: 'algolia-admin-key',
provider: 'ENV',
value: 'ALGOLIA_ADMIN_KEY',
},
],
})
Code: typesense
Index records to Typesense search engine.
| Field | Type | Required | Description |
|---|---|---|---|
host |
string | Yes | Typesense host |
port |
number | Yes | Typesense port |
protocol |
select | No | http or https |
apiKeySecretCode |
string | Yes | Secret code for API key |
collectionName |
string | Yes | Target collection name |
idField |
string | Yes | Document ID field |
bulkSize |
number | No | Records per batch |
.sink('typesense-products', {
adapterCode: 'typesense',
host: 'localhost',
port: 8108,
protocol: 'http',
apiKeySecretCode: 'typesense-api-key',
collectionName: 'products',
idField: 'id',
bulkSize: 250,
})
Create collection with schema before indexing:
{
"name": "products",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "name", "type": "string" },
{ "name": "description", "type": "string" },
{ "name": "price", "type": "int32" },
{ "name": "categories", "type": "string[]" }
],
"default_sorting_field": "price"
}
createPipeline()
.name('sync-to-search')
.trigger('schedule', {
type: 'SCHEDULE',
cron: '0 */4 * * *', // Every 4 hours
})
.extract('query-products', {
adapterCode: 'vendureQuery',
entity: 'PRODUCT',
relations: 'variants,featuredAsset,collections,facetValues,facetValues.facet,translations',
languageCode: 'en',
batchSize: 500,
})
.transform('prepare-search', {
operators: [
{ op: 'template', args: { template: '/products/${slug}', target: 'url' } },
{ op: 'copy', args: { source: 'featuredAsset.preview', target: 'image' } },
{ op: 'copy', args: { source: 'variants.0.price', target: 'minPrice' } },
],
})
.sink('meilisearch-products', {
adapterCode: 'meilisearch',
host: 'http://localhost:7700',
apiKeySecretCode: 'meilisearch-key',
indexName: 'products',
primaryKey: 'id',
})
createPipeline()
.name('product-updated-sync')
.trigger('vendure-event', {
type: 'EVENT',
event: 'ProductEvent',
})
.extract('get-product', {
adapterCode: 'vendureQuery',
entity: 'PRODUCT',
relations: 'variants,featuredAsset',
batchSize: 1,
})
.sink('elasticsearch', {
adapterCode: 'elasticsearch',
host: 'http://localhost:9200',
indexName: 'products',
idField: 'id',
})
createPipeline()
.name('multi-search-sync')
.extract('query-products', { /* ... */ })
.transform('prepare', { operators: [ /* ... */ ] })
// Primary search
.sink('meilisearch', {
adapterCode: 'meilisearch',
indexName: 'products',
// ...
})
// Analytics search
.sink('elasticsearch', {
adapterCode: 'elasticsearch',
indexName: 'products-analytics',
// ...
})
Adjust batch size based on:
{
batchSize: 500, // Start here, adjust based on performance
}
For large datasets, configure parallel processing per pipeline step:
{
batchSize: 500,
rateLimitPerSecond: 100, // Adjust based on target system capacity
}
For Elasticsearch, control index refresh:
{
refresh: false, // Faster, but documents not immediately searchable
}
Run manual refresh after bulk indexing.
Code: queueProducer
Publish records to RabbitMQ message queue via HTTP Management API (port 15672).
| Field | Type | Required | Description |
|---|---|---|---|
queueType |
select | Yes | Queue type (currently: rabbitmq) |
connectionCode |
string | Yes | Reference to queue connection configuration |
queueName |
string | Yes | RabbitMQ queue name to publish to |
routingKey |
string | No | Routing key for RabbitMQ exchanges |
messageType |
string | No | Message type header for consumers |
headers |
json | No | Static headers to include in messages |
idField |
string | No | Field to use as message ID for deduplication |
batchSize |
number | No | Number of messages to send per batch |
persistent |
boolean | No | Persist messages to disk (delivery mode 2) |
priority |
number | No | Message priority (1-10, higher = more urgent) |
delayMs |
number | No | Delay before message is available for consumption |
ttlMs |
number | No | Message time-to-live in milliseconds |
.sink('rabbitmq-orders', {
adapterCode: 'queueProducer',
queueType: 'rabbitmq',
connectionCode: 'rabbitmq-connection',
queueName: 'order-processing',
messageType: 'order.created',
idField: 'orderId',
persistent: true,
batchSize: 100,
})
.sink('rabbitmq-events', {
adapterCode: 'queueProducer',
queueType: 'rabbitmq',
connectionCode: 'rabbitmq-connection',
queueName: 'events',
routingKey: 'product.updated',
persistent: true,
priority: 5,
})
DataHubPlugin.init({
connections: [
{
code: 'rabbitmq-connection',
type: 'rabbitmq',
config: {
host: 'localhost',
port: 15672, // HTTP Management API port
usernameSecretCode: 'rabbitmq-user',
passwordSecretCode: 'rabbitmq-pass',
vhost: '/',
},
},
],
})
Code: webhook
Send records to an HTTP endpoint via POST requests.
| Field | Type | Required | Description |
|---|---|---|---|
url |
string | Yes | HTTP endpoint to send records to |
method |
select | No | HTTP method (POST, PUT, PATCH) |
headers |
json | No | HTTP headers as JSON object |
bearerTokenSecretCode |
string | No | Secret code for Bearer authentication |
apiKeySecretCode |
string | No | Secret code for API key authentication |
apiKeyHeader |
string | No | Header name for API key (default: X-API-Key) |
batchSize |
number | No | Records per request |
timeoutMs |
number | No | Request timeout in milliseconds (default: 30000) |
retries |
number | No | Maximum retry attempts on failure (default: 3) |
hmacSecretCode |
string | No | Secret code for HMAC signing. When set, each request includes an HMAC-SHA256 signature computed over the request body |
signatureHeaderName |
string | No | Header name for the HMAC signature (default: X-DataHub-Signature) |
.sink('webhook-notifications', {
adapterCode: 'webhook',
url: 'https://api.example.com/webhook/products',
method: 'POST',
headers: { 'Content-Type': 'application/json' },
bearerTokenSecretCode: 'webhook-bearer-token',
batchSize: 1,
})
.sink('webhook-bulk', {
adapterCode: 'webhook',
url: 'https://api.example.com/bulk-import',
method: 'POST',
apiKeySecretCode: 'webhook-api-key',
apiKeyHeader: 'X-API-Key',
batchSize: 100,
timeoutMs: 60000,
retries: 5,
})
Bearer Token:
DataHubPlugin.init({
secrets: [
{
code: 'webhook-bearer-token',
provider: 'ENV',
value: 'WEBHOOK_BEARER_TOKEN',
},
],
})
Sent as Authorization: Bearer {value} header.
API Key:
DataHubPlugin.init({
secrets: [
{
code: 'webhook-api-key',
provider: 'ENV',
value: 'WEBHOOK_API_KEY',
},
],
})
Sent in the header specified by apiKeyHeader (default: X-API-Key).