vendure-data-hub-plugin

Custom Sinks

Create sinks to index data to custom search engines or external systems.

Interface

interface SinkAdapter<TConfig = JsonObject> extends BaseAdapter<TConfig> {
    readonly type: 'SINK';
    readonly sinkType: SinkType;

    index(context: SinkContext, config: TConfig, records: readonly JsonObject[]): Promise<SinkResult>;
    delete?(context: SinkContext, config: TConfig, ids: readonly string[]): Promise<SinkResult>;
    refresh?(context: SinkContext, config: TConfig): Promise<void>;
}

type SinkType = 'ELASTICSEARCH' | 'OPENSEARCH' | 'MEILISEARCH' | 'ALGOLIA' | 'TYPESENSE' | 'WEBHOOK' | 'CUSTOM';

interface SinkContext {
    readonly ctx: RequestContext;
    readonly pipelineId: ID;
    readonly stepKey: string;
    readonly pipelineContext: PipelineContext;
    readonly secrets: SecretResolver;
    readonly connections: ConnectionResolver;
    readonly logger: AdapterLogger;
    readonly dryRun: boolean;
}

interface SinkResult {
    readonly indexed: number;
    readonly deleted: number;
    readonly failed: number;
    readonly errors?: readonly SinkError[];
}

interface SinkError {
    readonly record: JsonObject;
    readonly message: string;
    readonly statusCode?: number;
}

Basic Example

import { SinkAdapter, SinkContext, SinkResult, StepConfigSchema, JsonObject } from '@oronts/vendure-data-hub-plugin';

interface MySinkConfig {
    endpoint: string;
    apiKeySecretCode: string;
    indexName: string;
    idField: string;
    batchSize?: number;
}

const mySearchSinkSchema: StepConfigSchema = {
    fields: [
        { key: 'endpoint', label: 'API Endpoint', type: 'string', required: true },
        { key: 'apiKeySecretCode', label: 'API Key Secret', type: 'string', required: true },
        { key: 'indexName', label: 'Index Name', type: 'string', required: true },
        { key: 'idField', label: 'ID Field', type: 'string', required: true, placeholder: 'id' },
        { key: 'batchSize', label: 'Batch Size', type: 'number', placeholder: '100' },
    ],
};

export const mySearchSink: SinkAdapter<MySinkConfig> = {
    type: 'SINK',
    code: 'my-search',
    name: 'My Search Engine',
    description: 'Index records to custom search engine',
    category: 'external',
    sinkType: 'CUSTOM',
    schema: mySearchSinkSchema,
    icon: 'search',

    async index(context: SinkContext, config: MySinkConfig, records: readonly JsonObject[]): Promise<SinkResult> {
        const { secrets, logger, dryRun } = context;

        // Resolve API key from secrets
        const apiKey = await secrets.get(config.apiKeySecretCode);

        if (dryRun) {
            logger.info(`[DRY RUN] Would index ${records.length} records`);
            return { indexed: records.length, deleted: 0, failed: 0 };
        }

        let indexed = 0;
        let failed = 0;
        const errors: SinkError[] = [];
        const batchSize = config.batchSize || 100;

        // Process in batches
        for (let i = 0; i < records.length; i += batchSize) {
            const batch = records.slice(i, i + batchSize);

            try {
                const response = await fetch(`${config.endpoint}/indexes/${config.indexName}/documents`, {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json',
                        'Authorization': `Bearer ${apiKey}`,
                    },
                    body: JSON.stringify(batch),
                });

                if (response.ok) {
                    indexed += batch.length;
                } else {
                    failed += batch.length;
                    const errorText = await response.text();
                    for (const record of batch) {
                        errors.push({ record, message: errorText, statusCode: response.status });
                    }
                }
            } catch (err) {
                failed += batch.length;
                const message = err instanceof Error ? err.message : 'Network error';
                for (const record of batch) {
                    errors.push({ record, message });
                }
            }
        }

        logger.info(`Indexed ${indexed}, failed ${failed}`);
        return { indexed, deleted: 0, failed, errors };
    },
};

Complete Example: OpenSearch Sink

import { SinkAdapter, SinkContext, SinkResult, StepConfigSchema, JsonObject } from '@oronts/vendure-data-hub-plugin';

interface OpenSearchSinkConfig {
    host: string;
    port?: number;
    ssl?: boolean;
    indexName: string;
    idField: string;
    apiKeySecretCode?: string;
    usernameSecretCode?: string;
    passwordSecretCode?: string;
    bulkSize?: number;
    refreshAfterBulk?: boolean;
}

const openSearchSchema: StepConfigSchema = {
    fields: [
        {
            key: 'host',
            label: 'OpenSearch Host',
            type: 'string',
            required: true,
            placeholder: 'opensearch.example.com',
        },
        { key: 'port', label: 'Port', type: 'number', placeholder: '9200' },
        { key: 'ssl', label: 'Use SSL', type: 'boolean' },
        { key: 'indexName', label: 'Index Name', type: 'string', required: true },
        { key: 'idField', label: 'ID Field', type: 'string', required: true, placeholder: 'id' },
        { key: 'apiKeySecretCode', label: 'API Key Secret', type: 'string' },
        { key: 'usernameSecretCode', label: 'Username Secret', type: 'string' },
        { key: 'passwordSecretCode', label: 'Password Secret', type: 'string' },
        { key: 'bulkSize', label: 'Bulk Size', type: 'number', placeholder: '100' },
        { key: 'refreshAfterBulk', label: 'Refresh after bulk', type: 'boolean' },
    ],
};

export const openSearchSink: SinkAdapter<OpenSearchSinkConfig> = {
    type: 'SINK',
    code: 'opensearch',
    name: 'OpenSearch',
    description: 'Index records to OpenSearch cluster',
    category: 'external',
    sinkType: 'CUSTOM',
    schema: openSearchSchema,
    icon: 'search',

    async index(context, config, records): Promise<SinkResult> {
        const { secrets, logger, dryRun } = context;

        // Build base URL
        const protocol = config.ssl !== false ? 'https' : 'http';
        const port = config.port || 9200;
        const baseUrl = `${protocol}://${config.host}:${port}`;

        // Resolve auth
        const apiKey = config.apiKeySecretCode
            ? await secrets.get(config.apiKeySecretCode)
            : undefined;
        const username = config.usernameSecretCode
            ? await secrets.get(config.usernameSecretCode)
            : undefined;
        const password = config.passwordSecretCode
            ? await secrets.get(config.passwordSecretCode)
            : undefined;

        // Build headers
        const headers: Record<string, string> = {
            'Content-Type': 'application/x-ndjson',
        };
        if (apiKey) {
            headers['Authorization'] = `ApiKey ${apiKey}`;
        } else if (username && password) {
            headers['Authorization'] = `Basic ${Buffer.from(`${username}:${password}`).toString('base64')}`;
        }

        if (dryRun) {
            logger.info(`[DRY RUN] Would index ${records.length} records to ${config.indexName}`);
            return { indexed: records.length, deleted: 0, failed: 0 };
        }

        let indexed = 0;
        let failed = 0;
        const errors: Array<{ record: JsonObject; message: string; statusCode?: number }> = [];
        const bulkSize = config.bulkSize || 100;

        for (let i = 0; i < records.length; i += bulkSize) {
            const batch = records.slice(i, i + bulkSize);

            // Build NDJSON bulk body
            const bulkBody: string[] = [];
            for (const record of batch) {
                const docId = String(record[config.idField] ?? '');
                bulkBody.push(JSON.stringify({ index: { _index: config.indexName, _id: docId } }));
                bulkBody.push(JSON.stringify(record));
            }

            try {
                const response = await fetch(`${baseUrl}/_bulk`, {
                    method: 'POST',
                    headers,
                    body: bulkBody.join('\n') + '\n',
                });

                if (response.ok) {
                    const result = await response.json();
                    if (result.errors) {
                        for (let j = 0; j < result.items.length; j++) {
                            const item = result.items[j];
                            if (item.index?.error) {
                                failed++;
                                errors.push({
                                    record: batch[j],
                                    message: item.index.error.reason,
                                    statusCode: item.index.status,
                                });
                            } else {
                                indexed++;
                            }
                        }
                    } else {
                        indexed += batch.length;
                    }
                } else {
                    failed += batch.length;
                    const errorText = await response.text();
                    for (const record of batch) {
                        errors.push({ record, message: errorText, statusCode: response.status });
                    }
                }
            } catch (err) {
                failed += batch.length;
                const message = err instanceof Error ? err.message : 'Network error';
                for (const record of batch) {
                    errors.push({ record, message });
                }
            }
        }

        // Optional refresh
        if (config.refreshAfterBulk && indexed > 0) {
            try {
                await fetch(`${baseUrl}/${config.indexName}/_refresh`, {
                    method: 'POST',
                    headers: { ...headers, 'Content-Type': 'application/json' },
                });
            } catch {
                logger.warn('Index refresh failed');
            }
        }

        logger.info(`Indexed ${indexed}, failed ${failed} to ${config.indexName}`);
        return { indexed, deleted: 0, failed, errors };
    },

    async delete(context, config, ids): Promise<SinkResult> {
        const { secrets, logger, dryRun } = context;

        if (dryRun) {
            logger.info(`[DRY RUN] Would delete ${ids.length} documents`);
            return { indexed: 0, deleted: ids.length, failed: 0 };
        }

        const protocol = config.ssl !== false ? 'https' : 'http';
        const port = config.port || 9200;
        const baseUrl = `${protocol}://${config.host}:${port}`;

        const apiKey = config.apiKeySecretCode
            ? await secrets.get(config.apiKeySecretCode)
            : undefined;

        const headers: Record<string, string> = { 'Content-Type': 'application/x-ndjson' };
        if (apiKey) headers['Authorization'] = `ApiKey ${apiKey}`;

        const bulkBody = ids.map(id =>
            JSON.stringify({ delete: { _index: config.indexName, _id: id } })
        ).join('\n') + '\n';

        const response = await fetch(`${baseUrl}/_bulk`, {
            method: 'POST',
            headers,
            body: bulkBody,
        });

        if (response.ok) {
            return { indexed: 0, deleted: ids.length, failed: 0 };
        }

        return { indexed: 0, deleted: 0, failed: ids.length };
    },

    async refresh(context, config): Promise<void> {
        const protocol = config.ssl !== false ? 'https' : 'http';
        const port = config.port || 9200;
        await fetch(`${protocol}://${config.host}:${port}/${config.indexName}/_refresh`, {
            method: 'POST',
        });
    },
};

Registration

Register custom sinks via the plugin’s adapters option:

import { DataHubPlugin } from '@oronts/vendure-data-hub-plugin';
import { openSearchSink } from './opensearch-sink';
import { mySearchSink } from './my-search-sink';

export const config: VendureConfig = {
    plugins: [
        DataHubPlugin.init({
            adapters: [
                openSearchSink,
                mySearchSink,
            ],
        }),
    ],
};

Or register programmatically:

import { VendurePlugin, OnModuleInit } from '@vendure/core';
import { DataHubPlugin, DataHubRegistryService } from '@oronts/vendure-data-hub-plugin';
import { openSearchSink } from './opensearch-sink';

@VendurePlugin({
    imports: [DataHubPlugin],
})
export class MySinksPlugin implements OnModuleInit {
    constructor(private registry: DataHubRegistryService) {}

    onModuleInit() {
        this.registry.registerRuntime(openSearchSink);
    }
}

Using in Pipelines

import { createPipeline } from '@oronts/vendure-data-hub-plugin';

const searchSyncPipeline = createPipeline()
    .name('product-search-sync')
    .extract('products', {
        adapterCode: 'vendureQuery',
        entity: 'PRODUCT',
        relations: 'translations,featuredAsset',
    })
    .transform('prepare', {
        operators: [
            { op: 'copy', args: { source: 'translations.0.name', target: 'name' } },
            { op: 'copy', args: { source: 'translations.0.slug', target: 'slug' } },
            { op: 'copy', args: { source: 'featuredAsset.preview', target: 'image' } },
            { op: 'pick', args: { fields: ['id', 'name', 'slug', 'image'] } },
        ],
    })
    .sink('index-products', {
        adapterCode: 'opensearch',  // Your custom sink code
        host: 'opensearch.example.com',
        indexName: 'products',
        idField: 'id',
        apiKeySecretCode: 'opensearch-key',
        bulkSize: 200,
    })
    .trigger('sync', {
        type: 'SCHEDULE',
        cron: '0 */4 * * *',
    })
    .build();

Context Utilities

SecretResolver

Resolve secrets stored in Data Hub:

async index(context, config, records) {
    const apiKey = await context.secrets.get(config.apiKeySecretCode);
    const username = await context.secrets.get(config.usernameSecretCode);
    // Use secrets...
}

ConnectionResolver

Resolve connection configurations:

async index(context, config, records) {
    const connection = await context.connections.get(config.connectionCode);
    const { host, port, credentials } = connection.config;
    // Use connection config...
}

Logger

Use the provided logger:

async index(context, config, records) {
    context.logger.debug(`Processing ${records.length} records`);
    context.logger.info(`Indexed successfully`);
    context.logger.warn(`Some records skipped`);
    context.logger.error(`Indexing failed`, error);
}

Dry Run

Check if pipeline is running in dry-run mode:

async index(context, config, records) {
    if (context.dryRun) {
        context.logger.info(`[DRY RUN] Would index ${records.length} records`);
        return { indexed: records.length, deleted: 0, failed: 0 };
    }
    // Actual indexing...
}

Schema Field Types

Available field types for configuration schema:

Type UI Component Example
string Text input Host, index name
number Number input Port, batch size
boolean Checkbox SSL, refresh
select Dropdown Protocol selection
json JSON editor Custom headers
textarea Multi-line text Query templates
password Password input Inline secrets
const schema: StepConfigSchema = {
    fields: [
        { key: 'host', type: 'string', required: true, label: 'Host' },
        { key: 'port', type: 'number', placeholder: '9200' },
        { key: 'ssl', type: 'boolean', label: 'Use SSL' },
        {
            key: 'protocol',
            type: 'select',
            options: [
                { value: 'http', label: 'HTTP' },
                { value: 'https', label: 'HTTPS' },
            ],
        },
        { key: 'headers', type: 'json', label: 'Custom Headers' },
    ],
};

Error Handling

Return errors in the result for proper tracking:

async index(context, config, records) {
    const errors: SinkError[] = [];

    for (const record of records) {
        try {
            await indexRecord(record);
        } catch (err) {
            errors.push({
                record,
                message: err.message,
                statusCode: err.statusCode,
            });
        }
    }

    return {
        indexed: records.length - errors.length,
        deleted: 0,
        failed: errors.length,
        errors,
    };
}

Using Hooks with Sinks

You can modify records programmatically before they reach the sink using BEFORE_SINK hooks. This is useful for adding computed search fields, normalizing text, or building facets without adding a separate transform step.

Interceptor Example

const pipeline = createPipeline()
    .name('Product Search Sync')
    .code('product-search-sync')
    .extract('products', { source: 'VENDURE_QUERY', entity: 'PRODUCT' })
    .sink('index-meili', {
        adapterCode: 'meilisearch',
        indexName: 'products',
        host: 'http://localhost:7700',
        apiKeySecretCode: 'meili-key',
    })
    .hooks({
        BEFORE_SINK: [{
            type: 'INTERCEPTOR',
            name: 'Build search attributes',
            code: `
                return records.map(r => ({
                    ...r,
                    searchText: [r.name, r.sku, r.description]
                        .filter(Boolean).join(' ').toLowerCase(),
                    facetTags: (r.tags || '').split(',')
                        .map(t => t.trim()).filter(Boolean),
                    _rankingScore: r.featured ? 1.5 : 1.0,
                }));
            `,
        }],
    })
    .build();

Script Example

For complex logic, register a script function in NestJS:

// In your plugin:
DataHubPlugin.init({
    scripts: {
        'enrichForSearch': async (records, context, args) => {
            return records.map(r => ({
                ...r,
                popularity: calculatePopularity(r),
                priceRange: categorizePriceRange(r.price),
                availability: r.stockLevel > 0 ? 'in_stock' : 'out_of_stock',
            }));
        },
    },
})

// In pipeline:
.hooks({
    BEFORE_SINK: [{
        type: 'SCRIPT',
        scriptName: 'enrichForSearch',
    }],
})

Operation-Aware Hooks

When using the __operation field for CRUD pipelines, BEFORE_SINK hooks receive records with their operation type. You can use this to apply different transformations for upserts vs deletes:

BEFORE_SINK: [{
    type: 'INTERCEPTOR',
    name: 'Operation-aware enrichment',
    code: `
        return records.map(r => {
            if (r.__operation === 'DELETE') {
                return { id: r.id, __operation: r.__operation };
            }
            return {
                ...r,
                searchText: r.name.toLowerCase(),
                updatedAt: new Date().toISOString(),
            };
        });
    `,
}],

Testing

import { describe, it, expect, vi } from 'vitest';
import { openSearchSink } from './opensearch-sink';

describe('OpenSearch Sink', () => {
    const mockContext = {
        ctx: {} as any,
        pipelineId: '123',
        stepKey: 'test-sink',
        secrets: {
            get: vi.fn().mockResolvedValue('test-api-key'),
        },
        connections: { get: vi.fn() },
        logger: {
            debug: vi.fn(),
            info: vi.fn(),
            warn: vi.fn(),
            error: vi.fn(),
        },
        dryRun: false,
    };

    it('should index records successfully', async () => {
        global.fetch = vi.fn().mockResolvedValue({
            ok: true,
            json: () => Promise.resolve({ errors: false }),
        });

        const records = [
            { id: '1', name: 'Product 1' },
            { id: '2', name: 'Product 2' },
        ];

        const result = await openSearchSink.index(mockContext, {
            host: 'localhost',
            indexName: 'products',
            idField: 'id',
        }, records);

        expect(result.indexed).toBe(2);
        expect(result.failed).toBe(0);
    });

    it('should handle dry run', async () => {
        const dryRunContext = { ...mockContext, dryRun: true };

        const result = await openSearchSink.index(dryRunContext, {
            host: 'localhost',
            indexName: 'products',
            idField: 'id',
        }, [{ id: '1' }]);

        expect(result.indexed).toBe(1);
        expect(global.fetch).not.toHaveBeenCalled();
    });
});