vendure-data-hub-plugin

Core Concepts

Understanding these concepts will help you build effective data pipelines.

Pipeline

A pipeline is a complete data flow definition. It contains:

{
    version: 1,
    steps: [...],
    edges: [...],
    context: { ... },
    capabilities: { ... },
    hooks: { ... },
}

Steps

Steps are the building blocks of a pipeline. Each step has:

Step Types

Type Purpose
TRIGGER Defines how the pipeline starts (MANUAL, SCHEDULE, WEBHOOK, EVENT, FILE, MESSAGE)
EXTRACT Pulls data from external sources
TRANSFORM Modifies, validates, or enriches data
VALIDATE Validates records against rules or schemas
ENRICH Adds data from external lookups
ROUTE Splits data flow based on conditions
GATE Pauses pipeline execution for human approval. Supports MANUAL, THRESHOLD, and TIMEOUT approval types
LOAD Creates or updates Vendure entities
EXPORT Sends data to external destinations
FEED Generates product feeds (Google, Meta, etc.)
SINK Indexes data to search engines

Edges

Edges connect steps to define execution order:

{
    from: 'extract-step',
    to: 'transform-step',
    branch: 'optional-branch-name'  // For routing
}

Data flows from the from step to the to step. Multiple edges can originate from one step (fan-out) and multiple edges can target one step (fan-in).

Records

A record is a single data item flowing through the pipeline. Records are typically JSON objects:

{
    "id": "123",
    "name": "Product Name",
    "price": 29.99,
    "category": "Electronics"
}

Records are processed individually through transform and load steps. Extractors typically yield multiple records from a single source.

Adapters

Adapters are reusable components that perform specific operations:

Adapter Type Purpose
Extractor Connects to data sources and yields records
Operator Transforms individual record fields
Loader Creates or updates Vendure entities
Exporter Writes data to external destinations
Feed Generator Creates product feed files
Search Sink Indexes data to search engines

Connections

Connections store reusable configuration for external systems:

Connections are referenced by code in extract and export steps:

.extract('fetch-api', {
    adapterCode: 'httpApi',
    connectionCode: 'my-api',  // References saved connection
    url: '/products',
})

Secrets

Secrets store sensitive values like API keys and passwords. They are:

.extract('api-call', {
    adapterCode: 'httpApi',
    url: 'https://api.example.com/products',
    bearerTokenSecretCode: 'api-key',  // References saved secret
})

Pipeline Runs

A run is a single execution of a pipeline. Each run tracks:

Checkpoints

Checkpoints allow a failed pipeline to resume from the last successful record instead of starting over. The plugin automatically saves checkpoint data when:

If a run fails, rerunning the pipeline will skip already-processed records.

Templates

Templates are pre-configured pipeline blueprints for the import and export wizards. They pre-fill wizard steps with common field mappings, source types, and format options.

Built-in import templates:

Built-in export templates:

Custom templates can be registered via plugin options or the TemplateRegistryService.

Scripts & Hooks

Scripts are named functions that can modify records at any stage of pipeline execution. Register scripts via plugin options and reference them in pipeline hook definitions:

Hook Type Purpose Can Modify Records
INTERCEPTOR Inline JavaScript code (sandboxed) Yes
SCRIPT Pre-registered TypeScript functions Yes
WEBHOOK HTTP notifications No
EMIT Vendure domain events No
TRIGGER_PIPELINE Start another pipeline No
LOG Write to pipeline logs No

Throughput

Control how records flow through steps:

{
    throughput: {
        batchSize: 100,         // Process 100 records at a time
        concurrency: 4,         // Process 4 batches in parallel
        rateLimitRps: 10,       // Max 10 requests per second
        pauseOnErrorRate: { threshold: 0.5, intervalSec: 60 },  // Pause if error rate exceeds 50%
        drainStrategy: 'BACKOFF', // 'BACKOFF' | 'SHED' | 'QUEUE'
    }
}

Load Strategies

When loading entities, choose a strategy:

Strategy Behavior
CREATE Only create new records; skip if exists
UPDATE Only update existing records; skip if not found
UPSERT Create new or update existing records
MERGE Merge source with existing data
SOFT_DELETE Mark as deleted / logical delete
HARD_DELETE Permanently remove from database

Channel Strategy

When loading to Vendure, control channel assignment:

Strategy Behavior
EXPLICIT Add to specified channel(s)
INHERIT Inherit channel from parent entity
MULTI Assign to multiple channels

Validation

Records can be validated at multiple points:

  1. In transform steps - Using validation operators
  2. In validate steps - Using schema validation
  3. In load steps - Before entity creation

Invalid records are quarantined for review and can be:

Error Handling

When a record fails:

  1. The error is logged with full context
  2. The record is quarantined (saved with error details)
  3. The pipeline continues processing remaining records
  4. After completion, you can review and retry failed records

Execution Flow

  1. Trigger - Pipeline execution starts
  2. Extract - Data is pulled from source(s)
  3. Transform - Each record is modified
  4. Validate - Records are checked (optional)
  5. Route - Records are directed to different branches (optional)
  6. Load/Export/Feed/Sink - Data is written to destination(s)
  7. Complete - Run ends with success or partial failure

Next Steps