vendure-data-hub-plugin

Extractors Reference

Complete reference for all data extractors.

Table of Contents


HTTP API Extractor

Code: httpApi

Fetch data from REST APIs with automatic pagination, authentication, and retry support.

Configuration

Field Type Required Description
url string Yes API endpoint URL (or path if using connection)
method select No HTTP method: GET, POST, PUT, PATCH (default: GET)
headers json No Request headers (JSON object)
body json No Request body for POST/PUT/PATCH (JSON)
connectionCode string No HTTP connection to use (optional)
dataPath string No JSON path to records array (e.g., “data.items”)
pagination.type select No Pagination type: NONE, OFFSET, CURSOR, PAGE, LINK_HEADER
pagination.limit number No Page size (records per page)
pagination.maxPages number No Maximum pages to fetch
pagination.cursorPath string No JSON path to cursor (for cursor pagination)
rateLimit.requestsPerSecond number No Maximum requests per second
retry.maxAttempts number No Maximum retry attempts
timeoutMs number No Request timeout in milliseconds

Example

.extract('fetch-products', {
    adapterCode: 'httpApi',
    url: 'https://api.example.com/products',
    method: 'GET',
    headers: {
        'Accept': 'application/json',
    },
    dataPath: 'data.products',
    pagination: {
        type: 'PAGE',
        limit: 100,
        maxPages: 50,
    },
})

Pagination Modes

Page-based:

{
    pagination: {
        type: 'PAGE',
        limit: 100,
    },
    dataPath: 'items',
}

Offset-based:

{
    pagination: {
        type: 'OFFSET',
        limit: 100,
    },
}

Cursor-based:

{
    pagination: {
        type: 'CURSOR',
        cursorPath: 'meta.nextCursor',
    },
}

File Extractor

Code: file

Parse files in multiple formats (CSV, JSON, XML, XLSX, NDJSON, TSV). PARQUET is supported as an export format but not for extraction parsing.

Configuration

Field Type Required Description
path string Yes File path or glob pattern (e.g., /data/*.csv)
format select No File format: CSV, JSON, XML, XLSX, NDJSON, TSV (auto-detected if not specified)
delimiter string No Field delimiter for CSV/TSV. Default: , for CSV, \t for TSV
hasHeader boolean No Whether first row is header (CSV/TSV). Default: true
encoding string No File encoding. Default: utf-8
dataPath string No JSON path to records array (for JSON/XML)
sheet string No Sheet name or index for XLSX

Example - CSV File

.extract('parse-csv', {
    adapterCode: 'file',
    path: '/uploads/products.csv',
    format: 'CSV',
    delimiter: ',',
    hasHeader: true,
})

Example - JSON File

.extract('parse-json', {
    adapterCode: 'file',
    path: '/data/products.json',
    format: 'JSON',
    dataPath: 'data.products',
})

Example - Excel File

.extract('parse-excel', {
    adapterCode: 'file',
    path: '/uploads/inventory.xlsx',
    format: 'XLSX',
    sheet: 'Products',
})

Example - Glob Pattern

.extract('parse-all-csv', {
    adapterCode: 'file',
    path: '/imports/*.csv',
    format: 'CSV',
})

GraphQL Extractor

Code: graphql

Query external GraphQL endpoints with cursor/offset/Relay pagination support.

Configuration

Field Type Required Description
endpoint string Yes GraphQL endpoint URL
query string Yes GraphQL query
connectionCode string No HTTP connection to use (optional)
headers json No Request headers (JSON object)
variables json No Query variables (JSON object)
itemsField string No Field name containing items in response
edgesField string No Field name for Relay-style edges
nodeField string No Field name for node within each edge
cursorVar string No Variable name for cursor pagination
nextCursorField string No Field name for next cursor in response
pageInfoField string No Field name for pageInfo object
hasNextPageField string No Field name for hasNextPage boolean
endCursorField string No Field name for endCursor in pageInfo
timeoutMs number No Request timeout in milliseconds

Example - Basic Query

.extract('query-graphql', {
    adapterCode: 'graphql',
    endpoint: 'https://api.example.com/graphql',
    query: `
        query GetProducts($limit: Int) {
            products(limit: $limit) {
                id
                name
                price
            }
        }
    `,
    variables: { limit: 100 },
    itemsField: 'products',
})

Example - Offset Pagination

.extract('query-with-offset', {
    adapterCode: 'graphql',
    endpoint: 'https://api.example.com/graphql',
    query: `
        query GetProducts($skip: Int, $take: Int) {
            products(skip: $skip, take: $take) {
                items { id name price }
                totalItems
            }
        }
    `,
    itemsField: 'products.items',
})

Example - Relay-style Pagination

.extract('query-with-cursor', {
    adapterCode: 'graphql',
    endpoint: 'https://api.example.com/graphql',
    query: `
        query GetProducts($cursor: String) {
            products(first: 100, after: $cursor) {
                edges {
                    node {
                        id
                        name
                    }
                }
                pageInfo {
                    hasNextPage
                    endCursor
                }
            }
        }
    `,
    edgesField: 'products.edges',
    pageInfoField: 'products.pageInfo',
    hasNextPageField: 'hasNextPage',
    endCursorField: 'endCursor',
    cursorVar: 'cursor',
})

Vendure Query Extractor

Code: vendureQuery

Extract data directly from Vendure entities with automatic pagination and translation support.

Configuration

Field Type Required Description
entity string Yes Entity type to query
relations string No Comma-separated relations to include
languageCode string No Language code for translations (e.g., en, de)
flattenTranslations boolean No Merge translation fields to root level
batchSize number No Number of records per batch
sortBy string No Field to sort by
sortOrder string No Sort order: ASC or DESC

Supported Entity Types

Example - Products with Relations

.extract('query-products', {
    adapterCode: 'vendureQuery',
    entity: 'PRODUCT',
    relations: 'variants,featuredAsset,translations',
    languageCode: 'en',
    flattenTranslations: true,
    batchSize: 500,
    sortBy: 'updatedAt',
    sortOrder: 'DESC',
})

Example - Customers

.extract('query-customers', {
    adapterCode: 'vendureQuery',
    entity: 'CUSTOMER',
    relations: 'addresses',
    batchSize: 1000,
})

Example - Orders

.extract('query-orders', {
    adapterCode: 'vendureQuery',
    entity: 'ORDER',
    relations: 'lines,customer',
    sortBy: 'orderPlacedAt',
    sortOrder: 'DESC',
})

S3 Extractor

Code: s3

Fetch and parse files from S3-compatible storage (AWS S3, MinIO, DigitalOcean Spaces, etc.).

Configuration

Field Type Required Description
connectionCode string Yes S3 connection code
bucket string Yes S3 bucket name
key string No Object key (file path)
prefix string No Key prefix to list objects
format select No File format: CSV, JSON, XML, XLSX, NDJSON, TSV
dataPath string No JSON path to records (for JSON files)

Example

.extract('s3-products', {
    adapterCode: 's3',
    connectionCode: 'aws-s3',
    bucket: 'product-feeds',
    key: 'imports/products.csv',
    format: 'CSV',
})

FTP/SFTP Extractor

Code: ftp

Fetch and parse files from FTP or SFTP servers.

Configuration

Field Type Required Description
connectionCode string No FTP/SFTP connection code
protocol select Yes Protocol: ftp or sftp
host string Yes FTP/SFTP server hostname or IP
port number No Server port (FTP: 21, SFTP: 22)
username string No FTP/SFTP username
passwordSecretCode string No Secret code for password
remotePath string Yes Remote directory path
filePattern string No File name pattern (e.g., *.csv, products-*.json)
format select No File format: CSV, JSON, XML, XLSX (auto-detected if not specified)
deleteAfterProcess boolean No Delete files after processing
modifiedAfter string No Only process files modified after this date
maxFiles number No Maximum number of files to process

Example

.extract('sftp-inventory', {
    adapterCode: 'ftp',
    protocol: 'sftp',
    host: 'ftp.supplier.com',
    username: 'ftpuser',
    passwordSecretCode: 'supplier-ftp-pass',
    remotePath: '/exports',
    filePattern: 'inventory-*.csv',
    format: 'CSV',
})

Example - Using Connection

.extract('sftp-products', {
    adapterCode: 'ftp',
    connectionCode: 'supplier-sftp',
    remotePath: '/data/products',
    format: 'JSON',
})

Database Extractor

Code: database

Query SQL databases (PostgreSQL, MySQL, SQLite, MSSQL, Oracle) with pagination support.

Configuration

Field Type Required Description
connectionCode string Yes Database connection code
query string Yes SQL query to execute
pagination.type select No Pagination type: NONE, OFFSET, KEYSET
pagination.limit number No Page size
incrementalColumn string No Column for incremental extraction

Example

.extract('query-products', {
    adapterCode: 'database',
    connectionCode: 'supplier-db',
    query: 'SELECT * FROM products WHERE updated_at > :lastRun',
    pagination: {
        type: 'OFFSET',
        limit: 1000,
    },
    incrementalColumn: 'updated_at',
})

Webhook Extractor

Code: webhook

Receive data from webhook payloads. Used when pipelines are triggered via webhooks.

Configuration

Field Type Required Description
dataPath string No JSON path to records in webhook payload

Example

.extract('webhook-data', {
    adapterCode: 'webhook',
    dataPath: 'data.items',
})

CDC (Change Data Capture) Extractor

Code: cdc

Poll a database table for changes using a timestamp or version column. Tracks INSERT, UPDATE, and DELETE operations with checkpointing for incremental extraction.

Configuration

Field Type Required Description
connectionCode string Yes Database connection code
table string Yes Table name to monitor for changes
trackingColumn string Yes Timestamp or version column used to detect changes (e.g., updated_at, version)
trackingType select No Column type: TIMESTAMP or VERSION (default: TIMESTAMP)
primaryKey string Yes Primary key column name
databaseType select Yes Database type: POSTGRESQL or MYSQL
columns array No Specific columns to select (omit for all columns)
batchSize number No Number of records per batch (default: 1000)
pollIntervalMs number No Polling interval in milliseconds (default: 5000)
includeDeletes boolean No Whether to track soft-deletes
deleteColumn string No Column that indicates deletion timestamp (required when includeDeletes is true)

Example - Track Product Changes

.extract('product-changes', {
    adapterCode: 'cdc',
    connectionCode: 'main-db',
    table: 'products',
    trackingColumn: 'updated_at',
    trackingType: 'TIMESTAMP',
    databaseType: 'POSTGRESQL',
    primaryKey: 'id',
    columns: ['id', 'name', 'price', 'updated_at'],
    batchSize: 500,
})

Example - Version-Based Tracking

.extract('inventory-changes', {
    adapterCode: 'cdc',
    connectionCode: 'warehouse-db',
    table: 'inventory',
    trackingColumn: 'version',
    trackingType: 'VERSION',
    databaseType: 'MYSQL',
    primaryKey: 'id',
    batchSize: 1000,
})

Example - With Soft-Delete Tracking

.extract('product-changes-with-deletes', {
    adapterCode: 'cdc',
    connectionCode: 'main-db',
    table: 'products',
    trackingColumn: 'updated_at',
    trackingType: 'TIMESTAMP',
    databaseType: 'POSTGRESQL',
    primaryKey: 'id',
    includeDeletes: true,
    deleteColumn: 'deleted_at',
})

How It Works

  1. On first run, the extractor reads all rows from the table
  2. It stores a checkpoint with the highest value of the tracking column
  3. On subsequent runs, it queries only rows where the tracking column exceeds the checkpoint
  4. DELETE tracking requires includeDeletes: true and a deleteColumn that indicates when a row was soft-deleted

In-Memory Extractor

Code: inMemory

Reads records directly from inline data provided in the step configuration. Useful for testing, seed data, and webhook-triggered pipelines where data is passed at runtime.

Note: The inMemory extractor reads records from the data field (not records). The data field accepts an array of objects or a single object (which will be wrapped in an array).

Configuration

Field Type Required Description
data array/object Yes Inline records to extract. An array of objects, or a single object.

Example

.extract('inline-data', {
    adapterCode: 'inMemory',
    data: [
        { sku: 'ABC-001', name: 'Widget A', price: 1999 },
        { sku: 'ABC-002', name: 'Widget B', price: 2999 },
    ],
})

Quick Reference

Code Source Type Use Case
httpApi REST API External APIs with pagination, authentication, and retry support
graphql GraphQL API External GraphQL services with cursor/offset/Relay pagination
vendureQuery Vendure Internal data extraction for feeds, exports, and transformations
file Files Parse CSV, JSON, XML, XLSX, NDJSON, TSV files
s3 S3 Storage Fetch and parse files from S3-compatible storage
ftp FTP/SFTP Fetch files from FTP or SFTP servers
database SQL Database Query PostgreSQL, MySQL, SQLite, MSSQL, Oracle databases
cdc CDC Poll database tables for changes using timestamp or version tracking
webhook Webhook Receive data from webhook payloads
inMemory In-Memory Inline data for testing, seed data, and webhook payloads

Authentication Options

HTTP-based extractors (httpApi, graphql) support connection-based authentication via connectionCode which provides: