README
🚀 Apache Airflow MCP 服务器
Apache Airflow MCP 服务器是一个用于管理与 Apache Airflow 集成的第三方服务的实用工具。借助该服务器,用户能够轻松执行各类维护命令并处理相关任务,为 Airflow 的使用提供了更多便利。
✨ 主要特性
简介
MCP(Maintenance Command Processor)作为 Apache Airflow 的工具,专注于管理与 Airflow 集成的第三方服务。通过它,用户可执行多种维护命令并处理相关任务,提升 Airflow 的管理效率。
支持的 API 组
以下是 MCP 服务器支持的所有 API 组及其对应的接口: | 分组 | 接口 | | ---- | ---- | | config | GET /api/v1/config | | connections | GET, DELETE /api/v1/connections/{connection_id} | | dag | GET, POST /api/v1/dags, GET, DELETE /api/v1/dags/{dag_id}, PATCH /api/v1/dags/{dag_id}/paused | | dagrun | GET, POST /api/v1/dagruns, GET /api/v1/dagruns/{dag_run_id} | | dagstats | GET /api/v1/dags/statistics | | dataset | GET, DELETE /api/v1/datasets/{dataset_uri}, GET, DELETE /api/v1/datasets/{dataset_uri}/dag_runs/queued_events | | eventlog | GET, DELETE /api/v1/event_logs/{event_log_id} | | importerror | GET, DELETE /api/v1/import_errors/{import_error_id} | | monitoring | GET /api/v1/health | | plugin | GET /api/v1/plugins | | provider | GET /api/v1/providers | | taskinstance | GET, PATCH, DELETE /api/v1/task_instances/{task_instance_id}, GET /api/v1/dags/{dag_id}/task_instances | | variable | GET, DELETE /api/v1/variables/{variable_key} | | xcom | GET, DELETE /api/v1/xcoms/{xcom_key} |
📦 安装指南
依赖项
该项目依赖于官方 Apache Airflow 客户端库 (apache-airflow-client),安装时会自动处理。
环境变量
设置以下环境变量:
AIRFLOW_HOST=<你的-Airflow-主机>
AIRFLOW_USERNAME=<你的-Airflow-用户名>
AIRFLOW_PASSWORD=<你的-Airflow-密码>
与 Claude Desktop 集成
将以下内容添加到 claude_desktop_config.json 文件中:
{
"mcpServers": {
"mcp-server-apache-airflow": {
"command": "uvx",
"args": ["mcp-server-apache-airflow"],
"env": {
"AIRFLOW_HOST": "https://你的-Airflow-主机",
"AIRFLOW_USERNAME": "你的用户名",
"AIRFLOW_PASSWORD": "你的密码"
}
}
}
}
使用 uv 的替代配置如下:
{
"mcpServers": {
"mcp-server-apache-airflow": {
"command": "uv",
"args": [
"--directory",
"/实际路径/mcp-server-apache-airflow",
"run",
"mcp-server-apache-airflow"
],
"env": {
"AIRFLOW_HOST": "https://你的-Airflow-主机",
"AIRFLOW_USERNAME": "你的用户名",
"AIRFLOW_PASSWORD": "你的密码"
}
}
}
}
请将 /实际路径/mcp-server-apache-airflow 替换为您实际克隆代码的位置。
自定义 API 组
通过 --apis 标志选择要使用的 API 组:
# 启用所有默认的 API 组
$ mcp-server-apache-airflow --apis default
# 启用特定的 API 组(例如:config, dag)
$ mcp-server-apache-airflow --apis config,dag
💻 使用示例
环境变量配置
设置以下环境变量:
AIRFLOW_HOST:Airflow 服务器地址,默认为http://localhost:8080AIRFLOW_USERNAME:登录用户名,默认为adminAIRFLOW_PASSWORD:用户密码,默认为<PASSWORD>
启动 MCP 服务
在项目根目录下执行以下命令启动 MCP 服务器:
$ python -m mcp_server --apis default
🔧 技术细节
API 端点
配置接口
- 获取配置
GET /api/v1/config
连接接口
- 列出所有连接
GET /api/v1/connections - 删除指定连接
DELETE /api/v1/connections/{connection_id}
DAG 接口
- 创建或列出 DAGs
POST, GET /api/v1/dags - 获取指定 DAG 的信息
GET /api/v1/dags/{dag_id} - 删除指定 DAG
DELETE /api/v1/dags/{dag_id} - 暂停或恢复 DAG
PATCH /api/v1/dags/{dag_id}/paused
DAG 运行接口
- 创建或列出 DAG 运行
POST, GET /api/v1/dagruns - 获取指定 DAG 运行的信息
GET /api/v1/dagruns/{dag_run_id}
数据集接口
- 创建或列出数据集
POST, GET /api/v1/datasets - 删除指定数据集
DELETE /api/v1/datasets/{dataset_uri} - 获取与数据集关联的排队事件
GET /api/v1/datasets/{dataset_uri}/dag_runs/queued_events - 删除与数据集关联的排队事件
DELETE /api/v1/datasets/{dataset_uri}/dag_runs/queued_events
事件日志接口
- 获取指定事件日志
GET /api/v1/event_logs/{event_log_id} - 删除指定事件日志
DELETE /api/v1/event_logs/{event_log_id}
导入错误接口
- 获取指定导入错误
GET /api/v1/import_errors/{import_error_id} - 删除指定导入错误
DELETE /api/v1/import_errors/{import_error_id}
监控接口
- 获取系统健康状态
GET /api/v1/health
插件接口
- 列出所有插件
GET /api/v1/plugins
提供程序接口
- 列出所有提供程序
GET /api/v1/providers
任务实例接口
- 创建或列出任务实例
POST, GET /api/v1/task_instances - 获取指定任务实例的信息
GET /api/v1/task_instances/{task_instance_id} - 更新或删除指定任务实例
PATCH, DELETE /api/v1/task_instances/{task_instance_id} - 列出 DAG 下的任务实例
GET /api/v1/dags/{dag_id}/task_instances
变量接口
- 创建或列出变量
POST, GET /api/v1/variables - 获取指定变量的值
GET /api/v1/variables/{variable_key} - 删除指定变量
DELETE /api/v1/variables/{variable_key}
XCom 接口
- 创建或列出 XCom 数据
POST, GET /api/v1/xcoms - 获取指定 XCom 的值
GET /api/v1/xcoms/{xcom_key} - 删除指定 XCom
DELETE /api/v1/xcoms/{xcom_key}
🤝 贡献指南
如何贡献代码
- Fork 本仓库
- 创建功能分支
- 提交代码并创建 Pull Request
如何报告问题
请在 Issues 中提交您遇到的问题。
📄 许可证
[此处添加许可证信息]
Scan to join WeChat group