Understanding the plugin architecture helps you use it effectively and extend it.
┌─────────────────────────────────────────────────────────────────┐
│ Admin UI │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Pipeline │ │ Connections │ │ Runs / Logs / │ │
│ │ Builder │ │ & Secrets │ │ Analytics │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
│ GraphQL
▼
┌─────────────────────────────────────────────────────────────────┐
│ Vendure Server │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ DataHub Plugin │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ GraphQL │ │ Job Queue │ │ Webhook │ │ │
│ │ │ Resolvers │ │ Handlers │ │ Controllers │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌────────────────────────────────────────────────────────┐│ │
│ │ │ Pipeline Runner Service ││ │
│ │ │ ││ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ││ │
│ │ │ │ Extract │ │Transform │ │ Validate │ │ Load │ ││ │
│ │ │ │ Executor │ │ Executor │ │ Executor │ │Executor│ ││ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ └────────┘ ││ │
│ │ │ │ │ │ │ ││ │
│ │ │ ▼ ▼ ▼ ▼ ││ │
│ │ │ ┌──────────────────────────────────────────────────┐ ││ │
│ │ │ │ Adapter Registry │ ││ │
│ │ │ │ ┌──────────┐ ┌──────────┐ ┌─────────────────┐ │ ││ │
│ │ │ │ │Extractors│ │Operators │ │ Loaders │ │ ││ │
│ │ │ │ └──────────┘ └──────────┘ └─────────────────┘ │ ││ │
│ │ │ └──────────────────────────────────────────────────┘ ││ │
│ │ └────────────────────────────────────────────────────────┘│ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────────┐│ │
│ │ │ Services ││ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ ││ │
│ │ │ │ Pipeline │ │ Secrets │ │Connection│ │ Logging│ ││ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ └────────┘ ││ │
│ │ └────────────────────────────────────────────────────────┘│ │
│ └─────────────────────────────────────────────────────────────┘│
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Database (TypeORM) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────────┐ │ │
│ │ │ Pipeline │ │ Runs │ │ Secrets │ │ Connections│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
DataHubPlugin is the main plugin class that registers:
| Entity | Purpose |
|---|---|
Pipeline |
Pipeline definitions |
PipelineRun |
Execution history |
PipelineRevision |
Version history |
PipelineLog |
Execution logs |
DataHubConnection |
External connections |
DataHubSecret |
Encrypted credentials |
DataHubSettings |
Plugin configuration |
DataHubRecordError |
Failed records |
DataHubCheckpoint |
Resume checkpoints |
DataHubRecordRetryAudit |
Retry audit trail |
| Service | Responsibility |
|---|---|
PipelineService |
CRUD operations for pipelines |
PipelineRunnerService |
Orchestrates pipeline execution |
AdapterRuntimeService |
Executes adapters |
SecretService |
Manages secrets |
ConnectionService |
Manages connections |
CheckpointService |
Manages checkpoints |
PipelineLogService |
Writes execution logs |
AnalyticsService |
Aggregates metrics |
RecordErrorService |
Manages failed records |
Step-type-specific execution logic:
| Executor | Step Types |
|---|---|
ExtractExecutor |
extract |
TransformExecutor |
transform, validate, enrich |
LoadExecutor |
load |
ExportExecutor |
export |
FeedExecutor |
feed |
SinkExecutor |
sink |
The registry manages all adapters via registerRuntime():
// All adapter types use the same registration method
registry.registerRuntime(httpApiExtractor);
registry.registerRuntime(databaseExtractor);
registry.registerRuntime(renameOperator);
registry.registerRuntime(setOperator);
registry.registerRuntime(productLoader);
registry.registerRuntime(customerLoader);
A pipeline run starts when:
Runs are processed via Vendure’s job queue:
await jobQueue.add({
type: 'data-hub.run',
pipelineId: pipeline.id,
triggeredBy: 'manual',
});
The runner orchestrates execution:
For each step:
Records flow through steps:
Extract → [record1, record2, ...] → Transform → Load
Each step can:
{
version: 1,
steps: [
{ key: 'extract', type: 'extract', config: {...} },
{ key: 'transform', type: 'transform', config: {...} },
{ key: 'load', type: 'load', config: {...} },
],
edges: [
{ from: 'extract', to: 'transform' },
{ from: 'transform', to: 'load' },
],
}
Each run has a context:
{
runId: string;
pipelineId: string;
startedAt: Date;
triggeredBy: string;
parameters: Record<string, any>;
variables: Record<string, any>;
checkpoint: CheckpointData;
}
Records are JSON objects:
interface Record {
[key: string]: JsonValue;
_meta?: {
sourceStep: string;
index: number;
hash?: string;
};
}
Implement the ExtractorAdapter interface:
interface ExtractorAdapter {
readonly type: 'EXTRACTOR';
code: string;
name: string;
extract(context: ExtractContext, config: JsonObject): AsyncGenerator<RecordEnvelope>;
}
Create operator definitions:
interface SingleRecordOperator<TConfig = JsonObject> {
readonly type: 'OPERATOR';
readonly pure: boolean;
applyOne(record: JsonObject, config: TConfig, helpers: AdapterOperatorHelpers): JsonObject | null;
}
Implement entity loading:
interface LoaderAdapter<TConfig = JsonObject> {
readonly type: 'LOADER';
load(context: LoadContext, config: TConfig, records: readonly JsonObject[]): Promise<LoadResult>;
}
See Extending the Plugin for details.
Code-first configurations are synced on startup:
DataHubPlugin.init()Custom permissions protect operations:
@Allow(DataHubPipelinePermission.Read)
@Query()
dataHubPipelines() { ... }
@Allow(RunDataHubPipelinePermission)
@Mutation()
startDataHubPipelineRun() { ... }
Secrets are encrypted at rest using Vendure’s encryption utilities.
Webhook requests can be verified with HMAC signatures.
Records are processed in batches:
{
throughput: {
batchSize: 100,
concurrency: 4,
}
}
Long runs save progress periodically:
Pipeline runs use the job queue:
Pipelines execute as Directed Acyclic Graphs (DAGs):
┌─────────┐ ┌───────────┐ ┌─────────┐
│ Trigger │────▶│ Extract │────▶│Transform│
└─────────┘ └───────────┘ └────┬────┘
│
┌──────────────────┴──────────────────┐
▼ ▼
┌───────────┐ ┌───────────┐
│ Route │ │ Enrich │
└─────┬─────┘ └─────┬─────┘
┌───────┴───────┐ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌───────────┐
│ Load A │ │ Load B │ │ Sink │
└────────┘ └────────┘ └───────────┘
Features:
For multi-instance deployments:
┌─────────────────────────────────────────────────────────────┐
│ Lock Backend Selection │
├─────────────────┬─────────────────┬─────────────────────────┤
│ Redis │ PostgreSQL │ Memory │
│ (Recommended) │ (Fallback) │ (Single Instance) │
├─────────────────┼─────────────────┼─────────────────────────┤
│ SET NX EX │ Advisory Locks │ Map<string, LockToken> │
│ Atomic ops │ pg_try_advisory │ setTimeout cleanup │
│ TTL expiration │ Transaction- │ No cluster support │
│ Cluster support │ scoped │ │
└─────────────────┴─────────────────┴─────────────────────────┘
Configuration:
DataHubPlugin.init({
distributedLock: {
backend: 'redis',
redis: { host: 'localhost', port: 6379 },
defaultTtlMs: 30000,
waitTimeoutMs: 5000,
},
})
Protects external service calls:
┌─────────────────────────────────────────────┐
│ Circuit States │
├─────────────────────────────────────────────┤
│ │
│ ┌────────┐ 5 failures ┌────────┐ │
│ │ CLOSED │─────────────▶│ OPEN │ │
│ │(normal)│ │(blocked)│ │
│ └────┬───┘ └────┬───┘ │
│ ▲ │ │
│ │ ┌───────────┐ │ 30s │
│ 3 success │HALF-OPEN │◀─────┘ │
│ └────│ (testing) │ │
│ └───────────┘ │
└─────────────────────────────────────────────┘
Applied to:
Message queue integration for event-driven pipelines:
┌─────────────────────────────────────────────────────────────┐
│ Queue Adapters │
├──────────────┬──────────────┬─────────────┬────────────────┤
│ RabbitMQ │ Amazon SQS │ Redis │ Internal │
│ (AMQP) │ │ Streams │ (BullMQ) │
├──────────────┼──────────────┼─────────────┼────────────────┤
│ Native AMQP │ AWS SDK │ XREAD/XADD │ Redis-backed │
│ Publisher │ Long polling │ Consumer │ Job queue │
│ confirms │ Visibility │ groups │ Delayed jobs │
│ Dead-letter │ timeout │ Pending │ Retries │
│ queues │ Batch recv │ entries │ │
└──────────────┴──────────────┴─────────────┴────────────────┘
Consumer patterns:
Pipelines support multiple concurrent triggers:
┌─────────────────────────────────────────────────────────────┐
│ Trigger Types │
├──────────┬──────────┬──────────┬──────────┬────────────────┤
│ Manual │ Schedule │ Webhook │ Event │ Message Queue │
├──────────┼──────────┼──────────┼──────────┼────────────────┤
│ UI/API │ Cron │ HTTP │ Vendure │ RabbitMQ/SQS/ │
│ trigger │ express │ endpoint │ events │ Redis Streams │
└──────────┴──────────┴──────────┴──────────┴────────────────┘
│ │ │ │ │
└───────────┴──────────┴──────────┴───────────┘
│
▼
┌─────────────────┐
│ Pipeline Runner │
└─────────────────┘
All triggers converge to the same execution engine, enabling:
Product feed generation for marketing channels:
┌─────────────────────────────────────────────────────────────┐
│ Feed Generation Pipeline │
│ │
│ ┌────────────┐ ┌────────────┐ ┌──────────────────┐ │
│ │ Vendure │───▶│ Feed │───▶│ Format Generator │ │
│ │ Products │ │ Filters │ │ │ │
│ └────────────┘ └────────────┘ │ ┌──────────────┐ │ │
│ │ │ Google XML │ │ │
│ │ ├──────────────┤ │ │
│ │ │ Meta/FB │ │ │
│ │ ├──────────────┤ │ │
│ │ │ RSS/Atom │ │ │
│ │ ├──────────────┤ │ │
│ │ │ CSV/JSON │ │ │
│ │ └──────────────┘ │ │
│ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Supports:
Output to external systems:
┌─────────────────────────────────────────────────────────────┐
│ Sink Executor │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Circuit Breaker │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Search │ │ Webhook │ │ Queue │ │
│ │ Engines │ │ │ │ Producer │ │
│ ├───────────┤ ├───────────┤ ├───────────┤ │
│ │MeiliSearch│ │ HTTP POST │ │ RabbitMQ │ │
│ │Elastic │ │ Auth │ │ SQS │ │
│ │Algolia │ │ Retry │ │ Redis │ │
│ │Typesense │ │ Timeout │ │ │ │
│ └───────────┘ └───────────┘ └───────────┘ │
└─────────────────────────────────────────────────────────────┘
All sinks feature:
src/plugins/data-hub/
├── src/ # Backend source
│ ├── api/ # GraphQL resolvers & schema
│ ├── bootstrap/ # Plugin initialization
│ ├── constants/ # Configuration constants
│ ├── decorators/ # Custom decorators
│ ├── enrichers/ # Record enrichers
│ ├── entities/ # TypeORM entities
│ ├── extractors/ # Data extractors
│ ├── feeds/ # Feed generators
│ ├── gql/ # Generated GraphQL types
│ ├── jobs/ # Job queue handlers
│ ├── loaders/ # Entity loaders
│ ├── mappers/ # Field mappers
│ ├── operators/ # Transform operators
│ ├── parsers/ # File parsers
│ ├── runtime/ # Execution engine
│ ├── sdk/ # Public SDK & DSL
│ ├── services/ # Business logic
│ ├── templates/ # Import/export templates
│ ├── transforms/ # Transform execution
│ ├── types/ # TypeScript types
│ ├── utils/ # Utilities
│ ├── validation/ # Pipeline definition validators
│ └── vendure-schemas/ # Vendure entity schema definitions
├── connectors/ # External system connectors (e.g. Pimcore)
├── dashboard/ # React Admin UI
│ ├── components/ # UI components
│ ├── constants/ # UI constants
│ ├── gql/ # GraphQL queries
│ ├── hooks/ # React hooks
│ ├── routes/ # Route components
│ ├── types/ # UI type definitions
│ └── utils/ # UI utilities
├── shared/ # Shared code (backend + dashboard)
│ ├── constants/ # Shared constants
│ ├── types/ # Shared types
│ └── utils/ # Shared utilities
├── dev-server/ # Development server & examples
├── docs/ # Documentation
└── e2e/ # End-to-end tests