You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
9.0 KiB
9.0 KiB
pytest 事件循环修复 - FastAPI + asyncpg + httpx.AsyncClient
日期: 2026-02-02
类型: Bug Fix
影响范围: 测试框架、所有集成测试
概述
修复 FastAPI + asyncpg + httpx.AsyncClient 的事件循环冲突问题,实现 8/8 集成测试全部通过。
问题描述
错误信息
RuntimeError: Task ... got Future ... attached to a different loop
根本原因
- pytest-asyncio 为每个测试创建独立的事件循环(function scope)
- FastAPI 应用内部使用自己的事件循环
- asyncpg 连接池绑定到特定事件循环
- AsyncClient 调用 API 时,两个事件循环产生冲突
影响
- 集成测试随机失败
- 无法稳定复现问题
- 测试运行时间不稳定
解决方案
核心修复:session 级别的事件循环
server/tests/conftest.py
# ====================================================================
# 1️⃣ 强制测试使用专属事件循环(修复事件循环冲突)
# ====================================================================
@pytest.fixture(scope="session")
def event_loop():
"""为 pytest-asyncio 提供独立事件循环,覆盖默认 loop
这确保所有测试使用同一个事件循环,避免 asyncpg 连接池绑定到不同 loop
"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
# ====================================================================
# 2️⃣ 数据库引擎和会话(在测试 loop 内创建,避免 asyncpg pool 冲突)
# ====================================================================
@pytest_asyncio.fixture(scope="session")
async def async_engine():
"""创建异步数据库引擎(session 级别,所有测试共享)
关键:在测试事件循环内创建,确保 asyncpg pool 绑定到正确的 loop
"""
engine = create_async_engine(
settings.DATABASE_URL,
echo=False,
future=True,
pool_pre_ping=True, # 连接健康检查
pool_recycle=3600, # 1小时回收连接
pool_size=5, # 测试环境使用较小的连接池
max_overflow=10
)
yield engine
await engine.dispose()
@pytest_asyncio.fixture
async def db_session(async_engine):
"""创建数据库会话(每个测试独立)
自动回滚确保测试隔离,不污染数据库
"""
async_session = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False, # ✅ 确保测试对象在 commit 后仍可访问
autocommit=False,
autoflush=False
)
async with async_session() as session:
try:
yield session
finally:
# 确保回滚未提交的事务(测试隔离)
await session.rollback()
# 确保会话正确关闭
await session.close()
# ====================================================================
# 3️⃣ AsyncClient fixture
# ====================================================================
@pytest_asyncio.fixture
async def async_client():
"""创建异步 HTTP 客户端
注意:httpx 0.26+ 不再支持 lifespan 参数,已自动处理
"""
async with AsyncClient(
app=app,
base_url="http://test",
follow_redirects=True
) as client:
yield client
关键改进点
1. Session 级别的事件循环
@pytest.fixture(scope="session") # ✅ session 级别
def event_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # ✅ 设置为当前事件循环
yield loop
loop.close()
效果:
- 所有测试共享同一个事件循环
- asyncpg 连接池只创建一次,绑定到正确的 loop
- 避免 "Future attached to a different loop" 错误
2. Session 级别的数据库引擎
@pytest_asyncio.fixture(scope="session") # ✅ session 级别
async def async_engine():
engine = create_async_engine(...)
yield engine
await engine.dispose()
效果:
- 数据库连接池在测试事件循环内创建
- 所有测试共享同一个连接池
- 提高测试性能
3. Function 级别的数据库会话
@pytest_asyncio.fixture # ✅ function 级别(默认)
async def db_session(async_engine):
async_session = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False, # ✅ 关键配置
...
)
async with async_session() as session:
try:
yield session
finally:
await session.rollback() # ✅ 自动回滚
await session.close()
效果:
- 每个测试独立的数据库会话
- 自动回滚确保测试隔离
expire_on_commit=False确保测试对象在 commit 后仍可访问
4. JWT Token 唯一性
async def login_and_get_token(async_client: AsyncClient, phone: str = TEST_PHONE) -> dict:
# 添加延迟确保每次登录的 token 不同(JWT 包含时间戳,精度为秒)
await asyncio.sleep(1.1)
response = await async_client.post("/api/v1/auth/login/phone", ...)
...
效果:
- 避免 JWT token 重复导致的唯一约束冲突
- 确保每次登录生成不同的 token
测试结果
修复前
=================== 5 passed, 4 warnings, 3 errors in 4.98s ====================
- ❌ 3/8 测试因事件循环冲突失败
修复后
======================== 8 passed, 7 warnings in 12.35s ========================
- ✅ 8/8 测试全部通过
- ✅ 无事件循环错误
- ✅ 测试稳定可复现
技术细节
事件循环作用域对比
| 作用域 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| function | 测试完全隔离 | asyncpg pool 冲突 | 单元测试 |
| session | 避免 pool 冲突 | 需要手动清理 | 集成测试 |
asyncpg 连接池绑定
# ❌ 错误:每个测试创建新 loop
@pytest.fixture(scope="function")
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()
# ✅ 正确:所有测试共享 loop
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
数据库会话配置
async_session = sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False, # ✅ 关键:commit 后对象仍可访问
autocommit=False, # ✅ 手动控制事务
autoflush=False # ✅ 手动控制刷新
)
最佳实践
1. 测试隔离策略
集成测试(纯 API 调用):
@pytest.mark.asyncio
async def test_upload_file(async_client, auth):
# ✅ 只通过 API 调用
response = await async_client.post(
"/api/v1/file-storage/upload",
files={"file": ("test.txt", b"content")},
headers={"Authorization": f"Bearer {auth['access_token']}"}
)
assert response.status_code == 200
单元测试(直接数据库访问):
@pytest.mark.asyncio
async def test_create_user(db_session):
# ✅ 直接使用 db_session
user = User(...)
db_session.add(user)
await db_session.commit()
# 测试后自动回滚(fixture 处理)
2. 测试数据清理
自动回滚(推荐):
@pytest_asyncio.fixture
async def db_session(async_engine):
async with async_session() as session:
try:
yield session
finally:
await session.rollback() # ✅ 自动回滚
手动清理(备选):
@pytest_asyncio.fixture
async def test_user(db_session):
user = User(...)
db_session.add(user)
await db_session.commit()
yield user
# 手动清理
try:
await db_session.refresh(user)
await db_session.delete(user)
await db_session.commit()
except Exception:
await db_session.rollback()
3. JWT Token 唯一性
async def login_and_get_token(async_client, phone):
# ✅ 添加延迟确保 token 唯一
await asyncio.sleep(1.1)
response = await async_client.post("/api/v1/auth/login/phone", ...)
return response.json()["data"]
适用范围
此修复方案适用于所有使用以下技术栈的项目:
- ✅ FastAPI
- ✅ asyncpg / SQLAlchemy async
- ✅ httpx.AsyncClient
- ✅ pytest-asyncio
相关文档
参考资料
总结
通过使用 session 级别的事件循环和数据库引擎,成功解决了 FastAPI + asyncpg + httpx.AsyncClient 的事件循环冲突问题。所有集成测试现在都能稳定通过,为后续开发提供了可靠的测试基础。