Framework-agnostic pipeline orchestration library with typed jobs and pluggable storage.
Neuroline
Framework-agnostic pipeline orchestration library with support for:
- Sequential and parallel job execution
- Persistent state storage (MongoDB, in-memory, or custom)
- Type-safe jobs with synapses for data transformation
- Idempotency (re-running with same input data returns existing pipeline)
Installation
yarn add neuroline
For MongoDB storage:
yarn add neuroline mongoose
Quick Start
1. Creating a Job
A Job is a pure function with a defined interface:
import type { JobDefinition, JobContext } from 'neuroline';
interface MyJobInput {
url: string;
}
interface MyJobOutput {
data: string;
fetchedAt: Date;
}
interface MyJobOptions {
timeout?: number;
}
export const fetchDataJob: JobDefinition<MyJobInput, MyJobOutput, MyJobOptions> = {
name: 'fetch-data',
async execute(input, options, ctx) {
ctx.logger.info('Fetching data', { url: input.url });
const timeout = options?.timeout ?? 5000;
const response = await fetch(input.url, { signal: AbortSignal.timeout(timeout) });
const data = await response.text();
ctx.logger.info('Data fetched', { length: data.length });
return {
data,
fetchedAt: new Date(),
};
},
};
2. Pipeline Configuration
import type { PipelineConfig, SynapseContext } from 'neuroline';
import { fetchDataJob, processDataJob, saveResultJob } from './jobs';
interface PipelineInput {
url: string;
userId: string;
}
export const myPipelineConfig: PipelineConfig<PipelineInput> = {
name: 'my-neuroline',
stages: [
// Stage 1: single job
fetchDataJob,
// Stage 2: two jobs execute in parallel
[
{
job: processDataJob,
// synapses transform data for the job
synapses: (ctx: SynapseContext<PipelineInput>) => ({
rawData: ctx.getArtifact<{ data: string }>('fetch-data')?.data ?? '',
userId: ctx.pipelineInput.userId,
}),
},
{
job: notifyJob,
synapses: (ctx) => ({
userId: ctx.pipelineInput.userId,
message: 'Processing started',
}),
},
],
// Stage 3: final job
{
job: saveResultJob,
synapses: (ctx) => ({
processedData: ctx.getArtifact('process-data'),
userId: ctx.pipelineInput.userId,
}),
},
],
// Optional: custom hash function
computeInputHash: (input) => `${input.userId}-${input.url}`,
};
3. Creating and Using PipelineManager
With In-Memory Storage (for testing)
import { PipelineManager, InMemoryPipelineStorage } from 'neuroline';
import { myPipelineConfig } from './pipelines';
const storage = new InMemoryPipelineStorage();
const manager = new PipelineManager({
storage,
logger: console, // or your logger
});
// Register pipeline
manager.registerPipeline(myPipelineConfig);
// Start pipeline
const { pipelineId, isNew } = await manager.startPipeline('my-neuroline', {
data: { url: 'https://api.example.com/data', userId: 'user-123' },
jobOptions: {
'fetch-data': { timeout: 10000 },
},
});
// Poll status
const status = await manager.getStatus(pipelineId);
console.log(status);
// { status: 'processing', currentJobIndex: 1, totalJobs: 4, currentJobName: 'process-data' }
// Get result (artifact) of a specific job (default = last job)
const result = await manager.getResult(pipelineId);
console.log(result);
// { pipelineId: '...', jobName: 'save-result', status: 'done', artifact: {...} }
// Get result (artifact) by job name
const computeResult = await manager.getResult(pipelineId, 'process-data');
console.log(computeResult);
// { pipelineId: '...', jobName: 'process-data', status: 'done', artifact: {...} }
With MongoDB Storage
import mongoose from 'mongoose';
import { PipelineManager } from 'neuroline';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';
// Create model
const PipelineModel = mongoose.model('Pipeline', PipelineSchema);
// Create manager
const storage = new MongoPipelineStorage(PipelineModel);
const manager = new PipelineManager({ storage, logger: console });
manager.registerPipeline(myPipelineConfig);
API Reference
PipelineManager
constructor(options: PipelineManagerOptions)
interface PipelineManagerOptions {
storage: PipelineStorage; // Required
logger?: JobLogger; // Optional
}
registerPipeline(config: PipelineConfig): void
Registers a pipeline configuration. Must be called before startPipeline.
startPipeline<TData>(pipelineType: string, input: PipelineInput<TData>): Promise<StartPipelineResponse>
Starts a pipeline or returns existing one (if found by input data hash).
interface PipelineInput<TData> {
data: TData; // Input data
jobOptions?: Record<string, unknown>; // Options for jobs (key = job name)
}
interface StartPipelineResponse {
pipelineId: string; // ID for polling
isNew: boolean; // true if created, false if already existed
}
getStatus(pipelineId: string): Promise<PipelineStatusResponse>
Returns current pipeline status.
interface PipelineStatusResponse {
status: 'processing' | 'done' | 'error';
currentJobIndex: number;
totalJobs: number;
currentJobName?: string;
error?: { message: string; jobName?: string };
}
getResult(pipelineId: string, jobName?: string): Promise<PipelineResultResponse>
Returns result (artifact) for a single job. If jobName is not provided, returns the last job result.
interface PipelineResultResponse {
pipelineId: string;
jobName: string;
status: 'pending' | 'processing' | 'done' | 'error';
artifact: unknown | null | undefined; // undefined = not yet executed, null = executed but no result
}
getPipeline(pipelineId: string): Promise<PipelineState | null>
Returns full pipeline state (for debugging).
JobDefinition
interface JobDefinition<TInput, TOutput, TOptions> {
name: string;
execute: (
input: TInput,
options: TOptions | undefined,
context: JobContext
) => Promise<TOutput | null>;
}
JobContext
interface JobContext {
pipelineId: string;
jobIndex: number;
logger: {
info: (msg: string, data?: Record<string, unknown>) => void;
error: (msg: string, data?: Record<string, unknown>) => void;
warn: (msg: string, data?: Record<string, unknown>) => void;
};
}
PipelineConfig
interface PipelineConfig<TInput> {
name: string;
stages: PipelineStage[];
computeInputHash?: (input: TInput) => string;
}
// Stage: single job or array of jobs (parallel)
type PipelineStage = StageItem | StageItem[];
// StageItem: JobDefinition or JobInPipeline
type StageItem = JobDefinition | JobInPipeline;
interface JobInPipeline<TInput, TOutput, TOptions> {
job: JobDefinition<TInput, TOutput, TOptions>;
synapses?: (ctx: SynapseContext) => TInput;
retries?: number; // Количество ретраев при ошибке (по умолчанию: 0)
retryDelay?: number; // Задержка между ретраями в мс (по умолчанию: 1000)
}
SynapseContext
Context for synapses function:
interface SynapseContext<TPipelineInput> {
pipelineInput: TPipelineInput;
getArtifact: <T>(jobName: string) => T | undefined;
}
Storage
InMemoryPipelineStorage
Built-in in-memory storage for testing and prototyping.
import { InMemoryPipelineStorage } from 'neuroline';
const storage = new InMemoryPipelineStorage();
// For testing
storage.clear(); // Clear all data
storage.getAll(); // Get all pipelines
MongoPipelineStorage
MongoDB storage (requires mongoose as peer dependency).
import mongoose from 'mongoose';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';
const PipelineModel = mongoose.model('Pipeline', PipelineSchema);
const storage = new MongoPipelineStorage(PipelineModel);
Custom Storage
Implement the PipelineStorage interface:
import type { PipelineStorage, PipelineState, JobStatus, PipelineStatus } from 'neuroline';
class MyCustomStorage implements PipelineStorage {
async findById(pipelineId: string): Promise<PipelineState | null> { ... }
async findAll(params?: PaginationParams): Promise<PaginatedResult<PipelineState>> { ... }
async create(state: PipelineState): Promise<PipelineState> { ... }
async delete(pipelineId: string): Promise<boolean> { ... }
async updateStatus(pipelineId: string, status: PipelineStatus): Promise<void> { ... }
async updateJobStatus(pipelineId: string, jobIndex: number, status: JobStatus, startedAt?: Date): Promise<void> { ... }
async updateJobArtifact(pipelineId: string, jobIndex: number, artifact: unknown, finishedAt: Date): Promise<void> { ... }
async appendJobError(pipelineId: string, jobIndex: number, error: JobError, isFinal: boolean, finishedAt?: Date): Promise<void> { ... }
async updateCurrentJobIndex(pipelineId: string, jobIndex: number): Promise<void> { ... }
async updateJobInput(pipelineId: string, jobIndex: number, input: unknown, options?: unknown): Promise<void> { ... }
async updateJobRetryCount(pipelineId: string, jobIndex: number, retryCount: number, maxRetries: number): Promise<void> { ... }
async findAndTimeoutStaleJobs(timeoutMs?: number): Promise<number> { ... } // For stale jobs watchdog
}
NestJS Integration
import { Module, OnModuleInit } from '@nestjs/common';
import { MongooseModule, InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { PipelineManager } from 'neuroline';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';
@Module({
imports: [
MongooseModule.forFeature([
{ name: 'Pipeline', schema: PipelineSchema },
]),
],
})
export class PipelineModule implements OnModuleInit {
private manager: PipelineManager;
constructor(
@InjectModel('Pipeline') private pipelineModel: Model<any>,
private logger: Logger,
) {
const storage = new MongoPipelineStorage(this.pipelineModel);
this.manager = new PipelineManager({
storage,
logger: {
info: (msg, data) => this.logger.log({ msg, ...data }),
error: (msg, data) => this.logger.error({ msg, ...data }),
warn: (msg, data) => this.logger.warn({ msg, ...data }),
},
});
}
onModuleInit() {
this.manager.registerPipeline(myPipelineConfig);
}
}
Stages and Parallel Execution
Pipeline
├── Stage 1: [jobA] ← sequential
├── Stage 2: [jobB, jobC, jobD] ← parallel within stage
├── Stage 3: [jobE] ← sequential
└── Stage 4: [jobF, jobG] ← parallel within stage
- Stages execute sequentially (Stage 2 starts only after Stage 1 completes)
- Jobs within a stage execute in parallel
- If any job in a stage fails (after all retries), the entire pipeline is marked as
error
Retry Mechanism
Jobs can be configured to automatically retry on failure:
const config: PipelineConfig = {
name: 'my-pipeline',
stages: [
// Job with 2 retries and 1.5s delay between attempts
{
job: unstableJob,
synapses: (ctx) => ({ ... }),
retries: 2, // Will try up to 3 times (1 initial + 2 retries)
retryDelay: 1500, // Wait 1.5 seconds between retries
},
// Job without retries (default behavior)
normalJob,
],
};
retries: Number of additional attempts after initial failure (default: 0)retryDelay: Delay in milliseconds before each retry (default: 1000)retryCountandmaxRetriesare tracked in job state for monitoring
Stale Jobs Watchdog
When a process crashes during job execution, the job may remain in processing status forever ("stale job"). The watchdog monitors and automatically times out such jobs.
const manager = new PipelineManager({ storage, logger });
// Start watchdog (checks every minute, times out jobs after 20 minutes)
manager.startStaleJobsWatchdog({
checkIntervalMs: 60_000, // Check every 1 minute
jobTimeoutMs: 20 * 60_000, // Timeout after 20 minutes
onStaleJobsFound: (count) => console.log(`Timed out ${count} stale jobs`),
});
// Stop watchdog on shutdown
manager.stopStaleJobsWatchdog();
// Check if watchdog is running
manager.isWatchdogRunning();
// Manual check (useful for testing)
const timedOutCount = await manager.timeoutStaleJobs();
StaleJobsWatchdogOptions
interface StaleJobsWatchdogOptions {
checkIntervalMs?: number; // Default: 60000 (1 minute)
jobTimeoutMs?: number; // Default: 1200000 (20 minutes)
onStaleJobsFound?: (count: number) => void;
}
Idempotency
Pipelines are identified by input data hash:
// First call — creates pipeline
const { pipelineId, isNew } = await manager.startPipeline('my-pipeline', { data: { url: 'https://example.com' } });
// isNew = true
// Repeated call with same data — returns existing pipeline
const result2 = await manager.startPipeline('my-pipeline', { data: { url: 'https://example.com' } });
// result2.pipelineId === pipelineId
// result2.isNew = false
For custom hashing:
const config: PipelineConfig<MyInput> = {
name: 'my-pipeline',
stages: [...],
computeInputHash: (input) => `${input.userId}-${input.date}`,
};
Invalidation on Configuration Changes
When the pipeline structure changes (adding/removing/renaming jobs), old records are automatically invalidated:
// Version 1: pipeline with two jobs
const configV1: PipelineConfig = {
name: 'my-pipeline',
stages: [jobA, jobB],
};
manager.registerPipeline(configV1);
// Start — creates record in storage
await manager.startPipeline('my-pipeline', { data: { id: 1 } });
// Pipeline saved with configHash = hash(['jobA', 'jobB'])
// Version 2: added jobC
const configV2: PipelineConfig = {
name: 'my-pipeline',
stages: [jobA, jobB, jobC],
};
manager.registerPipeline(configV2);
// Start with same data
await manager.startPipeline('my-pipeline', { data: { id: 1 } });
// configHash changed → old record deleted → pipeline starts fresh
This is useful when:
- Adding new jobs to pipeline
- Removing obsolete jobs
- Changing execution order
- Renaming jobs
Types
All types are available for import:
import type {
// Job
JobDefinition,
JobContext,
JobLogger,
JobStatus,
JobState, // JobState<TInput, TOutput, TOptions> with generics
// Pipeline
PipelineConfig,
PipelineStage,
PipelineInput,
PipelineStatus,
PipelineState,
JobError,
JobError,
// Synapse
SynapseContext,
JobInPipeline,
StageItem,
// Responses
StartPipelineResponse,
PipelineStatusResponse,
PipelineResultResponse,
// Storage
PipelineStorage,
PaginatedResult,
PaginationParams,
// Watchdog
StaleJobsWatchdogOptions,
} from 'neuroline';
// MongoDB types (separate import)
import type {
MongoPipelineDocument,
MongoPipelineJobState,
} from 'neuroline/mongo';
JobState with Generics
JobState now supports generic types for input, output (artifact), and options:
interface JobError {
message: string;
stack?: string;
attempt?: number;
logs?: string[];
data?: unknown;
}
interface JobState<TInput = unknown, TOutput = unknown, TOptions = unknown> {
name: string;
status: JobStatus;
input?: TInput; // Input data (computed by synapses)
options?: TOptions; // Job options (from jobOptions)
artifact?: TOutput; // Output data (result of execute)
errors: JobError[]; // Error history (empty array when no errors)
startedAt?: Date;
finishedAt?: Date;
}
Client API (neuroline/client)
Client module for browser-side interaction with Pipeline API.
Note: One client = one pipeline. The URL determines which pipeline to run.
PipelineClient
import { PipelineClient } from 'neuroline/client';
// Client for a specific pipeline
const client = new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' });
// Start pipeline
const { pipelineId, isNew } = await client.start({
input: { userId: 123 },
jobOptions: { 'fetch-data': { timeout: 5000 } },
});
// Get status
const status = await client.getStatus(pipelineId);
// Get result (artifact) of the last job
const result = await client.getResult(pipelineId);
// Get job details (input, options, artifact)
const jobDetails = await client.getJobDetails(pipelineId, 'fetch-data');
Polling
// Manual polling
const { stop, completed } = client.poll(pipelineId, (event) => {
console.log('Status:', event.status.status);
});
// Wait for completion
const finalEvent = await completed;
// Or stop polling manually
stop();
Start with Polling
// Client for specific pipeline
const client = new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' });
// Start pipeline and immediately begin polling
const { pipelineId, stop, completed } = await client.startAndPoll(
{
input: { url: 'https://example.com' },
},
(event) => {
// Called on each poll
console.log('Progress:', event.status.currentJobIndex, '/', event.status.totalJobs);
},
(error) => {
// Called on error
console.error('Pipeline error:', error);
}
);
React Hook Factory
import { useState, useCallback, useEffect, useRef, useMemo } from 'react';
import { createUsePipelineHook, PipelineClient } from 'neuroline/client';
// Create hook with React dependencies
const usePipeline = createUsePipelineHook({ useState, useCallback, useEffect, useRef });
// In component
function MyComponent() {
// One client per pipeline
const client = useMemo(() => new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' }), []);
const { start, status, isRunning, error } = usePipeline(client);
const handleStart = async () => {
await start({ input: { userId: 123 } });
};
return (
<div>
<button onClick={handleStart} disabled={isRunning}>Start</button>
{status && <div>Status: {status.status}</div>}
</div>
);
}
Exports
| Import path | Contents |
|---|---|
neuroline | Core: types, PipelineManager, InMemoryPipelineStorage |
neuroline/mongo | MongoDB: MongoPipelineStorage, PipelineSchema, document types |
neuroline/client | Client: PipelineClient, createUsePipelineHook, types |
License
UNLICENSED
Neuroline
Фреймворк-агностик библиотека для оркестрации пайплайнов с поддержкой:
- Последовательного и параллельного выполнения jobs
- Персистентного хранения состояния (MongoDB, in-memory, или кастомное)
- Типобезопасных jobs с synapses для трансформации данных
- Идемпотентности (повторный запуск с теми же входными данными возвращает существующий pipeline)
Установка
yarn add neuroline
Для MongoDB хранилища:
yarn add neuroline mongoose
Быстрый старт
1. Создание Job
Job — это чистая функция с определённым интерфейсом:
import type { JobDefinition, JobContext } from 'neuroline';
interface MyJobInput {
url: string;
}
interface MyJobOutput {
data: string;
fetchedAt: Date;
}
interface MyJobOptions {
timeout?: number;
}
export const fetchDataJob: JobDefinition<MyJobInput, MyJobOutput, MyJobOptions> = {
name: 'fetch-data',
async execute(input, options, ctx) {
ctx.logger.info('Fetching data', { url: input.url });
const timeout = options?.timeout ?? 5000;
const response = await fetch(input.url, { signal: AbortSignal.timeout(timeout) });
const data = await response.text();
ctx.logger.info('Data fetched', { length: data.length });
return {
data,
fetchedAt: new Date(),
};
},
};
2. Конфигурация Pipeline
import type { PipelineConfig, SynapseContext } from 'neuroline';
import { fetchDataJob, processDataJob, saveResultJob } from './jobs';
interface PipelineInput {
url: string;
userId: string;
}
export const myPipelineConfig: PipelineConfig<PipelineInput> = {
name: 'my-neuroline',
stages: [
// Stage 1: одна job
fetchDataJob,
// Stage 2: две jobs выполняются параллельно
[
{
job: processDataJob,
// synapses трансформирует данные для job
synapses: (ctx: SynapseContext<PipelineInput>) => ({
rawData: ctx.getArtifact<{ data: string }>('fetch-data')?.data ?? '',
userId: ctx.pipelineInput.userId,
}),
},
{
job: notifyJob,
synapses: (ctx) => ({
userId: ctx.pipelineInput.userId,
message: 'Processing started',
}),
},
],
// Stage 3: финальная job
{
job: saveResultJob,
synapses: (ctx) => ({
processedData: ctx.getArtifact('process-data'),
userId: ctx.pipelineInput.userId,
}),
},
],
// Опционально: кастомная функция хеширования
computeInputHash: (input) => `${input.userId}-${input.url}`,
};
3. Создание и использование PipelineManager
С In-Memory хранилищем (для тестов)
import { PipelineManager, InMemoryPipelineStorage } from 'neuroline';
import { myPipelineConfig } from './pipelines';
const storage = new InMemoryPipelineStorage();
const manager = new PipelineManager({
storage,
logger: console, // или ваш логгер
});
// Регистрация pipeline
manager.registerPipeline(myPipelineConfig);
// Запуск pipeline
const { pipelineId, isNew } = await manager.startPipeline('my-neuroline', {
data: { url: 'https://api.example.com/data', userId: 'user-123' },
jobOptions: {
'fetch-data': { timeout: 10000 },
},
});
// Polling статуса
const status = await manager.getStatus(pipelineId);
console.log(status);
// { status: 'processing', currentJobIndex: 1, totalJobs: 4, currentJobName: 'process-data' }
// Получение результата (артефакта) конкретной job (по умолчанию — последней)
const result = await manager.getResult(pipelineId);
console.log(result);
// { pipelineId: '...', jobName: 'save-result', status: 'done', artifact: {...} }
// Получение результата (артефакта) по имени job
const computeResult = await manager.getResult(pipelineId, 'process-data');
console.log(computeResult);
// { pipelineId: '...', jobName: 'process-data', status: 'done', artifact: {...} }
С MongoDB хранилищем
import mongoose from 'mongoose';
import { PipelineManager } from 'neuroline';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';
// Создание модели
const PipelineModel = mongoose.model('Pipeline', PipelineSchema);
// Создание manager
const storage = new MongoPipelineStorage(PipelineModel);
const manager = new PipelineManager({ storage, logger: console });
manager.registerPipeline(myPipelineConfig);
API Reference
PipelineManager
constructor(options: PipelineManagerOptions)
interface PipelineManagerOptions {
storage: PipelineStorage; // Обязательно
logger?: JobLogger; // Опционально
}
registerPipeline(config: PipelineConfig): void
Регистрирует конфигурацию pipeline. Должен быть вызван до startPipeline.
startPipeline<TData>(pipelineType: string, input: PipelineInput<TData>): Promise<StartPipelineResponse>
Запускает pipeline или возвращает существующий (если найден по хешу входных данных).
interface PipelineInput<TData> {
data: TData; // Входные данные
jobOptions?: Record<string, unknown>; // Опции для jobs (ключ = имя job)
}
interface StartPipelineResponse {
pipelineId: string; // ID для polling
isNew: boolean; // true если создан, false если уже существовал
}
getStatus(pipelineId: string): Promise<PipelineStatusResponse>
Возвращает текущий статус pipeline.
interface PipelineStatusResponse {
status: 'processing' | 'done' | 'error';
currentJobIndex: number;
totalJobs: number;
currentJobName?: string;
error?: { message: string; jobName?: string };
}
getResult(pipelineId: string, jobName?: string): Promise<PipelineResultResponse>
Возвращает результат (артефакт) одной job. Если jobName не передан, возвращает результат последней job.
interface PipelineResultResponse {
pipelineId: string;
jobName: string;
status: 'pending' | 'processing' | 'done' | 'error';
artifact: unknown | null | undefined; // undefined = ещё не выполнена, null = выполнена без результата
}
getPipeline(pipelineId: string): Promise<PipelineState | null>
Возвращает полное состояние pipeline (для отладки).
JobDefinition
interface JobDefinition<TInput, TOutput, TOptions> {
name: string;
execute: (
input: TInput,
options: TOptions | undefined,
context: JobContext
) => Promise<TOutput | null>;
}
JobContext
interface JobContext {
pipelineId: string;
jobIndex: number;
logger: {
info: (msg: string, data?: Record<string, unknown>) => void;
error: (msg: string, data?: Record<string, unknown>) => void;
warn: (msg: string, data?: Record<string, unknown>) => void;
};
}
PipelineConfig
interface PipelineConfig<TInput> {
name: string;
stages: PipelineStage[];
computeInputHash?: (input: TInput) => string;
}
// Stage: одна job или массив jobs (параллельно)
type PipelineStage = StageItem | StageItem[];
// StageItem: JobDefinition или JobInPipeline
type StageItem = JobDefinition | JobInPipeline;
interface JobInPipeline<TInput, TOutput, TOptions> {
job: JobDefinition<TInput, TOutput, TOptions>;
synapses?: (ctx: SynapseContext) => TInput;
retries?: number; // Количество ретраев при ошибке (по умолчанию: 0)
retryDelay?: number; // Задержка между ретраями в мс (по умолчанию: 1000)
}
SynapseContext
Контекст для функции synapses:
interface SynapseContext<TPipelineInput> {
pipelineInput: TPipelineInput;
getArtifact: <T>(jobName: string) => T | undefined;
}
Хранилище (Storage)
InMemoryPipelineStorage
Встроенное in-memory хранилище для тестов и прототипирования.
import { InMemoryPipelineStorage } from 'neuroline';
const storage = new InMemoryPipelineStorage();
// Для тестов
storage.clear(); // Очистить все данные
storage.getAll(); // Получить все pipelines
MongoPipelineStorage
MongoDB хранилище (требует mongoose как peer dependency).
import mongoose from 'mongoose';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';
const PipelineModel = mongoose.model('Pipeline', PipelineSchema);
const storage = new MongoPipelineStorage(PipelineModel);
Кастомное хранилище
Реализуйте интерфейс PipelineStorage:
import type { PipelineStorage, PipelineState, JobStatus, PipelineStatus, PaginatedResult, PaginationParams } from 'neuroline';
class MyCustomStorage implements PipelineStorage {
async findById(pipelineId: string): Promise<PipelineState | null> { ... }
async findAll(params?: PaginationParams): Promise<PaginatedResult<PipelineState>> { ... }
async create(state: PipelineState): Promise<PipelineState> { ... }
async delete(pipelineId: string): Promise<boolean> { ... }
async updateStatus(pipelineId: string, status: PipelineStatus): Promise<void> { ... }
async updateJobStatus(pipelineId: string, jobIndex: number, status: JobStatus, startedAt?: Date): Promise<void> { ... }
async updateJobArtifact(pipelineId: string, jobIndex: number, artifact: unknown, finishedAt: Date): Promise<void> { ... }
async appendJobError(pipelineId: string, jobIndex: number, error: JobError, isFinal: boolean, finishedAt?: Date): Promise<void> { ... }
async updateCurrentJobIndex(pipelineId: string, jobIndex: number): Promise<void> { ... }
async updateJobInput(pipelineId: string, jobIndex: number, input: unknown, options?: unknown): Promise<void> { ... }
async updateJobRetryCount(pipelineId: string, jobIndex: number, retryCount: number, maxRetries: number): Promise<void> { ... }
async findAndTimeoutStaleJobs(timeoutMs?: number): Promise<number> { ... } // Для stale jobs watchdog
}
Интеграция с NestJS
import { Module, OnModuleInit } from '@nestjs/common';
import { MongooseModule, InjectModel } from '@nestjs/mongoose';
import { Model } from 'mongoose';
import { PipelineManager } from 'neuroline';
import { MongoPipelineStorage, PipelineSchema } from 'neuroline/mongo';
@Module({
imports: [
MongooseModule.forFeature([
{ name: 'Pipeline', schema: PipelineSchema },
]),
],
})
export class PipelineModule implements OnModuleInit {
private manager: PipelineManager;
constructor(
@InjectModel('Pipeline') private pipelineModel: Model<any>,
private logger: Logger,
) {
const storage = new MongoPipelineStorage(this.pipelineModel);
this.manager = new PipelineManager({
storage,
logger: {
info: (msg, data) => this.logger.log({ msg, ...data }),
error: (msg, data) => this.logger.error({ msg, ...data }),
warn: (msg, data) => this.logger.warn({ msg, ...data }),
},
});
}
onModuleInit() {
this.manager.registerPipeline(myPipelineConfig);
}
}
Stages и параллельное выполнение
Pipeline
├── Stage 1: [jobA] ← последовательно
├── Stage 2: [jobB, jobC, jobD] ← параллельно внутри stage
├── Stage 3: [jobE] ← последовательно
└── Stage 4: [jobF, jobG] ← параллельно внутри stage
- Stages выполняются последовательно (Stage 2 начнётся только после завершения Stage 1)
- Jobs внутри stage выполняются параллельно
- Если любая job в stage завершается с ошибкой (после всех ретраев), весь pipeline помечается как
error
Механизм Retry
Jobs можно настроить на автоматический retry при ошибке:
const config: PipelineConfig = {
name: 'my-pipeline',
stages: [
// Job с 2 ретраями и задержкой 1.5с между попытками
{
job: unstableJob,
synapses: (ctx) => ({ ... }),
retries: 2, // Максимум 3 попытки (1 начальная + 2 ретрая)
retryDelay: 1500, // Ожидание 1.5 секунды между ретраями
},
// Job без ретраев (поведение по умолчанию)
normalJob,
],
};
retries: Количество дополнительных попыток после первой ошибки (по умолчанию: 0)retryDelay: Задержка в миллисекундах перед каждым ретраем (по умолчанию: 1000)retryCountиmaxRetriesотслеживаются в состоянии job для мониторинга
Stale Jobs Watchdog
Если процесс падает во время выполнения джобы, она может навсегда остаться в статусе processing ("зависшая джоба"). Watchdog отслеживает и автоматически таймаутит такие джобы.
const manager = new PipelineManager({ storage, logger });
// Запуск watchdog (проверка раз в минуту, таймаут через 20 минут)
manager.startStaleJobsWatchdog({
checkIntervalMs: 60_000, // Проверка каждую минуту
jobTimeoutMs: 20 * 60_000, // Таймаут через 20 минут
onStaleJobsFound: (count) => console.log(`Timed out ${count} stale jobs`),
});
// Остановка watchdog при shutdown
manager.stopStaleJobsWatchdog();
// Проверка работает ли watchdog
manager.isWatchdogRunning();
// Ручная проверка (полезно для тестов)
const timedOutCount = await manager.timeoutStaleJobs();
StaleJobsWatchdogOptions
interface StaleJobsWatchdogOptions {
checkIntervalMs?: number; // По умолчанию: 60000 (1 минута)
jobTimeoutMs?: number; // По умолчанию: 1200000 (20 минут)
onStaleJobsFound?: (count: number) => void;
}
Idempotency (идемпотентность)
Pipeline идентифицируется по хешу входных данных:
// Первый вызов — создаёт pipeline
const { pipelineId, isNew } = await manager.startPipeline('my-pipeline', { data: { url: 'https://example.com' } });
// isNew = true
// Повторный вызов с теми же данными — возвращает существующий
const result2 = await manager.startPipeline('my-pipeline', { data: { url: 'https://example.com' } });
// result2.pipelineId === pipelineId
// result2.isNew = false
Для кастомного хеширования:
const config: PipelineConfig<MyInput> = {
name: 'my-pipeline',
stages: [...],
computeInputHash: (input) => `${input.userId}-${input.date}`,
};
Инвалидация при изменении конфигурации
При изменении структуры pipeline (добавление/удаление/переименование jobs) старые записи автоматически инвалидируются:
// Версия 1: pipeline с двумя jobs
const configV1: PipelineConfig = {
name: 'my-pipeline',
stages: [jobA, jobB],
};
manager.registerPipeline(configV1);
// Запускаем — создаётся запись в storage
await manager.startPipeline('my-pipeline', { data: { id: 1 } });
// Pipeline сохраняется с configHash = hash(['jobA', 'jobB'])
// Версия 2: доб авили jobC
const configV2: PipelineConfig = {
name: 'my-pipeline',
stages: [jobA, jobB, jobC],
};
manager.registerPipeline(configV2);
// Запускаем с теми же данными
await manager.startPipeline('my-pipeline', { data: { id: 1 } });
// configHash изменился → старая запись удаляется → pipeline запускается заново
Это полезно при:
- Добавлении новых jobs в pipeline
- Удалении устаревших jobs
- Изменении порядка выполнения
- Переименовании jobs
Типы
Все типы доступны для импорта:
import type {
// Job
JobDefinition,
JobContext,
JobLogger,
JobStatus,
JobState, // JobState<TInput, TOutput, TOptions> с generics
// Pipeline
PipelineConfig,
PipelineStage,
PipelineInput,
PipelineStatus,
PipelineState,
// Synapse
SynapseContext,
JobInPipeline,
StageItem,
// Responses
StartPipelineResponse,
PipelineStatusResponse,
PipelineResultResponse,
// Storage
PipelineStorage,
PaginatedResult,
PaginationParams,
// Watchdog
StaleJobsWatchdogOptions,
} from 'neuroline';
// MongoDB типы (отдельный импорт)
import type {
MongoPipelineDocument,
MongoPipelineJobState,
} from 'neuroline/mongo';
JobState с Generics
JobState теперь поддерживает generic типы для input, output (artifact) и options:
interface JobError {
message: string;
stack?: string;
attempt?: number;
logs?: string[];
data?: unknown;
}
interface JobState<TInput = unknown, TOutput = unknown, TOptions = unknown> {
name: string;
status: JobStatus;
input?: TInput; // Входные данные (вычисленные synapses)
options?: TOptions; // Опции job (из jobOptions)
artifact?: TOutput; // Выходные данные (результат execute)
errors: JobError[];
startedAt?: Date;
finishedAt?: Date;
}
Клиентский API (neuroline/client)
Клиентский модуль для взаимодействия с Pipeline API из браузера.
Примечание: Один клиент = один pipeline. URL определяет какой pipeline запускается.
PipelineClient
import { PipelineClient } from 'neuroline/client';
// Клиент для конкретного pipeline
const client = new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' });
// Запуск pipeline
const { pipelineId, isNew } = await client.start({
input: { userId: 123 },
jobOptions: { 'fetch-data': { timeout: 5000 } },
});
// Получение статуса
const status = await client.getStatus(pipelineId);
// Получение результата (артефакта) последней job
const result = await client.getResult(pipelineId);
// Получение деталей job (input, options, artifact)
const jobDetails = await client.getJobDetails(pipelineId, 'fetch-data');
Polling
// Ручной polling
const { stop, completed } = client.poll(pipelineId, (event) => {
console.log('Статус:', event.status.status);
});
// Ожидание завершения
const finalEvent = await completed;
// Или ручная остановка polling
stop();
Запуск с Polling
// Клиент для конкретного pipeline
const client = new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' });
// Запуск pipeline и немедленное начало polling
const { pipelineId, stop, completed } = await client.startAndPoll(
{
input: { url: 'https://example.com' },
},
(event) => {
// Вызывается на каждый poll
console.log('Прогресс:', event.status.currentJobIndex, '/', event.status.totalJobs);
},
(error) => {
// Вызывается при ошибке
console.error('Ошибка pipeline:', error);
}
);
Фабрика React хука
import { useState, useCallback, useEffect, useRef, useMemo } from 'react';
import { createUsePipelineHook, PipelineClient } from 'neuroline/client';
// Создание хука с зависимостями React
const usePipeline = createUsePipelineHook({ useState, useCallback, useEffect, useRef });
// В компоненте
function MyComponent() {
// Один клиент на pipeline
const client = useMemo(() => new PipelineClient({ baseUrl: '/api/pipeline/my-pipeline' }), []);
const { start, status, isRunning, error } = usePipeline(client);
const handleStart = async () => {
await start({ input: { userId: 123 } });
};
return (
<div>
<button onClick={handleStart} disabled={isRunning}>Запуск</button>
{status && <div>Статус: {status.status}</div>}
</div>
);
}
Exports
| Import path | Содержимое |
|---|---|
neuroline | Core: типы, PipelineManager, InMemoryPipelineStorage |
neuroline/mongo | MongoDB: MongoPipelineStorage, PipelineSchema, типы документов |
neuroline/client | Client: PipelineClient, createUsePipelineHook, типы |
License
UNLICENSED