article
README
🚀 MCP 服务器演示
本项目是一个基于 MCP(模型控制协议)和 Kafka 构建的生产就绪任务管理系统,它实现了 AI 代理与 Kafka 任务队列的交互,能高效管理任务并处理通知。
🚀 快速开始
本系统支持任务管理、通知处理以及通过 Kafka 实时处理事件。以下是快速启动系统的步骤:
- 克隆仓库并安装依赖:
# 克隆仓库
git clone https://github.com/yourusername/mcp-server-demo.git
cd mcp-server-demo
# 安装依赖项
pip install -e .
- 更新
kafka_config.py中的 Kafka 配置为您的实际 Kafka 集群详细信息:
KAFKA_CONFIG = {
'bootstrap.servers': 'your-kafka-bootstrap-servers',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'SCRAM-SHA-512',
'sasl.username': 'your-username',
'sasl.password': 'your-password',
}
- 启动服务器:
python main.py
- 加载测试数据:
python kafka_test_data.py
✨ 主要特性
- 任务管理:可创建、更新、调整优先级和完成生产任务。
- 通知系统:具备实时通知功能,且带有优先级级别。
- Kafka 集成:实现可靠的消息队列和事件流处理。
- MCP 工具:为 AI 代理提供友好型接口,用于任务和通知操作。
- 消费者服务:基于 Kafka 消息进行背景处理。
📦 安装指南
环境要求
- Python 3.13+
- Kafka 集群(本地或 AWS MSK)
- Confluent Kafka Python 客户端
安装步骤
# 克隆仓库
git clone https://github.com/yourusername/mcp-server-demo.git
cd mcp-server-demo
# 安装依赖项
pip install -e .
💻 使用示例
基础用法
启动服务器
python main.py
加载测试数据
python kafka_test_data.py
高级用法
该系统为 AI 代理暴露了一系列 MCP 工具,可用于任务和通知管理。
任务管理
fetch_queue:获取待处理任务列表change_task_priority:更新任务优先级pickup_task:标记任务为进行中complete_task:标记任务为完成get_task_details:获取任务详细信息check_task_status:检查任务当前状态
通知管理
check_notification_count:获取未读通知数量get_notification_list:获取过滤后的通知列表mark_notification_as_read:标记通知为已读
📚 详细文档
架构
该系统包含以下组件:
- MCP 服务器:为 AI 代理提供交互接口。
- Kafka 生产者:向 Kafka 主题发送消息。
- Kafka 消费者:从 Kafka 主题处理消息。
- 任务服务:负责任务管理的业务逻辑。
- 通知服务:负责通知处理的业务逻辑。
项目结构
mcp-server-demo/
├── main.py # MCP 服务器初始化
├── kafka_config.py # Kafka 配置
├── consumer_service.py # Kafka 消费者服务
├── task_service.py # 任务管理逻辑
├── notification_service.py # 通知处理逻辑
└── kafka_test_data.py # 测试数据生成器
📄 许可证
本项目采用 MIT 许可证。
贡献
欢迎贡献!请随意提交 Pull Request。
Scan to join WeChat group