You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

8.2 KiB

Celery 异步任务系统集成

日期: 2026-01-27
类型: 功能增强
影响范围: 后端架构、Docker 配置

变更概述

为项目集成 Celery 异步任务系统,支持 AI 生成、视频导出等长时间运行的任务。

变更详情

新增服务

1. RabbitMQ 消息队列

rabbitmq:
  image: rabbitmq:3.13-management-alpine
  container_name: jointo-server-rabbitmq
  ports:
    - "6182:5672"   # AMQP 端口
    - "6186:15672"  # 管理界面

访问: http://localhost:6186 (guest/guest)

2. Celery Worker - AI 任务

celery-worker-ai:
  container_name: jointo-server-celery-ai
  command: celery -A app.core.celery_app worker -Q ai --loglevel=info --concurrency=2

处理任务:

  • 图片生成 (限流: 10/分钟)
  • 视频生成 (限流: 5/分钟)
  • 配音生成
  • 批量处理

3. Celery Worker - 导出任务

celery-worker-export:
  container_name: jointo-server-celery-export
  command: celery -A app.core.celery_app worker -Q export --loglevel=info --concurrency=1

处理任务:

  • 项目导出 (限流: 3/分钟)
  • 时间轴渲染
  • 缩略图生成
  • 批量导出

4. Celery Beat - 定时任务

celery-beat:
  container_name: jointo-server-celery-beat
  command: celery -A app.core.celery_app beat --loglevel=info

定时任务:

  • 清理过期结果 (每天凌晨 2 点)
  • 收集任务统计 (每小时)
  • 清理临时文件 (每天)

新增文件

核心配置

server/app/core/celery_app.py

Celery 应用配置,包括:

  • 任务路由规则
  • 重试策略
  • 限流配置
  • 定时任务调度

任务模块

server/app/tasks/
├── __init__.py
├── ai_tasks.py              # AI 生成任务
├── export_tasks.py          # 导出任务
└── maintenance_tasks.py     # 维护任务

更新文件

1. docker-compose.yml

  • 添加 RabbitMQ 服务
  • 添加 3 个 Celery 容器
  • 添加 rabbitmq_data 卷
  • 更新环境变量

2. requirements.txt

celery==5.3.6
celery[redis]==5.3.6
kombu==5.3.5
amqp==5.2.0

3. .env.example

CELERY_BROKER_URL=amqp://guest:guest@localhost:6182//
CELERY_RESULT_BACKEND=redis://localhost:6183/1

容器列表更新

容器名称 服务 端口 说明
jointo-server-app FastAPI 6170 API 服务
jointo-server-postgres PostgreSQL 6181 数据库
jointo-server-redis Redis 6183 缓存 + 任务结果
jointo-server-rabbitmq RabbitMQ 6182, 6186 消息队列
jointo-server-celery-ai Celery Worker - AI 任务处理
jointo-server-celery-export Celery Worker - 导出任务处理
jointo-server-celery-beat Celery Beat - 定时任务调度
jointo-server-minio MinIO 6185, 6187 对象存储
jointo-server-adminer Adminer 6184 数据库管理

使用方法

启动服务

# 启动所有服务(包括 Celery)
cd server
docker compose up -d

# 查看服务状态
docker compose ps

# 查看 Celery Worker 日志
docker logs -f jointo-server-celery-ai
docker logs -f jointo-server-celery-export
docker logs -f jointo-server-celery-beat

提交任务

from app.tasks.ai_tasks import generate_image

# 异步提交任务
task = generate_image.delay(
    prompt="一只可爱的猫咪",
    model="stable-diffusion"
)

# 返回任务 ID
return {"task_id": task.id, "status": "submitted"}

查询任务状态

from celery.result import AsyncResult

result = AsyncResult(task_id)

# 获取状态
status = result.state  # PENDING, STARTED, SUCCESS, FAILURE

# 获取结果
if result.ready():
    data = result.get()

管理界面

RabbitMQ 管理界面:

功能:

  • 查看队列状态
  • 监控消息流量
  • 管理连接和通道

任务示例

AI 图片生成

from app.tasks.ai_tasks import generate_image

# 提交任务
task = generate_image.delay(
    prompt="一只可爱的猫咪",
    model="stable-diffusion",
    width=1024,
    height=1024
)

# 任务会返回
{
    "task_id": "abc123",
    "status": "completed",
    "image_url": "https://example.com/images/abc123.png",
    "prompt": "一只可爱的猫咪",
    "model": "stable-diffusion"
}

项目导出

from app.tasks.export_tasks import export_project

# 提交导出任务
task = export_project.delay(
    project_id="proj_123",
    format="mp4",
    resolution="1920x1080",
    fps=30
)

# 任务会返回
{
    "task_id": "def456",
    "status": "completed",
    "project_id": "proj_123",
    "export_url": "https://example.com/exports/proj_123.mp4",
    "format": "mp4",
    "file_size": 52428800  # 50MB
}

监控和调试

查看任务日志

# AI Worker 日志
docker logs -f jointo-server-celery-ai

# 导出 Worker 日志
docker logs -f jointo-server-celery-export

# Beat 日志
docker logs -f jointo-server-celery-beat

# RabbitMQ 日志
docker logs -f jointo-server-rabbitmq

进入容器调试

# 进入 Celery Worker 容器
docker exec -it jointo-server-celery-ai bash

# 手动执行任务测试
python -c "from app.tasks.ai_tasks import generate_image; print(generate_image.delay('test'))"

清理任务队列

# 进入 RabbitMQ 容器
docker exec -it jointo-server-rabbitmq bash

# 清空队列
rabbitmqctl purge_queue ai
rabbitmqctl purge_queue export

性能配置

Worker 并发数

# AI 任务 Worker(CPU 密集型)
--concurrency=2

# 导出任务 Worker(I/O 密集型)
--concurrency=1

任务限流

# 图片生成:每分钟最多 10 个
"app.tasks.ai_tasks.generate_image": {"rate_limit": "10/m"}

# 视频生成:每分钟最多 5 个
"app.tasks.ai_tasks.generate_video": {"rate_limit": "5/m"}

# 项目导出:每分钟最多 3 个
"app.tasks.export_tasks.export_project": {"rate_limit": "3/m"}

重试策略

# 自动重试配置
autoretry_for = (Exception,)
retry_kwargs = {"max_retries": 3}
retry_backoff = True
retry_backoff_max = 600
retry_jitter = True

影响评估

开发者影响

  • 新功能: 支持异步任务处理
  • 性能提升: 长时间任务不阻塞 API
  • 可扩展: 可独立扩展 Worker 数量
  • ⚠️ 学习成本: 需要了解 Celery 基本概念

系统资源

  • 内存: 每个 Worker 约 200-500MB
  • CPU: 根据任务类型动态调整
  • 磁盘: RabbitMQ 数据约 100MB

端口占用

  • 6182: RabbitMQ AMQP
  • 6186: RabbitMQ 管理界面

后续计划

  1. 实现 AI 服务集成(OpenAI、Stability AI、Runway)
  2. 实现视频渲染逻辑(FFmpeg)
  3. 添加任务状态 API 端点
  4. 前端添加任务进度显示
  5. 添加 Flower 监控面板(可选)
  6. 生产环境优化和压测

故障排查

RabbitMQ 连接失败

# 检查 RabbitMQ 是否运行
docker ps | grep rabbitmq

# 检查健康状态
docker exec jointo-server-rabbitmq rabbitmq-diagnostics ping

# 重启 RabbitMQ
docker restart jointo-server-rabbitmq

Worker 无法启动

# 查看错误日志
docker logs jointo-server-celery-ai

# 检查 Celery 配置
docker exec jointo-server-celery-ai python -c "from app.core.celery_app import celery_app; print(celery_app.conf)"

任务执行失败

# 查看任务详情
docker exec jointo-server-celery-ai celery -A app.core.celery_app inspect active

# 查看失败任务
docker exec jointo-server-celery-ai celery -A app.core.celery_app inspect failed

相关文档

注意事项

⚠️ 生产环境:

  • 修改 RabbitMQ 默认密码
  • 启用 SSL/TLS 加密
  • 配置资源限制
  • 设置监控告警

⚠️ 任务设计:

  • 任务应该是幂等的(可重复执行)
  • 避免在任务中保存状态
  • 合理设置超时时间
  • 处理好异常情况

⚠️ 性能优化:

  • 根据负载调整 Worker 数量
  • 合理设置并发数
  • 监控队列长度
  • 定期清理过期结果