首页
/ FastAPI-MCP消息队列:异步处理长时间运行任务的MCP工具

FastAPI-MCP消息队列:异步处理长时间运行任务的MCP工具

2026-02-04 05:08:18作者:段琳惟

引言:AI时代的长时任务处理挑战

在人工智能应用快速发展的今天,模型上下文协议(Model Context Protocol,MCP)已成为连接AI助手与外部服务的标准桥梁。然而,当面对需要长时间运行的计算密集型任务时,传统的同步请求-响应模式往往力不从心。你是否遇到过以下痛点:

  • 超时中断:AI助手调用API时,长时间任务因超时而被中断
  • 资源阻塞:同步处理占用宝贵的工作线程,影响系统吞吐量
  • 状态丢失:任务执行过程中断后,难以恢复执行状态
  • 进度反馈缺失:用户无法实时了解长时间任务的执行进度

FastAPI-MCP结合消息队列技术,为这些问题提供了优雅的解决方案。本文将深入探讨如何利用FastAPI-MCP构建支持异步长时间任务处理的MCP工具。

FastAPI-MCP核心架构解析

MCP协议基础架构

flowchart TD
    A[MCP Client<br>AI助手] --> B[FastAPI-MCP Server]
    B --> C[FastAPI Application]
    C --> D[Background Tasks<br>消息队列]
    D --> E[Task Workers<br>异步处理]
    E --> F[Result Storage<br>Redis/Database]
    F --> B
    B --> A

关键技术组件对比

组件 作用 优势 适用场景
FastAPI-MCP MCP协议转换 零配置自动暴露API 快速集成现有服务
Celery 分布式任务队列 支持多种消息代理 大规模分布式处理
RQ (Redis Queue) Redis任务队列 简单轻量 中小规模应用
ARQ 异步Redis队列 原生asyncio支持 高并发异步场景

实战:构建支持消息队列的MCP工具

环境准备与依赖安装

# 使用uv安装核心依赖
uv add fastapi-mcp celery redis pydantic

# 或使用pip
pip install fastapi-mcp celery redis pydantic

基础消息队列配置

from fastapi import FastAPI, BackgroundTasks, Depends
from fastapi_mcp import FastApiMCP
from pydantic import BaseModel
from typing import Optional
import redis
from celery import Celery
import json

# Redis配置
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# Celery配置
celery_app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

app = FastAPI(title="Async Task Processor")

class TaskRequest(BaseModel):
    """任务请求模型"""
    data: dict
    timeout: int = 300
    priority: int = 1

class TaskStatus(BaseModel):
    """任务状态模型"""
    task_id: str
    status: str  # pending, processing, completed, failed
    progress: int = 0
    result: Optional[dict] = None
    error: Optional[str] = None

长时间任务处理器实现

@celery_app.task(bind=True)
def process_long_running_task(self, task_data: dict) -> dict:
    """长时间运行的任务处理器"""
    total_steps = 10
    result = {"processed": False, "steps_completed": 0}
    
    # 模拟长时间处理过程
    for step in range(1, total_steps + 1):
        # 执行实际处理逻辑
        process_step(step, task_data)
        
        # 更新进度
        self.update_state(
            state='PROGRESS',
            meta={'current': step, 'total': total_steps}
        )
        
        # 模拟处理时间
        import time
        time.sleep(2)
    
    result["processed"] = True
    result["steps_completed"] = total_steps
    return result

def process_step(step: int, data: dict):
    """处理单个步骤"""
    # 实际业务逻辑处理
    if "text" in data:
        data["processed_text"] = f"Processed step {step}: {data['text']}"

MCP端点暴露与状态管理

# 任务相关的FastAPI端点
@app.post("/tasks/process", response_model=TaskStatus, tags=["tasks"])
async def create_processing_task(
    request: TaskRequest,
    background_tasks: BackgroundTasks
) -> TaskStatus:
    """创建处理任务"""
    task_id = f"task_{hash(str(request.data))}"
    
    # 启动异步任务
    async_result = process_long_running_task.delay(request.data)
    
    # 存储任务信息
    initial_status = TaskStatus(
        task_id=task_id,
        status="pending",
        progress=0
    )
    redis_client.set(f"task:{task_id}", async_result.id)
    redis_client.set(f"status:{task_id}", initial_status.json())
    
    return initial_status

@app.get("/tasks/{task_id}/status", response_model=TaskStatus, tags=["tasks"])
async def get_task_status(task_id: str) -> TaskStatus:
    """获取任务状态"""
    celery_task_id = redis_client.get(f"task:{task_id}")
    if not celery_task_id:
        raise HTTPException(status_code=404, detail="Task not found")
    
    async_result = celery_app.AsyncResult(celery_task_id)
    status_data = json.loads(redis_client.get(f"status:{task_id}") or "{}")
    
    if async_result.state == 'PROGRESS':
        status_data['progress'] = async_result.info.get('current', 0) * 10
        status_data['status'] = 'processing'
    elif async_result.state == 'SUCCESS':
        status_data['status'] = 'completed'
        status_data['result'] = async_result.result
        status_data['progress'] = 100
    elif async_result.state == 'FAILURE':
        status_data['status'] = 'failed'
        status_data['error'] = str(async_result.info)
    
    # 更新状态
    redis_client.set(f"status:{task_id}", json.dumps(status_data))
    return TaskStatus(**status_data)

@app.get("/tasks/{task_id}/result", tags=["tasks"])
async def get_task_result(task_id: str):
    """获取任务结果"""
    status = await get_task_status(task_id)
    if status.status != 'completed':
        raise HTTPException(status_code=400, detail="Task not completed")
    return status.result

MCP服务器集成配置

# 初始化MCP服务器
mcp = FastApiMCP(
    app,
    name="AsyncTaskProcessor",
    description="MCP server for asynchronous long-running tasks processing"
)

# 配置包含的操作
mcp.include_operations = ["create_processing_task", "get_task_status", "get_task_result"]

# 挂载MCP服务器
mcp.mount_http(mount_path="/mcp/tasks")

# 可选:配置认证
from fastapi_mcp.auth.proxy import setup_oauth_authorize_proxy
setup_oauth_authorize_proxy(
    app,
    client_id="your-client-id",
    authorize_url="https://your-auth-provider.com/oauth/authorize"
)

高级特性:智能任务调度与监控

任务优先级调度系统

class PriorityQueue:
    """基于Redis的优先级队列"""
    
    def __init__(self, name: str = "task_queue"):
        self.name = name
        self.redis = redis_client
    
    def enqueue(self, task_id: str, priority: int = 1):
        """入队任务"""
        self.redis.zadd(self.name, {task_id: priority})
    
    def dequeue(self) -> Optional[str]:
        """出队最高优先级任务"""
        result = self.redis.zrange(self.name, 0, 0)
        if result:
            task_id = result[0].decode()
            self.redis.zrem(self.name, task_id)
            return task_id
        return None

# 集成优先级调度
priority_queue = PriorityQueue()

@app.post("/tasks/priority-process")
async def create_priority_task(request: TaskRequest) -> TaskStatus:
    """创建优先级处理任务"""
    task_id = f"priority_task_{hash(str(request.data))}"
    priority_queue.enqueue(task_id, request.priority)
    
    return TaskStatus(
        task_id=task_id,
        status="queued",
        progress=0
    )

实时进度监控与WebSocket集成

from fastapi import WebSocket, WebSocketDisconnect
from fastapi.routing import APIRouter

router = APIRouter()

@router.websocket("/ws/tasks/{task_id}")
async def websocket_task_progress(websocket: WebSocket, task_id: str):
    """WebSocket实时进度监控"""
    await websocket.accept()
    
    try:
        while True:
            # 获取最新状态
            status = await get_task_status(task_id)
            await websocket.send_json(status.dict())
            
            # 任务完成时退出
            if status.status in ['completed', 'failed']:
                break
                
            # 每秒更新一次
            import asyncio
            await asyncio.sleep(1)
            
    except WebSocketDisconnect:
        pass

性能优化与最佳实践

连接池与资源管理

from redis.asyncio import ConnectionPool, Redis

# 异步Redis连接池
redis_pool = ConnectionPool.from_url(
    "redis://localhost:6379/0",
    max_connections=20,
    decode_responses=True
)

async def get_redis() -> Redis:
    """获取Redis连接"""
    return Redis(connection_pool=redis_pool)

# Celery性能配置
celery_app.conf.update(
    worker_concurrency=4,
    worker_prefetch_multiplier=1,
    task_acks_late=True,
    task_reject_on_worker_lost=True
)

错误处理与重试机制

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def reliable_task_operation(task_id: str, operation: callable):
    """带重试的任务操作"""
    try:
        return await operation(task_id)
    except (redis.ConnectionError, celery.exceptions.OperationalError) as e:
        logger.error(f"Operation failed for task {task_id}: {e}")
        raise

部署架构与扩展方案

生产环境部署架构

graph TB
    subgraph "负载均衡层"
        LB[Load Balancer<br>Nginx/Traefik]
    end
    
    subgraph "应用层"
        A1[FastAPI App 1]
        A2[FastAPI App 2]
        A3[FastAPI App N]
    end
    
    subgraph "消息队列层"
        MQ[Redis Cluster<br>消息代理]
    end
    
    subgraph "工作节点层"
        W1[Celery Worker 1]
        W2[Celery Worker 2]
        W3[Celery Worker N]
    end
    
    subgraph "存储层"
        DB[Database<br>PostgreSQL]
        Cache[Cache<br>Redis]
    end
    
    LB --> A1
    LB --> A2
    LB --> A3
    A1 --> MQ
    A2 --> MQ
    A3 --> MQ
    MQ --> W1
    MQ --> W2
    MQ --> W3
    W1 --> DB
    W2 --> DB
    W3 --> DB
    W1 --> Cache
    W2 --> Cache
    W3 --> Cache

监控与告警配置

from prometheus_client import Counter, Gauge, Histogram

# 监控指标
TASKS_CREATED = Counter('tasks_created_total', 'Total tasks created')
TASKS_COMPLETED = Counter('tasks_completed_total', 'Total tasks completed')
TASKS_FAILED = Counter('tasks_failed_total', 'Total tasks failed')
TASK_DURATION = Histogram('task_duration_seconds', 'Task processing duration')
QUEUE_SIZE = Gauge('task_queue_size', 'Current task queue size')

# 集成监控到任务处理
@celery_app.task(bind=True)
def monitored_task(self, task_data: dict):
    """带监控的任务"""
    start_time = time.time()
    TASKS_CREATED.inc()
    
    try:
        result = process_long_running_task(task_data)
        TASKS_COMPLETED.inc()
        TASK_DURATION.observe(time.time() - start_time)
        return result
    except Exception as e:
        TASKS_FAILED.inc()
        raise

总结与展望

FastAPI-MCP结合消息队列技术为处理长时间运行任务提供了强大的解决方案。通过本文介绍的架构和实现方案,你可以:

  1. 实现零阻塞异步处理:确保AI助手调用不会因长时间任务而阻塞
  2. 提供实时进度反馈:通过WebSocket和状态查询接口让用户了解任务执行情况
  3. 支持优先级调度:根据任务重要性智能调度处理顺序
  4. 确保系统可靠性:通过重试机制和监控告警保障任务执行成功率

这种架构特别适用于以下场景:

  • 大数据处理和分析任务
  • 机器学习模型训练和推理
  • 文档处理和转换任务
  • 复杂业务流程执行

随着MCP协议的不断演进和AI应用场景的扩展,基于FastAPI-MCP的异步任务处理方案将成为构建智能应用的重要基础设施。通过合理的架构设计和性能优化,你可以构建出既高效又可靠的AI赋能系统。

立即行动:开始在你的FastAPI项目中集成MCP支持,体验异步任务处理带来的效率提升和用户体验改善。记住,良好的架构设计是成功的一半,而FastAPI-MCP为你提供了实现这一目标的强大工具。

登录后查看全文
热门项目推荐
相关项目推荐