README
🚀 Queue Pilot
Queue Pilot 是一款用于消息队列开发的 MCP 服务器,它将消息检查与 JSON Schema 验证相结合,支持 RabbitMQ 和 Kafka。专为多个团队通过消息代理进行通信的集成项目而设计,你可以通过 AI 助手检查队列/主题、查看消息,并根据商定的模式验证有效负载。
🚀 快速开始
1. 定义你的模式
在一个目录中创建 JSON Schema 文件,例如 schemas/order.created.json:
{
"$id": "order.created",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Order Created",
"description": "Emitted when a new order is placed",
"version": "1.0.0",
"type": "object",
"required": ["orderId", "amount"],
"properties": {
"orderId": { "type": "string" },
"amount": { "type": "number" }
}
}
2. 添加到你的 MCP 客户端
使用 queue-pilot init 为你的客户端生成配置:
npx queue-pilot init --schemas /absolute/path/to/your/schemas --client <name>
支持的客户端有:claude-code、claude-desktop、vscode、cursor、windsurf。如果省略 --client,则生成通用的 JSON 配置。
对于 Kafka,添加 --broker kafka。生成的配置会自动包含所需的 @confluentinc/kafka-javascript 对等依赖项。
非默认凭证以环境变量的形式包含,以避免在 ps 输出中暴露机密信息:
npx queue-pilot init --schemas ./schemas --rabbitmq-user admin --rabbitmq-pass secret
运行 npx queue-pilot init --help 以查看包括 Kafka SASL 认证在内的所有选项。
⚠️ 重要提示
Windows 用户注意:如果
npx无法解析包,请尝试cmd /c npx queue-pilot init ...。
手动配置(不使用 init)
将以下服务器配置添加到你的 MCP 客户端:
RabbitMQ:
{
"mcpServers": {
"queue-pilot": {
"command": "npx",
"args": [
"-y",
"queue-pilot",
"--schemas", "/absolute/path/to/your/schemas"
]
}
}
}
Kafka:
{
"mcpServers": {
"queue-pilot": {
"command": "npx",
"args": [
"-y",
"--package=@confluentinc/kafka-javascript",
"--package=queue-pilot",
"queue-pilot",
"--schemas", "/absolute/path/to/your/schemas",
"--broker", "kafka"
],
"env": {
"KAFKA_BROKERS": "localhost:9092"
}
}
}
}
💡 使用建议
模式路径提示:
--schemas请使用绝对路径。相对路径从 MCP 客户端的工作目录解析,可能不是你的项目根目录。
| 客户端 | 配置文件 |
|--------|------------|
| Claude Code | .mcp.json(项目)或 ~/.claude.json(用户) |
| Claude Desktop | claude_desktop_config.json |
| Cursor | .cursor/mcp.json |
| VS Code (Copilot) | .vscode/mcp.json(使用 "servers" 而不是 "mcpServers") |
| Windsurf | ~/.codeium/windsurf/mcp_config.json |
开发(从源代码运行)
{
"mcpServers": {
"queue-pilot": {
"command": "npx",
"args": [
"tsx",
"src/index.ts",
"--schemas", "./schemas"
],
"cwd": "/path/to/queue-pilot"
}
}
}
3. 使用它
向你的助手询问以下问题:
- "有哪些队列,每个队列有多少条消息?"
- "显示订单队列中的消息"
- "检查注册队列,查看所有消息是否有效"
- "有哪些可用的模式?"
- "根据 order.created 模式验证此消息"
- "向 events 交换器发布一个 order.created 事件"
- "创建一个名为 dead-letters 的队列,并将其绑定到 events 交换器"
- "清空订单队列中的所有消息"
- "列出所有消费者组"(Kafka)
- "显示订单主题的分区详细信息"(Kafka)
MCP 工具
通用工具(所有代理)
| 工具 | 描述 |
|------|-------------|
| list_schemas | 列出所有已加载的消息模式 |
| get_schema | 获取特定模式的完整定义 |
| validate_message | 根据模式验证 JSON 消息 |
| list_queues | 列出所有队列/主题及其消息数量 |
| peek_messages | 查看队列/主题中的消息而不消费它们 |
| inspect_queue | 查看消息并根据其模式验证每条消息 |
| get_overview | 获取代理集群概述 |
| check_health | 检查代理健康状态 |
| get_queue | 获取特定队列/主题的详细信息 |
| list_consumers | 列出消费者(RabbitMQ)或消费者组(Kafka) |
| publish_message | 发布消息,并可选择进行模式验证 |
| purge_queue | 从队列/主题中删除所有消息 |
| create_queue | 创建新的队列/主题 |
| delete_queue | 删除队列/主题 |
RabbitMQ 特定工具
| 工具 | 描述 |
|------|-------------|
| list_exchanges | 列出所有 RabbitMQ 交换器 |
| create_exchange | 创建新的交换器 |
| delete_exchange | 删除交换器 |
| list_bindings | 列出交换器和队列之间的绑定 |
| create_binding | 使用路由键将队列绑定到交换器 |
| delete_binding | 删除绑定 |
| list_connections | 列出所有连接到代理的客户端连接 |
Kafka 特定工具
| 工具 | 描述 |
|------|-------------|
| list_consumer_groups | 列出所有消费者组及其状态 |
| describe_consumer_group | 显示消费者组的成员、分配和状态 |
| list_partitions | 显示主题的分区详细信息(领导者、副本、ISR) |
| get_offsets | 显示每个分区的最早/最新偏移量 |
MCP 提示与资源
提示
预构建的工作流模板,可指导你的 AI 助手完成多步骤操作。
| 提示 | 参数 | 描述 |
|--------|-----------|-------------|
| debug-flow | exchange, queue | 跟踪从交换器到队列的绑定,查看消息,并根据其模式验证每条消息 |
| health-report | (无) | 检查代理健康状况,获取集群概述,标记有积压消息的队列 |
| schema-compliance | queue (可选) | 查看消息并根据其模式验证每条消息 — 针对一个队列或所有队列 |
使用示例(在任何支持 MCP 的客户端中):
"对交换器 'events' 和队列 'orders' 使用 debug-flow 提示"
资源
每个加载的模式都作为可读的 MCP 资源暴露在 schema:///<schema-name>。
支持 MCP 资源的客户端可以直接读取模式定义,而无需调用工具。例如,从 order.created.json 加载的模式可在 schema:///order.created 访问。
✨ 主要特性
- 多代理支持 — 通过统一的适配器接口支持 RabbitMQ 和 Apache Kafka。
- 消息检查 — 浏览队列/主题,查看消息而不消费它们。
- 模式验证 — 根据 JSON Schema 定义验证消息有效负载。
- 组合检查 —
inspect_queue查看消息并根据其模式验证每条消息。 - 验证发布 —
publish_message在发送前根据模式进行验证,无效消息不会到达代理。 - 队列管理 — 创建队列/主题、绑定,并为开发/测试工作流清除消息。
- 代理信息 — 列出交换器、绑定、消费者组和分区详细信息。
📦 安装指南
前提条件
- Node.js >= 22 — 必需的运行时(使用
node --version检查)。 - 消息代理:
- RabbitMQ,启用 管理插件(HTTP API 端口为 15672)。
- Apache Kafka(需要
@confluentinc/kafka-javascript作为对等依赖项)。
- 兼容 MCP 的客户端 — Claude Code、Claude Desktop、Cursor、VS Code (Copilot)、Windsurf 等。
📚 详细文档
模式格式
模式遵循 JSON Schema draft-07,并遵循以下约定:
$id— 消息类型标识符(与消息上的type属性匹配)。version— 模式版本(自定义字段,JSON Schema 不验证)。- 标准 JSON Schema 验证,包括
required、properties、format等。
模式匹配:检查队列时,消息的 type 属性用于通过 $id 查找相应的模式。
配置
CLI 参数优先于环境变量,环境变量优先于默认值。
| 设置 | CLI 标志 | 环境变量 | 默认值 |
|---------|----------|---------|---------|
| 模式目录 | --schemas | — | (必需) |
| 代理类型 | --broker | — | rabbitmq |
| RabbitMQ URL | --rabbitmq-url | RABBITMQ_URL | http://localhost:15672 |
| RabbitMQ 用户 | --rabbitmq-user | RABBITMQ_USER | guest |
| RabbitMQ 密码 | --rabbitmq-pass | RABBITMQ_PASS | guest |
| Kafka 代理 | --kafka-brokers | KAFKA_BROKERS | localhost:9092 |
| Kafka 客户端 ID | --kafka-client-id | KAFKA_CLIENT_ID | queue-pilot |
| SASL 机制 | --kafka-sasl-mechanism | KAFKA_SASL_MECHANISM | (无) |
| SASL 用户名 | --kafka-sasl-username | KAFKA_SASL_USERNAME | (无) |
| SASL 密码 | --kafka-sasl-password | KAFKA_SASL_PASSWORD | (无) |
在 MCP 客户端的 env 块中使用环境变量,以避免在 ps 输出中暴露凭证。
🔧 技术细节
开发
npm install
npm test # 单元测试
npm run test:coverage # 覆盖率报告
npm run build # TypeScript 编译
npm run typecheck # 类型检查
# 集成测试(需要 RabbitMQ)
docker compose up -d --wait
npm run test:integration
技术栈
- TypeScript(严格模式,ESM)
- MCP SDK v1.26.0
- Ajv 用于 JSON Schema 验证
- Zod 用于 MCP 工具参数定义
- Vitest 用于测试
- RabbitMQ 管理 HTTP API
- Confluent Kafka JavaScript(可选,用于 Kafka 支持)
📄 许可证
本项目采用 MIT 许可证。
Scan to join WeChat group