Optimize Data Hub for high-throughput data processing and large-scale operations.
Data Hub processes data through a multi-stage pipeline with configurable performance characteristics:
| Operation | Records/Second | Notes |
|---|---|---|
| File Extraction (CSV) | 10,000-50,000 | Depends on file size, parsing complexity |
| HTTP API Extraction | 100-1,000 | Limited by rate limits, pagination |
| Database Extraction | 5,000-20,000 | Depends on query complexity, indexes |
| Transform Operations | 50,000-200,000 | Pure operations are fastest |
| Validation | 20,000-100,000 | Schema validation is slowest |
| Product Upsert | 50-200 | Vendure entity complexity, relations |
| Variant Upsert | 100-500 | Simpler than products |
| Customer Upsert | 200-800 | Moderate complexity |
| Search Indexing | 1,000-10,000 | Bulk operations are efficient |
Control processing speed and resource usage with throughput settings.
Set defaults in pipeline context:
createPipeline()
.context({
throughput: {
batchSize: 100, // Records per batch
concurrency: 4, // Parallel batches
rateLimitRps: 10, // Requests per second
drainStrategy: 'BACKOFF',
},
})
Override for specific steps:
.extract('fetch-api', {
adapterCode: 'httpApi',
url: 'https://api.example.com/products',
throughput: {
batchSize: 50, // Smaller batches for API
rateLimitRps: 5, // Respect API rate limit
},
})
.load('upsert-products', {
adapterCode: 'productUpsert',
throughput: {
batchSize: 20, // Smaller batches for complex writes
concurrency: 2, // Limit parallel database writes
},
})
interface Throughput {
// Records per batch (default: 100)
// Higher = more throughput, more memory
// Lower = less memory, better error isolation
batchSize?: number;
// Parallel batch processing (default: 1)
// Higher = more throughput, more CPU/memory
// Should be ≤ CPU cores
concurrency?: number;
// Maximum requests per second (default: unlimited)
// Prevents overwhelming external APIs
rateLimitRps?: number;
// Strategy when processing queue is full
drainStrategy?: 'BACKOFF' | 'SHED' | 'QUEUE';
// Pause on high error rate
pauseOnErrorRate?: {
threshold: number; // 0-1 (0.05 = 5%)
intervalSec: number; // Check interval
};
}
// BACKOFF - Slow down when queue is full (default)
drainStrategy: 'BACKOFF' // Reduces throughput temporarily
// SHED - Drop records when overloaded (use with caution)
drainStrategy: 'SHED' // May lose data
// QUEUE - Buffer records in memory (can cause OOM)
drainStrategy: 'QUEUE' // Unlimited buffering
Optimize batch sizes for different operation types.
// File extraction - Large batches
.extract('parse-csv', {
adapterCode: 'file',
format: 'CSV',
throughput: { batchSize: 1000 },
})
// API extraction - Respect pagination
.extract('fetch-api', {
adapterCode: 'httpApi',
throughput: { batchSize: 100 },
})
// Pure transforms - Large batches
.transform('normalize', {
operators: [/* pure operations */],
throughput: { batchSize: 5000 },
})
// Database writes - Small batches
.load('upsert-products', {
adapterCode: 'productUpsert',
throughput: { batchSize: 20 },
})
// Search indexing - Medium batches
.sink('index-search', {
adapterCode: 'meilisearch',
bulkSize: 500,
})
Adjust batch size based on data characteristics:
.hooks({
AFTER_EXTRACT: [{
type: 'INTERCEPTOR',
name: 'Adjust batch size',
code: `
// Larger records = smaller batches
const avgSize = records.reduce((sum, r) =>
sum + JSON.stringify(r).length, 0) / records.length;
if (avgSize > 10000) {
context.throughput.batchSize = 50;
} else if (avgSize > 5000) {
context.throughput.batchSize = 100;
} else {
context.throughput.batchSize = 500;
}
return records;
`,
}],
})
Enable parallel processing for independent steps.
createPipeline()
.parallel({
maxConcurrentSteps: 8,
errorPolicy: 'CONTINUE',
})
// These steps run in parallel (no data dependencies)
.extract('fetch-products', { /* ... */ })
.extract('fetch-prices', { /* ... */ })
.extract('fetch-inventory', { /* ... */ })
// Merge results
.transform('merge', { /* ... */ })
// Edges define dependencies
.edge('fetch-products', 'merge')
.edge('fetch-prices', 'merge')
.edge('fetch-inventory', 'merge')
// CPU-bound operations (transforms, validation)
.transform('heavy-processing', {
throughput: {
concurrency: 4, // ≈ CPU cores
},
})
// I/O-bound operations (API calls, database)
.extract('api-calls', {
throughput: {
concurrency: 16, // Can be > CPU cores
},
})
// Database writes
.load('upsert-entities', {
throughput: {
concurrency: 2, // Conservative for safety
},
})
For CPU-intensive operations:
import { Worker } from 'worker_threads';
// Register custom operator with worker pool
const heavyTransform = {
type: 'OPERATOR',
code: 'heavy-transform',
async apply(records, config) {
const workerPool = createWorkerPool(4);
const chunks = chunkArray(records, 100);
const results = await Promise.all(
chunks.map(chunk =>
workerPool.execute(chunk, config)
)
);
return results.flat();
},
};
Optimize database queries and writes.
// In vendure-config.ts
import { VendureConfig } from '@vendure/core';
export const config: VendureConfig = {
dbConnectionOptions: {
// Increase pool size for concurrent pipelines
extra: {
max: 20, // Maximum connections
min: 5, // Minimum connections
idleTimeoutMillis: 30000,
},
},
};
// Load only needed relations
.extract('query-products', {
adapterCode: 'vendureQuery',
entity: 'PRODUCT',
relations: 'variants,featuredAsset', // Only what's needed
batchSize: 500,
})
// Use indexes for lookups
.load('upsert-products', {
adapterCode: 'productUpsert',
matchField: 'slug', // Indexed field
})
// Batch related operations
.load('upsert-products', {
adapterCode: 'productUpsert',
strategy: 'UPSERT',
throughput: {
batchSize: 50, // Batch upserts together
concurrency: 2, // Limit concurrent batches
},
})
// Incremental extraction (much faster)
.extract('fetch-updates', {
adapterCode: 'database',
connectionCode: 'erp-db',
query: 'SELECT * FROM products WHERE updated_at > :checkpoint',
incremental: {
enabled: true,
field: 'updated_at',
},
throughput: { batchSize: 1000 },
})
// Use indexed columns in WHERE clauses
.extract('fetch-products', {
adapterCode: 'database',
query: `
SELECT * FROM products
WHERE status = 'active' -- indexed
AND updated_at > :checkpoint -- indexed
ORDER BY id -- indexed
`,
})
Prevent out-of-memory errors with large datasets.
// Enable streaming for large files
.extract('parse-large-csv', {
adapterCode: 'file',
path: '/data/large-file.csv',
format: 'CSV',
throughput: {
batchSize: 1000, // Process in chunks
},
})
// Note: Extractors use async generators (already streaming)
// Avoid operators that buffer all records
.transform('process', {
operators: [
// Good: operates record-by-record
{ op: 'rename', args: { from: 'old', to: 'new' } },
{ op: 'set', args: { path: 'status', value: 'active' } },
// Avoid: requires loading all records
// { op: 'deduplicate', args: { field: 'sku' } },
// { op: 'sort', args: { field: 'name' } },
],
})
.context({
checkpointing: {
enabled: true,
strategy: 'COUNT',
intervalRecords: 5000, // Checkpoint every 5K records
},
})
.hooks({
AFTER_TRANSFORM: [{
type: 'INTERCEPTOR',
name: 'Monitor memory',
code: `
const used = process.memoryUsage();
const usedMB = Math.round(used.heapUsed / 1024 / 1024);
if (usedMB > 1024) { // > 1GB
context.logger.warn(\`High memory usage: \${usedMB}MB\`);
// Trigger garbage collection if needed
if (global.gc) {
global.gc();
}
}
return records;
`,
}],
})
Optimize network requests and API calls.
// Use connection pooling for HTTP clients
.extract('fetch-api', {
adapterCode: 'httpApi',
connectionCode: 'external-api', // Reuses HTTP agent
throughput: {
concurrency: 10, // Reuse connections
},
})
// Enable compression for large payloads
.extract('fetch-api', {
adapterCode: 'httpApi',
url: 'https://api.example.com/data',
headers: {
'Accept-Encoding': 'gzip, deflate',
},
})
// Respect API rate limits
.extract('fetch-api', {
adapterCode: 'httpApi',
url: 'https://api.example.com/products',
throughput: {
rateLimitRps: 5, // 5 requests/second
batchSize: 100, // Request 100 items at a time
},
pagination: {
type: 'PAGE',
limit: 100,
},
})
.extract('fetch-api', {
adapterCode: 'httpApi',
url: 'https://api.example.com/products',
throughput: {
rateLimitRps: 10,
},
})
.context({
errorHandling: {
maxRetries: 5,
retryDelayMs: 1000,
maxRetryDelayMs: 60000,
backoffMultiplier: 2, // Exponential backoff
},
})
Track performance metrics and identify bottlenecks.
Monitor via the Analytics dashboard or GraphQL API:
query {
dataHubPipelinePerformance(
pipelineId: "pipeline-1"
timeRange: {
from: "2024-01-01T00:00:00Z"
to: "2024-01-31T23:59:59Z"
intervalMinutes: 60
}
) {
totalRuns
successfulRuns
failedRuns
avgDurationMs
p50DurationMs
p95DurationMs
p99DurationMs
}
}
.hooks({
BEFORE_TRANSFORM: [{
type: 'LOG',
level: 'INFO',
message: 'Starting transform step',
}],
AFTER_TRANSFORM: [{
type: 'INTERCEPTOR',
name: 'Profile step',
code: `
const duration = Date.now() - context.startTime;
const recordsPerSec = Math.round(records.length / (duration / 1000));
context.logger.info(\`Transform completed: \${records.length} records in \${duration}ms (\${recordsPerSec} rec/sec)\`);
return records;
`,
}],
})
import { DomainEventsService } from '@oronts/vendure-data-hub-plugin';
// Emit custom metrics
.hooks({
AFTER_LOAD: [{
type: 'SCRIPT',
scriptName: 'emitMetrics',
}],
})
// Register script
DataHubPlugin.init({
scripts: {
emitMetrics: async (records, context) => {
const events = context.services.get(DomainEventsService);
events.publish({
type: 'CUSTOM_METRIC',
payload: {
pipeline: context.pipelineCode,
step: context.stepKey,
recordCount: records.length,
timestamp: new Date(),
},
});
return records;
},
},
})
Identify and resolve performance issues.
Problem: Extracting data takes too long
Solutions:
// 1. Increase pagination/batch size
.extract('fetch-api', {
pagination: { limit: 500 }, // Was 100
})
// 2. Use incremental extraction
.extract('fetch-api', {
incremental: {
enabled: true,
field: 'updated_at',
},
})
// 3. Parallel API calls
.extract('fetch-api', {
throughput: {
concurrency: 10, // Was 1
},
})
// 4. Use database connection pooling
.extract('query-db', {
adapterCode: 'database',
connectionCode: 'db', // Uses pool
})
Problem: Transform step is bottleneck
Solutions:
// 1. Increase batch size
.transform('process', {
throughput: { batchSize: 5000 }, // Was 100
})
// 2. Remove unnecessary operators
.transform('process', {
operators: [
// Only essential transforms
{ op: 'rename', args: { from: 'old', to: 'new' } },
],
})
// 3. Use parallel processing
.transform('process', {
throughput: {
concurrency: 4,
batchSize: 1000,
},
})
// 4. Move complex logic to custom operator
// (compiled code is faster than multiple small operators)
Problem: Load step takes too long
Solutions:
// 1. Reduce batch size (counterintuitive but helps)
.load('upsert', {
throughput: { batchSize: 20 }, // Was 100
})
// 2. Limit concurrency
.load('upsert', {
throughput: {
batchSize: 50,
concurrency: 2, // Was 4
},
})
// 3. Use appropriate strategy
.load('upsert', {
strategy: 'CREATE', // Faster than UPSERT if records are new
})
// 4. Reduce relations loaded
.load('upsert', {
adapterCode: 'productUpsert',
config: {
loadRelations: false, // Don't load unnecessary relations
},
})
// 5. Use MERGE instead of SOURCE_WINS
.load('upsert', {
conflictStrategy: 'MERGE', // Only updates changed fields
})
Problem: Pipeline runs out of memory
Solutions:
// 1. Reduce batch sizes
.context({
throughput: { batchSize: 100 }, // Was 1000
})
// 2. Enable checkpointing
.context({
checkpointing: {
enabled: true,
intervalRecords: 1000,
},
})
// 3. Reduce concurrency
.context({
throughput: { concurrency: 2 }, // Was 8
})
// 4. Avoid buffering operators
// (deduplicate, sort, groupBy)
// 5. Use streaming extractors
// (already default for file/database)
Problem: External API returns 429 errors
Solutions:
// 1. Add rate limiting
.extract('fetch-api', {
throughput: {
rateLimitRps: 5, // Match API limit
},
})
// 2. Reduce concurrency
.extract('fetch-api', {
throughput: {
concurrency: 2, // Fewer parallel requests
},
})
// 3. Add retry with backoff
.context({
errorHandling: {
maxRetries: 5,
retryDelayMs: 2000,
backoffMultiplier: 2,
},
})
// 4. Use pagination efficiently
.extract('fetch-api', {
pagination: {
limit: 100, // Larger pages
maxPages: 10, // Limit total pages
},
})
// Initial configuration (safe)
.context({
throughput: {
batchSize: 100,
concurrency: 2,
},
})
// After profiling, increase gradually
.context({
throughput: {
batchSize: 500, // 5x increase
concurrency: 4, // 2x increase
},
})
// Add timing hooks to identify bottlenecks
.hooks({
AFTER_EXTRACT: [{ type: 'LOG', level: 'INFO', message: 'Extract done' }],
AFTER_TRANSFORM: [{ type: 'LOG', level: 'INFO', message: 'Transform done' }],
AFTER_VALIDATE: [{ type: 'LOG', level: 'INFO', message: 'Validate done' }],
AFTER_LOAD: [{ type: 'LOG', level: 'INFO', message: 'Load done' }],
})
// Review logs to find slowest steps
| Operation Type | Recommended Batch Size |
|---|---|
| File parsing | 1000-5000 |
| API extraction | 50-200 |
| Pure transforms | 1000-5000 |
| Database writes | 20-100 |
| Search indexing | 500-1000 |
| Validation | 100-500 |
.context({
checkpointing: {
enabled: true,
strategy: 'COUNT',
intervalRecords: 5000, // Checkpoint every 5K
},
})
.context({
throughput: {
pauseOnErrorRate: {
threshold: 0.05, // Pause at 5% error rate
intervalSec: 60,
},
},
})
.context({
errorHandling: {
deadLetterQueue: true,
alertOnDeadLetter: true,
},
})
// If 90% of records are new, use CREATE
.load('upsert', {
strategy: 'CREATE', // Faster than UPSERT
})
// If most records exist, use UPDATE
.load('upsert', {
strategy: 'UPDATE',
})
// Mixed? Use UPSERT with MERGE
.load('upsert', {
strategy: 'UPSERT',
conflictStrategy: 'MERGE',
})
// Run multiple pipeline instances with data partitioning
.extract('fetch-api', {
url: 'https://api.example.com/products',
query: {
// Partition by ID range
minId: process.env.PARTITION_MIN,
maxId: process.env.PARTITION_MAX,
},
})
// Cache HTTP lookups
.transform('enrich', {
operators: [{
op: 'httpLookup',
args: {
url: 'https://api.example.com/lookup/',
target: 'enrichedData',
cache: true, // Enable caching
cacheTtl: 3600, // 1 hour
},
}],
})
// Bad: N+1 queries
.extract('products', { /* ... */ })
.transform('enrich', {
operators: [
{ op: 'httpLookup', args: { url: '/prices/' } }, // 1 request per record
],
})
// Good: Batch request
.extract('products', { /* ... */ })
.enrich('fetch-prices', {
sourceType: 'HTTP',
endpoint: '/prices/batch', // Single batch request
matchField: 'id',
})
Before deploying to production: