Complete reference for all data extractors.
Code: httpApi
Fetch data from REST APIs with automatic pagination, authentication, and retry support.
| Field | Type | Required | Description |
|---|---|---|---|
url |
string | Yes | API endpoint URL (or path if using connection) |
method |
select | No | HTTP method: GET, POST, PUT, PATCH (default: GET) |
headers |
json | No | Request headers (JSON object) |
body |
json | No | Request body for POST/PUT/PATCH (JSON) |
connectionCode |
string | No | HTTP connection to use (optional) |
dataPath |
string | No | JSON path to records array (e.g., “data.items”) |
pagination.type |
select | No | Pagination type: NONE, OFFSET, CURSOR, PAGE, LINK_HEADER |
pagination.limit |
number | No | Page size (records per page) |
pagination.maxPages |
number | No | Maximum pages to fetch |
pagination.cursorPath |
string | No | JSON path to cursor (for cursor pagination) |
rateLimit.requestsPerSecond |
number | No | Maximum requests per second |
retry.maxAttempts |
number | No | Maximum retry attempts |
timeoutMs |
number | No | Request timeout in milliseconds |
.extract('fetch-products', {
adapterCode: 'httpApi',
url: 'https://api.example.com/products',
method: 'GET',
headers: {
'Accept': 'application/json',
},
dataPath: 'data.products',
pagination: {
type: 'PAGE',
limit: 100,
maxPages: 50,
},
})
Page-based:
{
pagination: {
type: 'PAGE',
limit: 100,
},
dataPath: 'items',
}
Offset-based:
{
pagination: {
type: 'OFFSET',
limit: 100,
},
}
Cursor-based:
{
pagination: {
type: 'CURSOR',
cursorPath: 'meta.nextCursor',
},
}
Code: file
Parse files in multiple formats (CSV, JSON, XML, XLSX, NDJSON, TSV). PARQUET is supported as an export format but not for extraction parsing.
| Field | Type | Required | Description |
|---|---|---|---|
path |
string | Yes | File path or glob pattern (e.g., /data/*.csv) |
format |
select | No | File format: CSV, JSON, XML, XLSX, NDJSON, TSV (auto-detected if not specified) |
delimiter |
string | No | Field delimiter for CSV/TSV. Default: , for CSV, \t for TSV |
hasHeader |
boolean | No | Whether first row is header (CSV/TSV). Default: true |
encoding |
string | No | File encoding. Default: utf-8 |
dataPath |
string | No | JSON path to records array (for JSON/XML) |
sheet |
string | No | Sheet name or index for XLSX |
.extract('parse-csv', {
adapterCode: 'file',
path: '/uploads/products.csv',
format: 'CSV',
delimiter: ',',
hasHeader: true,
})
.extract('parse-json', {
adapterCode: 'file',
path: '/data/products.json',
format: 'JSON',
dataPath: 'data.products',
})
.extract('parse-excel', {
adapterCode: 'file',
path: '/uploads/inventory.xlsx',
format: 'XLSX',
sheet: 'Products',
})
.extract('parse-all-csv', {
adapterCode: 'file',
path: '/imports/*.csv',
format: 'CSV',
})
Code: graphql
Query external GraphQL endpoints with cursor/offset/Relay pagination support.
| Field | Type | Required | Description |
|---|---|---|---|
endpoint |
string | Yes | GraphQL endpoint URL |
query |
string | Yes | GraphQL query |
connectionCode |
string | No | HTTP connection to use (optional) |
headers |
json | No | Request headers (JSON object) |
variables |
json | No | Query variables (JSON object) |
itemsField |
string | No | Field name containing items in response |
edgesField |
string | No | Field name for Relay-style edges |
nodeField |
string | No | Field name for node within each edge |
cursorVar |
string | No | Variable name for cursor pagination |
nextCursorField |
string | No | Field name for next cursor in response |
pageInfoField |
string | No | Field name for pageInfo object |
hasNextPageField |
string | No | Field name for hasNextPage boolean |
endCursorField |
string | No | Field name for endCursor in pageInfo |
timeoutMs |
number | No | Request timeout in milliseconds |
.extract('query-graphql', {
adapterCode: 'graphql',
endpoint: 'https://api.example.com/graphql',
query: `
query GetProducts($limit: Int) {
products(limit: $limit) {
id
name
price
}
}
`,
variables: { limit: 100 },
itemsField: 'products',
})
.extract('query-with-offset', {
adapterCode: 'graphql',
endpoint: 'https://api.example.com/graphql',
query: `
query GetProducts($skip: Int, $take: Int) {
products(skip: $skip, take: $take) {
items { id name price }
totalItems
}
}
`,
itemsField: 'products.items',
})
.extract('query-with-cursor', {
adapterCode: 'graphql',
endpoint: 'https://api.example.com/graphql',
query: `
query GetProducts($cursor: String) {
products(first: 100, after: $cursor) {
edges {
node {
id
name
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
`,
edgesField: 'products.edges',
pageInfoField: 'products.pageInfo',
hasNextPageField: 'hasNextPage',
endCursorField: 'endCursor',
cursorVar: 'cursor',
})
Code: vendureQuery
Extract data directly from Vendure entities with automatic pagination and translation support.
| Field | Type | Required | Description |
|---|---|---|---|
entity |
string | Yes | Entity type to query |
relations |
string | No | Comma-separated relations to include |
languageCode |
string | No | Language code for translations (e.g., en, de) |
flattenTranslations |
boolean | No | Merge translation fields to root level |
batchSize |
number | No | Number of records per batch |
sortBy |
string | No | Field to sort by |
sortOrder |
string | No | Sort order: ASC or DESC |
PRODUCT - ProductsPRODUCT_VARIANT - Product VariantsCUSTOMER - CustomersORDER - OrdersCOLLECTION - CollectionsFACET - FacetsFACET_VALUE - Facet ValuesPROMOTION - PromotionsASSET - Assets.extract('query-products', {
adapterCode: 'vendureQuery',
entity: 'PRODUCT',
relations: 'variants,featuredAsset,translations',
languageCode: 'en',
flattenTranslations: true,
batchSize: 500,
sortBy: 'updatedAt',
sortOrder: 'DESC',
})
.extract('query-customers', {
adapterCode: 'vendureQuery',
entity: 'CUSTOMER',
relations: 'addresses',
batchSize: 1000,
})
.extract('query-orders', {
adapterCode: 'vendureQuery',
entity: 'ORDER',
relations: 'lines,customer',
sortBy: 'orderPlacedAt',
sortOrder: 'DESC',
})
Code: s3
Fetch and parse files from S3-compatible storage (AWS S3, MinIO, DigitalOcean Spaces, etc.).
| Field | Type | Required | Description |
|---|---|---|---|
connectionCode |
string | Yes | S3 connection code |
bucket |
string | Yes | S3 bucket name |
key |
string | No | Object key (file path) |
prefix |
string | No | Key prefix to list objects |
format |
select | No | File format: CSV, JSON, XML, XLSX, NDJSON, TSV |
dataPath |
string | No | JSON path to records (for JSON files) |
.extract('s3-products', {
adapterCode: 's3',
connectionCode: 'aws-s3',
bucket: 'product-feeds',
key: 'imports/products.csv',
format: 'CSV',
})
Code: ftp
Fetch and parse files from FTP or SFTP servers.
| Field | Type | Required | Description |
|---|---|---|---|
connectionCode |
string | No | FTP/SFTP connection code |
protocol |
select | Yes | Protocol: ftp or sftp |
host |
string | Yes | FTP/SFTP server hostname or IP |
port |
number | No | Server port (FTP: 21, SFTP: 22) |
username |
string | No | FTP/SFTP username |
passwordSecretCode |
string | No | Secret code for password |
remotePath |
string | Yes | Remote directory path |
filePattern |
string | No | File name pattern (e.g., *.csv, products-*.json) |
format |
select | No | File format: CSV, JSON, XML, XLSX (auto-detected if not specified) |
deleteAfterProcess |
boolean | No | Delete files after processing |
modifiedAfter |
string | No | Only process files modified after this date |
maxFiles |
number | No | Maximum number of files to process |
.extract('sftp-inventory', {
adapterCode: 'ftp',
protocol: 'sftp',
host: 'ftp.supplier.com',
username: 'ftpuser',
passwordSecretCode: 'supplier-ftp-pass',
remotePath: '/exports',
filePattern: 'inventory-*.csv',
format: 'CSV',
})
.extract('sftp-products', {
adapterCode: 'ftp',
connectionCode: 'supplier-sftp',
remotePath: '/data/products',
format: 'JSON',
})
Code: database
Query SQL databases (PostgreSQL, MySQL, SQLite, MSSQL, Oracle) with pagination support.
| Field | Type | Required | Description |
|---|---|---|---|
connectionCode |
string | Yes | Database connection code |
query |
string | Yes | SQL query to execute |
pagination.type |
select | No | Pagination type: NONE, OFFSET, KEYSET |
pagination.limit |
number | No | Page size |
incrementalColumn |
string | No | Column for incremental extraction |
.extract('query-products', {
adapterCode: 'database',
connectionCode: 'supplier-db',
query: 'SELECT * FROM products WHERE updated_at > :lastRun',
pagination: {
type: 'OFFSET',
limit: 1000,
},
incrementalColumn: 'updated_at',
})
Code: webhook
Receive data from webhook payloads. Used when pipelines are triggered via webhooks.
| Field | Type | Required | Description |
|---|---|---|---|
dataPath |
string | No | JSON path to records in webhook payload |
.extract('webhook-data', {
adapterCode: 'webhook',
dataPath: 'data.items',
})
Code: cdc
Poll a database table for changes using a timestamp or version column. Tracks INSERT, UPDATE, and DELETE operations with checkpointing for incremental extraction.
| Field | Type | Required | Description |
|---|---|---|---|
connectionCode |
string | Yes | Database connection code |
table |
string | Yes | Table name to monitor for changes |
trackingColumn |
string | Yes | Timestamp or version column used to detect changes (e.g., updated_at, version) |
trackingType |
select | No | Column type: TIMESTAMP or VERSION (default: TIMESTAMP) |
primaryKey |
string | Yes | Primary key column name |
databaseType |
select | Yes | Database type: POSTGRESQL or MYSQL |
columns |
array | No | Specific columns to select (omit for all columns) |
batchSize |
number | No | Number of records per batch (default: 1000) |
pollIntervalMs |
number | No | Polling interval in milliseconds (default: 5000) |
includeDeletes |
boolean | No | Whether to track soft-deletes |
deleteColumn |
string | No | Column that indicates deletion timestamp (required when includeDeletes is true) |
.extract('product-changes', {
adapterCode: 'cdc',
connectionCode: 'main-db',
table: 'products',
trackingColumn: 'updated_at',
trackingType: 'TIMESTAMP',
databaseType: 'POSTGRESQL',
primaryKey: 'id',
columns: ['id', 'name', 'price', 'updated_at'],
batchSize: 500,
})
.extract('inventory-changes', {
adapterCode: 'cdc',
connectionCode: 'warehouse-db',
table: 'inventory',
trackingColumn: 'version',
trackingType: 'VERSION',
databaseType: 'MYSQL',
primaryKey: 'id',
batchSize: 1000,
})
.extract('product-changes-with-deletes', {
adapterCode: 'cdc',
connectionCode: 'main-db',
table: 'products',
trackingColumn: 'updated_at',
trackingType: 'TIMESTAMP',
databaseType: 'POSTGRESQL',
primaryKey: 'id',
includeDeletes: true,
deleteColumn: 'deleted_at',
})
includeDeletes: true and a deleteColumn that indicates when a row was soft-deletedCode: inMemory
Reads records directly from inline data provided in the step configuration. Useful for testing, seed data, and webhook-triggered pipelines where data is passed at runtime.
Note: The inMemory extractor reads records from the data field (not records). The data field accepts an array of objects or a single object (which will be wrapped in an array).
| Field | Type | Required | Description |
|---|---|---|---|
data |
array/object | Yes | Inline records to extract. An array of objects, or a single object. |
.extract('inline-data', {
adapterCode: 'inMemory',
data: [
{ sku: 'ABC-001', name: 'Widget A', price: 1999 },
{ sku: 'ABC-002', name: 'Widget B', price: 2999 },
],
})
| Code | Source Type | Use Case |
|---|---|---|
httpApi |
REST API | External APIs with pagination, authentication, and retry support |
graphql |
GraphQL API | External GraphQL services with cursor/offset/Relay pagination |
vendureQuery |
Vendure | Internal data extraction for feeds, exports, and transformations |
file |
Files | Parse CSV, JSON, XML, XLSX, NDJSON, TSV files |
s3 |
S3 Storage | Fetch and parse files from S3-compatible storage |
ftp |
FTP/SFTP | Fetch files from FTP or SFTP servers |
database |
SQL Database | Query PostgreSQL, MySQL, SQLite, MSSQL, Oracle databases |
cdc |
CDC | Poll database tables for changes using timestamp or version tracking |
webhook |
Webhook | Receive data from webhook payloads |
inMemory |
In-Memory | Inline data for testing, seed data, and webhook payloads |
HTTP-based extractors (httpApi, graphql) support connection-based authentication via connectionCode which provides: