@bratsos/workflow-engine Skill
Type-safe workflow engine for building AI-powered, multi-stage pipelines with persistence and batch processing support. Uses a command kernel architecture with environment-agnostic design.
Architecture Overview
The engine follows a kernel + host pattern:
- Core library (
@bratsos/workflow-engine) - Command kernel, stage/workflow definitions, persistence adapters - Node Host (
@bratsos/workflow-engine-host-node) - Long-running worker with polling loops and signal handling - Serverless Host (
@bratsos/workflow-engine-host-serverless) - Stateless single-invocation for edge/lambda/workers
The kernel is a pure command dispatcher. All workflow operations are expressed as typed commands dispatched via kernel.dispatch(). Hosts wrap the kernel with environment-specific process management.
When to Apply
- User wants to create workflow stages or pipelines
- User mentions
defineStage,defineAsyncBatchStage,WorkflowBuilder - User is implementing workflow persistence with Prisma
- User needs AI integration (generateText, generateObject, embeddings, batch)
- User is building multi-stage data processing pipelines
- User mentions kernel, command dispatch, or job execution
- User wants to set up a Node.js worker or serverless worker
- User wants to rerun a workflow from a specific stage
- User needs to test workflows with in-memory adapters
Quick Start
import { defineStage, WorkflowBuilder } from "@bratsos/workflow-engine";
import { createKernel } from "@bratsos/workflow-engine/kernel";
import { createNodeHost } from "@bratsos/workflow-engine-host-node";
import {
createPrismaWorkflowPersistence,
createPrismaJobQueue,
} from "@bratsos/workflow-engine";
import { z } from "zod";
// 1. Define a stage
const processStage = defineStage({
id: "process",
name: "Process Data",
schemas: {
input: z.object({ data: z.string() }),
output: z.object({ result: z.string() }),
config: z.object({ verbose: z.boolean().default(false) }),
},
async execute(ctx) {
return { output: { result: ctx.input.data.toUpperCase() } };
},
});
// 2. Build a workflow
const workflow = new WorkflowBuilder(
"my-workflow", "My Workflow", "Processes data",
z.object({ data: z.string() }),
z.object({ result: z.string() })
)
.pipe(processStage)
.build();
// 3. Create kernel
const kernel = createKernel({
persistence: createPrismaWorkflowPersistence(prisma),
blobStore: myBlobStore,
jobTransport: createPrismaJobQueue(prisma),
eventSink: myEventSink,
scheduler: myScheduler,
clock: { now: () => new Date() },
registry: { getWorkflow: (id) => (id === "my-workflow" ? workflow : undefined) },
});
// 4. Start a Node host
const host = createNodeHost({
kernel,
jobTransport: createPrismaJobQueue(prisma),
workerId: "worker-1",
});
await host.start();
// 5. Dispatch a command
await kernel.dispatch({
type: "run.create",
idempotencyKey: crypto.randomUUID(),
workflowId: "my-workflow",
input: { data: "hello" },
});
Core Exports Reference
| Export | Type | Import Path | Purpose |
|--------|------|-------------|---------|
| defineStage | Function | @bratsos/workflow-engine | Create sync stages |
| defineAsyncBatchStage | Function | @bratsos/workflow-engine | Create async/batch stages |
| WorkflowBuilder | Class | @bratsos/workflow-engine | Chain stages into workflows |
| createKernel | Function | @bratsos/workflow-engine/kernel | Create command kernel |
| createNodeHost | Function | @bratsos/workflow-engine-host-node | Create Node.js host |
| createServerlessHost | Function | @bratsos/workflow-engine-host-serverless | Create serverless host |
| createAIHelper | Function | @bratsos/workflow-engine | AI operations (text, object, embed, batch) |
| registerEmbeddingProvider | Function | @bratsos/workflow-engine | Register custom embedding providers (Voyage, Cohere, etc.) |
| createStageIds | Function | @bratsos/workflow-engine | Create stage ID constants from a workflow |
| defineStageIds | Function | @bratsos/workflow-engine | Define stage ID constants from a tuple |
| isValidStageId | Function | @bratsos/workflow-engine | Runtime stage ID validation |
| assertValidStageId | Function | @bratsos/workflow-engine | Assert stage ID validity (throws) |
| definePlugin | Function | @bratsos/workflow-engine/kernel | Define kernel plugins |
| createPluginRunner | Function | @bratsos/workflow-engine/kernel | Create plugin event processor |
Kernel Commands
All operations go through kernel.dispatch(command):
| Command | Description |
|---------|-------------|
| run.create | Create a new workflow run |
| run.claimPending | Claim pending runs, enqueue first-stage jobs |
| run.transition | Advance to next stage group or complete |
| run.cancel | Cancel a running workflow (authoritative: cascades to stages + jobs) |
| run.rerunFrom | Rerun from a specific stage (cleans up blob artifacts by prefix) |
| job.execute | Execute a single stage (uses multi-phase transactions; see 08-common-patterns.md) |
| stage.pollSuspended | Poll suspended stages for readiness (skips cancelled runs; per-stage transactions) |
| lease.reapStale | Release stale job leases |
| run.reapStuck | Detect and fail RUNNING runs with no recent activity |
| outbox.flush | Publish pending outbox events |
| plugin.replayDLQ | Replay dead-letter queue events |
Stage Definition
Sync Stage
const myStage = defineStage({
id: "my-stage",
name: "My Stage",
description: "Optional",
dependencies: ["prev"],
schemas: {
input: InputSchema, // Zod schema or "none"
output: OutputSchema,
config: ConfigSchema,
},
async execute(ctx) {
const { input, config, workflowContext } = ctx;
const prevOutput = ctx.require("prev");
const optOutput = ctx.optional("other");
await ctx.log("INFO", "Processing...");
return {
output: { ... },
customMetrics: { itemsProcessed: 10 },
};
},
});
Async Batch Stage
const batchStage = defineAsyncBatchStage({
id: "batch-process",
name: "Batch Process",
mode: "async-batch",
schemas: { input: "none", output: OutputSchema, config: ConfigSchema },
async execute(ctx) {
if (ctx.resumeState) {
return { output: ctx.resumeState.cachedResult };
}
const batchId = await submitBatchJob(ctx.input);
return {
suspended: true,
state: {
batchId,
submittedAt: new Date().toISOString(),
pollInterval: 60000,
maxWaitTime: 3600000,
},
pollConfig: { pollInterval: 60000, maxWaitTime: 3600000, nextPollAt: new Date(Date.now() + 60000) },
};
},
async checkCompletion(suspendedState, ctx) {
const status = await checkBatchStatus(suspendedState.batchId);
if (status === "completed") return { ready: true, output: { results } };
if (status === "failed") return { ready: false, error: "Batch failed" };
return { ready: false, nextCheckIn: 60000 };
},
});
WorkflowBuilder
Workflows are linear pipelines of execution groups. .pipe() creates single-stage groups; .parallel() creates multi-stage groups. Parallel group outputs are keyed by stage ID in the workflow context.
const workflow = new WorkflowBuilder(
"workflow-id", "Workflow Name", "Description",
InputSchema, OutputSchema
)
.pipe(stage1) // Group 0
.pipe(stage2) // Group 1
.parallel([stage3a, stage3b]) // Group 2 (concurrent, output: { "stage3a-id": ..., "stage3b-id": ... })
.pipe(stage4) // Group 3
.build();
// In stage4, access parallel outputs by stage ID:
ctx.require("stage3a-id") // output of stage3a
ctx.require("stage3b-id") // output of stage3b
workflow.getStageIds();
workflow.getExecutionPlan();
workflow.getDefaultConfig();
workflow.validateConfig(config);
When a workflow completes, the final execution group's output is persisted in WorkflowRun.output and included in the workflow:completed event.
Kernel Setup
import { createKernel } from "@bratsos/workflow-engine/kernel";
import type { Kernel, KernelConfig, Persistence, BlobStore, JobTransport, EventSink, Scheduler, Clock } from "@bratsos/workflow-engine/kernel";
const kernel = createKernel({
persistence, // Persistence port - runs, stages, logs, outbox, idempotency
blobStore, // BlobStore port - large payload storage
jobTransport, // JobTransport port - job queue
eventSink, // EventSink port - async event publishing
scheduler, // Scheduler port - deferred command triggers
clock, // Clock port - injectable time source
registry, // WorkflowRegistry - { getWorkflow(id) }
});
// Dispatch typed commands
const { workflowRunId } = await kernel.dispatch({
type: "run.create",
idempotencyKey: "unique-key",
workflowId: "my-workflow",
input: { data: "hello" },
});
Node Host
import { createNodeHost } from "@bratsos/workflow-engine-host-node";
const host = createNodeHost({
kernel,
jobTransport,
workerId: "worker-1",
orchestrationIntervalMs: 10_000,
jobPollIntervalMs: 1_000,
staleLeaseThresholdMs: 60_000,
});
await host.start(); // Starts polling loops + signal handlers
await host.stop(); // Graceful shutdown
host.getStats(); // { workerId, jobsProcessed, orchestrationTicks, isRunning, uptimeMs }
Serverless Host
import {
createServerlessHost,
type ServerlessHost,
type ServerlessHostConfig,
type JobMessage,
type JobResult,
type ProcessJobsResult,
type MaintenanceTickResult,
} from "@bratsos/workflow-engine-host-serverless";
const host = createServerlessHost({
kernel,
jobTransport,
workerId: "my-worker",
// Optional tuning (same defaults as Node host)
staleLeaseThresholdMs: 60_000,
maxClaimsPerTick: 10,
maxSuspendedChecksPerTick: 10,
maxOutboxFlushPerTick: 100,
});
handleJob(msg: JobMessage): Promise<JobResult>
Execute a single pre-dequeued job. Consumers wire platform-specific ack/retry around the result.
// JobMessage shape (matches queue message body)
interface JobMessage {
jobId: string;
workflowRunId: string;
workflowId: string;
stageId: string;
attempt: number;
maxAttempts?: number;
payload: Record<string, unknown>;
}
// JobResult
interface JobResult {
outcome: "completed" | "suspended" | "failed";
error?: string;
}
const result = await host.handleJob(msg);
if (result.outcome === "completed") msg.ack();
else if (result.outcome === "suspended") msg.ack();
else msg.retry();
processAvailableJobs(opts?): Promise<ProcessJobsResult>
Dequeue and process jobs from the job transport. Defaults to 1 job (safe for edge runtimes with CPU limits).
const result = await host.processAvailableJobs({ maxJobs: 5 });
// { processed: number, succeeded: number, failed: number }
runMaintenanceTick(): Promise<MaintenanceTickResult>
Run one bounded maintenance cycle: claim pending, poll suspended, reap stale, flush outbox, reap stuck runs.
const tick = await host.runMaintenanceTick();
// { claimed, suspendedChecked, staleReleased, eventsFlushed, stuckReaped }
// Note: resumed suspended stages are automatically followed by run.transition.
AI Integration & Cost Tracking
const ai = createAIHelper(
`workflow.${ctx.workflowRunId}.stage.${ctx.stageId}`,
aiCallLogger,
);
const { text, cost } = await ai.generateText("gemini-2.5-flash", prompt);
const { object } = await ai.generateObject("gemini-2.5-flash", prompt, schema);
const { embedding } = await ai.embed("text-embedding-004", ["text1"], { dimensions: 768 });
// OpenRouter embedding models (OpenAI, Cohere, etc.)
const { embedding } = await ai.embed("openai/text-embedding-3-small", ["text1"]);
// Provider-specific options passthrough (Voyage, Cohere, etc.)
const { embedding } = await ai.embed("voyage-4-large", ["text1"], {
providerOptions: { voyage: { outputDimension: 512, inputType: "document" } },
});
// Custom embedding providers (Voyage, Cohere, Jina, etc.)
import { registerEmbeddingProvider } from "@bratsos/workflow-engine";
import { voyage } from "voyage-ai-provider";
registerEmbeddingProvider("voyage", (modelId) => voyage.embeddingModel(modelId));
// Then register models with provider: "voyage" and use ai.embed() as usual
Persistence Setup
Required Prisma Models (ALL are required)
Copy the complete schema from the package README. This includes: WorkflowRun, WorkflowStage, WorkflowLog, WorkflowArtifact, AICall, JobQueue, OutboxEvent, IdempotencyKey.
Create Persistence
import {
createPrismaWorkflowPersistence,
createPrismaJobQueue,
createPrismaAICallLogger,
} from "@bratsos/workflow-engine/persistence/prisma";
const persistence = createPrismaWorkflowPersistence(prisma);
const jobQueue = createPrismaJobQueue(prisma);
const aiCallLogger = createPrismaAICallLogger(prisma);
// SQLite - MUST pass databaseType option
const persistence = createPrismaWorkflowPersistence(prisma, { databaseType: "sqlite" });
const jobQueue = createPrismaJobQueue(prisma, { databaseType: "sqlite" });
Testing
// In-memory persistence and job queue
import {
InMemoryWorkflowPersistence,
InMemoryJobQueue,
InMemoryAICallLogger,
} from "@bratsos/workflow-engine/testing";
// Kernel-specific test adapters
import {
FakeClock,
InMemoryBlobStore,
CollectingEventSink,
NoopScheduler,
} from "@bratsos/workflow-engine/kernel/testing";
// Create kernel with all in-memory adapters
const persistence = new InMemoryWorkflowPersistence();
const jobQueue = new InMemoryJobQueue();
const kernel = createKernel({
persistence,
blobStore: new InMemoryBlobStore(),
jobTransport: jobQueue,
eventSink: new CollectingEventSink(),
scheduler: new NoopScheduler(),
clock: new FakeClock(),
registry: { getWorkflow: (id) => workflows.get(id) },
});
// Test a full workflow lifecycle
await kernel.dispatch({ type: "run.create", idempotencyKey: "test", workflowId: "my-wf", input: {} });
await kernel.dispatch({ type: "run.claimPending", workerId: "test-worker" });
const job = await jobQueue.dequeue();
await kernel.dispatch({ type: "job.execute", workflowRunId: job.workflowRunId, workflowId: job.workflowId, stageId: job.stageId, config: {} });
await kernel.dispatch({ type: "run.transition", workflowRunId: job.workflowRunId });
Reference Files
- 01-stage-definitions.md - Complete stage API
- 02-workflow-builder.md - WorkflowBuilder patterns
- 03-kernel-host-setup.md - Kernel & host configuration
- 04-ai-integration.md - AI helper methods
- 05-persistence-setup.md - Database setup
- 06-async-batch-stages.md - Async operations
- 07-testing-patterns.md - Testing with kernel
- 08-common-patterns.md - Kernel patterns & best practices
- 09-troubleshooting.md - Debugging stuck runs, P2002 errors, ghost jobs
Key Principles
- Type Safety: All schemas are Zod - types flow through the entire pipeline
- Command Kernel: All operations are typed commands dispatched through
kernel.dispatch() - Environment-Agnostic: Kernel has no timers, no signals, no global state
- Context Access: Use
ctx.require()andctx.optional()for type-safe stage output access - Transactional Outbox: Events written to outbox, published via
outbox.flushcommand.job.executeandstage.pollSuspendeduse multi-phase transactions to avoid holding connections during external I/O - Idempotency:
run.createandjob.executereplay cached results by key; concurrent same-key dispatch throwsIdempotencyInProgressError - Authoritative Cancellation:
run.cancelcascades to stages + jobs. Ghost jobs (running against non-RUNNING runs) are detected viaghost: trueflag and not retried - Self-Healing: Stage creation is idempotent (upsert), orchestration steps are isolated, stuck runs are automatically reaped
- Cost Tracking: All AI calls automatically track tokens and costs
- BlobStore-Only Artifacts: All artifact storage goes through the BlobStore port.
run.rerunFromcleans up artifacts by key prefix
微信扫一扫