Data Hub includes integrations for external services with built-in fault tolerance.
Index your data to search engines with automatic circuit breaker protection.
| Engine | Adapter Code | Features |
|---|---|---|
| MeiliSearch | meilisearch |
Full-text search, faceting |
| Elasticsearch | elasticsearch |
Advanced querying, analytics |
| OpenSearch | opensearch |
AWS-compatible, Elasticsearch-compatible API |
| Algolia | algolia |
Instant search, relevance tuning |
| Typesense | typesense |
Typo-tolerant, fast |
// Pipeline with search sink
.sink('index-products', {
adapterCode: 'meilisearch',
host: 'http://localhost:7700',
apiKeySecretCode: 'meili-api-key',
indexName: 'products',
primaryKey: 'id',
batchSize: 100,
})
Send data to external HTTP endpoints with retry and circuit breaker.
.sink('notify-external', {
adapterCode: 'webhook',
url: 'https://api.example.com/webhook',
method: 'POST',
bearerTokenSecretCode: 'webhook-token',
batchSize: 50,
timeoutMs: 30000,
retries: 3,
})
Publish messages to message queues. See Queue & Messaging for details.
.sink('to-queue', {
adapterCode: 'queueProducer',
queueType: 'rabbitmq-amqp',
connectionCode: 'my-rabbitmq',
queueName: 'product-updates',
})
All external sinks include automatic circuit breaker protection to prevent cascading failures.
Circuit breaker uses defaults from plugin configuration:
| Setting | Default | Description |
|---|---|---|
failureThreshold |
5 | Failures before opening |
successThreshold |
3 | Successes to close |
resetTimeoutMs |
30000 | Time before half-open |
failureWindowMs |
60000 | Window for counting failures |
When circuit breaker activates, warnings are logged:
WARN [SinkExecutor] Circuit breaker OPEN for sink:elasticsearch:https://search.example.com
- Blocking requests until reset
- Time until half-open: 25000ms
Enrich records by fetching data from external APIs during transformation.
.transform('enrich-inventory', {
operators: [
{
op: 'httpLookup',
args: {
url: 'https://api.inventory.com/stock/',
target: 'stockLevel',
responsePath: 'data.available',
bearerTokenSecretCode: 'inventory-api-key',
cacheTtlSec: 300,
rateLimitPerSecond: 50,
maxRetries: 3,
default: 0,
},
},
],
})
| Option | Description |
|---|---|
url |
Endpoint URL with `` placeholders |
method |
GET or POST |
target |
Field to store response |
responsePath |
JSON path to extract |
cacheTtlSec |
Cache duration (0 = disabled) |
rateLimitPerSecond |
Max requests per second |
maxRetries |
Retry attempts on failure |
timeoutMs |
Request timeout |
bearerTokenSecretCode |
Bearer token secret |
apiKeySecretCode |
API key secret |
default |
Value if lookup fails |
Always enable caching for HTTP lookups to reduce load:
{ op: 'httpLookup', args: { cacheTtlSec: 300, ... } }
Respect external API limits:
{ op: 'httpLookup', args: { rateLimitPerSecond: 10, ... } }
Handle failures gracefully:
{ op: 'httpLookup', args: { default: null, skipOn404: true, ... } }
Check logs for circuit breaker activations and investigate root causes.
Tune batch sizes based on external service capacity:
.sink('index', { batchSize: 100, ... })
failureThreshold for flaky servicesmaxRetries