13 KiB
AI 任务监控和管理功能实现
变更日期:2026-01-29
变更类型:功能增强
影响范围:AI Service、Celery Tasks
向后兼容:✅ 是
变更概述
实现了完整的 AI 任务监控和管理功能,包括任务批量查询、统计分析、超时处理和队列监控,提升了系统的可观测性和自动化运维能力。
变更详情
1. Repository 层扩展
文件:server/app/repositories/ai_job_repository.py
新增方法:
1.1 get_jobs_by_filters()
批量查询和筛选任务,支持多维度过滤和分页。
功能:
- 按用户 ID 筛选
- 按任务类型筛选
- 按任务状态筛选
- 按时间范围筛选
- 支持排序(升序/降序)
- 支持分页
参数:
async def get_jobs_by_filters(
user_id: Optional[str] = None,
job_type: Optional[int] = None,
status: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
limit: int = 50,
offset: int = 0,
order_by: str = 'created_at',
order_desc: bool = True
) -> tuple[List[AIJob], int]
返回:(任务列表, 总数)
1.2 get_job_statistics()
获取任务统计信息,支持多维度分析。
统计维度:
- 总任务数
- 已完成任务数
- 失败任务数
- 待处理任务数
- 处理中任务数
- 成功率
- 总积分消耗
- 平均执行时间
- 按任务类型分组统计
- 按模型分组统计
参数:
async def get_job_statistics(
user_id: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> dict
返回示例:
{
'total_jobs': 100,
'completed_jobs': 85,
'failed_jobs': 10,
'pending_jobs': 3,
'processing_jobs': 2,
'success_rate': 85.0,
'total_credits': 1250,
'avg_execution_time': 15.5,
'by_type': {
1: {'count': 50, 'credits': 500}, # 图片生成
2: {'count': 30, 'credits': 600} # 视频生成
},
'by_model': {
'gpt-4': {'count': 40, 'credits': 800},
'dall-e-3': {'count': 50, 'credits': 500}
}
}
1.3 get_timeout_jobs()
获取超时的任务,用于自动化处理。
超时判断逻辑:
- PENDING 状态:从创建时间开始计算
- PROCESSING 状态:从开始处理时间计算
参数:
async def get_timeout_jobs(
timeout_minutes: int = 30
) -> List[AIJob]
2. Service 层扩展
文件:server/app/services/ai_service.py
新增方法:
2.1 get_user_jobs()
批量查询用户任务,支持分页和筛选。
功能:
- 验证用户是否存在(应用层引用完整性保证)
- 支持多维度筛选
- 自动计算分页信息
- 格式化返回数据
参数:
async def get_user_jobs(
user_id: str,
job_type: Optional[int] = None,
status: Optional[int] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
page: int = 1,
page_size: int = 20,
order_by: str = 'created_at',
order_desc: bool = True
) -> Dict[str, Any]
返回示例:
{
'items': [
{
'job_id': '019d1234-5678-7abc-def0-111111111111',
'job_type': 1,
'status': 3,
'progress': 100,
'model_name': 'dall-e-3',
'credits_used': 10,
'created_at': '2026-01-29T10:00:00Z',
'started_at': '2026-01-29T10:00:05Z',
'completed_at': '2026-01-29T10:00:30Z',
'error_message': None
}
],
'total': 100,
'page': 1,
'page_size': 20,
'total_pages': 5
}
2.2 get_job_statistics()
获取任务统计信息,支持全局和用户级别统计。
功能:
- 支持全局统计(不传 user_id)
- 支持用户级别统计
- 支持时间范围筛选
- 多维度数据分析
参数:
async def get_job_statistics(
user_id: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]
2.3 get_queue_status()
获取任务队列状态,实时监控系统负载。
监控指标:
- Worker 数量和状态
- 活跃任务数(Celery 层)
- 预定任务数(Celery 层)
- 保留任务数(Celery 层)
- 待处理任务数(数据库层)
- 处理中任务数(数据库层)
- 队列长度
返回示例:
{
'workers': {
'total': 2,
'active': True
},
'tasks': {
'active': 5,
'scheduled': 10,
'reserved': 3,
'pending_in_db': 15,
'processing_in_db': 5
},
'queue_length': 20
}
2.4 handle_timeout_jobs()
处理超时任务,自动化运维核心功能。
处理流程:
- 获取超时任务列表
- 取消 Celery 任务(如果存在)
- 更新任务状态为失败
- 退还积分(如果已扣除)
- 记录日志
参数:
async def handle_timeout_jobs(
timeout_minutes: int = 30
) -> Dict[str, Any]
返回示例:
{
'handled_count': 5,
'refunded_credits': 50,
'timeout_minutes': 30
}
3. Celery 定时任务
文件:server/app/tasks/ai_tasks.py
3.1 check_timeout_jobs_task
定时检测超时任务并自动处理。
任务配置:
- 任务名称:
ai_tasks.check_timeout_jobs - 执行频率:每 10 分钟
- 超时阈值:30 分钟(可配置)
功能:
- 自动检测超时任务
- 自动标记失败
- 自动退还积分
- 记录详细日志
实现:
@celery_app.task(name='ai_tasks.check_timeout_jobs')
def check_timeout_jobs_task(timeout_minutes: int = 30):
"""定时检测超时任务并处理"""
async def _execute():
async with async_session_maker() as session:
ai_service = AIService(session)
result = await ai_service.handle_timeout_jobs(timeout_minutes)
return result
return asyncio.run(_execute())
4. Celery Beat 配置
文件:server/app/core/celery_app.py
新增定时任务调度:
beat_schedule={
# ... 其他定时任务 ...
# 每 10 分钟检测超时的 AI 任务
"check-timeout-ai-jobs": {
"task": "ai_tasks.check_timeout_jobs",
"schedule": crontab(minute="*/10"),
"kwargs": {"timeout_minutes": 30},
},
}
技术亮点
1. 应用层引用完整性保证
所有查询方法都验证关联实体是否存在,遵循 Jointo 技术栈规范。
2. 高效的统计查询
使用 SQLAlchemy 的聚合函数和分组查询,一次查询获取多维度统计数据。
3. 自动化运维
通过 Celery Beat 定时任务,实现超时任务的自动检测和处理,无需人工干预。
4. 完整的积分闭环
超时任务自动退还积分,保证用户权益。
5. 详细的日志记录
所有关键操作都记录日志,便于问题排查和数据分析。
使用示例
1. 查询用户任务列表
from app.services.ai_service import AIService
# 查询用户最近的图片生成任务
result = await ai_service.get_user_jobs(
user_id='019d1234-5678-7abc-def0-000000000001',
job_type=1, # 图片生成
status=3, # 已完成
page=1,
page_size=20
)
print(f"总任务数: {result['total']}")
print(f"当前页: {result['page']}/{result['total_pages']}")
for job in result['items']:
print(f"任务 {job['job_id']}: {job['status']}")
2. 获取任务统计
from datetime import datetime, timedelta
# 获取最近 7 天的统计
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=7)
stats = await ai_service.get_job_statistics(
user_id='019d1234-5678-7abc-def0-000000000001',
start_date=start_date,
end_date=end_date
)
print(f"总任务数: {stats['total_jobs']}")
print(f"成功率: {stats['success_rate']}%")
print(f"平均执行时间: {stats['avg_execution_time']}秒")
print(f"总积分消耗: {stats['total_credits']}")
3. 监控队列状态
# 获取实时队列状态
status = await ai_service.get_queue_status()
print(f"Worker 数量: {status['workers']['total']}")
print(f"活跃任务: {status['tasks']['active']}")
print(f"队列长度: {status['queue_length']}")
# 告警判断
if status['queue_length'] > 100:
print("⚠️ 队列积压严重,需要扩容 Worker")
4. 手动处理超时任务
# 手动触发超时任务处理(通常由定时任务自动执行)
result = await ai_service.handle_timeout_jobs(timeout_minutes=30)
print(f"处理超时任务: {result['handled_count']} 个")
print(f"退还积分: {result['refunded_credits']} 分")
性能优化
1. 索引优化
所有查询字段都已建立索引,确保查询性能:
idx_ai_jobs_user_ididx_ai_jobs_typeidx_ai_jobs_statusidx_ai_jobs_created_atidx_ai_jobs_status_created_at(复合索引)
2. 分页查询
使用 LIMIT/OFFSET 分页,避免一次性加载大量数据。
3. 聚合查询优化
统计查询使用数据库聚合函数,避免在应用层计算。
4. 异步执行
所有方法都是异步的,充分利用 asyncio 的并发能力。
监控和告警
1. 日志记录
所有关键操作都记录日志,包括:
- 查询操作(user_id、查询条件、结果数量)
- 统计操作(统计范围、结果摘要)
- 超时处理(处理数量、退还积分)
- 错误信息(异常堆栈、上下文)
2. 指标监控
建议监控以下指标:
- 队列长度(queue_length)
- 超时任务数(handled_count)
- 成功率(success_rate)
- 平均执行时间(avg_execution_time)
- Worker 状态(workers.active)
3. 告警规则
建议配置以下告警:
- 队列长度 > 100:需要扩容 Worker
- 超时任务数 > 10/小时:检查 Worker 性能
- 成功率 < 90%:检查 AI Provider 状态
- Worker 离线:立即告警
测试建议
1. 单元测试
# 测试任务查询
async def test_get_user_jobs():
result = await ai_service.get_user_jobs(
user_id=test_user_id,
page=1,
page_size=10
)
assert result['total'] >= 0
assert len(result['items']) <= 10
# 测试统计功能
async def test_get_job_statistics():
stats = await ai_service.get_job_statistics(
user_id=test_user_id
)
assert 'total_jobs' in stats
assert 'success_rate' in stats
assert stats['success_rate'] >= 0 and stats['success_rate'] <= 100
2. 集成测试
# 测试超时处理
async def test_handle_timeout_jobs():
# 创建一个超时任务
job = await create_test_job(status=AIJobStatus.PROCESSING)
# 等待超时
await asyncio.sleep(timeout_minutes * 60 + 10)
# 触发超时处理
result = await ai_service.handle_timeout_jobs(timeout_minutes)
# 验证任务已标记为失败
job = await ai_service.get_job_status(job.ai_job_id)
assert job['status'] == AIJobStatus.FAILED
assert '超时' in job['error_message']
后续优化建议
1. 缓存优化
对于频繁查询的统计数据,可以使用 Redis 缓存:
# 缓存统计数据 5 分钟
@cache(ttl=300)
async def get_job_statistics(user_id, start_date, end_date):
...
2. 实时通知
超时任务处理后,可以通过 WebSocket 实时通知用户:
# 发送 WebSocket 通知
await websocket_manager.send_to_user(
user_id=job.user_id,
message={
'type': 'job_timeout',
'job_id': job.ai_job_id,
'refunded_credits': job.credits_used
}
)
3. 数据归档
定期归档历史任务数据,保持查询性能:
# 归档 90 天前的已完成任务
async def archive_old_jobs():
cutoff_date = datetime.utcnow() - timedelta(days=90)
await job_repository.archive_jobs(cutoff_date)
4. 智能超时阈值
根据任务类型和历史数据,动态调整超时阈值:
# 图片生成:10 分钟
# 视频生成:30 分钟
# 文本处理:5 分钟
timeout_by_type = {
AIJobType.IMAGE: 10,
AIJobType.VIDEO: 30,
AIJobType.TEXT_PROCESSING: 5
}
相关文档
总结
本次变更实现了完整的 AI 任务监控和管理功能,包括:
✅ 任务批量查询和筛选(支持多维度过滤、分页、排序)
✅ 任务统计和分析(成功率、执行时间、积分消耗、多维度分组)
✅ 任务超时处理(自动检测、自动标记失败、自动退还积分)
✅ 任务队列监控(Worker 状态、队列长度、实时任务数)
✅ Celery Beat 定时任务(每 10 分钟自动检测超时任务)
✅ 完整的日志记录和错误处理
✅ 应用层引用完整性保证
这些功能显著提升了系统的可观测性和自动化运维能力,为生产环境的稳定运行提供了坚实保障。