Upgrading between versions
When the user wants to upgrade @bratsos/workflow-engine in their project (e.g., "upgrade my project to the latest workflow-engine version", "I just bumped workflow-engine, walk me through the migration"), route to migrations/README.md — the upgrade router. It explains how to detect the installed and previous versions, find the relevant migration guides, and apply them in order. Multi-version upgrades (e.g., 0.6 → 0.8) load and apply multiple migration files sequentially.
@bratsos/workflow-engine Skill
Type-safe workflow engine for building AI-powered, multi-stage pipelines with persistence and batch processing support. Uses a command kernel architecture with environment-agnostic design.
Architecture Overview
The engine follows a kernel + host pattern:
- Core library (
@bratsos/workflow-engine) - Command kernel, stage/workflow definitions, persistence adapters - Node Host (
@bratsos/workflow-engine-host-node) - Long-running worker with polling loops and signal handling - Serverless Host (
@bratsos/workflow-engine-host-serverless) - Stateless single-invocation for edge/lambda/workers - Remote Host (
@bratsos/workflow-engine-host-remote) - Credential-free remote activity workers: run a stage'sexecute()on a separate, disposable machine (no DB connection, no root object-store credentials)
The kernel is a pure command dispatcher. All workflow operations are expressed as typed commands dispatched via kernel.dispatch(). Hosts wrap the kernel with environment-specific process management.
When to Apply
- User wants to create workflow stages or pipelines
- User mentions
defineStage,defineAsyncBatchStage,WorkflowBuilder - User is implementing workflow persistence with Prisma
- User needs AI integration (generateText, generateObject, embeddings, batch)
- User is building multi-stage data processing pipelines
- User mentions kernel, command dispatch, or job execution
- User wants to set up a Node.js worker or serverless worker
- User wants to run a stage on a separate / remote / disposable machine, or mentions credential-free workers,
defineRemoteStage, theActivityExecutorport, or offloading heavy stages (transcoding, ffmpeg, batch inference) - User wants to rerun a workflow from a specific stage
- User needs to test workflows with in-memory adapters
Quick Start
import { defineStage, WorkflowBuilder } from "@bratsos/workflow-engine";
import { createKernel } from "@bratsos/workflow-engine/kernel";
import { createNodeHost } from "@bratsos/workflow-engine-host-node";
import {
createPrismaWorkflowPersistence,
createPrismaJobQueue,
} from "@bratsos/workflow-engine";
import { z } from "zod";
// 1. Define a stage
const processStage = defineStage({
id: "process",
name: "Process Data",
schemas: {
input: z.object({ data: z.string() }),
output: z.object({ result: z.string() }),
config: z.object({ verbose: z.boolean().default(false) }),
},
async execute(ctx) {
return { output: { result: ctx.input.data.toUpperCase() } };
},
});
// 2. Build a workflow
const workflow = new WorkflowBuilder(
"my-workflow", "My Workflow", "Processes data",
z.object({ data: z.string() }),
z.object({ result: z.string() })
)
.pipe(processStage)
.build();
// 3. Create kernel
const kernel = createKernel({
persistence: createPrismaWorkflowPersistence(prisma),
blobStore: myBlobStore,
jobTransport: createPrismaJobQueue(prisma),
eventSink: myEventSink,
scheduler: myScheduler,
clock: { now: () => new Date() },
registry: { getWorkflow: (id) => (id === "my-workflow" ? workflow : undefined) },
});
// 4. Start a Node host
const host = createNodeHost({
kernel,
jobTransport: createPrismaJobQueue(prisma),
workerId: "worker-1",
});
await host.start();
// 5. Dispatch a command
await kernel.dispatch({
type: "run.create",
idempotencyKey: crypto.randomUUID(),
workflowId: "my-workflow",
input: { data: "hello" },
});
Core Exports Reference
| Export | Type | Import Path | Purpose |
|--------|------|-------------|---------|
| defineStage | Function | @bratsos/workflow-engine | Create sync stages |
| defineAsyncBatchStage | Function | @bratsos/workflow-engine | Create async/batch stages |
| WorkflowBuilder | Class | @bratsos/workflow-engine | Chain stages into workflows |
| createKernel | Function | @bratsos/workflow-engine/kernel | Create command kernel |
| createNodeHost | Function | @bratsos/workflow-engine-host-node | Create Node.js host |
| createServerlessHost | Function | @bratsos/workflow-engine-host-serverless | Create serverless host |
| defineRemoteStage / createActivityWorker | Function | @bratsos/workflow-engine-host-remote | Run a stage on a credential-free remote worker (see 11-remote-activity-workers.md) |
| createRoutingExecutor / createLocalExecutor | Function | @bratsos/workflow-engine/kernel | ActivityExecutor port: route specific stages to a remote executor / default in-process executor |
| createAIHelper | Function | @bratsos/workflow-engine | AI operations (text, object, embed, batch) |
| registerEmbeddingProvider | Function | @bratsos/workflow-engine | Register custom embedding providers (Voyage, Cohere, etc.) |
| createStageIds | Function | @bratsos/workflow-engine | Create stage ID constants from a workflow |
| defineStageIds | Function | @bratsos/workflow-engine | Define stage ID constants from a tuple |
| isValidStageId | Function | @bratsos/workflow-engine | Runtime stage ID validation |
| assertValidStageId | Function | @bratsos/workflow-engine | Assert stage ID validity (throws) |
| definePlugin | Function | @bratsos/workflow-engine/kernel | Define kernel plugins |
| createPluginRunner | Function | @bratsos/workflow-engine/kernel | Create plugin event processor |
| typedKey | Function | @bratsos/workflow-engine/conventions | Define a well-known annotation key with linked value type |
| Trigger / Decision / Approval / Revision | Constants | @bratsos/workflow-engine/conventions | Well-known annotation key namespaces (v0.8+) |
Kernel Commands
All operations go through kernel.dispatch(command):
| Command | Description |
|---------|-------------|
| run.create | Create a new workflow run |
| run.claimPending | Claim pending runs, enqueue first-stage jobs |
| run.transition | Advance to next stage group or complete |
| run.cancel | Cancel a running workflow (authoritative: cascades to stages + jobs) |
| run.rerunFrom | Rerun from a specific stage (cleans up blob artifacts by prefix) |
| job.execute | Execute a single stage (uses multi-phase transactions; see 08-common-patterns.md) |
| stage.pollSuspended | Poll suspended stages for readiness (skips cancelled runs; per-stage transactions) |
| lease.reapStale | Release stale job leases |
| run.reapStuck | Detect and fail RUNNING runs with no recent activity |
| outbox.flush | Publish pending outbox events |
| plugin.replayDLQ | Replay dead-letter queue events |
Stage Definition
Sync Stage
const myStage = defineStage({
id: "my-stage",
name: "My Stage",
description: "Optional",
dependencies: ["prev"],
schemas: {
input: InputSchema, // Zod schema or "none"
output: OutputSchema,
config: ConfigSchema,
},
async execute(ctx) {
const { input, config, workflowContext } = ctx;
const prevOutput = ctx.require("prev");
const optOutput = ctx.optional("other");
await ctx.log("INFO", "Processing...");
return {
output: { ... },
customMetrics: { itemsProcessed: 10 },
};
},
});
Async Batch Stage
const batchStage = defineAsyncBatchStage({
id: "batch-process",
name: "Batch Process",
mode: "async-batch",
schemas: { input: "none", output: OutputSchema, config: ConfigSchema },
async execute(ctx) {
if (ctx.resumeState) {
return { output: ctx.resumeState.cachedResult };
}
const batchId = await submitBatchJob(ctx.input);
return {
suspended: true,
state: {
batchId,
submittedAt: new Date().toISOString(),
pollInterval: 60000,
maxWaitTime: 3600000,
},
pollConfig: { pollInterval: 60000, maxWaitTime: 3600000, nextPollAt: new Date(Date.now() + 60000) },
};
},
async checkCompletion(suspendedState, ctx) {
const status = await checkBatchStatus(suspendedState.batchId);
if (status === "completed") return { ready: true, output: { results } };
if (status === "failed") return { ready: false, error: "Batch failed" };
return { ready: false, nextCheckIn: 60000 };
},
});
WorkflowBuilder
Workflows are linear pipelines of execution groups. .pipe() creates single-stage groups; .parallel() creates multi-stage groups. Parallel group outputs are keyed by stage ID in the workflow context.
const workflow = new WorkflowBuilder(
"workflow-id", "Workflow Name", "Description",
InputSchema, OutputSchema
)
.pipe(stage1) // Group 0
.pipe(stage2) // Group 1
.parallel([stage3a, stage3b]) // Group 2 (concurrent, output: { "stage3a-id": ..., "stage3b-id": ... })
.pipe(stage4) // Group 3
.build();
// In stage4, access parallel outputs by stage ID:
ctx.require("stage3a-id") // output of stage3a
ctx.require("stage3b-id") // output of stage3b
workflow.getStageIds();
workflow.getExecutionPlan();
workflow.getDefaultConfig();
workflow.validateConfig(config);
When a workflow completes, the final execution group's output is persisted in WorkflowRun.output and included in the workflow:completed event.
Kernel Setup
import { createKernel } from "@bratsos/workflow-engine/kernel";
import type { Kernel, KernelConfig, Persistence, BlobStore, JobTransport, EventSink, Scheduler, Clock } from "@bratsos/workflow-engine/kernel";
const kernel = createKernel({
persistence, // Persistence port - runs, stages, logs, outbox, idempotency
blobStore, // BlobStore port - large payload storage
jobTransport, // JobTransport port - job queue
eventSink, // EventSink port - async event publishing
scheduler, // Scheduler port - deferred command triggers
clock, // Clock port - injectable time source
registry, // WorkflowRegistry - { getWorkflow(id) }
// executor, // optional ActivityExecutor port - defaults to in-process; inject to run stages on remote workers (see 11-remote-activity-workers.md)
});
// Dispatch typed commands
const { workflowRunId } = await kernel.dispatch({
type: "run.create",
idempotencyKey: "unique-key",
workflowId: "my-workflow",
input: { data: "hello" },
});
Node Host
import { createNodeHost } from "@bratsos/workflow-engine-host-node";
const host = createNodeHost({
kernel,
jobTransport,
workerId: "worker-1",
orchestrationIntervalMs: 10_000,
jobPollIntervalMs: 1_000,
staleLeaseThresholdMs: 60_000,
});
await host.start(); // Starts polling loops + signal handlers
await host.stop(); // Graceful shutdown
host.getStats(); // { workerId, jobsProcessed, orchestrationTicks, isRunning, uptimeMs }
Serverless Host
import {
createServerlessHost,
type ServerlessHost,
type ServerlessHostConfig,
type JobMessage,
type JobResult,
type ProcessJobsResult,
type MaintenanceTickResult,
} from "@bratsos/workflow-engine-host-serverless";
const host = createServerlessHost({
kernel,
jobTransport,
workerId: "my-worker",
// Optional tuning (same defaults as Node host)
staleLeaseThresholdMs: 60_000,
maxClaimsPerTick: 10,
maxSuspendedChecksPerTick: 10,
maxOutboxFlushPerTick: 100,
});
handleJob(msg: JobMessage): Promise<JobResult>
Execute a single pre-dequeued job. Consumers wire platform-specific ack/retry around the result.
// JobMessage shape (matches queue message body)
interface JobMessage {
jobId: string;
workflowRunId: string;
workflowId: string;
stageId: string;
attempt: number;
maxAttempts?: number;
payload: Record<string, unknown>;
}
// JobResult
interface JobResult {
outcome: "completed" | "suspended" | "failed";
error?: string;
}
const result = await host.handleJob(msg);
if (result.outcome === "completed") msg.ack();
else if (result.outcome === "suspended") msg.ack();
else msg.retry();
processAvailableJobs(opts?): Promise<ProcessJobsResult>
Dequeue and process jobs from the job transport. Defaults to 1 job (safe for edge runtimes with CPU limits).
const result = await host.processAvailableJobs({ maxJobs: 5 });
// { processed: number, succeeded: number, failed: number }
runMaintenanceTick(): Promise<MaintenanceTickResult>
Run one bounded maintenance cycle: claim pending, poll suspended, reap stale, flush outbox, reap stuck runs.
const tick = await host.runMaintenanceTick();
// { claimed, suspendedChecked, staleReleased, eventsFlushed, stuckReaped }
// Note: resumed suspended stages are automatically followed by run.transition.
Remote Activity Workers
Run a stage's execute() on a separate, credential-free machine (no database connection, no root object-store credentials) via the @bratsos/workflow-engine-host-remote package. The orchestrator owns all state; a remote worker leases the task, runs the real stage code, writes large artifacts directly to object storage by reference, and reports back — all driven through the engine's existing suspend/resume machinery (no new DB table).
Two wiring models:
- Proxy stage (recommended for long stages):
defineRemoteStage(realStage, transport, opts?)suspends immediately (releasing the kernel job lease) and resumes when the worker reports. ActivityExecutorport (short stages / in-core routing): injectcreateRemoteExecutor(transport)— orcreateRoutingExecutor({ remote, remoteStageIds })to route only specific stages — viacreateKernel({ executor }). Backward-compatible: the defaultcreateLocalExecutor()is byte-for-byte the in-process behavior.
import { defineRemoteStage } from "@bratsos/workflow-engine-host-remote";
// Orchestrator: wrap a heavy stage so it runs on a remote worker
const workflow = new WorkflowBuilder(...)
.pipe(defineRemoteStage(heavyStage, oTransport, { maxWaitMs: 3_600_000, stageCodeVersion: "v1" }))
.pipe(coreStage)
.build();
The worker runs in a separate process/machine with zero standing credentials (createActivityWorker + createHttpWorkerTransport), receiving a presigned URL per artifact. See references/11-remote-activity-workers.md for the worker, broker, HTTP transport, S3/R2 artifacts, durability, and limitations.
Annotations (Provenance)
Attach typed key-value facts to runs and stages for queryable provenance — trigger context, decisions, approvals, anything else you'd want to ask back later. Writes are buffered during a stage and flushed atomically with the stage outcome (durable, not fire-and-forget).
import { Decision, Trigger } from "@bratsos/workflow-engine/conventions";
// Inside a stage's execute()
ctx.annotate(Decision.outcome, "low"); // typed
ctx.annotate(Decision.confidence, 0.42); // typed
ctx.annotate("acme.compliance.signoff", "alice@acme.com"); // custom key
ctx.annotate({
actor: { kind: "agent", id: "triage-v3" },
attributes: {
"decision.outcome": "low",
"decision.rationale": "below threshold",
"decision.used_fallback": true,
},
});
// At run creation — captures trigger context
await kernel.dispatch({
type: "run.create",
workflowId: "ticket-triage",
input: { ticket },
annotations: [{
actor: { kind: "system", id: "zendesk" },
attributes: {
"trigger.source": "webhook:zendesk",
"trigger.parent_run_id": previousRunId,
},
}],
});
// External attach (plugins, post-hoc reviews)
await kernel.annotations.attach(runId, {
actor: { kind: "user", id: "alice@acme.com" },
attributes: { "review.disposition": "approved-anyway" },
idempotencyKey: "review-2026-05-24-alice",
});
// Query — flat key namespace, indexed
await kernel.annotations.list(runId);
await kernel.annotations.list(runId, { keyPrefix: "decision." });
await kernel.annotations.list(runId, { actorId: "triage-v3" });
Annotations replace the deprecated WorkflowRun.metadata column. Existing metadata is automatically surfaced as legacy.metadata.* virtual rows when consumers call kernel.annotations.list() (no dual-write, lazy synthesis). See references/10-annotations.md for the full API and conventions catalog.
AI Integration & Cost Tracking
const ai = createAIHelper(
`workflow.${ctx.workflowRunId}.stage.${ctx.stageId}`,
aiCallLogger,
);
const { text, cost } = await ai.generateText("gemini-2.5-flash", prompt);
const { object } = await ai.generateObject("gemini-2.5-flash", prompt, schema);
const { embedding } = await ai.embed("text-embedding-004", ["text1"], { dimensions: 768 });
// OpenRouter embedding models (OpenAI, Cohere, etc.)
const { embedding } = await ai.embed("openai/text-embedding-3-small", ["text1"]);
// Provider-specific options passthrough (Voyage, Cohere, etc.)
const { embedding } = await ai.embed("voyage-4-large", ["text1"], {
providerOptions: { voyage: { outputDimension: 512, inputType: "document" } },
});
// Custom embedding providers (Voyage, Cohere, Jina, etc.)
import { registerEmbeddingProvider } from "@bratsos/workflow-engine";
import { voyage } from "voyage-ai-provider";
registerEmbeddingProvider("voyage", (modelId) => voyage.embeddingModel(modelId));
// Then register models with provider: "voyage" and use ai.embed() as usual
Persistence Setup
Required Prisma Models (ALL are required)
Copy the complete schema from the package README. This includes: WorkflowRun, WorkflowStage, WorkflowLog, WorkflowArtifact, AICall, JobQueue, OutboxEvent, IdempotencyKey.
Create Persistence
import {
createPrismaWorkflowPersistence,
createPrismaJobQueue,
createPrismaAICallLogger,
} from "@bratsos/workflow-engine/persistence/prisma";
const persistence = createPrismaWorkflowPersistence(prisma);
const jobQueue = createPrismaJobQueue(prisma);
const aiCallLogger = createPrismaAICallLogger(prisma);
// SQLite - MUST pass databaseType option
const persistence = createPrismaWorkflowPersistence(prisma, { databaseType: "sqlite" });
const jobQueue = createPrismaJobQueue(prisma, { databaseType: "sqlite" });
Testing
// In-memory persistence and job queue
import {
InMemoryWorkflowPersistence,
InMemoryJobQueue,
InMemoryAICallLogger,
} from "@bratsos/workflow-engine/testing";
// Kernel-specific test adapters
import {
FakeClock,
InMemoryBlobStore,
CollectingEventSink,
NoopScheduler,
} from "@bratsos/workflow-engine/kernel/testing";
// Create kernel with all in-memory adapters
const persistence = new InMemoryWorkflowPersistence();
const jobQueue = new InMemoryJobQueue();
const kernel = createKernel({
persistence,
blobStore: new InMemoryBlobStore(),
jobTransport: jobQueue,
eventSink: new CollectingEventSink(),
scheduler: new NoopScheduler(),
clock: new FakeClock(),
registry: { getWorkflow: (id) => workflows.get(id) },
});
// Test a full workflow lifecycle
await kernel.dispatch({ type: "run.create", idempotencyKey: "test", workflowId: "my-wf", input: {} });
await kernel.dispatch({ type: "run.claimPending", workerId: "test-worker" });
const job = await jobQueue.dequeue();
await kernel.dispatch({ type: "job.execute", workflowRunId: job.workflowRunId, workflowId: job.workflowId, stageId: job.stageId, config: {} });
await kernel.dispatch({ type: "run.transition", workflowRunId: job.workflowRunId });
Reference Files
- 01-stage-definitions.md - Complete stage API
- 02-workflow-builder.md - WorkflowBuilder patterns
- 03-kernel-host-setup.md - Kernel & host configuration
- 04-ai-integration.md - AI helper methods
- 05-persistence-setup.md - Database setup
- 06-async-batch-stages.md - Async operations
- 07-testing-patterns.md - Testing with kernel
- 08-common-patterns.md - Kernel patterns & best practices
- 09-troubleshooting.md - Debugging stuck runs, P2002 errors, ghost jobs
- 10-annotations.md - First-class provenance surface:
ctx.annotate,kernel.annotations.*, conventions catalog - 11-remote-activity-workers.md - Credential-free remote workers:
defineRemoteStage, broker, worker SDK, HTTP transport, S3/R2 artifacts,ActivityExecutorport
Key Principles
- Type Safety: All schemas are Zod - types flow through the entire pipeline
- Command Kernel: All operations are typed commands dispatched through
kernel.dispatch() - Environment-Agnostic: Kernel has no timers, no signals, no global state
- Context Access: Use
ctx.require()andctx.optional()for type-safe stage output access - Transactional Outbox: Events written to outbox, published via
outbox.flushcommand.job.executeandstage.pollSuspendeduse multi-phase transactions to avoid holding connections during external I/O - Idempotency:
run.createandjob.executereplay cached results by key; concurrent same-key dispatch throwsIdempotencyInProgressError - Authoritative Cancellation:
run.cancelcascades to stages + jobs. Ghost jobs (running against non-RUNNING runs) are detected viaghost: trueflag and not retried - Self-Healing: Stage creation is idempotent (upsert), orchestration steps are isolated, stuck runs are automatically reaped
- Cost Tracking: All AI calls automatically track tokens and costs
- BlobStore-Only Artifacts: All artifact storage goes through the BlobStore port.
run.rerunFromcleans up artifacts by key prefix - Durable Provenance:
ctx.annotate(...)writes are buffered and flushed inside the stage-completion transaction. Annotations are atomic with the stage outcome — a stage's annotations either all persist or all roll back together with the stage update and outbox events. - Pluggable Execution: stage execution goes through an injectable
ActivityExecutorport (default in-processLocalExecutor). Inject a remote executor — or wrap a stage withdefineRemoteStage— to runexecute()on a separate credential-free machine without changing kernel internals.
Scan to join WeChat group