FastAPI-MCP消息队列:异步处理长时间运行任务的MCP工具
2026-02-04 05:08:18作者:段琳惟
引言:AI时代的长时任务处理挑战
在人工智能应用快速发展的今天,模型上下文协议(Model Context Protocol,MCP)已成为连接AI助手与外部服务的标准桥梁。然而,当面对需要长时间运行的计算密集型任务时,传统的同步请求-响应模式往往力不从心。你是否遇到过以下痛点:
- 超时中断:AI助手调用API时,长时间任务因超时而被中断
- 资源阻塞:同步处理占用宝贵的工作线程,影响系统吞吐量
- 状态丢失:任务执行过程中断后,难以恢复执行状态
- 进度反馈缺失:用户无法实时了解长时间任务的执行进度
FastAPI-MCP结合消息队列技术,为这些问题提供了优雅的解决方案。本文将深入探讨如何利用FastAPI-MCP构建支持异步长时间任务处理的MCP工具。
FastAPI-MCP核心架构解析
MCP协议基础架构
flowchart TD
A[MCP Client<br>AI助手] --> B[FastAPI-MCP Server]
B --> C[FastAPI Application]
C --> D[Background Tasks<br>消息队列]
D --> E[Task Workers<br>异步处理]
E --> F[Result Storage<br>Redis/Database]
F --> B
B --> A
关键技术组件对比
| 组件 | 作用 | 优势 | 适用场景 |
|---|---|---|---|
| FastAPI-MCP | MCP协议转换 | 零配置自动暴露API | 快速集成现有服务 |
| Celery | 分布式任务队列 | 支持多种消息代理 | 大规模分布式处理 |
| RQ (Redis Queue) | Redis任务队列 | 简单轻量 | 中小规模应用 |
| ARQ | 异步Redis队列 | 原生asyncio支持 | 高并发异步场景 |
实战:构建支持消息队列的MCP工具
环境准备与依赖安装
# 使用uv安装核心依赖
uv add fastapi-mcp celery redis pydantic
# 或使用pip
pip install fastapi-mcp celery redis pydantic
基础消息队列配置
from fastapi import FastAPI, BackgroundTasks, Depends
from fastapi_mcp import FastApiMCP
from pydantic import BaseModel
from typing import Optional
import redis
from celery import Celery
import json
# Redis配置
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# Celery配置
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
app = FastAPI(title="Async Task Processor")
class TaskRequest(BaseModel):
"""任务请求模型"""
data: dict
timeout: int = 300
priority: int = 1
class TaskStatus(BaseModel):
"""任务状态模型"""
task_id: str
status: str # pending, processing, completed, failed
progress: int = 0
result: Optional[dict] = None
error: Optional[str] = None
长时间任务处理器实现
@celery_app.task(bind=True)
def process_long_running_task(self, task_data: dict) -> dict:
"""长时间运行的任务处理器"""
total_steps = 10
result = {"processed": False, "steps_completed": 0}
# 模拟长时间处理过程
for step in range(1, total_steps + 1):
# 执行实际处理逻辑
process_step(step, task_data)
# 更新进度
self.update_state(
state='PROGRESS',
meta={'current': step, 'total': total_steps}
)
# 模拟处理时间
import time
time.sleep(2)
result["processed"] = True
result["steps_completed"] = total_steps
return result
def process_step(step: int, data: dict):
"""处理单个步骤"""
# 实际业务逻辑处理
if "text" in data:
data["processed_text"] = f"Processed step {step}: {data['text']}"
MCP端点暴露与状态管理
# 任务相关的FastAPI端点
@app.post("/tasks/process", response_model=TaskStatus, tags=["tasks"])
async def create_processing_task(
request: TaskRequest,
background_tasks: BackgroundTasks
) -> TaskStatus:
"""创建处理任务"""
task_id = f"task_{hash(str(request.data))}"
# 启动异步任务
async_result = process_long_running_task.delay(request.data)
# 存储任务信息
initial_status = TaskStatus(
task_id=task_id,
status="pending",
progress=0
)
redis_client.set(f"task:{task_id}", async_result.id)
redis_client.set(f"status:{task_id}", initial_status.json())
return initial_status
@app.get("/tasks/{task_id}/status", response_model=TaskStatus, tags=["tasks"])
async def get_task_status(task_id: str) -> TaskStatus:
"""获取任务状态"""
celery_task_id = redis_client.get(f"task:{task_id}")
if not celery_task_id:
raise HTTPException(status_code=404, detail="Task not found")
async_result = celery_app.AsyncResult(celery_task_id)
status_data = json.loads(redis_client.get(f"status:{task_id}") or "{}")
if async_result.state == 'PROGRESS':
status_data['progress'] = async_result.info.get('current', 0) * 10
status_data['status'] = 'processing'
elif async_result.state == 'SUCCESS':
status_data['status'] = 'completed'
status_data['result'] = async_result.result
status_data['progress'] = 100
elif async_result.state == 'FAILURE':
status_data['status'] = 'failed'
status_data['error'] = str(async_result.info)
# 更新状态
redis_client.set(f"status:{task_id}", json.dumps(status_data))
return TaskStatus(**status_data)
@app.get("/tasks/{task_id}/result", tags=["tasks"])
async def get_task_result(task_id: str):
"""获取任务结果"""
status = await get_task_status(task_id)
if status.status != 'completed':
raise HTTPException(status_code=400, detail="Task not completed")
return status.result
MCP服务器集成配置
# 初始化MCP服务器
mcp = FastApiMCP(
app,
name="AsyncTaskProcessor",
description="MCP server for asynchronous long-running tasks processing"
)
# 配置包含的操作
mcp.include_operations = ["create_processing_task", "get_task_status", "get_task_result"]
# 挂载MCP服务器
mcp.mount_http(mount_path="/mcp/tasks")
# 可选:配置认证
from fastapi_mcp.auth.proxy import setup_oauth_authorize_proxy
setup_oauth_authorize_proxy(
app,
client_id="your-client-id",
authorize_url="https://your-auth-provider.com/oauth/authorize"
)
高级特性:智能任务调度与监控
任务优先级调度系统
class PriorityQueue:
"""基于Redis的优先级队列"""
def __init__(self, name: str = "task_queue"):
self.name = name
self.redis = redis_client
def enqueue(self, task_id: str, priority: int = 1):
"""入队任务"""
self.redis.zadd(self.name, {task_id: priority})
def dequeue(self) -> Optional[str]:
"""出队最高优先级任务"""
result = self.redis.zrange(self.name, 0, 0)
if result:
task_id = result[0].decode()
self.redis.zrem(self.name, task_id)
return task_id
return None
# 集成优先级调度
priority_queue = PriorityQueue()
@app.post("/tasks/priority-process")
async def create_priority_task(request: TaskRequest) -> TaskStatus:
"""创建优先级处理任务"""
task_id = f"priority_task_{hash(str(request.data))}"
priority_queue.enqueue(task_id, request.priority)
return TaskStatus(
task_id=task_id,
status="queued",
progress=0
)
实时进度监控与WebSocket集成
from fastapi import WebSocket, WebSocketDisconnect
from fastapi.routing import APIRouter
router = APIRouter()
@router.websocket("/ws/tasks/{task_id}")
async def websocket_task_progress(websocket: WebSocket, task_id: str):
"""WebSocket实时进度监控"""
await websocket.accept()
try:
while True:
# 获取最新状态
status = await get_task_status(task_id)
await websocket.send_json(status.dict())
# 任务完成时退出
if status.status in ['completed', 'failed']:
break
# 每秒更新一次
import asyncio
await asyncio.sleep(1)
except WebSocketDisconnect:
pass
性能优化与最佳实践
连接池与资源管理
from redis.asyncio import ConnectionPool, Redis
# 异步Redis连接池
redis_pool = ConnectionPool.from_url(
"redis://localhost:6379/0",
max_connections=20,
decode_responses=True
)
async def get_redis() -> Redis:
"""获取Redis连接"""
return Redis(connection_pool=redis_pool)
# Celery性能配置
celery_app.conf.update(
worker_concurrency=4,
worker_prefetch_multiplier=1,
task_acks_late=True,
task_reject_on_worker_lost=True
)
错误处理与重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def reliable_task_operation(task_id: str, operation: callable):
"""带重试的任务操作"""
try:
return await operation(task_id)
except (redis.ConnectionError, celery.exceptions.OperationalError) as e:
logger.error(f"Operation failed for task {task_id}: {e}")
raise
部署架构与扩展方案
生产环境部署架构
graph TB
subgraph "负载均衡层"
LB[Load Balancer<br>Nginx/Traefik]
end
subgraph "应用层"
A1[FastAPI App 1]
A2[FastAPI App 2]
A3[FastAPI App N]
end
subgraph "消息队列层"
MQ[Redis Cluster<br>消息代理]
end
subgraph "工作节点层"
W1[Celery Worker 1]
W2[Celery Worker 2]
W3[Celery Worker N]
end
subgraph "存储层"
DB[Database<br>PostgreSQL]
Cache[Cache<br>Redis]
end
LB --> A1
LB --> A2
LB --> A3
A1 --> MQ
A2 --> MQ
A3 --> MQ
MQ --> W1
MQ --> W2
MQ --> W3
W1 --> DB
W2 --> DB
W3 --> DB
W1 --> Cache
W2 --> Cache
W3 --> Cache
监控与告警配置
from prometheus_client import Counter, Gauge, Histogram
# 监控指标
TASKS_CREATED = Counter('tasks_created_total', 'Total tasks created')
TASKS_COMPLETED = Counter('tasks_completed_total', 'Total tasks completed')
TASKS_FAILED = Counter('tasks_failed_total', 'Total tasks failed')
TASK_DURATION = Histogram('task_duration_seconds', 'Task processing duration')
QUEUE_SIZE = Gauge('task_queue_size', 'Current task queue size')
# 集成监控到任务处理
@celery_app.task(bind=True)
def monitored_task(self, task_data: dict):
"""带监控的任务"""
start_time = time.time()
TASKS_CREATED.inc()
try:
result = process_long_running_task(task_data)
TASKS_COMPLETED.inc()
TASK_DURATION.observe(time.time() - start_time)
return result
except Exception as e:
TASKS_FAILED.inc()
raise
总结与展望
FastAPI-MCP结合消息队列技术为处理长时间运行任务提供了强大的解决方案。通过本文介绍的架构和实现方案,你可以:
- 实现零阻塞异步处理:确保AI助手调用不会因长时间任务而阻塞
- 提供实时进度反馈:通过WebSocket和状态查询接口让用户了解任务执行情况
- 支持优先级调度:根据任务重要性智能调度处理顺序
- 确保系统可靠性:通过重试机制和监控告警保障任务执行成功率
这种架构特别适用于以下场景:
- 大数据处理和分析任务
- 机器学习模型训练和推理
- 文档处理和转换任务
- 复杂业务流程执行
随着MCP协议的不断演进和AI应用场景的扩展,基于FastAPI-MCP的异步任务处理方案将成为构建智能应用的重要基础设施。通过合理的架构设计和性能优化,你可以构建出既高效又可靠的AI赋能系统。
立即行动:开始在你的FastAPI项目中集成MCP支持,体验异步任务处理带来的效率提升和用户体验改善。记住,良好的架构设计是成功的一半,而FastAPI-MCP为你提供了实现这一目标的强大工具。
登录后查看全文
热门项目推荐
相关项目推荐
Kimi-K2.5Kimi K2.5 是一款开源的原生多模态智能体模型,它在 Kimi-K2-Base 的基础上,通过对约 15 万亿混合视觉和文本 tokens 进行持续预训练构建而成。该模型将视觉与语言理解、高级智能体能力、即时模式与思考模式,以及对话式与智能体范式无缝融合。Python00
GLM-4.7-FlashGLM-4.7-Flash 是一款 30B-A3B MoE 模型。作为 30B 级别中的佼佼者,GLM-4.7-Flash 为追求性能与效率平衡的轻量化部署提供了全新选择。Jinja00
VLOOKVLOOK™ 是优雅好用的 Typora/Markdown 主题包和增强插件。 VLOOK™ is an elegant and practical THEME PACKAGE × ENHANCEMENT PLUGIN for Typora/Markdown.Less00
PaddleOCR-VL-1.5PaddleOCR-VL-1.5 是 PaddleOCR-VL 的新一代进阶模型,在 OmniDocBench v1.5 上实现了 94.5% 的全新 state-of-the-art 准确率。 为了严格评估模型在真实物理畸变下的鲁棒性——包括扫描伪影、倾斜、扭曲、屏幕拍摄和光照变化——我们提出了 Real5-OmniDocBench 基准测试集。实验结果表明,该增强模型在新构建的基准测试集上达到了 SOTA 性能。此外,我们通过整合印章识别和文本检测识别(text spotting)任务扩展了模型的能力,同时保持 0.9B 的超紧凑 VLM 规模,具备高效率特性。Python00
KuiklyUI基于KMP技术的高性能、全平台开发框架,具备统一代码库、极致易用性和动态灵活性。 Provide a high-performance, full-platform development framework with unified codebase, ultimate ease of use, and dynamic flexibility. 注意:本仓库为Github仓库镜像,PR或Issue请移步至Github发起,感谢支持!Kotlin07
compass-metrics-modelMetrics model project for the OSS CompassPython00
项目优选
收起
deepin linux kernel
C
27
11
OpenHarmony documentation | OpenHarmony开发者文档
Dockerfile
525
3.72 K
Ascend Extension for PyTorch
Python
329
391
本项目是CANN提供的数学类基础计算算子库,实现网络在NPU上加速计算。
C++
877
578
openEuler内核是openEuler操作系统的核心,既是系统性能与稳定性的基石,也是连接处理器、设备与服务的桥梁。
C
335
162
暂无简介
Dart
764
189
Nop Platform 2.0是基于可逆计算理论实现的采用面向语言编程范式的新一代低代码开发平台,包含基于全新原理从零开始研发的GraphQL引擎、ORM引擎、工作流引擎、报表引擎、规则引擎、批处理引引擎等完整设计。nop-entropy是它的后端部分,采用java语言实现,可选择集成Spring框架或者Quarkus框架。中小企业可以免费商用
Java
12
1
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
1.33 K
746
🔥LeetCode solutions in any programming language | 多种编程语言实现 LeetCode、《剑指 Offer(第 2 版)》、《程序员面试金典(第 6 版)》题解
Java
67
20
React Native鸿蒙化仓库
JavaScript
302
350