vendure-data-hub-plugin

Architecture Overview

Understanding the plugin architecture helps you use it effectively and extend it.

High-Level Architecture

┌─────────────────────────────────────────────────────────────────┐
│                         Admin UI                                 │
│   ┌──────────────┐  ┌──────────────┐  ┌──────────────────────┐  │
│   │ Pipeline     │  │ Connections  │  │ Runs / Logs /        │  │
│   │ Builder      │  │ & Secrets    │  │ Analytics            │  │
│   └──────────────┘  └──────────────┘  └──────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
                              │
                              │ GraphQL
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                       Vendure Server                             │
│                                                                  │
│  ┌────────────────────────────────────────────────────────────┐ │
│  │                    DataHub Plugin                           │ │
│  │                                                             │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐ │ │
│  │  │ GraphQL     │  │ Job Queue   │  │ Webhook             │ │ │
│  │  │ Resolvers   │  │ Handlers    │  │ Controllers         │ │ │
│  │  └─────────────┘  └─────────────┘  └─────────────────────┘ │ │
│  │         │                │                  │               │ │
│  │         ▼                ▼                  ▼               │ │
│  │  ┌────────────────────────────────────────────────────────┐│ │
│  │  │              Pipeline Runner Service                    ││ │
│  │  │                                                        ││ │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌────────┐ ││ │
│  │  │  │ Extract  │  │Transform │  │ Validate │  │  Load  │ ││ │
│  │  │  │ Executor │  │ Executor │  │ Executor │  │Executor│ ││ │
│  │  │  └──────────┘  └──────────┘  └──────────┘  └────────┘ ││ │
│  │  │         │            │             │            │      ││ │
│  │  │         ▼            ▼             ▼            ▼      ││ │
│  │  │  ┌──────────────────────────────────────────────────┐ ││ │
│  │  │  │              Adapter Registry                     │ ││ │
│  │  │  │  ┌──────────┐  ┌──────────┐  ┌─────────────────┐ │ ││ │
│  │  │  │  │Extractors│  │Operators │  │    Loaders      │ │ ││ │
│  │  │  │  └──────────┘  └──────────┘  └─────────────────┘ │ ││ │
│  │  │  └──────────────────────────────────────────────────┘ ││ │
│  │  └────────────────────────────────────────────────────────┘│ │
│  │                                                             │ │
│  │  ┌────────────────────────────────────────────────────────┐│ │
│  │  │                     Services                            ││ │
│  │  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌────────┐ ││ │
│  │  │  │ Pipeline │  │ Secrets  │  │Connection│  │ Logging│ ││ │
│  │  │  └──────────┘  └──────────┘  └──────────┘  └────────┘ ││ │
│  │  └────────────────────────────────────────────────────────┘│ │
│  └─────────────────────────────────────────────────────────────┘│
│                                                                  │
│  ┌────────────────────────────────────────────────────────────┐ │
│  │                     Database (TypeORM)                      │ │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌────────────┐ │ │
│  │  │ Pipeline │  │  Runs    │  │ Secrets  │  │ Connections│ │ │
│  │  └──────────┘  └──────────┘  └──────────┘  └────────────┘ │ │
│  └────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

Core Components

Plugin Entry Point

DataHubPlugin is the main plugin class that registers:

Entities

Entity Purpose
Pipeline Pipeline definitions
PipelineRun Execution history
PipelineRevision Version history
PipelineLog Execution logs
DataHubConnection External connections
DataHubSecret Encrypted credentials
DataHubSettings Plugin configuration
DataHubRecordError Failed records
DataHubCheckpoint Resume checkpoints
DataHubRecordRetryAudit Retry audit trail

Services

Service Responsibility
PipelineService CRUD operations for pipelines
PipelineRunnerService Orchestrates pipeline execution
AdapterRuntimeService Executes adapters
SecretService Manages secrets
ConnectionService Manages connections
CheckpointService Manages checkpoints
PipelineLogService Writes execution logs
AnalyticsService Aggregates metrics
RecordErrorService Manages failed records

Executors

Step-type-specific execution logic:

Executor Step Types
ExtractExecutor extract
TransformExecutor transform, validate, enrich
LoadExecutor load
ExportExecutor export
FeedExecutor feed
SinkExecutor sink

Adapter Registry

The registry manages all adapters via registerRuntime():

// All adapter types use the same registration method
registry.registerRuntime(httpApiExtractor);
registry.registerRuntime(databaseExtractor);
registry.registerRuntime(renameOperator);
registry.registerRuntime(setOperator);
registry.registerRuntime(productLoader);
registry.registerRuntime(customerLoader);

Execution Flow

1. Trigger

A pipeline run starts when:

2. Job Queue

Runs are processed via Vendure’s job queue:

await jobQueue.add({
    type: 'data-hub.run',
    pipelineId: pipeline.id,
    triggeredBy: 'manual',
});

3. Pipeline Runner

The runner orchestrates execution:

  1. Load pipeline definition
  2. Resolve step dependencies
  3. Execute steps in order
  4. Track progress and checkpoints
  5. Handle errors and retries
  6. Record final status

4. Step Execution

For each step:

  1. Get the appropriate executor
  2. Resolve adapter from registry
  3. Execute with configuration
  4. Collect output records
  5. Pass to next step(s)

5. Record Processing

Records flow through steps:

Extract → [record1, record2, ...] → Transform → Load

Each step can:

Data Flow

Pipeline Definition

{
    version: 1,
    steps: [
        { key: 'extract', type: 'extract', config: {...} },
        { key: 'transform', type: 'transform', config: {...} },
        { key: 'load', type: 'load', config: {...} },
    ],
    edges: [
        { from: 'extract', to: 'transform' },
        { from: 'transform', to: 'load' },
    ],
}

Execution Context

Each run has a context:

{
    runId: string;
    pipelineId: string;
    startedAt: Date;
    triggeredBy: string;
    parameters: Record<string, any>;
    variables: Record<string, any>;
    checkpoint: CheckpointData;
}

Record Format

Records are JSON objects:

interface Record {
    [key: string]: JsonValue;
    _meta?: {
        sourceStep: string;
        index: number;
        hash?: string;
    };
}

Extension Points

Custom Extractors

Implement the ExtractorAdapter interface:

interface ExtractorAdapter {
    readonly type: 'EXTRACTOR';
    code: string;
    name: string;
    extract(context: ExtractContext, config: JsonObject): AsyncGenerator<RecordEnvelope>;
}

Custom Operators

Create operator definitions:

interface SingleRecordOperator<TConfig = JsonObject> {
    readonly type: 'OPERATOR';
    readonly pure: boolean;
    applyOne(record: JsonObject, config: TConfig, helpers: AdapterOperatorHelpers): JsonObject | null;
}

Custom Loaders

Implement entity loading:

interface LoaderAdapter<TConfig = JsonObject> {
    readonly type: 'LOADER';
    load(context: LoadContext, config: TConfig, records: readonly JsonObject[]): Promise<LoadResult>;
}

See Extending the Plugin for details.

Configuration Sync

Code-first configurations are synced on startup:

  1. Plugin loads options from DataHubPlugin.init()
  2. Pipelines, secrets, connections are compared with database
  3. New items are created
  4. Existing items are updated if code-first takes precedence
  5. UI shows code-first items as read-only

Security

Permissions

Custom permissions protect operations:

@Allow(DataHubPipelinePermission.Read)
@Query()
dataHubPipelines() { ... }

@Allow(RunDataHubPipelinePermission)
@Mutation()
startDataHubPipelineRun() { ... }

Secret Encryption

Secrets are encrypted at rest using Vendure’s encryption utilities.

Webhook Signatures

Webhook requests can be verified with HMAC signatures.

Performance Considerations

Batch Processing

Records are processed in batches:

{
    throughput: {
        batchSize: 100,
        concurrency: 4,
    }
}

Checkpointing

Long runs save progress periodically:

Job Queue

Pipeline runs use the job queue:

Enterprise Architecture

DAG-Based Workflow Engine

Pipelines execute as Directed Acyclic Graphs (DAGs):

┌─────────┐     ┌───────────┐     ┌─────────┐
│ Trigger │────▶│  Extract  │────▶│Transform│
└─────────┘     └───────────┘     └────┬────┘
                                       │
                    ┌──────────────────┴──────────────────┐
                    ▼                                      ▼
              ┌───────────┐                         ┌───────────┐
              │   Route   │                         │  Enrich   │
              └─────┬─────┘                         └─────┬─────┘
           ┌───────┴───────┐                              │
           ▼               ▼                              ▼
      ┌────────┐     ┌────────┐                    ┌───────────┐
      │ Load A │     │ Load B │                    │   Sink    │
      └────────┘     └────────┘                    └───────────┘

Features:

Distributed Locking

For multi-instance deployments:

┌─────────────────────────────────────────────────────────────┐
│                    Lock Backend Selection                     │
├─────────────────┬─────────────────┬─────────────────────────┤
│      Redis      │   PostgreSQL    │        Memory           │
│   (Recommended) │   (Fallback)    │   (Single Instance)     │
├─────────────────┼─────────────────┼─────────────────────────┤
│ SET NX EX       │ Advisory Locks  │ Map<string, LockToken>  │
│ Atomic ops      │ pg_try_advisory │ setTimeout cleanup      │
│ TTL expiration  │ Transaction-    │ No cluster support      │
│ Cluster support │ scoped          │                         │
└─────────────────┴─────────────────┴─────────────────────────┘

Configuration:

DataHubPlugin.init({
    distributedLock: {
        backend: 'redis',
        redis: { host: 'localhost', port: 6379 },
        defaultTtlMs: 30000,
        waitTimeoutMs: 5000,
    },
})

Circuit Breaker

Protects external service calls:

         ┌─────────────────────────────────────────────┐
         │              Circuit States                  │
         ├─────────────────────────────────────────────┤
         │                                             │
         │   ┌────────┐  5 failures  ┌────────┐       │
         │   │ CLOSED │─────────────▶│  OPEN  │       │
         │   │(normal)│              │(blocked)│       │
         │   └────┬───┘              └────┬───┘       │
         │        ▲                       │           │
         │        │    ┌───────────┐      │ 30s      │
         │ 3 success   │HALF-OPEN  │◀─────┘           │
         │        └────│ (testing) │                  │
         │             └───────────┘                  │
         └─────────────────────────────────────────────┘

Applied to:

Queue Architecture

Message queue integration for event-driven pipelines:

┌─────────────────────────────────────────────────────────────┐
│                    Queue Adapters                            │
├──────────────┬──────────────┬─────────────┬────────────────┤
│ RabbitMQ     │ Amazon SQS   │ Redis       │ Internal       │
│ (AMQP)       │              │ Streams     │ (BullMQ)       │
├──────────────┼──────────────┼─────────────┼────────────────┤
│ Native AMQP  │ AWS SDK      │ XREAD/XADD  │ Redis-backed   │
│ Publisher    │ Long polling │ Consumer    │ Job queue      │
│ confirms     │ Visibility   │ groups      │ Delayed jobs   │
│ Dead-letter  │ timeout      │ Pending     │ Retries        │
│ queues       │ Batch recv   │ entries     │                │
└──────────────┴──────────────┴─────────────┴────────────────┘

Consumer patterns:

Multi-Trigger Architecture

Pipelines support multiple concurrent triggers:

┌─────────────────────────────────────────────────────────────┐
│                    Trigger Types                             │
├──────────┬──────────┬──────────┬──────────┬────────────────┤
│  Manual  │ Schedule │ Webhook  │  Event   │ Message Queue  │
├──────────┼──────────┼──────────┼──────────┼────────────────┤
│ UI/API   │ Cron     │ HTTP     │ Vendure  │ RabbitMQ/SQS/  │
│ trigger  │ express  │ endpoint │ events   │ Redis Streams  │
└──────────┴──────────┴──────────┴──────────┴────────────────┘
         │           │          │          │           │
         └───────────┴──────────┴──────────┴───────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │ Pipeline Runner │
                    └─────────────────┘

All triggers converge to the same execution engine, enabling:

Feed Generator Architecture

Product feed generation for marketing channels:

┌─────────────────────────────────────────────────────────────┐
│                  Feed Generation Pipeline                    │
│                                                              │
│  ┌────────────┐    ┌────────────┐    ┌──────────────────┐  │
│  │  Vendure   │───▶│ Feed       │───▶│ Format Generator │  │
│  │  Products  │    │ Filters    │    │                  │  │
│  └────────────┘    └────────────┘    │ ┌──────────────┐ │  │
│                                       │ │ Google XML  │ │  │
│                                       │ ├──────────────┤ │  │
│                                       │ │ Meta/FB     │ │  │
│                                       │ ├──────────────┤ │  │
│                                       │ │ RSS/Atom    │ │  │
│                                       │ ├──────────────┤ │  │
│                                       │ │ CSV/JSON    │ │  │
│                                       │ └──────────────┘ │  │
│                                       └──────────────────┘  │
└─────────────────────────────────────────────────────────────┘

Supports:

Sink Architecture

Output to external systems:

┌─────────────────────────────────────────────────────────────┐
│                      Sink Executor                           │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                  Circuit Breaker                      │   │
│  └──────────────────────────────────────────────────────┘   │
│                           │                                  │
│           ┌───────────────┼───────────────┐                 │
│           ▼               ▼               ▼                 │
│     ┌───────────┐   ┌───────────┐   ┌───────────┐          │
│     │  Search   │   │  Webhook  │   │   Queue   │          │
│     │  Engines  │   │           │   │  Producer │          │
│     ├───────────┤   ├───────────┤   ├───────────┤          │
│     │MeiliSearch│   │ HTTP POST │   │ RabbitMQ  │          │
│     │Elastic    │   │ Auth      │   │ SQS       │          │
│     │Algolia    │   │ Retry     │   │ Redis     │          │
│     │Typesense  │   │ Timeout   │   │           │          │
│     └───────────┘   └───────────┘   └───────────┘          │
└─────────────────────────────────────────────────────────────┘

All sinks feature:

Directory Structure

src/plugins/data-hub/
├── src/                          # Backend source
│   ├── api/                      # GraphQL resolvers & schema
│   ├── bootstrap/                # Plugin initialization
│   ├── constants/                # Configuration constants
│   ├── decorators/               # Custom decorators
│   ├── enrichers/                # Record enrichers
│   ├── entities/                 # TypeORM entities
│   ├── extractors/               # Data extractors
│   ├── feeds/                    # Feed generators
│   ├── gql/                      # Generated GraphQL types
│   ├── jobs/                     # Job queue handlers
│   ├── loaders/                  # Entity loaders
│   ├── mappers/                  # Field mappers
│   ├── operators/                # Transform operators
│   ├── parsers/                  # File parsers
│   ├── runtime/                  # Execution engine
│   ├── sdk/                      # Public SDK & DSL
│   ├── services/                 # Business logic
│   ├── templates/                # Import/export templates
│   ├── transforms/               # Transform execution
│   ├── types/                    # TypeScript types
│   ├── utils/                    # Utilities
│   ├── validation/               # Pipeline definition validators
│   └── vendure-schemas/          # Vendure entity schema definitions
├── connectors/                   # External system connectors (e.g. Pimcore)
├── dashboard/                    # React Admin UI
│   ├── components/               # UI components
│   ├── constants/                # UI constants
│   ├── gql/                      # GraphQL queries
│   ├── hooks/                    # React hooks
│   ├── routes/                   # Route components
│   ├── types/                    # UI type definitions
│   └── utils/                    # UI utilities
├── shared/                       # Shared code (backend + dashboard)
│   ├── constants/                # Shared constants
│   ├── types/                    # Shared types
│   └── utils/                    # Shared utilities
├── dev-server/                   # Development server & examples
├── docs/                         # Documentation
└── e2e/                          # End-to-end tests