README
🚀 MCP Kafka 工具
这是一个基于 MCP(模型上下文协议)协议的 Kafka 工具,借助大模型实现与 Kafka 的连接。它具备多种实用功能,可查询指定主题信息、在指定主题中读写数据,还能回放指定时间和主题的数据。
🚀 快速开始
安装依赖
pip install -r requirements.txt
使用方式
本地模式(标准输入输出)
直接运行 mcp_kafka_server.py 文件:
python mcp_kafka_server.py
之后在 MCP 客户端(如 Claude、Cherry Studio 等)中连接本地 MCP 服务器。
Web 服务模式(SSE)
运行 mcp_kafka_sse_server.py 文件启动 Web 服务:
python mcp_kafka_sse_server.py
服务将启动在 http://localhost:8000,MCP 端点为 http://localhost:8000/mcp。然后在 MCP 客户端中连接这个远程 MCP 服务器。
✨ 主要特性
1. 列出所有主题
list_topics()
此功能可返回 Kafka 中所有可用的主题列表。
2. 获取主题信息
get_topic_info(topic_name)
该功能用于获取指定主题的详细信息,涵盖分区数量、每个分区的起始和结束偏移量以及消息数量。
3. 发送消息
send_message(topic_name, message, key)
向指定主题发送消息,message 可以是 JSON 字符串或普通字符串,key 为可选参数。
4. 读取消息
read_messages(topic_name, max_messages, timeout_ms)
从指定主题读取最新的消息,max_messages 指定最大读取消息数(默认 10 条),timeout_ms 指定超时时间(默认 1000 毫秒)。
5. 回放历史消息
replay_messages(topic_name, start_time, end_time, max_messages)
回放指定时间段内的消息,start_time 和 end_time 为 ISO 格式的时间字符串(如 '2023-01-01T00:00:00'),max_messages 指定最大回放消息数(默认 100 条)。
📚 详细文档
配置 Kafka 连接
默认连接到 localhost:9092,如需修改连接地址,请在 KafkaManager 类的初始化参数中修改 bootstrap_servers。
微信扫一扫