Back to MCP directory
publicPublicdnsLocal runtime

queue-pilot

Queue Pilot是一个用于消息队列开发的MCP服务器,支持RabbitMQ和Kafka,提供消息检查、JSON Schema验证和队列管理功能,可通过AI助手进行消息代理的开发和调试。

article

README

🚀 Queue Pilot

Queue Pilot 是一款用于消息队列开发的 MCP 服务器,它将消息检查与 JSON Schema 验证相结合,支持 RabbitMQKafka。专为多个团队通过消息代理进行通信的集成项目而设计,你可以通过 AI 助手检查队列/主题、查看消息,并根据商定的模式验证有效负载。

🚀 快速开始

1. 定义你的模式

在一个目录中创建 JSON Schema 文件,例如 schemas/order.created.json

{
  "$id": "order.created",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Order Created",
  "description": "Emitted when a new order is placed",
  "version": "1.0.0",
  "type": "object",
  "required": ["orderId", "amount"],
  "properties": {
    "orderId": { "type": "string" },
    "amount": { "type": "number" }
  }
}

2. 添加到你的 MCP 客户端

使用 queue-pilot init 为你的客户端生成配置:

npx queue-pilot init --schemas /absolute/path/to/your/schemas --client <name>

支持的客户端有:claude-codeclaude-desktopvscodecursorwindsurf。如果省略 --client,则生成通用的 JSON 配置。

对于 Kafka,添加 --broker kafka。生成的配置会自动包含所需的 @confluentinc/kafka-javascript 对等依赖项。

非默认凭证以环境变量的形式包含,以避免在 ps 输出中暴露机密信息:

npx queue-pilot init --schemas ./schemas --rabbitmq-user admin --rabbitmq-pass secret

运行 npx queue-pilot init --help 以查看包括 Kafka SASL 认证在内的所有选项。

⚠️ 重要提示

Windows 用户注意:如果 npx 无法解析包,请尝试 cmd /c npx queue-pilot init ...

手动配置(不使用 init)

将以下服务器配置添加到你的 MCP 客户端:

RabbitMQ:

{
  "mcpServers": {
    "queue-pilot": {
      "command": "npx",
      "args": [
        "-y",
        "queue-pilot",
        "--schemas", "/absolute/path/to/your/schemas"
      ]
    }
  }
}

Kafka:

{
  "mcpServers": {
    "queue-pilot": {
      "command": "npx",
      "args": [
        "-y",
        "--package=@confluentinc/kafka-javascript",
        "--package=queue-pilot",
        "queue-pilot",
        "--schemas", "/absolute/path/to/your/schemas",
        "--broker", "kafka"
      ],
      "env": {
        "KAFKA_BROKERS": "localhost:9092"
      }
    }
  }
}

💡 使用建议

模式路径提示:--schemas 请使用绝对路径。相对路径从 MCP 客户端的工作目录解析,可能不是你的项目根目录。

| 客户端 | 配置文件 | |--------|------------| | Claude Code | .mcp.json(项目)或 ~/.claude.json(用户) | | Claude Desktop | claude_desktop_config.json | | Cursor | .cursor/mcp.json | | VS Code (Copilot) | .vscode/mcp.json(使用 "servers" 而不是 "mcpServers") | | Windsurf | ~/.codeium/windsurf/mcp_config.json |

开发(从源代码运行)
{
  "mcpServers": {
    "queue-pilot": {
      "command": "npx",
      "args": [
        "tsx",
        "src/index.ts",
        "--schemas", "./schemas"
      ],
      "cwd": "/path/to/queue-pilot"
    }
  }
}

3. 使用它

向你的助手询问以下问题:

  • "有哪些队列,每个队列有多少条消息?"
  • "显示订单队列中的消息"
  • "检查注册队列,查看所有消息是否有效"
  • "有哪些可用的模式?"
  • "根据 order.created 模式验证此消息"
  • "向 events 交换器发布一个 order.created 事件"
  • "创建一个名为 dead-letters 的队列,并将其绑定到 events 交换器"
  • "清空订单队列中的所有消息"
  • "列出所有消费者组"(Kafka)
  • "显示订单主题的分区详细信息"(Kafka)
MCP 工具

通用工具(所有代理)

| 工具 | 描述 | |------|-------------| | list_schemas | 列出所有已加载的消息模式 | | get_schema | 获取特定模式的完整定义 | | validate_message | 根据模式验证 JSON 消息 | | list_queues | 列出所有队列/主题及其消息数量 | | peek_messages | 查看队列/主题中的消息而不消费它们 | | inspect_queue | 查看消息并根据其模式验证每条消息 | | get_overview | 获取代理集群概述 | | check_health | 检查代理健康状态 | | get_queue | 获取特定队列/主题的详细信息 | | list_consumers | 列出消费者(RabbitMQ)或消费者组(Kafka) | | publish_message | 发布消息,并可选择进行模式验证 | | purge_queue | 从队列/主题中删除所有消息 | | create_queue | 创建新的队列/主题 | | delete_queue | 删除队列/主题 |

RabbitMQ 特定工具

| 工具 | 描述 | |------|-------------| | list_exchanges | 列出所有 RabbitMQ 交换器 | | create_exchange | 创建新的交换器 | | delete_exchange | 删除交换器 | | list_bindings | 列出交换器和队列之间的绑定 | | create_binding | 使用路由键将队列绑定到交换器 | | delete_binding | 删除绑定 | | list_connections | 列出所有连接到代理的客户端连接 |

Kafka 特定工具

| 工具 | 描述 | |------|-------------| | list_consumer_groups | 列出所有消费者组及其状态 | | describe_consumer_group | 显示消费者组的成员、分配和状态 | | list_partitions | 显示主题的分区详细信息(领导者、副本、ISR) | | get_offsets | 显示每个分区的最早/最新偏移量 |

MCP 提示与资源

提示

预构建的工作流模板,可指导你的 AI 助手完成多步骤操作。

| 提示 | 参数 | 描述 | |--------|-----------|-------------| | debug-flow | exchange, queue | 跟踪从交换器到队列的绑定,查看消息,并根据其模式验证每条消息 | | health-report | (无) | 检查代理健康状况,获取集群概述,标记有积压消息的队列 | | schema-compliance | queue (可选) | 查看消息并根据其模式验证每条消息 — 针对一个队列或所有队列 |

使用示例(在任何支持 MCP 的客户端中):

"对交换器 'events' 和队列 'orders' 使用 debug-flow 提示"

资源

每个加载的模式都作为可读的 MCP 资源暴露在 schema:///<schema-name>

支持 MCP 资源的客户端可以直接读取模式定义,而无需调用工具。例如,从 order.created.json 加载的模式可在 schema:///order.created 访问。

✨ 主要特性

  • 多代理支持 — 通过统一的适配器接口支持 RabbitMQ 和 Apache Kafka。
  • 消息检查 — 浏览队列/主题,查看消息而不消费它们。
  • 模式验证 — 根据 JSON Schema 定义验证消息有效负载。
  • 组合检查inspect_queue 查看消息并根据其模式验证每条消息。
  • 验证发布publish_message 在发送前根据模式进行验证,无效消息不会到达代理。
  • 队列管理 — 创建队列/主题、绑定,并为开发/测试工作流清除消息。
  • 代理信息 — 列出交换器、绑定、消费者组和分区详细信息。

📦 安装指南

前提条件

  • Node.js >= 22 — 必需的运行时(使用 node --version 检查)。
  • 消息代理
    • RabbitMQ,启用 管理插件(HTTP API 端口为 15672)。
    • Apache Kafka(需要 @confluentinc/kafka-javascript 作为对等依赖项)。
  • 兼容 MCP 的客户端 — Claude Code、Claude Desktop、Cursor、VS Code (Copilot)、Windsurf 等。

📚 详细文档

模式格式

模式遵循 JSON Schema draft-07,并遵循以下约定:

  • $id — 消息类型标识符(与消息上的 type 属性匹配)。
  • version — 模式版本(自定义字段,JSON Schema 不验证)。
  • 标准 JSON Schema 验证,包括 requiredpropertiesformat 等。

模式匹配:检查队列时,消息的 type 属性用于通过 $id 查找相应的模式。

配置

CLI 参数优先于环境变量,环境变量优先于默认值。

| 设置 | CLI 标志 | 环境变量 | 默认值 | |---------|----------|---------|---------| | 模式目录 | --schemas | — | (必需) | | 代理类型 | --broker | — | rabbitmq | | RabbitMQ URL | --rabbitmq-url | RABBITMQ_URL | http://localhost:15672 | | RabbitMQ 用户 | --rabbitmq-user | RABBITMQ_USER | guest | | RabbitMQ 密码 | --rabbitmq-pass | RABBITMQ_PASS | guest | | Kafka 代理 | --kafka-brokers | KAFKA_BROKERS | localhost:9092 | | Kafka 客户端 ID | --kafka-client-id | KAFKA_CLIENT_ID | queue-pilot | | SASL 机制 | --kafka-sasl-mechanism | KAFKA_SASL_MECHANISM | (无) | | SASL 用户名 | --kafka-sasl-username | KAFKA_SASL_USERNAME | (无) | | SASL 密码 | --kafka-sasl-password | KAFKA_SASL_PASSWORD | (无) |

在 MCP 客户端的 env 块中使用环境变量,以避免在 ps 输出中暴露凭证。

🔧 技术细节

开发

npm install
npm test                    # 单元测试
npm run test:coverage       # 覆盖率报告
npm run build               # TypeScript 编译
npm run typecheck           # 类型检查

# 集成测试(需要 RabbitMQ)
docker compose up -d --wait
npm run test:integration

技术栈

📄 许可证

本项目采用 MIT 许可证。

help

Runtime guide

cloud

Hosted runtime

Hosted servers run from a provider-managed environment. You usually connect the MCP client to the hosted endpoint or follow the provider's authorization flow, without keeping a local process alive

  1. Open provider connection page
  2. Authorize or copy endpoint
  3. Connect from your MCP client
terminal

Local runtime / other methods

Local servers run on your own machine or infrastructure. You normally copy the server_config into your MCP client, install the required package, and provide env variables from env_schema when needed

  1. Copy server_config
  2. Install required package
  3. Fill env variables and restart client