You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
48 KiB
48 KiB
充值管理服务
文档版本:v2.4
最后更新:2026-01-29
目录
服务概述
充值管理服务负责处理用户的充值订单创建、支付处理、支付回调、订单查询等功能。支持微信支付和支付宝支付。
职责
- 创建充值订单
- 调用支付接口
- 处理支付回调
- 订单状态管理
- 充值成功后增加积分
核心功能
1. 创建充值订单
- 选择套餐或自定义金额
- 生成唯一订单号
- 设置订单过期时间
2. 支付处理
- 微信支付(扫码/H5/小程序)
- 支付宝支付(扫码/H5)
- 生成支付参数
3. 支付回调
- 验证签名
- 更新订单状态
- 增加用户积分
- 记录回调日志
4. 订单管理
- 查询订单列表
- 查询订单详情
- 取消订单
- 订单超时处理
数据库设计
枚举类型定义
# app/models/recharge/enums.py
from enum import IntEnum
class PaymentMethod(IntEnum):
"""支付方式枚举(使用 SMALLINT 存储)"""
WECHAT = 1
ALIPAY = 2
@classmethod
def from_string(cls, name: str) -> "PaymentMethod":
mapping = {"wechat": cls.WECHAT, "alipay": cls.ALIPAY}
return mapping.get(name.lower(), cls.WECHAT)
def to_string(self) -> str:
mapping = {self.WECHAT: "wechat", self.ALIPAY: "alipay"}
return mapping.get(self, "unknown")
@classmethod
def get_display_name(cls, value: int) -> str:
names = {cls.WECHAT: "微信支付", cls.ALIPAY: "支付宝"}
return names.get(value, "未知支付方式")
class PaymentStatus(IntEnum):
"""支付状态枚举(使用 SMALLINT 存储)"""
PENDING = 1
PAID = 2
FAILED = 3
REFUNDED = 4
CANCELLED = 5
@classmethod
def from_string(cls, name: str) -> "PaymentStatus":
mapping = {
"pending": cls.PENDING,
"paid": cls.PAID,
"failed": cls.FAILED,
"refunded": cls.REFUNDED,
"cancelled": cls.CANCELLED
}
return mapping.get(name.lower(), cls.PENDING)
def to_string(self) -> str:
mapping = {
self.PENDING: "pending",
self.PAID: "paid",
self.FAILED: "failed",
self.REFUNDED: "refunded",
self.CANCELLED: "cancelled"
}
return mapping.get(self, "pending")
@classmethod
def get_display_name(cls, value: int) -> str:
names = {
cls.PENDING: "待支付",
cls.PAID: "已支付",
cls.FAILED: "支付失败",
cls.REFUNDED: "已退款",
cls.CANCELLED: "已取消"
}
return names.get(value, "未知状态")
充值订单表
-- app/models/recharge/tables.sql
CREATE TABLE recharge_orders (
order_id UUID PRIMARY KEY,
-- 关联字段(无外键约束,应用层验证)
user_id UUID NOT NULL,
package_id UUID,
-- 订单信息
order_no TEXT NOT NULL UNIQUE,
-- 金额信息
amount DECIMAL(10, 2) NOT NULL,
credits INTEGER NOT NULL,
bonus_credits INTEGER NOT NULL DEFAULT 0,
-- 支付信息
payment_method SMALLINT NOT NULL DEFAULT 1,
payment_status SMALLINT NOT NULL DEFAULT 1,
transaction_id TEXT,
-- 时间戳
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
paid_at TIMESTAMPTZ,
expired_at TIMESTAMPTZ,
-- 备注
remark TEXT
);
-- 表注释
COMMENT ON TABLE recharge_orders IS '充值订单表 - 应用层保证引用完整性';
-- 字段注释
COMMENT ON COLUMN recharge_orders.order_id IS '充值订单唯一标识(UUID v7)';
COMMENT ON COLUMN recharge_orders.user_id IS '用户 ID - 应用层验证';
COMMENT ON COLUMN recharge_orders.package_id IS '套餐 ID - 应用层验证';
COMMENT ON COLUMN recharge_orders.order_no IS '订单编号(唯一)';
COMMENT ON COLUMN recharge_orders.amount IS '订单金额(元)';
COMMENT ON COLUMN recharge_orders.credits IS '基础积分数';
COMMENT ON COLUMN recharge_orders.bonus_credits IS '赠送积分数';
COMMENT ON COLUMN recharge_orders.payment_method IS '支付方式(1=微信, 2=支付宝)';
COMMENT ON COLUMN recharge_orders.payment_status IS '支付状态(1=待支付, 2=已支付, 3=支付失败, 4=已退款, 5=已取消)';
COMMENT ON COLUMN recharge_orders.transaction_id IS '第三方支付平台交易 ID';
COMMENT ON COLUMN recharge_orders.created_at IS '订单创建时间';
COMMENT ON COLUMN recharge_orders.paid_at IS '支付完成时间';
COMMENT ON COLUMN recharge_orders.expired_at IS '订单过期时间';
COMMENT ON COLUMN recharge_orders.remark IS '备注信息';
-- 索引(移除外键后索引更加重要)
CREATE INDEX idx_recharge_orders_user_id ON recharge_orders (user_id);
CREATE INDEX idx_recharge_orders_package_id ON recharge_orders (package_id)
WHERE package_id IS NOT NULL;
CREATE INDEX idx_recharge_orders_order_no ON recharge_orders (order_no);
CREATE INDEX idx_recharge_orders_payment_status ON recharge_orders (payment_status)
WHERE payment_status = 1; -- 优化待支付订单查询
CREATE INDEX idx_recharge_orders_created_at ON recharge_orders (created_at DESC);
CREATE INDEX idx_recharge_orders_transaction_id ON recharge_orders (transaction_id)
WHERE transaction_id IS NOT NULL;
-- 组合索引
CREATE INDEX idx_recharge_orders_user_status ON recharge_orders (user_id, payment_status);
CREATE INDEX idx_recharge_orders_user_created ON recharge_orders (user_id, created_at DESC);
支付回调日志表
CREATE TABLE payment_callbacks (
callback_id UUID PRIMARY KEY,
-- 订单信息
order_no TEXT NOT NULL,
-- 支付信息
payment_method SMALLINT NOT NULL,
transaction_id TEXT,
-- 回调数据
callback_data JSONB NOT NULL,
-- 处理状态
is_processed BOOLEAN DEFAULT false,
process_result TEXT,
-- 时间戳
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
processed_at TIMESTAMPTZ
);
-- 表注释
COMMENT ON TABLE payment_callbacks IS '支付回调日志表';
-- 字段注释
COMMENT ON COLUMN payment_callbacks.callback_id IS '回调记录唯一标识(UUID v7)';
COMMENT ON COLUMN payment_callbacks.order_no IS '订单编号';
COMMENT ON COLUMN payment_callbacks.payment_method IS '支付方式(1=微信, 2=支付宝)';
COMMENT ON COLUMN payment_callbacks.transaction_id IS '第三方支付平台交易 ID';
COMMENT ON COLUMN payment_callbacks.callback_data IS '回调数据(JSON 格式)';
COMMENT ON COLUMN payment_callbacks.is_processed IS '是否已处理';
COMMENT ON COLUMN payment_callbacks.process_result IS '处理结果';
COMMENT ON COLUMN payment_callbacks.created_at IS '回调接收时间';
COMMENT ON COLUMN payment_callbacks.processed_at IS '处理完成时间';
-- 索引
CREATE INDEX idx_payment_callbacks_order_no ON payment_callbacks (order_no);
CREATE INDEX idx_payment_callbacks_processed ON payment_callbacks (is_processed);
CREATE INDEX idx_payment_callbacks_created_at ON payment_callbacks (created_at);
服务实现
RechargeService 类
# app/services/recharge_service.py
from typing import Optional, Dict, Any, List
from uuid import UUID
from decimal import Decimal
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy import select, func
from app.models.recharge import RechargeOrder, PaymentCallback
from app.models.recharge.enums import PaymentMethod, PaymentStatus
from app.repositories.recharge_repository import RechargeRepository
from app.services.credit_service import CreditService
from app.services.payment_service import PaymentService
from app.core.exceptions import NotFoundError, ValidationError
from app.utils.id_generator import generate_uuid
from datetime import datetime, timedelta, timezone
from loguru import logger
import uuid
class RechargeService:
"""充值业务逻辑层"""
def __init__(
self,
session: AsyncSession,
repository: RechargeRepository,
credit_service: CreditService,
payment_service: PaymentService
):
self.session = session
self.repository = repository
self.credit_service = credit_service
self.payment_service = payment_service
# ==================== 创建订单 ====================
async def create_order(
self,
user_id: UUID,
package_id: Optional[UUID] = None,
amount: Optional[Decimal] = None,
payment_method: str = 'wechat'
) -> Dict[str, Any]:
"""创建充值订单"""
# 1. 验证用户存在(应用层引用完整性检查)
if not await self.repository.exists_user(user_id):
raise NotFoundError(f"用户不存在: {user_id}")
payment_method_enum = PaymentMethod.from_string(payment_method)
# 2. 验证套餐或金额
if package_id:
# 验证套餐存在(应用层引用完整性检查)
if not await self.repository.exists_package(package_id):
raise NotFoundError(f"套餐不存在: {package_id}")
package = await self.credit_service.get_package(package_id)
order_amount = package.price
credits = package.credits
bonus_credits = package.bonus_credits
elif amount:
if amount < Decimal('1'):
raise ValidationError("充值金额不能小于 1 元")
order_amount = amount
credits = int(amount * 10)
bonus_credits = 0
else:
raise ValidationError("必须指定套餐或金额")
order_no = self._generate_order_no()
order = RechargeOrder(
order_id=generate_uuid(),
user_id=user_id,
order_no=order_no,
package_id=package_id,
amount=order_amount,
credits=credits,
bonus_credits=bonus_credits,
payment_method=payment_method_enum,
payment_status=PaymentStatus.PENDING,
expired_at=datetime.now(timezone.utc) + timedelta(minutes=30)
)
order = await self.repository.create(order)
payment_params = await self.payment_service.create_payment(
order_no=order_no,
amount=float(order_amount),
payment_method=payment_method,
description=f"充值 {credits + bonus_credits} 积分"
)
return {
'order': order.to_dict(),
'paymentParams': payment_params
}
# ==================== 支付回调 ====================
async def handle_payment_callback(
self,
payment_method: str,
callback_data: Dict[str, Any]
) -> Dict[str, Any]:
"""处理支付回调"""
callback_log = PaymentCallback(
callback_id=generate_uuid(),
order_no=callback_data.get('order_no'),
payment_method=PaymentMethod.from_string(payment_method),
transaction_id=callback_data.get('transaction_id'),
callback_data=callback_data,
is_processed=False
)
self.session.add(callback_log)
await self.session.flush()
try:
# 1. 验证签名
is_valid = await self.payment_service.verify_callback(
payment_method=payment_method,
callback_data=callback_data
)
if not is_valid:
callback_log.process_result = "签名验证失败"
await self.session.commit()
return {'success': False, 'message': '签名验证失败'}
# 2. 查询订单
order_no = callback_data.get('order_no')
order = await self.repository.get_by_order_no(order_no)
if not order:
callback_log.process_result = "订单不存在"
await self.session.commit()
return {'success': False, 'message': '订单不存在'}
# 3. 检查订单状态(幂等性)
if order.payment_status == PaymentStatus.PAID:
callback_log.process_result = "订单已支付"
callback_log.is_processed = True
await self.session.commit()
return {'success': True, 'message': '订单已支付'}
# 4. 验证用户存在(应用层引用完整性检查)
if not await self.repository.exists_user(order.user_id):
callback_log.process_result = f"用户不存在: {order.user_id}"
await self.session.commit()
raise NotFoundError(f"用户不存在: {order.user_id}")
# 5. 更新订单状态
await self.repository.update(order.order_id, {
'payment_status': PaymentStatus.PAID,
'transaction_id': callback_data.get('transaction_id'),
'paid_at': datetime.now(timezone.utc)
})
# 6. 增加用户积分
total_credits = order.credits + order.bonus_credits
await self.credit_service.add_credits(
user_id=order.user_id,
amount=total_credits,
transaction_type='recharge',
description=f"充值 {order.amount} 元获得 {total_credits} 积分",
related_order_id=order.order_id
)
# 7. 更新用户充值总额
from app.models.user import User
user = await self.session.get(User, order.user_id)
if user:
user.total_recharged_amount += float(order.amount)
# 8. 记录回调处理成功
callback_log.is_processed = True
callback_log.processed_at = datetime.now(timezone.utc)
callback_log.process_result = "处理成功"
await self.session.commit()
return {'success': True, 'message': '充值成功'}
except Exception as e:
callback_log.process_result = f"处理失败:{str(e)}"
await self.session.commit()
raise
# ==================== 订单查询 ====================
async def get_order(self, order_id: UUID) -> RechargeOrder:
"""查询订单详情"""
order = await self.repository.get_by_id(order_id)
if not order:
raise NotFoundError("订单不存在")
return order
async def get_user_orders(
self,
user_id: UUID,
payment_status: Optional[str] = None,
page: int = 1,
page_size: int = 20
) -> Dict[str, Any]:
"""查询用户订单列表"""
status_enum = PaymentStatus.from_string(payment_status) if payment_status else None
orders, total = await self.repository.get_by_user(
user_id=user_id,
payment_status=status_enum,
page=page,
page_size=page_size
)
return {
'items': [order.to_dict() for order in orders],
'total': total,
'page': page,
'pageSize': page_size,
'totalPages': (total + page_size - 1) // page_size
}
async def cancel_order(self, order_id: UUID, user_id: UUID) -> RechargeOrder:
"""取消订单"""
order = await self.repository.get_by_id(order_id)
if not order:
raise NotFoundError("订单不存在")
if order.user_id != user_id:
raise ValidationError("无权操作此订单")
if order.payment_status != PaymentStatus.PENDING:
raise ValidationError("只能取消待支付订单")
return await self.repository.update(order.order_id, {
'payment_status': PaymentStatus.CANCELLED
})
# ==================== 定时任务 ====================
async def expire_orders(self) -> int:
"""订单超时处理(定时任务)"""
expired_orders = await self.repository.get_expired_orders()
count = 0
for order in expired_orders:
await self.repository.update(order.order_id, {
'payment_status': PaymentStatus.CANCELLED,
'remark': '订单超时自动取消'
})
count += 1
return count
# ==================== 私有方法 ====================
def _generate_order_no(self) -> str:
"""生成订单号"""
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')
random_str = uuid.uuid4().hex[:8].upper()
return f"RCH{timestamp}{random_str}"
Repository 层
# app/repositories/recharge_repository.py
from typing import List, Optional, Tuple, Dict, Any
from uuid import UUID
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy import func, text
from app.models.recharge import RechargeOrder
from app.models.recharge.enums import PaymentStatus
from datetime import datetime, timezone
class RechargeRepository:
"""充值订单数据访问层"""
def __init__(self, session: AsyncSession):
self.session = session
# ==================== 引用完整性验证 ====================
async def exists_user(self, user_id: UUID) -> bool:
"""检查用户是否存在(应用层引用完整性)"""
from app.models.user import User
result = await self.session.execute(
select(User.user_id).where(
User.user_id == user_id,
User.deleted_at.is_(None)
).limit(1)
)
return result.scalar_one_or_none() is not None
async def exists_package(self, package_id: UUID) -> bool:
"""检查套餐是否存在(应用层引用完整性)"""
from app.models.credit import CreditPackage
result = await self.session.execute(
select(CreditPackage.package_id).where(
CreditPackage.package_id == package_id,
CreditPackage.is_active == True
).limit(1)
)
return result.scalar_one_or_none() is not None
async def get_by_id(self, order_id: UUID) -> Optional[RechargeOrder]:
"""根据 ID 获取订单"""
result = await self.session.execute(
select(RechargeOrder).where(RechargeOrder.order_id == order_id)
)
return result.scalar_one_or_none()
async def get_by_order_no(self, order_no: str) -> Optional[RechargeOrder]:
"""根据订单号获取订单"""
result = await self.session.execute(
select(RechargeOrder).where(RechargeOrder.order_no == order_no)
)
return result.scalar_one_or_none()
async def get_by_user(
self,
user_id: UUID,
payment_status: Optional[PaymentStatus] = None,
page: int = 1,
page_size: int = 20
) -> Tuple[List[RechargeOrder], int]:
"""获取用户订单列表"""
query = select(RechargeOrder).where(RechargeOrder.user_id == user_id)
if payment_status:
query = query.where(RechargeOrder.payment_status == payment_status)
query = query.order_by(RechargeOrder.created_at.desc())
total_result = await self.session.execute(
select(func.count()).select_from(query.subquery())
)
total = total_result.scalar_one()
query = query.offset((page - 1) * page_size).limit(page_size)
result = await self.session.execute(query)
orders = result.scalars().all()
return list(orders), total
async def get_expired_orders(self) -> List[RechargeOrder]:
"""获取过期订单"""
result = await self.session.execute(
select(RechargeOrder).where(
RechargeOrder.payment_status == PaymentStatus.PENDING,
RechargeOrder.expired_at < datetime.now(timezone.utc)
)
)
return list(result.scalars().all())
async def create(self, order: RechargeOrder) -> RechargeOrder:
"""创建订单"""
self.session.add(order)
await self.session.commit()
await self.session.refresh(order)
return order
async def update(self, order_id: UUID, update_data: dict) -> RechargeOrder:
"""更新订单"""
from app.core.exceptions import NotFoundError
order = await self.get_by_id(order_id)
if not order:
raise NotFoundError("订单不存在")
for key, value in update_data.items():
setattr(order, key, value)
await self.session.commit()
await self.session.refresh(order)
return order
# ==================== 数据完整性维护 ====================
async def cancel_user_pending_orders(self, user_id: UUID) -> int:
"""取消用户的所有待支付订单(用户删除时调用)"""
result = await self.session.execute(
select(RechargeOrder).where(
RechargeOrder.user_id == user_id,
RechargeOrder.payment_status == PaymentStatus.PENDING
)
)
orders = result.scalars().all()
count = 0
for order in orders:
order.payment_status = PaymentStatus.CANCELLED
order.remark = "用户删除,订单自动取消"
count += 1
await self.session.commit()
return count
async def check_orphan_orders(self) -> List[Dict[str, Any]]:
"""检查孤儿订单(用户或套餐不存在的订单)"""
# 检查用户不存在的订单
query = text("""
SELECT
ro.order_no,
ro.user_id::text,
ro.package_id::text,
ro.amount,
ro.payment_status
FROM recharge_orders ro
LEFT JOIN users u ON ro.user_id = u.user_id AND u.deleted_at IS NULL
WHERE u.user_id IS NULL
UNION
SELECT
ro.order_no,
ro.user_id::text,
ro.package_id::text,
ro.amount,
ro.payment_status
FROM recharge_orders ro
LEFT JOIN credit_packages cp ON ro.package_id = cp.package_id AND cp.is_active = true
WHERE ro.package_id IS NOT NULL AND cp.package_id IS NULL
""")
result = await self.session.execute(query)
rows = result.fetchall()
return [
{
'order_no': row[0],
'user_id': row[1],
'package_id': row[2],
'amount': float(row[3]),
'payment_status': int(row[4])
}
for row in rows
]
模型定义
# app/models/recharge/order.py
from typing import Optional, TYPE_CHECKING
from uuid import UUID
from decimal import Decimal
from sqlmodel import SQLModel, Field, Column, Index, Relationship
from sqlalchemy.dialects.postgresql import UUID as PG_UUID, SMALLINT, JSONB
from datetime import datetime, timezone
from app.models.recharge.enums import PaymentMethod, PaymentStatus
from app.utils.id_generator import generate_uuid
if TYPE_CHECKING:
from app.models.user import User
from app.models.credit import CreditPackage
class RechargeOrder(SQLModel, table=True):
"""充值订单表 - 应用层保证引用完整性"""
__tablename__ = "recharge_orders"
order_id: UUID = Field(
sa_column=Column(
PG_UUID(as_uuid=True),
primary_key=True,
default=generate_uuid
)
)
# 关联字段(无外键约束,应用层验证)
user_id: UUID = Field(
sa_column=Column(PG_UUID(as_uuid=True), nullable=False, index=True),
description="用户 ID - 应用层验证"
)
order_no: str = Field(max_length=64, unique=True, index=True)
package_id: Optional[UUID] = Field(
default=None,
sa_column=Column(PG_UUID(as_uuid=True), nullable=True, index=True),
description="套餐 ID - 应用层验证"
)
amount: Decimal = Field(decimal_places=2, max_digits=10)
credits: int
bonus_credits: int = Field(default=0)
payment_method: int = Field(
sa_column=Column(SMALLINT, nullable=False, default=PaymentMethod.WECHAT),
description="支付方式:1=微信,2=支付宝"
)
payment_status: int = Field(
sa_column=Column(SMALLINT, nullable=False, default=PaymentStatus.PENDING),
description="支付状态:1=待支付,2=已支付,3=支付失败,4=已退款,5=已取消"
)
transaction_id: Optional[str] = Field(default=None)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
paid_at: Optional[datetime] = Field(default=None)
expired_at: datetime
remark: Optional[str] = Field(default=None)
# 关系(无物理外键,使用 primaryjoin 明确指定关联条件)
user: Optional["User"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "RechargeOrder.user_id == User.user_id",
"foreign_keys": "[RechargeOrder.user_id]",
}
)
package: Optional["CreditPackage"] = Relationship(
sa_relationship_kwargs={
"primaryjoin": "RechargeOrder.package_id == CreditPackage.package_id",
"foreign_keys": "[RechargeOrder.package_id]",
}
)
# 表级索引
__table_args__ = (
Index('idx_recharge_orders_user_status', 'user_id', 'payment_status'),
Index('idx_recharge_orders_user_created', 'user_id', 'created_at'),
)
def to_dict(self) -> dict:
"""转换为字典(camelCase 字段名)"""
return {
'id': str(self.order_id),
'userId': str(self.user_id),
'orderNo': self.order_no,
'packageId': str(self.package_id) if self.package_id else None,
'amount': float(self.amount),
'credits': self.credits,
'bonusCredits': self.bonus_credits,
'paymentMethod': PaymentMethod(self.payment_method).to_string(),
'paymentStatus': PaymentStatus(self.payment_status).to_string(),
'transactionId': self.transaction_id,
'createdAt': self.created_at.isoformat() if self.created_at else None,
'paidAt': self.paid_at.isoformat() if self.paid_at else None,
'expiredAt': self.expired_at.isoformat() if self.expired_at else None,
'remark': self.remark
}
class PaymentCallback(SQLModel, table=True):
"""支付回调日志表"""
__tablename__ = "payment_callbacks"
callback_id: UUID = Field(
sa_column=Column(
PG_UUID(as_uuid=True),
primary_key=True,
default=generate_uuid
)
)
order_no: str = Field(index=True)
payment_method: int
transaction_id: Optional[str] = Field(default=None)
callback_data: dict = Field(sa_column=Column(JSONB, nullable=False))
is_processed: bool = Field(default=False)
process_result: Optional[str] = Field(default=None)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
processed_at: Optional[datetime] = Field(default=None)
关键说明:
- TYPE_CHECKING 导入:使用
TYPE_CHECKING避免循环导入问题 - Relationship 配置:
primaryjoin: 明确指定关联条件(因为没有物理外键)foreign_keys: 指定外键列(用于 SQLAlchemy 推断)
- 可选关系:
package是可选的(用户可以自定义金额充值)
Schema 定义
请求 Schema
# app/schemas/recharge.py
from typing import Optional
from uuid import UUID
from decimal import Decimal
from pydantic import BaseModel, Field, field_validator, ConfigDict
class RechargeOrderCreateRequest(BaseModel):
"""创建充值订单请求"""
model_config = ConfigDict(populate_by_name=True)
package_id: Optional[UUID] = Field(None, alias="packageId", description="套餐 ID(与 amount 二选一)")
amount: Optional[Decimal] = Field(None, description="自定义金额(与 packageId 二选一)")
payment_method: str = Field("wechat", alias="paymentMethod", description="支付方式:wechat/alipay")
@field_validator('payment_method')
@classmethod
def validate_payment_method(cls, v: str) -> str:
if v not in ['wechat', 'alipay']:
raise ValueError('支付方式必须是 wechat 或 alipay')
return v
@field_validator('amount')
@classmethod
def validate_amount(cls, v: Optional[Decimal]) -> Optional[Decimal]:
if v is not None and v < Decimal('1'):
raise ValueError('充值金额不能小于 1 元')
return v
class RechargeOrderQueryRequest(BaseModel):
"""查询订单列表请求"""
model_config = ConfigDict(populate_by_name=True)
payment_status: Optional[str] = Field(None, alias="paymentStatus", description="支付状态筛选")
page: int = Field(1, ge=1, description="页码")
page_size: int = Field(20, ge=1, le=100, alias="pageSize", description="每页数量")
响应 Schema
class RechargeOrderResponse(BaseModel):
"""充值订单响应"""
model_config = ConfigDict(populate_by_name=True)
id: str
user_id: str = Field(alias="userId")
order_no: str = Field(alias="orderNo")
package_id: Optional[str] = Field(None, alias="packageId")
amount: float
credits: int
bonus_credits: int = Field(alias="bonusCredits")
payment_method: str = Field(alias="paymentMethod")
payment_method_display: str = Field(alias="paymentMethodDisplay")
payment_status: str = Field(alias="paymentStatus")
payment_status_display: str = Field(alias="paymentStatusDisplay")
transaction_id: Optional[str] = Field(None, alias="transactionId")
created_at: str = Field(alias="createdAt")
paid_at: Optional[str] = Field(None, alias="paidAt")
expired_at: str = Field(alias="expiredAt")
remark: Optional[str] = None
class PaymentParamsResponse(BaseModel):
"""支付参数响应"""
model_config = ConfigDict(populate_by_name=True)
qrcode_url: str = Field(alias="qrcodeUrl", description="支付二维码 URL")
expires_in: int = Field(alias="expiresIn", description="过期时间(秒)")
class RechargeOrderCreateResponse(BaseModel):
"""创建充值订单响应"""
model_config = ConfigDict(populate_by_name=True)
order: RechargeOrderResponse
payment_params: PaymentParamsResponse = Field(alias="paymentParams")
class RechargeOrderListResponse(BaseModel):
"""订单列表响应"""
model_config = ConfigDict(populate_by_name=True)
items: list[RechargeOrderResponse]
total: int
page: int
page_size: int = Field(alias="pageSize")
total_pages: int = Field(alias="totalPages")
API 接口
API 路由实现
# app/api/v1/recharge.py
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, Request
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.database import get_session
from app.api.deps import get_current_user
from app.models.user import User
from app.repositories.recharge_repository import RechargeRepository
from app.services.recharge_service import RechargeService
from app.services.credit_service import CreditService
from app.services.payment_service import PaymentService
from app.repositories.credit_repository import CreditRepository
from app.schemas.recharge import (
RechargeOrderCreateRequest,
RechargeOrderCreateResponse,
RechargeOrderResponse,
RechargeOrderListResponse,
RechargeOrderQueryRequest
)
from app.schemas.response import ApiResponse
from loguru import logger
router = APIRouter(prefix="/recharge", tags=["recharge"])
# ==================== 依赖注入 ====================
async def get_recharge_service(
session: Annotated[AsyncSession, Depends(get_session)]
) -> RechargeService:
"""获取充值服务实例"""
recharge_repo = RechargeRepository(session)
credit_repo = CreditRepository(session)
credit_service = CreditService(session, credit_repo)
payment_service = PaymentService()
return RechargeService(session, recharge_repo, credit_service, payment_service)
# ==================== API 路由 ====================
@router.post("/orders", response_model=ApiResponse[RechargeOrderCreateResponse])
async def create_order(
request: RechargeOrderCreateRequest,
current_user: Annotated[User, Depends(get_current_user)],
service: Annotated[RechargeService, Depends(get_recharge_service)]
):
"""创建充值订单
- **packageId**: 套餐 ID(与 amount 二选一)
- **amount**: 自定义金额(与 packageId 二选一)
- **paymentMethod**: 支付方式(wechat/alipay)
"""
result = await service.create_order(
user_id=current_user.user_id,
package_id=request.package_id,
amount=request.amount,
payment_method=request.payment_method
)
return ApiResponse.success(data=result)
@router.get("/orders/{order_id}", response_model=ApiResponse[RechargeOrderResponse])
async def get_order(
order_id: UUID,
current_user: Annotated[User, Depends(get_current_user)],
service: Annotated[RechargeService, Depends(get_recharge_service)]
):
"""查询订单详情"""
order = await service.get_order(order_id)
# 验证订单所有权
if order.user_id != current_user.user_id:
return ApiResponse.error(message="无权查看此订单", code=403)
return ApiResponse.success(data=order.to_dict())
@router.get("/orders", response_model=ApiResponse[RechargeOrderListResponse])
async def get_orders(
query: Annotated[RechargeOrderQueryRequest, Depends()],
current_user: Annotated[User, Depends(get_current_user)],
service: Annotated[RechargeService, Depends(get_recharge_service)]
):
"""查询订单列表
- **paymentStatus**: 支付状态筛选(pending/paid/failed/refunded/cancelled)
- **page**: 页码
- **pageSize**: 每页数量
"""
result = await service.get_user_orders(
user_id=current_user.user_id,
payment_status=query.payment_status,
page=query.page,
page_size=query.page_size
)
return ApiResponse.success(data=result)
@router.post("/orders/{order_id}/cancel", response_model=ApiResponse[RechargeOrderResponse])
async def cancel_order(
order_id: UUID,
current_user: Annotated[User, Depends(get_current_user)],
service: Annotated[RechargeService, Depends(get_recharge_service)]
):
"""取消订单"""
order = await service.cancel_order(order_id, current_user.user_id)
return ApiResponse.success(data=order.to_dict())
# ==================== 支付回调(内部接口) ====================
@router.post("/callbacks/wechat")
async def wechat_callback(
request: Request,
service: Annotated[RechargeService, Depends(get_recharge_service)]
):
"""微信支付回调
注意:此接口由微信支付平台调用,不需要认证
"""
try:
headers = dict(request.headers)
body = await request.body()
callback_data = {
'headers': headers,
'body': body.decode('utf-8'),
'order_no': None, # 从解密后的数据中提取
'transaction_id': None
}
result = await service.handle_payment_callback(
payment_method='wechat',
callback_data=callback_data
)
if result['success']:
return {"code": "SUCCESS", "message": "成功"}
else:
return {"code": "FAIL", "message": result['message']}
except Exception as e:
logger.error(f"微信支付回调处理异常: {str(e)}")
return {"code": "FAIL", "message": str(e)}
@router.post("/callbacks/alipay")
async def alipay_callback(
request: Request,
service: Annotated[RechargeService, Depends(get_recharge_service)]
):
"""支付宝回调
注意:此接口由支付宝平台调用,不需要认证
"""
try:
form_data = await request.form()
callback_data = dict(form_data)
result = await service.handle_payment_callback(
payment_method='alipay',
callback_data=callback_data
)
if result['success']:
return "success"
else:
return "fail"
except Exception as e:
logger.error(f"支付宝回调处理异常: {str(e)}")
return "fail"
API 接口示例
1. 创建充值订单
POST /api/v1/recharge/orders
请求体:
{
"packageId": "550e8400-e29b-41d4-a716-446655440000",
"paymentMethod": "wechat"
}
或自定义金额:
{
"amount": 50.0,
"paymentMethod": "alipay"
}
响应:
{
"code": 200,
"message": "Success",
"data": {
"order": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"orderNo": "RCH20250114100000ABC123",
"amount": 49.9,
"credits": 600,
"bonusCredits": 100,
"paymentMethod": "wechat",
"paymentStatus": "pending",
"expiredAt": "2025-01-14T10:30:00Z"
},
"paymentParams": {
"qrcodeUrl": "weixin://wxpay/bizpayurl?pr=xxx",
"expiresIn": 1800
}
}
}
2. 查询订单详情
GET /api/v1/recharge/orders/{id}
响应:
{
"code": 200,
"message": "Success",
"data": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"orderNo": "RCH20250114100000ABC123",
"amount": 49.9,
"credits": 600,
"bonusCredits": 100,
"paymentMethod": "wechat",
"paymentStatus": "paid",
"transactionId": "4200001234567890",
"createdAt": "2025-01-14T10:00:00Z",
"paidAt": "2025-01-14T10:05:00Z"
}
}
3. 查询订单列表
GET /api/v1/recharge/orders?paymentStatus=paid&page=1&pageSize=20
响应:
{
"code": 200,
"message": "Success",
"data": {
"items": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"orderNo": "RCH20250114100000ABC123",
"amount": 49.9,
"credits": 600,
"paymentStatus": "paid",
"createdAt": "2025-01-14T10:00:00Z"
}
],
"total": 10,
"page": 1,
"pageSize": 20,
"totalPages": 1
}
}
4. 取消订单
POST /api/v1/recharge/orders/{id}/cancel
响应:
{
"code": 200,
"message": "Success",
"data": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"paymentStatus": "cancelled"
}
}
5. 支付回调(内部接口)
POST /api/v1/recharge/callbacks/wechat
POST /api/v1/recharge/callbacks/alipay
支付集成
微信支付
# app/services/payment/wechat_payment.py
from app.core.config import settings
class WechatPayment:
def __init__(self):
from wechatpayv3 import WeChatPay, WeChatPayType
self.wxpay = WeChatPay(
wechatpay_type=WeChatPayType.NATIVE,
mchid=settings.WECHAT_PAY_MCHID,
private_key=settings.WECHAT_PAY_PRIVATE_KEY,
cert_serial_no=settings.WECHAT_PAY_CERT_SERIAL_NO,
apiv3_key=settings.WECHAT_PAY_APIV3_KEY,
appid=settings.WECHAT_APPID
)
async def create_payment(
self,
order_no: str,
amount: float,
description: str
) -> Dict[str, Any]:
"""创建支付订单"""
code, message = self.wxpay.pay(
description=description,
out_trade_no=order_no,
amount={'total': int(amount * 100)},
notify_url=settings.WECHAT_PAY_NOTIFY_URL
)
if code == 200:
return {
'qrcodeUrl': message.get('code_url'),
'expiresIn': 1800
}
else:
raise Exception(f"微信支付创建失败:{message}")
async def verify_callback(self, callback_data: Dict) -> bool:
"""验证回调签名"""
return self.wxpay.callback(
headers=callback_data.get('headers'),
body=callback_data.get('body')
)
支付宝支付
# app/services/payment/alipay_payment.py
from alipay import AliPay
from app.core.config import settings
class AlipayPayment:
def __init__(self):
self.alipay = AliPay(
appid=settings.ALIPAY_APPID,
app_notify_url=settings.ALIPAY_NOTIFY_URL,
app_private_key_string=settings.ALIPAY_PRIVATE_KEY,
alipay_public_key_string=settings.ALIPAY_PUBLIC_KEY,
sign_type="RSA2"
)
async def create_payment(
self,
order_no: str,
amount: float,
description: str
) -> Dict[str, Any]:
"""创建支付订单"""
order_string = self.alipay.api_alipay_trade_precreate(
subject=description,
out_trade_no=order_no,
total_amount=str(amount)
)
return {
'qrcodeUrl': order_string.get('qr_code'),
'expiresIn': 1800
}
async def verify_callback(self, callback_data: Dict) -> bool:
"""验证回调签名"""
signature = callback_data.pop('sign')
return self.alipay.verify(callback_data, signature)
数据完整性保证
后台任务 - 孤儿记录检查
# app/tasks/recharge_tasks.py
import asyncio
from celery import shared_task
from app.core.database import async_session_maker
from app.repositories.recharge_repository import RechargeRepository
from app.services.recharge_service import RechargeService
from app.services.credit_service import CreditService
from app.services.payment_service import PaymentService
from app.repositories.credit_repository import CreditRepository
from loguru import logger
@shared_task(name="recharge.check_orphan_orders")
def check_orphan_recharge_orders():
"""检查孤儿充值订单(定期任务)
检查用户不存在或套餐不存在的订单,发送告警通知
执行频率:每天凌晨 2:00
"""
return asyncio.run(_check_orphan_recharge_orders_async())
async def _check_orphan_recharge_orders_async():
"""异步检查孤儿充值订单"""
logger.info("开始检查孤儿充值订单...")
async with async_session_maker() as session:
repository = RechargeRepository(session)
# 检查用户不存在的订单
orphan_orders = await repository.check_orphan_orders()
if orphan_orders:
logger.error(f"发现 {len(orphan_orders)} 个孤儿充值订单")
for order in orphan_orders:
logger.error(
f"孤儿订单: order_no={order['order_no']}, "
f"user_id={order['user_id']}, "
f"amount={order['amount']}, "
f"payment_status={order['payment_status']}"
)
# TODO: 发送告警通知
# await send_alert(f"发现 {len(orphan_orders)} 个孤儿充值订单")
else:
logger.info("未发现孤儿充值订单")
return {
'orphan_count': len(orphan_orders),
'orphan_orders': orphan_orders
}
@shared_task(name="recharge.cleanup_expired_orders")
def cleanup_expired_orders():
"""清理过期订单(定期任务)
将超过过期时间的待支付订单自动取消
执行频率:每小时执行一次
"""
return asyncio.run(_cleanup_expired_orders_async())
async def _cleanup_expired_orders_async():
"""异步清理过期订单"""
logger.info("开始清理过期充值订单...")
async with async_session_maker() as session:
repository = RechargeRepository(session)
credit_repository = CreditRepository(session)
credit_service = CreditService(session, credit_repository)
payment_service = PaymentService()
service = RechargeService(
session=session,
repository=repository,
credit_service=credit_service,
payment_service=payment_service
)
count = await service.expire_orders()
if count > 0:
logger.info(f"已取消 {count} 个过期订单")
else:
logger.info("没有过期订单需要处理")
return {
'cancelled_count': count
}
# Celery Beat 配置示例(添加到 celery_app.py)
"""
from celery.schedules import crontab
app.conf.beat_schedule = {
# 每天凌晨 2:00 检查孤儿订单
'check-orphan-recharge-orders': {
'task': 'recharge.check_orphan_orders',
'schedule': crontab(hour=2, minute=0),
},
# 每小时清理过期订单
'cleanup-expired-orders': {
'task': 'recharge.cleanup_expired_orders',
'schedule': crontab(minute=0), # 每小时的第 0 分钟
},
}
"""
用户删除时的级联处理
# services/user_service.py
async def delete_user(self, user_id: UUID):
"""删除用户(软删除)"""
async with self.session.begin():
# 1. 软删除用户
await self.user_repository.soft_delete(user_id)
# 2. 取消用户的待支付充值订单
from app.repositories.recharge_repository import RechargeRepository
recharge_repo = RechargeRepository(self.session)
cancelled_count = await recharge_repo.cancel_user_pending_orders(user_id)
logger.info(f"用户 {user_id} 删除,已取消 {cancelled_count} 个待支付订单")
# 3. 保留已支付订单记录(用于财务审计)
# 已支付订单不做处理,保留历史记录
文档版本:v2.4
最后更新:2026-01-29
变更记录
v2.4 (2026-01-29)
- ✅ Pydantic v2 升级:
Config→model_config = ConfigDict(populate_by_name=True) - ✅ Celery 异步任务修正:使用同步包装器 +
asyncio.run()模式 - ✅ 补充 Repository 方法:添加
check_orphan_orders()实现 - ✅ 完善类型提示:Repository 添加
Dict[str, Any]导入 - ✅ 数据库会话修正:使用
async_session_maker()替代get_session() - ✅ 技术栈合规性:完全符合 jointo-tech-stack 规范
v2.3 (2026-01-27)
- ✅ 修复代码示例:移除模型中的
alias="id",避免序列化问题 - ✅ 修正导入路径:
app.core.id_generator→app.utils.id_generator - ✅ 统一时间戳:全部使用
datetime.now(timezone.utc) - ✅ 完善导入:添加
JSONB、text、timezone等缺失导入 - ✅ 添加定时任务:提供完整的
recharge_tasks.py实现 - ✅ 代码可运行性:确保所有示例代码可直接使用
v2.2 (2026-01-26)
- ✅ 修正主键类型:所有 ID 字段从 TEXT 改为 UUID 类型
- ✅ 类型一致性:确保数据库 UUID 类型与 Python UUID 对象匹配
- ✅ 性能优化:UUID 类型占用 16 字节(vs TEXT 36 字节)
- ✅ 完善迁移脚本:提供完整的表创建和索引创建脚本
v2.1 (2026-01-26)
- ✅ 移除数据库外键约束,改为应用层验证
- ✅ 添加 Repository 层引用完整性验证方法
- ✅ Service 层添加用户和套餐存在性检查
- ✅ 修正依赖注入模式,添加 session 参数
- ✅ 添加 func 导入修复
- ✅ 优化索引策略,添加组合索引和条件索引
- ✅ 添加数据完整性后台检查任务
- ✅ 添加用户删除时的级联处理逻辑
- ✅ 统一时间戳使用 timezone.utc
- ✅ 完善支付回调的引用完整性检查
- ✅ 添加表级索引定义
v2.0 (2026-01-26)
- 重构依赖注入模式:通过构造函数注入依赖
- 统一枚举类型:使用 IntEnum + SMALLINT
- 修改主键类型:使用 UUID v7
- 规范 API 路由:使用复数名词遵循 REST 规范
- 统一响应格式:使用 code/message/data 结构
- 统一字段命名:使用 camelCase
- 添加数据库字段 COMMENT 注释