You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

13 KiB

AI 任务监控和管理功能实现

变更日期:2026-01-29
变更类型:功能增强
影响范围:AI Service、Celery Tasks
向后兼容


变更概述

实现了完整的 AI 任务监控和管理功能,包括任务批量查询、统计分析、超时处理和队列监控,提升了系统的可观测性和自动化运维能力。


变更详情

1. Repository 层扩展

文件server/app/repositories/ai_job_repository.py

新增方法

1.1 get_jobs_by_filters()

批量查询和筛选任务,支持多维度过滤和分页。

功能

  • 按用户 ID 筛选
  • 按任务类型筛选
  • 按任务状态筛选
  • 按时间范围筛选
  • 支持排序(升序/降序)
  • 支持分页

参数

async def get_jobs_by_filters(
    user_id: Optional[str] = None,
    job_type: Optional[int] = None,
    status: Optional[int] = None,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    limit: int = 50,
    offset: int = 0,
    order_by: str = 'created_at',
    order_desc: bool = True
) -> tuple[List[AIJob], int]

返回(任务列表, 总数)

1.2 get_job_statistics()

获取任务统计信息,支持多维度分析。

统计维度

  • 总任务数
  • 已完成任务数
  • 失败任务数
  • 待处理任务数
  • 处理中任务数
  • 成功率
  • 总积分消耗
  • 平均执行时间
  • 按任务类型分组统计
  • 按模型分组统计

参数

async def get_job_statistics(
    user_id: Optional[str] = None,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None
) -> dict

返回示例

{
    'total_jobs': 100,
    'completed_jobs': 85,
    'failed_jobs': 10,
    'pending_jobs': 3,
    'processing_jobs': 2,
    'success_rate': 85.0,
    'total_credits': 1250,
    'avg_execution_time': 15.5,
    'by_type': {
        1: {'count': 50, 'credits': 500},  # 图片生成
        2: {'count': 30, 'credits': 600}   # 视频生成
    },
    'by_model': {
        'gpt-4': {'count': 40, 'credits': 800},
        'dall-e-3': {'count': 50, 'credits': 500}
    }
}

1.3 get_timeout_jobs()

获取超时的任务,用于自动化处理。

超时判断逻辑

  • PENDING 状态:从创建时间开始计算
  • PROCESSING 状态:从开始处理时间计算

参数

async def get_timeout_jobs(
    timeout_minutes: int = 30
) -> List[AIJob]

2. Service 层扩展

文件server/app/services/ai_service.py

新增方法

2.1 get_user_jobs()

批量查询用户任务,支持分页和筛选。

功能

  • 验证用户是否存在(应用层引用完整性保证)
  • 支持多维度筛选
  • 自动计算分页信息
  • 格式化返回数据

参数

async def get_user_jobs(
    user_id: str,
    job_type: Optional[int] = None,
    status: Optional[int] = None,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    page: int = 1,
    page_size: int = 20,
    order_by: str = 'created_at',
    order_desc: bool = True
) -> Dict[str, Any]

返回示例

{
    'items': [
        {
            'job_id': '019d1234-5678-7abc-def0-111111111111',
            'job_type': 1,
            'status': 3,
            'progress': 100,
            'model_name': 'dall-e-3',
            'credits_used': 10,
            'created_at': '2026-01-29T10:00:00Z',
            'started_at': '2026-01-29T10:00:05Z',
            'completed_at': '2026-01-29T10:00:30Z',
            'error_message': None
        }
    ],
    'total': 100,
    'page': 1,
    'page_size': 20,
    'total_pages': 5
}

2.2 get_job_statistics()

获取任务统计信息,支持全局和用户级别统计。

功能

  • 支持全局统计(不传 user_id)
  • 支持用户级别统计
  • 支持时间范围筛选
  • 多维度数据分析

参数

async def get_job_statistics(
    user_id: Optional[str] = None,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None
) -> Dict[str, Any]

2.3 get_queue_status()

获取任务队列状态,实时监控系统负载。

监控指标

  • Worker 数量和状态
  • 活跃任务数(Celery 层)
  • 预定任务数(Celery 层)
  • 保留任务数(Celery 层)
  • 待处理任务数(数据库层)
  • 处理中任务数(数据库层)
  • 队列长度

返回示例

{
    'workers': {
        'total': 2,
        'active': True
    },
    'tasks': {
        'active': 5,
        'scheduled': 10,
        'reserved': 3,
        'pending_in_db': 15,
        'processing_in_db': 5
    },
    'queue_length': 20
}

2.4 handle_timeout_jobs()

处理超时任务,自动化运维核心功能。

处理流程

  1. 获取超时任务列表
  2. 取消 Celery 任务(如果存在)
  3. 更新任务状态为失败
  4. 退还积分(如果已扣除)
  5. 记录日志

参数

async def handle_timeout_jobs(
    timeout_minutes: int = 30
) -> Dict[str, Any]

返回示例

{
    'handled_count': 5,
    'refunded_credits': 50,
    'timeout_minutes': 30
}

3. Celery 定时任务

文件server/app/tasks/ai_tasks.py

3.1 check_timeout_jobs_task

定时检测超时任务并自动处理。

任务配置

  • 任务名称:ai_tasks.check_timeout_jobs
  • 执行频率:每 10 分钟
  • 超时阈值:30 分钟(可配置)

功能

  • 自动检测超时任务
  • 自动标记失败
  • 自动退还积分
  • 记录详细日志

实现

@celery_app.task(name='ai_tasks.check_timeout_jobs')
def check_timeout_jobs_task(timeout_minutes: int = 30):
    """定时检测超时任务并处理"""
    async def _execute():
        async with async_session_maker() as session:
            ai_service = AIService(session)
            result = await ai_service.handle_timeout_jobs(timeout_minutes)
            return result
    
    return asyncio.run(_execute())

4. Celery Beat 配置

文件server/app/core/celery_app.py

新增定时任务调度

beat_schedule={
    # ... 其他定时任务 ...
    
    # 每 10 分钟检测超时的 AI 任务
    "check-timeout-ai-jobs": {
        "task": "ai_tasks.check_timeout_jobs",
        "schedule": crontab(minute="*/10"),
        "kwargs": {"timeout_minutes": 30},
    },
}

技术亮点

1. 应用层引用完整性保证

所有查询方法都验证关联实体是否存在,遵循 Jointo 技术栈规范。

2. 高效的统计查询

使用 SQLAlchemy 的聚合函数和分组查询,一次查询获取多维度统计数据。

3. 自动化运维

通过 Celery Beat 定时任务,实现超时任务的自动检测和处理,无需人工干预。

4. 完整的积分闭环

超时任务自动退还积分,保证用户权益。

5. 详细的日志记录

所有关键操作都记录日志,便于问题排查和数据分析。


使用示例

1. 查询用户任务列表

from app.services.ai_service import AIService

# 查询用户最近的图片生成任务
result = await ai_service.get_user_jobs(
    user_id='019d1234-5678-7abc-def0-000000000001',
    job_type=1,  # 图片生成
    status=3,    # 已完成
    page=1,
    page_size=20
)

print(f"总任务数: {result['total']}")
print(f"当前页: {result['page']}/{result['total_pages']}")
for job in result['items']:
    print(f"任务 {job['job_id']}: {job['status']}")

2. 获取任务统计

from datetime import datetime, timedelta

# 获取最近 7 天的统计
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=7)

stats = await ai_service.get_job_statistics(
    user_id='019d1234-5678-7abc-def0-000000000001',
    start_date=start_date,
    end_date=end_date
)

print(f"总任务数: {stats['total_jobs']}")
print(f"成功率: {stats['success_rate']}%")
print(f"平均执行时间: {stats['avg_execution_time']}秒")
print(f"总积分消耗: {stats['total_credits']}")

3. 监控队列状态

# 获取实时队列状态
status = await ai_service.get_queue_status()

print(f"Worker 数量: {status['workers']['total']}")
print(f"活跃任务: {status['tasks']['active']}")
print(f"队列长度: {status['queue_length']}")

# 告警判断
if status['queue_length'] > 100:
    print("⚠️ 队列积压严重,需要扩容 Worker")

4. 手动处理超时任务

# 手动触发超时任务处理(通常由定时任务自动执行)
result = await ai_service.handle_timeout_jobs(timeout_minutes=30)

print(f"处理超时任务: {result['handled_count']} 个")
print(f"退还积分: {result['refunded_credits']} 分")

性能优化

1. 索引优化

所有查询字段都已建立索引,确保查询性能:

  • idx_ai_jobs_user_id
  • idx_ai_jobs_type
  • idx_ai_jobs_status
  • idx_ai_jobs_created_at
  • idx_ai_jobs_status_created_at(复合索引)

2. 分页查询

使用 LIMIT/OFFSET 分页,避免一次性加载大量数据。

3. 聚合查询优化

统计查询使用数据库聚合函数,避免在应用层计算。

4. 异步执行

所有方法都是异步的,充分利用 asyncio 的并发能力。


监控和告警

1. 日志记录

所有关键操作都记录日志,包括:

  • 查询操作(user_id、查询条件、结果数量)
  • 统计操作(统计范围、结果摘要)
  • 超时处理(处理数量、退还积分)
  • 错误信息(异常堆栈、上下文)

2. 指标监控

建议监控以下指标:

  • 队列长度(queue_length)
  • 超时任务数(handled_count)
  • 成功率(success_rate)
  • 平均执行时间(avg_execution_time)
  • Worker 状态(workers.active)

3. 告警规则

建议配置以下告警:

  • 队列长度 > 100:需要扩容 Worker
  • 超时任务数 > 10/小时:检查 Worker 性能
  • 成功率 < 90%:检查 AI Provider 状态
  • Worker 离线:立即告警

测试建议

1. 单元测试

# 测试任务查询
async def test_get_user_jobs():
    result = await ai_service.get_user_jobs(
        user_id=test_user_id,
        page=1,
        page_size=10
    )
    assert result['total'] >= 0
    assert len(result['items']) <= 10

# 测试统计功能
async def test_get_job_statistics():
    stats = await ai_service.get_job_statistics(
        user_id=test_user_id
    )
    assert 'total_jobs' in stats
    assert 'success_rate' in stats
    assert stats['success_rate'] >= 0 and stats['success_rate'] <= 100

2. 集成测试

# 测试超时处理
async def test_handle_timeout_jobs():
    # 创建一个超时任务
    job = await create_test_job(status=AIJobStatus.PROCESSING)
    
    # 等待超时
    await asyncio.sleep(timeout_minutes * 60 + 10)
    
    # 触发超时处理
    result = await ai_service.handle_timeout_jobs(timeout_minutes)
    
    # 验证任务已标记为失败
    job = await ai_service.get_job_status(job.ai_job_id)
    assert job['status'] == AIJobStatus.FAILED
    assert '超时' in job['error_message']

后续优化建议

1. 缓存优化

对于频繁查询的统计数据,可以使用 Redis 缓存:

# 缓存统计数据 5 分钟
@cache(ttl=300)
async def get_job_statistics(user_id, start_date, end_date):
    ...

2. 实时通知

超时任务处理后,可以通过 WebSocket 实时通知用户:

# 发送 WebSocket 通知
await websocket_manager.send_to_user(
    user_id=job.user_id,
    message={
        'type': 'job_timeout',
        'job_id': job.ai_job_id,
        'refunded_credits': job.credits_used
    }
)

3. 数据归档

定期归档历史任务数据,保持查询性能:

# 归档 90 天前的已完成任务
async def archive_old_jobs():
    cutoff_date = datetime.utcnow() - timedelta(days=90)
    await job_repository.archive_jobs(cutoff_date)

4. 智能超时阈值

根据任务类型和历史数据,动态调整超时阈值:

# 图片生成:10 分钟
# 视频生成:30 分钟
# 文本处理:5 分钟
timeout_by_type = {
    AIJobType.IMAGE: 10,
    AIJobType.VIDEO: 30,
    AIJobType.TEXT_PROCESSING: 5
}

相关文档


总结

本次变更实现了完整的 AI 任务监控和管理功能,包括:

任务批量查询和筛选(支持多维度过滤、分页、排序)
任务统计和分析(成功率、执行时间、积分消耗、多维度分组)
任务超时处理(自动检测、自动标记失败、自动退还积分)
任务队列监控(Worker 状态、队列长度、实时任务数)
Celery Beat 定时任务(每 10 分钟自动检测超时任务)
完整的日志记录和错误处理
应用层引用完整性保证

这些功能显著提升了系统的可观测性和自动化运维能力,为生产环境的稳定运行提供了坚实保障。