返回 Skill 列表
extension
分类: 开发与工程无需 API Key

workflow-engine

Guide for @bratsos/workflow-engine - 一个类型安全的工作流引擎,集成了AI、阶段管道和持久化。在构建多阶段工作流、基于AI的管道、实现工作流持久化、定义阶段或处理批量AI操作时使用。

person作者: jakexiaohubgithub

@bratsos/workflow-engine Skill

Type-safe workflow engine for building AI-powered, multi-stage pipelines with persistence and batch processing support. Uses a command kernel architecture with environment-agnostic design.

Architecture Overview

The engine follows a kernel + host pattern:

  • Core library (@bratsos/workflow-engine) - Command kernel, stage/workflow definitions, persistence adapters
  • Node Host (@bratsos/workflow-engine-host-node) - Long-running worker with polling loops and signal handling
  • Serverless Host (@bratsos/workflow-engine-host-serverless) - Stateless single-invocation for edge/lambda/workers

The kernel is a pure command dispatcher. All workflow operations are expressed as typed commands dispatched via kernel.dispatch(). Hosts wrap the kernel with environment-specific process management.

When to Apply

  • User wants to create workflow stages or pipelines
  • User mentions defineStage, defineAsyncBatchStage, WorkflowBuilder
  • User is implementing workflow persistence with Prisma
  • User needs AI integration (generateText, generateObject, embeddings, batch)
  • User is building multi-stage data processing pipelines
  • User mentions kernel, command dispatch, or job execution
  • User wants to set up a Node.js worker or serverless worker
  • User wants to rerun a workflow from a specific stage
  • User needs to test workflows with in-memory adapters

Quick Start

import { defineStage, WorkflowBuilder } from "@bratsos/workflow-engine";
import { createKernel } from "@bratsos/workflow-engine/kernel";
import { createNodeHost } from "@bratsos/workflow-engine-host-node";
import {
  createPrismaWorkflowPersistence,
  createPrismaJobQueue,
} from "@bratsos/workflow-engine";
import { z } from "zod";

// 1. Define a stage
const processStage = defineStage({
  id: "process",
  name: "Process Data",
  schemas: {
    input: z.object({ data: z.string() }),
    output: z.object({ result: z.string() }),
    config: z.object({ verbose: z.boolean().default(false) }),
  },
  async execute(ctx) {
    return { output: { result: ctx.input.data.toUpperCase() } };
  },
});

// 2. Build a workflow
const workflow = new WorkflowBuilder(
  "my-workflow", "My Workflow", "Processes data",
  z.object({ data: z.string() }),
  z.object({ result: z.string() })
)
  .pipe(processStage)
  .build();

// 3. Create kernel
const kernel = createKernel({
  persistence: createPrismaWorkflowPersistence(prisma),
  blobStore: myBlobStore,
  jobTransport: createPrismaJobQueue(prisma),
  eventSink: myEventSink,
  scheduler: myScheduler,
  clock: { now: () => new Date() },
  registry: { getWorkflow: (id) => (id === "my-workflow" ? workflow : undefined) },
});

// 4. Start a Node host
const host = createNodeHost({
  kernel,
  jobTransport: createPrismaJobQueue(prisma),
  workerId: "worker-1",
});
await host.start();

// 5. Dispatch a command
await kernel.dispatch({
  type: "run.create",
  idempotencyKey: crypto.randomUUID(),
  workflowId: "my-workflow",
  input: { data: "hello" },
});

Core Exports Reference

| Export | Type | Import Path | Purpose | |--------|------|-------------|---------| | defineStage | Function | @bratsos/workflow-engine | Create sync stages | | defineAsyncBatchStage | Function | @bratsos/workflow-engine | Create async/batch stages | | WorkflowBuilder | Class | @bratsos/workflow-engine | Chain stages into workflows | | createKernel | Function | @bratsos/workflow-engine/kernel | Create command kernel | | createNodeHost | Function | @bratsos/workflow-engine-host-node | Create Node.js host | | createServerlessHost | Function | @bratsos/workflow-engine-host-serverless | Create serverless host | | createAIHelper | Function | @bratsos/workflow-engine | AI operations (text, object, embed, batch) | | registerEmbeddingProvider | Function | @bratsos/workflow-engine | Register custom embedding providers (Voyage, Cohere, etc.) | | createStageIds | Function | @bratsos/workflow-engine | Create stage ID constants from a workflow | | defineStageIds | Function | @bratsos/workflow-engine | Define stage ID constants from a tuple | | isValidStageId | Function | @bratsos/workflow-engine | Runtime stage ID validation | | assertValidStageId | Function | @bratsos/workflow-engine | Assert stage ID validity (throws) | | definePlugin | Function | @bratsos/workflow-engine/kernel | Define kernel plugins | | createPluginRunner | Function | @bratsos/workflow-engine/kernel | Create plugin event processor |

Kernel Commands

All operations go through kernel.dispatch(command):

| Command | Description | |---------|-------------| | run.create | Create a new workflow run | | run.claimPending | Claim pending runs, enqueue first-stage jobs | | run.transition | Advance to next stage group or complete | | run.cancel | Cancel a running workflow (authoritative: cascades to stages + jobs) | | run.rerunFrom | Rerun from a specific stage (cleans up blob artifacts by prefix) | | job.execute | Execute a single stage (uses multi-phase transactions; see 08-common-patterns.md) | | stage.pollSuspended | Poll suspended stages for readiness (skips cancelled runs; per-stage transactions) | | lease.reapStale | Release stale job leases | | run.reapStuck | Detect and fail RUNNING runs with no recent activity | | outbox.flush | Publish pending outbox events | | plugin.replayDLQ | Replay dead-letter queue events |

Stage Definition

Sync Stage

const myStage = defineStage({
  id: "my-stage",
  name: "My Stage",
  description: "Optional",
  dependencies: ["prev"],

  schemas: {
    input: InputSchema,     // Zod schema or "none"
    output: OutputSchema,
    config: ConfigSchema,
  },

  async execute(ctx) {
    const { input, config, workflowContext } = ctx;
    const prevOutput = ctx.require("prev");
    const optOutput = ctx.optional("other");

    await ctx.log("INFO", "Processing...");

    return {
      output: { ... },
      customMetrics: { itemsProcessed: 10 },
    };
  },
});

Async Batch Stage

const batchStage = defineAsyncBatchStage({
  id: "batch-process",
  name: "Batch Process",
  mode: "async-batch",
  schemas: { input: "none", output: OutputSchema, config: ConfigSchema },

  async execute(ctx) {
    if (ctx.resumeState) {
      return { output: ctx.resumeState.cachedResult };
    }

    const batchId = await submitBatchJob(ctx.input);
    return {
      suspended: true,
      state: {
        batchId,
        submittedAt: new Date().toISOString(),
        pollInterval: 60000,
        maxWaitTime: 3600000,
      },
      pollConfig: { pollInterval: 60000, maxWaitTime: 3600000, nextPollAt: new Date(Date.now() + 60000) },
    };
  },

  async checkCompletion(suspendedState, ctx) {
    const status = await checkBatchStatus(suspendedState.batchId);
    if (status === "completed") return { ready: true, output: { results } };
    if (status === "failed") return { ready: false, error: "Batch failed" };
    return { ready: false, nextCheckIn: 60000 };
  },
});

WorkflowBuilder

Workflows are linear pipelines of execution groups. .pipe() creates single-stage groups; .parallel() creates multi-stage groups. Parallel group outputs are keyed by stage ID in the workflow context.

const workflow = new WorkflowBuilder(
  "workflow-id", "Workflow Name", "Description",
  InputSchema, OutputSchema
)
  .pipe(stage1)                          // Group 0
  .pipe(stage2)                          // Group 1
  .parallel([stage3a, stage3b])          // Group 2 (concurrent, output: { "stage3a-id": ..., "stage3b-id": ... })
  .pipe(stage4)                          // Group 3
  .build();

// In stage4, access parallel outputs by stage ID:
ctx.require("stage3a-id")  // output of stage3a
ctx.require("stage3b-id")  // output of stage3b

workflow.getStageIds();
workflow.getExecutionPlan();
workflow.getDefaultConfig();
workflow.validateConfig(config);

When a workflow completes, the final execution group's output is persisted in WorkflowRun.output and included in the workflow:completed event.

Kernel Setup

import { createKernel } from "@bratsos/workflow-engine/kernel";
import type { Kernel, KernelConfig, Persistence, BlobStore, JobTransport, EventSink, Scheduler, Clock } from "@bratsos/workflow-engine/kernel";

const kernel = createKernel({
  persistence,   // Persistence port - runs, stages, logs, outbox, idempotency
  blobStore,     // BlobStore port - large payload storage
  jobTransport,  // JobTransport port - job queue
  eventSink,     // EventSink port - async event publishing
  scheduler,     // Scheduler port - deferred command triggers
  clock,         // Clock port - injectable time source
  registry,      // WorkflowRegistry - { getWorkflow(id) }
});

// Dispatch typed commands
const { workflowRunId } = await kernel.dispatch({
  type: "run.create",
  idempotencyKey: "unique-key",
  workflowId: "my-workflow",
  input: { data: "hello" },
});

Node Host

import { createNodeHost } from "@bratsos/workflow-engine-host-node";

const host = createNodeHost({
  kernel,
  jobTransport,
  workerId: "worker-1",
  orchestrationIntervalMs: 10_000,
  jobPollIntervalMs: 1_000,
  staleLeaseThresholdMs: 60_000,
});

await host.start();   // Starts polling loops + signal handlers
await host.stop();    // Graceful shutdown
host.getStats();      // { workerId, jobsProcessed, orchestrationTicks, isRunning, uptimeMs }

Serverless Host

import {
  createServerlessHost,
  type ServerlessHost,
  type ServerlessHostConfig,
  type JobMessage,
  type JobResult,
  type ProcessJobsResult,
  type MaintenanceTickResult,
} from "@bratsos/workflow-engine-host-serverless";

const host = createServerlessHost({
  kernel,
  jobTransport,
  workerId: "my-worker",
  // Optional tuning (same defaults as Node host)
  staleLeaseThresholdMs: 60_000,
  maxClaimsPerTick: 10,
  maxSuspendedChecksPerTick: 10,
  maxOutboxFlushPerTick: 100,
});

handleJob(msg: JobMessage): Promise<JobResult>

Execute a single pre-dequeued job. Consumers wire platform-specific ack/retry around the result.

// JobMessage shape (matches queue message body)
interface JobMessage {
  jobId: string;
  workflowRunId: string;
  workflowId: string;
  stageId: string;
  attempt: number;
  maxAttempts?: number;
  payload: Record<string, unknown>;
}

// JobResult
interface JobResult {
  outcome: "completed" | "suspended" | "failed";
  error?: string;
}

const result = await host.handleJob(msg);
if (result.outcome === "completed") msg.ack();
else if (result.outcome === "suspended") msg.ack();
else msg.retry();

processAvailableJobs(opts?): Promise<ProcessJobsResult>

Dequeue and process jobs from the job transport. Defaults to 1 job (safe for edge runtimes with CPU limits).

const result = await host.processAvailableJobs({ maxJobs: 5 });
// { processed: number, succeeded: number, failed: number }

runMaintenanceTick(): Promise<MaintenanceTickResult>

Run one bounded maintenance cycle: claim pending, poll suspended, reap stale, flush outbox, reap stuck runs.

const tick = await host.runMaintenanceTick();
// { claimed, suspendedChecked, staleReleased, eventsFlushed, stuckReaped }
// Note: resumed suspended stages are automatically followed by run.transition.

AI Integration & Cost Tracking

const ai = createAIHelper(
  `workflow.${ctx.workflowRunId}.stage.${ctx.stageId}`,
  aiCallLogger,
);

const { text, cost } = await ai.generateText("gemini-2.5-flash", prompt);
const { object } = await ai.generateObject("gemini-2.5-flash", prompt, schema);
const { embedding } = await ai.embed("text-embedding-004", ["text1"], { dimensions: 768 });
// OpenRouter embedding models (OpenAI, Cohere, etc.)
const { embedding } = await ai.embed("openai/text-embedding-3-small", ["text1"]);

// Provider-specific options passthrough (Voyage, Cohere, etc.)
const { embedding } = await ai.embed("voyage-4-large", ["text1"], {
  providerOptions: { voyage: { outputDimension: 512, inputType: "document" } },
});

// Custom embedding providers (Voyage, Cohere, Jina, etc.)
import { registerEmbeddingProvider } from "@bratsos/workflow-engine";
import { voyage } from "voyage-ai-provider";
registerEmbeddingProvider("voyage", (modelId) => voyage.embeddingModel(modelId));
// Then register models with provider: "voyage" and use ai.embed() as usual

Persistence Setup

Required Prisma Models (ALL are required)

Copy the complete schema from the package README. This includes: WorkflowRun, WorkflowStage, WorkflowLog, WorkflowArtifact, AICall, JobQueue, OutboxEvent, IdempotencyKey.

Create Persistence

import {
  createPrismaWorkflowPersistence,
  createPrismaJobQueue,
  createPrismaAICallLogger,
} from "@bratsos/workflow-engine/persistence/prisma";

const persistence = createPrismaWorkflowPersistence(prisma);
const jobQueue = createPrismaJobQueue(prisma);
const aiCallLogger = createPrismaAICallLogger(prisma);

// SQLite - MUST pass databaseType option
const persistence = createPrismaWorkflowPersistence(prisma, { databaseType: "sqlite" });
const jobQueue = createPrismaJobQueue(prisma, { databaseType: "sqlite" });

Testing

// In-memory persistence and job queue
import {
  InMemoryWorkflowPersistence,
  InMemoryJobQueue,
  InMemoryAICallLogger,
} from "@bratsos/workflow-engine/testing";

// Kernel-specific test adapters
import {
  FakeClock,
  InMemoryBlobStore,
  CollectingEventSink,
  NoopScheduler,
} from "@bratsos/workflow-engine/kernel/testing";

// Create kernel with all in-memory adapters
const persistence = new InMemoryWorkflowPersistence();
const jobQueue = new InMemoryJobQueue();
const kernel = createKernel({
  persistence,
  blobStore: new InMemoryBlobStore(),
  jobTransport: jobQueue,
  eventSink: new CollectingEventSink(),
  scheduler: new NoopScheduler(),
  clock: new FakeClock(),
  registry: { getWorkflow: (id) => workflows.get(id) },
});

// Test a full workflow lifecycle
await kernel.dispatch({ type: "run.create", idempotencyKey: "test", workflowId: "my-wf", input: {} });
await kernel.dispatch({ type: "run.claimPending", workerId: "test-worker" });
const job = await jobQueue.dequeue();
await kernel.dispatch({ type: "job.execute", workflowRunId: job.workflowRunId, workflowId: job.workflowId, stageId: job.stageId, config: {} });
await kernel.dispatch({ type: "run.transition", workflowRunId: job.workflowRunId });

Reference Files

Key Principles

  1. Type Safety: All schemas are Zod - types flow through the entire pipeline
  2. Command Kernel: All operations are typed commands dispatched through kernel.dispatch()
  3. Environment-Agnostic: Kernel has no timers, no signals, no global state
  4. Context Access: Use ctx.require() and ctx.optional() for type-safe stage output access
  5. Transactional Outbox: Events written to outbox, published via outbox.flush command. job.execute and stage.pollSuspended use multi-phase transactions to avoid holding connections during external I/O
  6. Idempotency: run.create and job.execute replay cached results by key; concurrent same-key dispatch throws IdempotencyInProgressError
  7. Authoritative Cancellation: run.cancel cascades to stages + jobs. Ghost jobs (running against non-RUNNING runs) are detected via ghost: true flag and not retried
  8. Self-Healing: Stage creation is idempotent (upsert), orchestration steps are isolated, stuck runs are automatically reaped
  9. Cost Tracking: All AI calls automatically track tokens and costs
  10. BlobStore-Only Artifacts: All artifact storage goes through the BlobStore port. run.rerunFrom cleans up artifacts by key prefix