You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

60 KiB

用户管理服务

文档版本:v2.5
最后更新:2026-01-28


目录

  1. 服务概述
  2. 核心功能
  3. 服务实现
  4. API 接口
  5. 数据库设计
  6. 数据模型

服务概述

用户管理服务负责处理用户登录、认证、授权、用户信息管理、AI 积分管理等功能。

职责

  • 用户登录(手机号验证码、微信扫码)
  • JWT 认证和授权
  • 用户信息 CRUD 操作
  • 账号绑定(手机号、微信、邮箱)
  • AI 积分管理
  • 会话管理

核心功能

1. 用户登录

支持以下登录方式:

  • 手机号 + 验证码登录(主要方式)
  • 微信扫码登录(支持公众号和开放平台)

登录时自动注册,无需显式注册接口。

2. 账号绑定

  • 绑定手机号
  • 绑定微信
  • 绑定邮箱

3. 用户名管理

  • 首次登录自动生成用户名:user_{timestamp}_{random4}
  • 允许修改一次(username_changed 标记)

4. 用户信息管理

  • 查询用户信息
  • 更新用户资料
  • 上传头像

5. 算力积分管理

  • 查询积分余额
  • 查询充值记录
  • 查询消耗记录
  • 积分统计信息

服务实现

UserService 类

# app/services/user_service.py
from typing import Optional, Dict, Any
from uuid import UUID
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.user import User, UserSession
from app.repositories.user_repository import UserRepository
from app.schemas.user import UserUpdate
from app.core.exceptions import NotFoundError, ValidationError, AuthenticationError
from app.core.security import create_access_token, create_refresh_token
from datetime import datetime, timedelta, timezone
import time
import random
import string

class UserService:
    def __init__(self, db: AsyncSession):
        self.repository = UserRepository(db)
        self.db = db

    # ==================== 登录相关 ====================

    async def login_with_phone(
        self,
        phone: str,
        country_code: str,
        code: str,
        ip_address: Optional[str] = None,
        user_agent: Optional[str] = None
    ) -> Dict[str, Any]:
        """手机号验证码登录"""
        # 验证验证码
        from app.services.sms_service import SmsService
        sms_service = SmsService(self.db)
        await sms_service.verify_code(phone, country_code, code, purpose='login')

        # 查找用户
        user = await self.repository.get_by_phone(phone, country_code)

        # 用户不存在,自动注册
        if not user:
            username = self.generate_username()
            user = User(
                phone=phone,
                country_code=country_code,
                phone_verified=True,
                username=username,
                ai_credits_balance=100,  # 新用户赠送 100 积分
                total_recharged_amount=0.00,
                total_credits_earned=100,  # 记录赠送积分
                total_credits_consumed=0,
                username_changed=False
            )
            user = await self.repository.create(user)

            # 记录赠送积分交易
            from app.services.credit_service import CreditService
            credit_service = CreditService(self.db)
            await credit_service.record_gift_credits(
                user_id=user.user_id,
                amount=100,
                description='新用户注册赠送',
                expires_days=30  # 有效期 1 个月
            )

        # 生成 Token
        access_token = create_access_token(data={'user_id': str(user.user_id)})
        refresh_token = create_refresh_token(data={'user_id': str(user.user_id)})

        # 创建会话
        await self._create_session(
            user_id=user.user_id,
            access_token=access_token,
            refresh_token=refresh_token,
            ip_address=ip_address,
            user_agent=user_agent
        )

        return {
            'user': user,
            'access_token': access_token,
            'refresh_token': refresh_token,
            'token_type': 'bearer'
        }

    async def login_with_wechat(
        self,
        scene_id: str
    ) -> Dict[str, Any]:
        """微信扫码登录"""
        from app.services.wechat_service import WechatService
        wechat_service = WechatService(self.db)

        return await wechat_service.get_login_result(scene_id)

    async def refresh_token(
        self,
        refresh_token: str
    ) -> Dict[str, str]:
        """刷新 Token"""
        # 验证 Refresh Token
        from app.core.security import decode_token
        payload = decode_token(refresh_token)
        if not payload:
            raise AuthenticationError("无效的 Refresh Token")

        user_id = payload.get('user_id')
        if not user_id:
            raise AuthenticationError("无效的 Refresh Token")

        # 查找会话
        session = await self.repository.get_session_by_refresh_token(refresh_token)
        if not session:
            raise AuthenticationError("会话不存在")

        # 生成新的 Access Token
        new_access_token = create_access_token(data={'user_id': user_id})

        # 更新会话
        await self.repository.update_session(session.session_id, {
            'token': new_access_token,
            'last_used_at': datetime.now(timezone.utc)
        })

        return {
            'access_token': new_access_token,
            'token_type': 'bearer'
        }

    async def logout(
        self,
        user_id: UUID,
        access_token: str
    ) -> None:
        """用户登出(应用层验证会话所有权)"""
        # 查找会话并验证所有权
        session = await self.repository.get_session_by_token(access_token)
        if not session:
            raise NotFoundError("会话不存在")
        
        # 验证会话是否属于该用户(安全性)
        if session.user_id != user_id:
            raise AuthenticationError("无权删除该会话")
        
        # 删除会话
        await self.repository.delete_session(session.session_id)

    async def get_user(
        self,
        user_id: UUID
    ) -> User:
        """获取用户信息"""
        user = await self.repository.get_by_id(user_id)
        if not user:
            raise NotFoundError("用户不存在")
        return user

    async def update_user(
        self,
        user_id: UUID,
        user_data: UserUpdate
    ) -> User:
        """更新用户信息"""
        user = await self.repository.get_by_id(user_id)
        if not user:
            raise NotFoundError("用户不存在")

        # 如果更新邮箱,检查是否已被使用
        if user_data.email and user_data.email.lower() != user.email:
            existing_user = await self.repository.get_by_email(user_data.email.lower())
            if existing_user:
                raise ValidationError("邮箱已被使用")

        update_data = user_data.dict(exclude_unset=True)
        if 'email' in update_data:
            update_data['email'] = update_data['email'].lower()

        return await self.repository.update(user_id, update_data)

    async def update_username(
        self,
        user_id: UUID,
        username: str
    ) -> User:
        """修改用户名(仅允许一次)"""
        user = await self.repository.get_by_id(user_id)
        if not user:
            raise NotFoundError("用户不存在")

        # 检查是否已修改过
        if user.username_changed:
            raise ValidationError("用户名只能修改一次")

        # 检查用户名是否已被使用
        existing_user = await self.repository.get_by_username(username)
        if existing_user:
            raise ValidationError("用户名已被使用")

        return await self.repository.update(user_id, {
            'username': username,
            'username_changed': True
        })

    async def upload_avatar(
        self,
        user_id: UUID,
        file_content: bytes,
        filename: str
    ) -> User:
        """上传头像"""
        from app.services.storage_service import StorageService
        storage_service = StorageService()

        # 上传文件到对象存储
        avatar_url = await storage_service.upload_file(
            file_content=file_content,
            filename=filename,
            folder='avatars'
        )

        # 如果使用 avatar_id 关联附件表,需要验证附件是否存在
        # 应用层引用完整性验证
        # from app.repositories.attachment_repository import AttachmentRepository
        # attachment_repo = AttachmentRepository(self.db)
        # if avatar_id:
        #     attachment = await attachment_repo.get_by_id(avatar_id)
        #     if not attachment:
        #         raise NotFoundError("附件不存在")

        # 更新用户头像
        return await self.repository.update(user_id, {
            'avatar_url': avatar_url
        })

    # ==================== 账号绑定 ====================

    async def bind_phone(
        self,
        user_id: UUID,
        phone: str,
        country_code: str,
        code: str
    ) -> User:
        """绑定手机号"""
        # 验证验证码
        from app.services.sms_service import SmsService
        sms_service = SmsService(self.db)
        await sms_service.verify_code(phone, country_code, code, purpose='bind_phone')

        # 检查手机号是否已被其他账号使用
        existing_user = await self.repository.get_by_phone(phone, country_code)
        if existing_user and existing_user.user_id != user_id:
            raise ValidationError("该手机号已被其他用户绑定")

        return await self.repository.update(user_id, {
            'phone': phone,
            'country_code': country_code,
            'phone_verified': True
        })

    async def bind_wechat(
        self,
        user_id: UUID,
        code: str,
        platform: str
    ) -> User:
        """绑定微信账号"""
        from app.services.wechat_service import WechatService
        wechat_service = WechatService(self.db)

        return await wechat_service.bind_wechat(user_id, code, platform)

    async def bind_email(
        self,
        user_id: UUID,
        email: str
    ) -> User:
        """绑定邮箱"""
        # 检查邮箱是否已被其他账号使用
        existing_user = await self.repository.get_by_email(email.lower())
        if existing_user and existing_user.user_id != user_id:
            raise ValidationError("该邮箱已被其他用户绑定")

        return await self.repository.update(user_id, {
            'email': email.lower()
        })

    # ==================== 积分信息查询 ====================

    async def get_credits_info(
        self,
        user_id: UUID
    ) -> Dict[str, Any]:
        """查询用户积分信息"""
        user = await self.repository.get_by_id(user_id)
        if not user:
            raise NotFoundError("用户不存在")

        return {
            'balance': user.ai_credits_balance,
            'total_earned': user.total_credits_earned,
            'total_consumed': user.total_credits_consumed,
            'total_recharged_amount': float(user.total_recharged_amount)
        }

    # ==================== 用户删除 ====================

    async def delete_user(
        self,
        user_id: UUID
    ) -> None:
        """删除用户(应用层级联删除)"""
        # 验证用户是否存在
        user = await self.repository.get_by_id(user_id)
        if not user:
            raise NotFoundError("用户不存在")
        
        # 级联删除:删除用户的所有会话
        await self.repository.delete_sessions_by_user_id(user_id)
        
        # 软删除用户
        await self.repository.update(user_id, {
            'deleted_at': datetime.now(timezone.utc)
        })
        
        # 注意:不删除用户的头像附件(可能被其他用户使用)
        # 注意:不删除用户的积分记录(保留审计日志)

    # ==================== 私有方法 ====================

    def generate_username(self) -> str:
        """自动生成用户名"""
        timestamp = int(time.time())
        random_suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
        return f"user_{timestamp}_{random_suffix}"

    async def _create_session(
        self,
        user_id: UUID,
        access_token: str,
        refresh_token: str,
        ip_address: Optional[str] = None,
        user_agent: Optional[str] = None
    ) -> UserSession:
        """创建会话(应用层验证用户存在)"""
        # 验证用户是否存在(应用层引用完整性)
        user = await self.repository.get_by_id(user_id)
        if not user:
            raise NotFoundError("用户不存在")
        
        session = UserSession(
            user_id=user_id,
            token=access_token,
            refresh_token=refresh_token,
            expires_at=datetime.now(timezone.utc) + timedelta(days=7),
            ip_address=ip_address,
            user_agent=user_agent
        )

        return await self.repository.create_session(session)

UserRepository 完整实现

详见"引用完整性保证"章节中的 Repository 层实现。


---

## 引用完整性保证

### 架构约束

根据项目架构规范(`.claude/skills/jointo-tech-stack/references/database.md`):

1. **禁止数据库物理外键约束**:
   - 数据库层禁止创建 `FOREIGN KEY` 约束
   - 所有引用完整性在应用层 Service/Repository 中验证
   - 所有逻辑关联字段必须手动创建索引

2. **原因**:
   - 为分库分表做准备
   - 提升数据库性能
   - 增强系统扩展性
   - 统一业务逻辑在应用层

---

### 关联关系表

| 表名 | 字段 | 关联目标 | 验证策略 | 删除策略 |
|------|------|---------|---------|---------|
| `users` | `avatar_id` | `attachments.attachment_id` | 创建/更新时验证附件存在 | 用户删除时不删除附件(可能被其他用户使用) |
| `user_sessions` | `user_id` | `users.user_id` | 创建会话时验证用户存在 | 用户删除时级联删除所有会话 |

---

### 验证策略实现

#### 1. 创建会话时验证用户存在

```python
async def _create_session(
    self,
    user_id: UUID,
    access_token: str,
    refresh_token: str,
    ip_address: Optional[str] = None,
    user_agent: Optional[str] = None
) -> UserSession:
    """创建会话(应用层验证用户存在)"""
    # ✅ 验证用户是否存在(应用层引用完整性)
    user = await self.repository.get_by_id(user_id)
    if not user:
        raise NotFoundError("用户不存在")
    
    session = UserSession(
        user_id=user_id,
        token=access_token,
        refresh_token=refresh_token,
        expires_at=datetime.now(timezone.utc) + timedelta(days=7),
        ip_address=ip_address,
        user_agent=user_agent
    )

    return await self.repository.create_session(session)

2. 上传头像时验证附件存在

async def upload_avatar(
    self,
    user_id: UUID,
    file_content: bytes,
    filename: str
) -> User:
    """上传头像"""
    from app.services.storage_service import StorageService
    storage_service = StorageService()

    # 上传文件到对象存储
    avatar_url = await storage_service.upload_file(
        file_content=file_content,
        filename=filename,
        folder='avatars'
    )

    # 如果使用 avatar_id 关联附件表,需要验证附件是否存在
    # ✅ 应用层引用完整性验证
    # from app.repositories.attachment_repository import AttachmentRepository
    # attachment_repo = AttachmentRepository(self.db)
    # if avatar_id:
    #     attachment = await attachment_repo.get_by_id(avatar_id)
    #     if not attachment:
    #         raise NotFoundError("附件不存在")

    # 更新用户头像
    return await self.repository.update(user_id, {
        'avatar_url': avatar_url
    })

3. 登出时验证会话所有权

async def logout(
    self,
    user_id: UUID,
    access_token: str
) -> None:
    """用户登出(应用层验证会话所有权)"""
    # ✅ 查找会话并验证所有权
    session = await self.repository.get_session_by_token(access_token)
    if not session:
        raise NotFoundError("会话不存在")
    
    # ✅ 验证会话是否属于该用户(安全性)
    if session.user_id != user_id:
        raise AuthenticationError("无权删除该会话")
    
    # 删除会话
    await self.repository.delete_session(session.session_id)

4. 删除用户时级联删除会话

async def delete_user(
    self,
    user_id: UUID
) -> None:
    """删除用户(应用层级联删除)"""
    # ✅ 验证用户是否存在
    user = await self.repository.get_by_id(user_id)
    if not user:
        raise NotFoundError("用户不存在")
    
    # ✅ 级联删除:删除用户的所有会话
    await self.repository.delete_sessions_by_user_id(user_id)
    
    # ✅ 软删除用户
    await self.repository.update(user_id, {
        'deleted_at': datetime.now(timezone.utc)
    })
    
    # 注意:不删除用户的头像附件(可能被其他用户使用)
    # 注意:不删除用户的积分记录(保留审计日志)

Repository 层实现

# app/repositories/user_repository.py
from typing import Optional, List
from uuid import UUID
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.user import User, UserSession
from datetime import datetime, timezone

class UserRepository:
    def __init__(self, db: AsyncSession):
        self.db = db

    # ==================== User CRUD ====================

    async def create(self, user: User) -> User:
        """创建用户"""
        self.db.add(user)
        await self.db.commit()
        await self.db.refresh(user)
        return user

    async def get_by_id(self, user_id: UUID) -> Optional[User]:
        """通过 ID 查询用户"""
        statement = select(User).where(
            User.user_id == user_id,
            User.deleted_at.is_(None)
        )
        result = await self.db.execute(statement)
        return result.scalar_one_or_none()

    async def get_by_phone(self, phone: str, country_code: str) -> Optional[User]:
        """通过手机号查询用户"""
        statement = select(User).where(
            User.phone == phone,
            User.country_code == country_code,
            User.deleted_at.is_(None)
        )
        result = await self.db.execute(statement)
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> Optional[User]:
        """通过邮箱查询用户(大小写不敏感)"""
        from sqlalchemy import func
        statement = select(User).where(
            func.lower(User.email) == email.lower(),
            User.deleted_at.is_(None)
        )
        result = await self.db.execute(statement)
        return result.scalar_one_or_none()

    async def get_by_username(self, username: str) -> Optional[User]:
        """通过用户名查询用户"""
        statement = select(User).where(
            User.username == username,
            User.deleted_at.is_(None)
        )
        result = await self.db.execute(statement)
        return result.scalar_one_or_none()

    async def get_by_wechat_openid(
        self,
        openid: str,
        platform: int
    ) -> Optional[User]:
        """通过微信 openid 查询用户"""
        statement = select(User).where(
            User.wechat_openid == openid,
            User.wechat_platform == platform,
            User.deleted_at.is_(None)
        )
        result = await self.db.execute(statement)
        return result.scalar_one_or_none()

    async def update(self, user_id: UUID, data: dict) -> User:
        """更新用户信息"""
        user = await self.get_by_id(user_id)
        if not user:
            raise ValueError("用户不存在")
        
        for key, value in data.items():
            setattr(user, key, value)
        
        user.updated_at = datetime.now(timezone.utc)
        await self.db.commit()
        await self.db.refresh(user)
        return user

    # ==================== Session CRUD ====================

    async def create_session(self, session: UserSession) -> UserSession:
        """创建会话"""
        self.db.add(session)
        await self.db.commit()
        await self.db.refresh(session)
        return session

    async def get_session_by_token(self, token: str) -> Optional[UserSession]:
        """通过 token 查询会话"""
        statement = select(UserSession).where(
            UserSession.token == token
        )
        result = await self.db.execute(statement)
        return result.scalar_one_or_none()

    async def get_session_by_refresh_token(
        self,
        refresh_token: str
    ) -> Optional[UserSession]:
        """通过 refresh_token 查询会话"""
        statement = select(UserSession).where(
            UserSession.refresh_token == refresh_token
        )
        result = await self.db.execute(statement)
        return result.scalar_one_or_none()

    async def get_sessions_by_user_id(self, user_id: UUID) -> List[UserSession]:
        """查询用户的所有会话"""
        statement = select(UserSession).where(
            UserSession.user_id == user_id
        ).order_by(UserSession.last_used_at.desc())
        result = await self.db.execute(statement)
        return result.scalars().all()

    async def update_session(self, session_id: UUID, data: dict) -> Optional[UserSession]:
        """更新会话信息"""
        statement = select(UserSession).where(
            UserSession.session_id == session_id
        )
        result = await self.db.execute(statement)
        session = result.scalar_one_or_none()
        
        if not session:
            return None
        
        for key, value in data.items():
            setattr(session, key, value)
        
        await self.db.commit()
        await self.db.refresh(session)
        return session

    async def delete_session(self, session_id: UUID) -> None:
        """删除会话"""
        statement = select(UserSession).where(
            UserSession.session_id == session_id
        )
        result = await self.db.execute(statement)
        session = result.scalar_one_or_none()
        
        if session:
            await self.db.delete(session)
            await self.db.commit()

    async def delete_sessions_by_user_id(self, user_id: UUID) -> None:
        """删除用户的所有会话(级联删除)"""
        statement = select(UserSession).where(
            UserSession.user_id == user_id
        )
        result = await self.db.execute(statement)
        sessions = result.scalars().all()
        
        for session in sessions:
            await self.db.delete(session)
        
        await self.db.commit()
    
    async def get_by_wechat_unionid(self, unionid: str) -> Optional[User]:
        """通过微信 UnionID 查询用户"""
        from sqlalchemy import func
        statement = select(User).where(
            User.wechat_unionid == unionid,
            User.deleted_at.is_(None)
        )
        result = await self.db.execute(statement)
        return result.scalar_one_or_none()
    
    async def update_wechat_info(
        self,
        user_id: UUID,
        openid: str,
        unionid: Optional[str],
        platform: int
    ) -> Optional[User]:
        """更新用户微信信息
        
        Args:
            user_id: 用户 ID
            openid: 微信 OpenID
            unionid: 微信 UnionID
            platform: 平台类型(整数枚举值:1=mp, 2=open)
        """
        user = await self.get_by_id(user_id)
        if not user:
            return None
        
        user.wechat_openid = openid
        user.wechat_unionid = unionid
        user.wechat_platform = platform
        user.updated_at = datetime.now(timezone.utc)
        
        await self.db.commit()
        await self.db.refresh(user)
        return user
    
    async def clear_wechat_info(self, user_id: UUID) -> Optional[User]:
        """清除用户微信信息"""
        user = await self.get_by_id(user_id)
        if not user:
            return None
        
        user.wechat_openid = None
        user.wechat_unionid = None
        user.wechat_platform = None
        user.updated_at = datetime.now(timezone.utc)
        
        await self.db.commit()
        await self.db.refresh(user)
        return user
    
    async def update_session_last_used(self, session_id: UUID) -> Optional[UserSession]:
        """更新会话最后使用时间"""
        statement = select(UserSession).where(
            UserSession.session_id == session_id
        )
        result = await self.db.execute(statement)
        session = result.scalar_one_or_none()
        
        if not session:
            return None
        
        session.last_used_at = datetime.now(timezone.utc)
        await self.db.commit()
        await self.db.refresh(session)
        return session

性能优化

1. 索引策略

所有逻辑关联字段都已创建索引:

-- users 表
CREATE INDEX idx_users_avatar_id ON users (avatar_id) WHERE avatar_id IS NOT NULL;

-- user_sessions 表
CREATE INDEX idx_user_sessions_user_id ON user_sessions (user_id);
CREATE INDEX idx_user_sessions_token ON user_sessions (token);

2. 批量操作优化

删除用户时批量删除会话:

# ✅ 推荐:批量删除
async def delete_sessions_by_user_id(self, user_id: UUID) -> None:
    """批量删除用户的所有会话"""
    statement = select(UserSession).where(
        UserSession.user_id == user_id
    )
    result = await self.db.execute(statement)
    sessions = result.scalars().all()
    
    for session in sessions:
        await self.db.delete(session)
    
    await self.db.commit()

# ❌ 不推荐:逐个删除
# for session in sessions:
#     await self.repository.delete_session(session.session_id)

错误处理

常见错误场景

场景 错误类型 HTTP 状态码 错误消息
创建会话时用户不存在 NotFoundError 404 "用户不存在"
上传头像时附件不存在 NotFoundError 404 "附件不存在"
登出时会话不存在 NotFoundError 404 "会话不存在"
登出时会话不属于该用户 AuthenticationError 403 "无权删除该会话"
删除用户时用户不存在 NotFoundError 404 "用户不存在"

API 错误响应示例

{
  "error": {
    "code": "NOT_FOUND",
    "message": "用户不存在",
    "details": {
      "user_id": "550e8400-e29b-41d4-a716-446655440000"
    }
  }
}
{
  "error": {
    "code": "FORBIDDEN",
    "message": "无权删除该会话",
    "details": {
      "session_id": "660e8400-e29b-41d4-a716-446655440001",
      "user_id": "550e8400-e29b-41d4-a716-446655440000"
    }
  }
}

API 接口

1. 发送验证码

POST /api/v1/auth/sms/send

请求体

{
  "phone": "13800138000",
  "country_code": "+86",
  "purpose": "login"
}

响应

{
  "message": "验证码已发送",
  "expires_in": 600
}

2. 手机号登录

POST /api/v1/auth/login/phone

请求体

{
  "phone": "13800138000",
  "country_code": "+86",
  "code": "123456"
}

响应

{
  "user": {
    "user_id": "550e8400-e29b-41d4-a716-446655440000",
    "username": "user_1738012345_a3f9",
    "phone": "13800138000",
    "phone_verified": true,
    "ai_credits_balance": 100,
    "created_at": "2025-01-14T10:00:00Z"
  },
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer"
}

3. 获取微信登录二维码

GET /api/v1/auth/wechat/qrcode?platform=mp

响应

{
  "scene_id": "550e8400-e29b-41d4-a716-446655440000",
  "qrcode_url": "https://open.weixin.qq.com/connect/oauth2/authorize?...",
  "expires_in": 300
}

4. 轮询微信登录结果

GET /api/v1/auth/wechat/result?scene_id=xxx

响应(未完成)

{
  "status": "pending"
}

响应(已完成)

{
  "status": "success",
  "user": {
    "user_id": "550e8400-e29b-41d4-a716-446655440000",
    "username": "user_1738012345_a3f9",
    "wechat_openid": "oxxxxxxxxxxxxxx",
    "avatar_url": "https://wx.qlogo.cn/..."
  },
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer"
}

5. 刷新 Token

POST /api/v1/auth/refresh

请求体

{
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}

6. 用户登出

POST /api/v1/auth/logout

7. 获取当前用户信息

GET /api/v1/users/me

响应

{
  "user_id": "550e8400-e29b-41d4-a716-446655440000",
  "username": "user_1738012345_a3f9",
  "nickname": "我的昵称",
  "phone": "13800138000",
  "email": "user@example.com",
  "avatar_url": "https://storage.jointo.ai/avatars/123.jpg",
  "ai_credits_balance": 150,
  "total_recharged_amount": 20.0,
  "username_changed": false,
  "created_at": "2025-01-14T10:00:00Z"
}

8. 更新用户信息

PUT /api/v1/users/me

请求体

{
  "nickname": "新昵称",
  "avatar_url": "https://storage.jointo.ai/avatars/456.jpg"
}

9. 修改用户名

PUT /api/v1/users/me/username

请求体

{
  "username": "my_new_username"
}

响应

{
  "message": "用户名修改成功",
  "user": {
    "user_id": "550e8400-e29b-41d4-a716-446655440000",
    "username": "my_new_username",
    "username_changed": true
  }
}

10. 上传头像

POST /api/v1/users/me/avatar
Content-Type: multipart/form-data

请求体

file: <binary>

11. 绑定手机号

POST /api/v1/users/me/bind/phone

请求体

{
  "phone": "13800138000",
  "country_code": "+86",
  "code": "123456"
}

12. 绑定微信

POST /api/v1/users/me/bind/wechat

请求体

{
  "code": "微信授权码",
  "platform": "mp"
}

13. 绑定邮箱

POST /api/v1/users/me/bind/email

请求体

{
  "email": "user@example.com"
}

14. 查询积分信息

GET /api/v1/users/me/credits

响应

{
  "balance": 85,
  "total_earned": 100,
  "total_consumed": 15,
  "total_recharged_amount": 0.0
}

15. 积分充值和消耗

积分充值和消耗功能由 积分管理服务 提供,详见相关文档。


16. 删除用户

DELETE /api/v1/users/me

响应

{
  "message": "用户已删除"
}

说明

  • 软删除用户(设置 deleted_at 字段)
  • 级联删除用户的所有会话
  • 不删除用户的头像附件(可能被其他用户使用)
  • 不删除用户的积分记录(保留审计日志)

数据库设计

3.1 users(用户表)

CREATE TABLE users (
    user_id UUID PRIMARY KEY,

    -- 登录方式
    phone VARCHAR(20),
    country_code VARCHAR(10) DEFAULT '+86',
    phone_verified BOOLEAN NOT NULL DEFAULT false,
    wechat_openid VARCHAR(100),
    wechat_unionid VARCHAR(100),
    
    -- 微信平台 (使用 SMALLINT 存储)
    -- 1: mp (公众号), 2: open (开放平台)
    wechat_platform SMALLINT CHECK (wechat_platform IN (1, 2)),

    -- 基本信息
    email VARCHAR(255),
    username VARCHAR(255) NOT NULL,
    username_changed BOOLEAN NOT NULL DEFAULT false,
    nickname VARCHAR(255),
    password_hash VARCHAR(255),
    avatar_url VARCHAR(500),
    avatar_id UUID,

    -- 算力积分信息
    ai_credits_balance INTEGER NOT NULL DEFAULT 100,
    total_recharged_amount NUMERIC(10, 2) NOT NULL DEFAULT 0.00,
    total_credits_earned INTEGER NOT NULL DEFAULT 100,
    total_credits_consumed INTEGER NOT NULL DEFAULT 0,

    -- 时间戳(使用 TIMESTAMPTZ 记录事件时间)
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at TIMESTAMPTZ,

    -- 唯一约束(仅保留必要的表级约束)
    CONSTRAINT users_username_unique UNIQUE (username) NULLS NOT DISTINCT
);

-- 索引
CREATE UNIQUE INDEX idx_users_phone ON users (phone, country_code) WHERE deleted_at IS NULL AND phone IS NOT NULL;
CREATE UNIQUE INDEX idx_users_email_lower ON users (LOWER(email)) WHERE deleted_at IS NULL AND email IS NOT NULL;
CREATE INDEX idx_users_username_lower ON users (LOWER(username)) WHERE deleted_at IS NULL;
CREATE UNIQUE INDEX idx_users_wechat_openid ON users (wechat_openid, wechat_platform) WHERE deleted_at IS NULL AND wechat_openid IS NOT NULL;
CREATE INDEX idx_users_credits_balance ON users (ai_credits_balance) WHERE deleted_at IS NULL;
CREATE INDEX idx_users_created_at ON users (created_at) WHERE deleted_at IS NULL;
CREATE INDEX idx_users_avatar_id ON users (avatar_id) WHERE avatar_id IS NOT NULL;

-- 表级注释
COMMENT ON TABLE users IS '用户表 - 应用层保证引用完整性';

-- 列级注释
COMMENT ON COLUMN users.user_id IS '用户唯一标识';
COMMENT ON COLUMN users.phone IS '手机号';
COMMENT ON COLUMN users.country_code IS '国家区号';
COMMENT ON COLUMN users.phone_verified IS '手机号是否已验证';
COMMENT ON COLUMN users.wechat_openid IS '微信 OpenID';
COMMENT ON COLUMN users.wechat_unionid IS '微信 UnionID';
COMMENT ON COLUMN users.wechat_platform IS '微信平台:1=mp(公众号), 2=open(开放平台)';
COMMENT ON COLUMN users.email IS '邮箱地址';
COMMENT ON COLUMN users.username IS '用户名(唯一标识)';
COMMENT ON COLUMN users.username_changed IS '用户名是否已修改过';
COMMENT ON COLUMN users.nickname IS '昵称(显示名称)';
COMMENT ON COLUMN users.password_hash IS '密码哈希值';
COMMENT ON COLUMN users.avatar_url IS '头像 URL';
COMMENT ON COLUMN users.avatar_id IS '头像附件 ID - 应用层验证,关联 attachments.attachment_id';
COMMENT ON COLUMN users.ai_credits_balance IS '当前剩余算力积分';
COMMENT ON COLUMN users.total_recharged_amount IS '累计充值金额(元)';
COMMENT ON COLUMN users.total_credits_earned IS '累计获得积分';
COMMENT ON COLUMN users.total_credits_consumed IS '累计消耗积分';
COMMENT ON COLUMN users.created_at IS '创建时间(自动记录时区)';
COMMENT ON COLUMN users.updated_at IS '更新时间(自动记录时区)';
COMMENT ON COLUMN users.deleted_at IS '软删除时间(自动记录时区)';

-- 触发器:自动更新 updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = now();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_users_updated_at
    BEFORE UPDATE ON users
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

设计说明

  1. 主键设计

    • 使用 UUID PRIMARY KEY 作为主键
    • UUID v7 基于时间戳排序,天然有序,索引性能更好
  2. 多种登录方式

    • 手机号登录phone + country_code(支持国际区号)
    • 微信登录wechat_openid + wechat_platform(mp=公众号,open=开放平台)
    • 邮箱登录email + password_hash(可选,后期支持)
    • 所有登录字段均可为 NULL,支持灵活绑定
  3. 唯一性约束

    • 手机号 + 区号组合唯一(使用条件唯一索引)
    • 邮箱唯一(使用条件唯一索引,可选绑定)
    • 用户名唯一(表级约束,必填)
    • 微信 openid + 平台组合唯一(使用条件唯一索引)
  4. 用户名与昵称管理

    • username:必填,唯一标识,首次登录自动生成(格式:user_{timestamp}_{random4}
    • username_changed:标记是否已修改过用户名(仅允许修改一次)
    • nickname:可选,显示名称,允许重复,可随时修改
  5. 头像管理

    • avatar_url:直接存储头像 URL(兼容旧数据或外部头像)
    • avatar_id:关联 attachments 表(推荐方式,便于统一管理)
    • 应用层验证: Service 层验证 avatar_id 引用完整性
  6. 算力积分系统

    • ai_credits_balance:当前剩余积分(新用户默认 100)
    • total_recharged_amount:累计充值金额(元)
    • total_credits_earned:累计获得积分(包含充值、赠送、活动等,新用户默认 100)
    • total_credits_consumed:累计消耗积分
  7. 软删除

    • 使用 deleted_at 字段实现软删除
    • 所有索引都包含 WHERE deleted_at IS NULL 条件
  8. 索引策略

    • 条件唯一索引:手机号、邮箱(大小写不敏感)、微信 openid(仅索引未删除且非空记录)
    • 表级唯一约束:用户名(必填字段)
    • 普通索引:积分余额、创建时间、头像 ID
    • 所有索引都包含 WHERE deleted_at IS NULL 条件以提升性能
  9. 安全性

    • 密码使用 bcrypt 哈希存储(可选)
    • 邮箱统一转小写存储(LOWER(email)
    • 用户名大小写不敏感查询

微信平台枚举值映射表

名称 说明
1 mp 公众号
2 open 开放平台

3.2 user_sessions(用户会话表)

CREATE TABLE user_sessions (
    session_id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    token VARCHAR(500) NOT NULL UNIQUE,
    refresh_token VARCHAR(500),
    expires_at TIMESTAMPTZ NOT NULL,
    ip_address INET,
    user_agent TEXT,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    last_used_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- 索引
CREATE INDEX idx_user_sessions_user_id ON user_sessions (user_id);
CREATE INDEX idx_user_sessions_token ON user_sessions (token);
CREATE INDEX idx_user_sessions_expires_at ON user_sessions (expires_at);

-- 表级注释
COMMENT ON TABLE user_sessions IS '用户会话表 - 应用层保证引用完整性';

-- 列级注释
COMMENT ON COLUMN user_sessions.session_id IS '会话唯一标识';
COMMENT ON COLUMN user_sessions.user_id IS '用户 ID - 应用层验证,关联 users.user_id';
COMMENT ON COLUMN user_sessions.token IS '访问令牌(JWT Access Token)';
COMMENT ON COLUMN user_sessions.refresh_token IS '刷新令牌(JWT Refresh Token)';
COMMENT ON COLUMN user_sessions.expires_at IS '会话过期时间(自动记录时区)';
COMMENT ON COLUMN user_sessions.ip_address IS '登录 IP 地址';
COMMENT ON COLUMN user_sessions.user_agent IS '用户代理字符串(浏览器/设备信息)';
COMMENT ON COLUMN user_sessions.created_at IS '创建时间(自动记录时区)';
COMMENT ON COLUMN user_sessions.last_used_at IS '最后使用时间(自动记录时区)';

设计说明

  1. 会话管理

    • token:访问令牌(JWT Access Token)
    • refresh_token:刷新令牌(JWT Refresh Token)
    • expires_at:会话过期时间
  2. 安全追踪

    • ip_address:登录 IP 地址(使用 INET 类型,支持 IPv4/IPv6)
    • user_agent:用户代理字符串(浏览器/设备信息)
    • last_used_at:最后使用时间
  3. 引用完整性

    • 不使用数据库外键约束(ON DELETE CASCADE
    • 在应用层 Service 中验证 user_id 是否存在
    • 用户删除时,通过 Service 层逻辑删除所有会话
  4. 索引策略

    • user_id:快速查询用户的所有会话
    • token:快速验证访问令牌
    • expires_at:定期清理过期会话

3.3 数据类型说明

字段类型 说明 示例
UUID 通用唯一标识符,使用 UUID v7 user_id, session_id, avatar_id
VARCHAR(N) 可变长度字符串,最大 N 字符 username, email, phone
BOOLEAN 布尔值 phone_verified, username_changed
NUMERIC(10, 2) 精确数值,10位总长度,2位小数 total_recharged_amount
INTEGER 32位整数 ai_credits_balance
SMALLINT 16位整数,用于枚举值 wechat_platform
TIMESTAMPTZ 带时区的时间戳(事件时间) created_at, updated_at
INET IP 地址(IPv4/IPv6) ip_address
TEXT 无长度限制的文本 user_agent

3.4 约束说明

约束类型 说明 示例
PRIMARY KEY 主键约束 user_id
UNIQUE 唯一约束 phone + country_code, email, username
CHECK 检查约束 wechat_platform IN (1, 2)
NOT NULL 非空约束 username NOT NULL
DEFAULT 默认值 country_code DEFAULT '+86'
NULLS NOT DISTINCT NULL 值参与唯一性检查 UNIQUE (phone, country_code) NULLS NOT DISTINCT

注意:本项目禁止使用数据库物理外键约束(REFERENCES),所有引用完整性在应用层 Service 中验证。


3.5 触发器说明

update_updated_at_column()

自动更新 updated_at 字段的触发器函数,适用于所有需要跟踪更新时间的表。

CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = now();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

使用方式

CREATE TRIGGER update_users_updated_at
    BEFORE UPDATE ON users
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

3.6 数据迁移脚本

初始化用户表

-- 1. 创建触发器函数(如果不存在)
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = now();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- 2. 创建用户表(无外键约束,使用 TIMESTAMPTZ)
CREATE TABLE IF NOT EXISTS users (
    user_id UUID PRIMARY KEY,
    phone VARCHAR(20),
    country_code VARCHAR(10) DEFAULT '+86',
    phone_verified BOOLEAN NOT NULL DEFAULT false,
    wechat_openid VARCHAR(100),
    wechat_unionid VARCHAR(100),
    wechat_platform SMALLINT CHECK (wechat_platform IN (1, 2)),
    email VARCHAR(255),
    username VARCHAR(255) NOT NULL,
    username_changed BOOLEAN NOT NULL DEFAULT false,
    nickname VARCHAR(255),
    password_hash VARCHAR(255),
    avatar_url VARCHAR(500),
    avatar_id UUID,
    ai_credits_balance INTEGER NOT NULL DEFAULT 100,
    total_recharged_amount NUMERIC(10, 2) NOT NULL DEFAULT 0.00,
    total_credits_earned INTEGER NOT NULL DEFAULT 100,
    total_credits_consumed INTEGER NOT NULL DEFAULT 0,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    deleted_at TIMESTAMPTZ,
    
    -- 仅保留必要的表级约束
    CONSTRAINT users_username_unique UNIQUE (username) NULLS NOT DISTINCT
);

-- 3. 创建索引(条件唯一索引替代表级约束)
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_phone ON users (phone, country_code) WHERE deleted_at IS NULL AND phone IS NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email_lower ON users (LOWER(email)) WHERE deleted_at IS NULL AND email IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_users_username_lower ON users (LOWER(username)) WHERE deleted_at IS NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_users_wechat_openid ON users (wechat_openid, wechat_platform) WHERE deleted_at IS NULL AND wechat_openid IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_users_credits_balance ON users (ai_credits_balance) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_users_created_at ON users (created_at) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_users_avatar_id ON users (avatar_id) WHERE avatar_id IS NOT NULL;

-- 4. 添加表和列注释
-- 表级注释
COMMENT ON TABLE users IS '用户表 - 应用层保证引用完整性';

-- 列级注释
COMMENT ON COLUMN users.user_id IS '用户唯一标识';
COMMENT ON COLUMN users.phone IS '手机号';
COMMENT ON COLUMN users.country_code IS '国家区号';
COMMENT ON COLUMN users.phone_verified IS '手机号是否已验证';
COMMENT ON COLUMN users.wechat_openid IS '微信 OpenID';
COMMENT ON COLUMN users.wechat_unionid IS '微信 UnionID';
COMMENT ON COLUMN users.wechat_platform IS '微信平台:1=mp(公众号), 2=open(开放平台)';
COMMENT ON COLUMN users.email IS '邮箱地址';
COMMENT ON COLUMN users.username IS '用户名(唯一标识)';
COMMENT ON COLUMN users.username_changed IS '用户名是否已修改过';
COMMENT ON COLUMN users.nickname IS '昵称(显示名称)';
COMMENT ON COLUMN users.password_hash IS '密码哈希值';
COMMENT ON COLUMN users.avatar_url IS '头像 URL';
COMMENT ON COLUMN users.avatar_id IS '头像附件 ID - 应用层验证,关联 attachments.attachment_id';
COMMENT ON COLUMN users.ai_credits_balance IS '当前剩余算力积分';
COMMENT ON COLUMN users.total_recharged_amount IS '累计充值金额(元)';
COMMENT ON COLUMN users.total_credits_earned IS '累计获得积分';
COMMENT ON COLUMN users.total_credits_consumed IS '累计消耗积分';
COMMENT ON COLUMN users.created_at IS '创建时间(自动记录时区)';
COMMENT ON COLUMN users.updated_at IS '更新时间(自动记录时区)';
COMMENT ON COLUMN users.deleted_at IS '软删除时间(自动记录时区)';

-- 5. 创建触发器
DROP TRIGGER IF EXISTS update_users_updated_at ON users;
CREATE TRIGGER update_users_updated_at
    BEFORE UPDATE ON users
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

-- 6. 创建会话表(无外键约束,使用 TIMESTAMPTZ)
CREATE TABLE IF NOT EXISTS user_sessions (
    session_id UUID PRIMARY KEY,
    user_id UUID NOT NULL,
    token VARCHAR(500) NOT NULL UNIQUE,
    refresh_token VARCHAR(500),
    expires_at TIMESTAMPTZ NOT NULL,
    ip_address INET,
    user_agent TEXT,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    last_used_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- 7. 创建会话表索引
CREATE INDEX IF NOT EXISTS idx_user_sessions_user_id ON user_sessions (user_id);
CREATE INDEX IF NOT EXISTS idx_user_sessions_token ON user_sessions (token);
CREATE INDEX IF NOT EXISTS idx_user_sessions_expires_at ON user_sessions (expires_at);

-- 8. 添加会话表注释
-- 表级注释
COMMENT ON TABLE user_sessions IS '用户会话表 - 应用层保证引用完整性';

-- 列级注释
COMMENT ON COLUMN user_sessions.session_id IS '会话唯一标识';
COMMENT ON COLUMN user_sessions.user_id IS '用户 ID - 应用层验证,关联 users.user_id';
COMMENT ON COLUMN user_sessions.token IS '访问令牌(JWT Access Token)';
COMMENT ON COLUMN user_sessions.refresh_token IS '刷新令牌(JWT Refresh Token)';
COMMENT ON COLUMN user_sessions.expires_at IS '会话过期时间(自动记录时区)';
COMMENT ON COLUMN user_sessions.ip_address IS '登录 IP 地址';
COMMENT ON COLUMN user_sessions.user_agent IS '用户代理字符串(浏览器/设备信息)';
COMMENT ON COLUMN user_sessions.created_at IS '创建时间(自动记录时区)';
COMMENT ON COLUMN user_sessions.last_used_at IS '最后使用时间(自动记录时区)';

3.7 常用查询示例

查询用户信息(包含积分)

SELECT
    user_id,
    username,
    phone,
    email,
    avatar_url,
    ai_credits_balance,
    total_recharged_amount,
    total_credits_earned,
    total_credits_consumed,
    created_at
FROM users
WHERE user_id = $1 AND deleted_at IS NULL;

通过手机号查找用户

SELECT user_id, username, phone_verified
FROM users
WHERE phone = $1
  AND country_code = $2
  AND deleted_at IS NULL;

通过微信 openid 查找用户

SELECT user_id, username, wechat_unionid
FROM users
WHERE wechat_openid = $1
  AND wechat_platform = $2
  AND deleted_at IS NULL;

查询用户的所有活跃会话

SELECT
    session_id,
    token,
    expires_at,
    ip_address,
    user_agent,
    last_used_at
FROM user_sessions
WHERE user_id = $1
  AND expires_at > now()
ORDER BY last_used_at DESC;

清理过期会话

DELETE FROM user_sessions
WHERE expires_at < now();

3.8 性能优化建议

  1. 定期清理过期会话

    -- 每天执行一次
    DELETE FROM user_sessions WHERE expires_at < now() - INTERVAL '7 days';
    
  2. 监控积分余额索引

    -- 查询积分不足的用户
    SELECT user_id, username, ai_credits_balance
    FROM users
    WHERE ai_credits_balance < 10 AND deleted_at IS NULL;
    
  3. 分析表统计信息

    ANALYZE users;
    ANALYZE user_sessions;
    
  4. 查看表大小

    SELECT
        pg_size_pretty(pg_total_relation_size('users')) AS users_size,
        pg_size_pretty(pg_total_relation_size('user_sessions')) AS sessions_size;
    

数据模型

User 模型

# app/models/user.py
from sqlmodel import SQLModel, Field, Relationship
from sqlalchemy import Column, String, Integer, SmallInteger, Boolean, Numeric
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
from datetime import datetime, timezone
from typing import Optional, TYPE_CHECKING
from uuid import UUID
from enum import IntEnum
from app.utils.id_generator import generate_uuid

if TYPE_CHECKING:
    from app.models.attachment import Attachment

class WechatPlatform(IntEnum):
    """微信平台枚举"""
    MP = 1    # 公众号
    OPEN = 2  # 开放平台
    
    @classmethod
    def from_string(cls, value: str) -> 'WechatPlatform':
        """从字符串转换为枚举值"""
        mapping = {
            'mp': cls.MP,
            'open': cls.OPEN
        }
        result = mapping.get(value.lower())
        if result is None:
            raise ValueError(f"Invalid platform: {value}")
        return result
    
    def to_string(self) -> str:
        """转换为字符串"""
        mapping = {
            self.MP: 'mp',
            self.OPEN: 'open'
        }
        return mapping[self]
    
    @classmethod
    def get_display_name(cls, value: int) -> str:
        """获取显示名称"""
        names = {
            cls.MP: "公众号",
            cls.OPEN: "开放平台"
        }
        return names.get(value, "未知平台")

class User(SQLModel, table=True):
    __tablename__ = "users"

    # 主键
    user_id: UUID = Field(
        default_factory=generate_uuid,
        sa_column=Column(PG_UUID(as_uuid=True), primary_key=True),
        description="用户ID"
    )

    # 登录方式
    phone: Optional[str] = Field(default=None, max_length=20)
    country_code: str = Field(default='+86', max_length=10)
    phone_verified: bool = Field(default=False)
    wechat_openid: Optional[str] = Field(default=None, max_length=100)
    wechat_unionid: Optional[str] = Field(default=None, max_length=100)
    wechat_platform: Optional[int] = Field(
        default=None,
        sa_column=Column(SmallInteger),
        description="微信平台:1=mp(公众号), 2=open(开放平台)"
    )

    # 基本信息
    email: Optional[str] = Field(default=None, max_length=255)
    username: str = Field(max_length=255)
    username_changed: bool = Field(default=False)
    nickname: Optional[str] = Field(default=None, max_length=255)
    password_hash: Optional[str] = Field(default=None, max_length=255)
    avatar_url: Optional[str] = Field(default=None, max_length=500)
    avatar_id: Optional[UUID] = Field(
        default=None,
        description="头像附件ID - 应用层验证,关联 attachments.attachment_id"
    )

    # 算力积分信息
    ai_credits_balance: int = Field(default=100)
    total_recharged_amount: float = Field(
        default=0.00,
        sa_column=Column(Numeric(10, 2))
    )
    total_credits_earned: int = Field(default=100)
    total_credits_consumed: int = Field(default=0)

    # 时间戳(使用 UTC aware datetime)
    created_at: datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc),
        description="创建时间(UTC)"
    )
    updated_at: datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc),
        description="更新时间(UTC)"
    )
    deleted_at: Optional[datetime] = Field(
        default=None,
        description="软删除时间(UTC)"
    )
    
    # Relationship 配置(使用 primaryjoin,因为无物理外键)
    avatar: Optional["Attachment"] = Relationship(
        sa_relationship_kwargs={
            "primaryjoin": "User.avatar_id == Attachment.attachment_id",
            "foreign_keys": "[User.avatar_id]",
        }
    )

class UserSession(SQLModel, table=True):
    __tablename__ = "user_sessions"

    session_id: UUID = Field(
        default_factory=generate_uuid,
        sa_column=Column(PG_UUID(as_uuid=True), primary_key=True),
        description="会话唯一标识"
    )
    user_id: UUID = Field(
        sa_column=Column(PG_UUID(as_uuid=True)),
        description="用户ID - 应用层验证,关联 users.user_id"
    )
    token: str = Field(max_length=500, unique=True)
    refresh_token: Optional[str] = Field(default=None, max_length=500)
    expires_at: datetime = Field(description="会话过期时间(UTC)")
    ip_address: Optional[str] = Field(default=None)
    user_agent: Optional[str] = Field(default=None)
    created_at: datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc),
        description="创建时间(UTC)"
    )
    last_used_at: datetime = Field(
        default_factory=lambda: datetime.now(timezone.utc),
        description="最后使用时间(UTC)"
    )

Schema 定义

# app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field, field_validator, field_serializer
from typing import Optional
from datetime import datetime, timezone
from uuid import UUID
from enum import Enum

class MemberRoleEnum(str, Enum):
    """API 层角色枚举(字符串)"""
    OWNER = "owner"
    EDITOR = "editor"
    VIEWER = "viewer"

class WechatPlatformEnum(str, Enum):
    """API 层微信平台枚举(字符串)"""
    MP = "mp"
    OPEN = "open"

class PhoneLoginRequest(BaseModel):
    phone: str = Field(..., min_length=11, max_length=20)
    country_code: str = Field(default='+86')
    code: str = Field(..., min_length=6, max_length=6)

class SendSmsRequest(BaseModel):
    phone: str = Field(..., min_length=11, max_length=20)
    country_code: str = Field(default='+86')
    purpose: str = Field(default='login')

class UserUpdate(BaseModel):
    nickname: Optional[str] = Field(None, min_length=1, max_length=50)
    avatar_url: Optional[str] = None

class UsernameUpdate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_\u4e00-\u9fa5-]+$")

class BindPhoneRequest(BaseModel):
    phone: str = Field(..., min_length=11, max_length=20)
    country_code: str = Field(default='+86')
    code: str = Field(..., min_length=6, max_length=6)

class BindWechatRequest(BaseModel):
    code: str
    platform: WechatPlatformEnum
    
    @field_validator('platform', mode='before')
    @classmethod
    def convert_platform_to_int(cls, v):
        """API 输入:字符串 → 整数"""
        from app.models.user import WechatPlatform
        if isinstance(v, str):
            return WechatPlatform.from_string(v).value
        return v

class BindEmailRequest(BaseModel):
    email: EmailStr

class UserResponse(BaseModel):
    user_id: UUID
    phone: Optional[str] = None
    email: Optional[str] = None
    username: str
    nickname: Optional[str] = None
    avatar_url: Optional[str] = None
    wechat_platform: Optional[str] = None
    ai_credits_balance: int
    total_recharged_amount: float
    username_changed: bool
    created_at: datetime
    
    @field_serializer('user_id')
    def serialize_user_id(self, value: UUID) -> str:
        """UUID 序列化为字符串"""
        return str(value)
    
    @field_serializer('wechat_platform')
    def serialize_platform(self, value: Optional[int]) -> Optional[str]:
        """API 输出:整数 → 字符串"""
        if value is None:
            return None
        from app.models.user import WechatPlatform
        return WechatPlatform(value).to_string()

    class Config:
        from_attributes = True

class CreditsInfoResponse(BaseModel):
    balance: int
    total_earned: int
    total_consumed: int
    total_recharged_amount: float

    class Config:
        from_attributes = True

相关文档


文档版本:v2.6
最后更新:2026-01-28

变更记录

v2.6 (2026-01-28)

  • 补充 Repository 层缺失方法
    • 添加 get_by_wechat_unionid - 通过微信 UnionID 查询用户
    • 添加 update_wechat_info - 更新用户微信信息
    • 添加 clear_wechat_info - 清除用户微信信息
    • 添加 update_session_last_used - 更新会话最后使用时间
  • 修正返回类型
    • update_session 返回类型从 UserSession 改为 Optional[UserSession]
    • 移除 ValueError 异常,改为返回 None
  • 原因:确保文档与实际代码实现保持一致

v2.5 (2026-01-28)

  • 补充完整的 SQL 注释
    • users 表的所有列添加 COMMENT ON COLUMN 注释
    • user_sessions 表的所有列添加 COMMENT ON COLUMN 注释
    • 使用规范的 COMMENT ON TABLE/COLUMN 语法(符合 PostgreSQL 标准)
  • 修复代码导入问题
    • 在所有代码示例中添加 timezone 导入
    • 修正 refresh_token 方法中的字段名错误(session.idsession.session_id
  • 原因:确保文档符合 jointo-tech-stack 数据库设计规范,提升代码可维护性

v2.4 (2026-01-28)

  • 补充应用层关联关系处理说明
    • 新增"引用完整性保证"专门章节
    • 列出所有关联关系及其验证策略
    • 提供完整的验证代码示例和错误处理说明
    • 补充 UserRepository 完整实现
  • 修复 Service 代码安全漏洞
    • 修复 logout 方法:验证会话是否属于该用户
    • 补充 delete_user 方法:实现级联删除逻辑
    • 优化 upload_avatar 注释说明
  • 补充 API 接口
    • 添加"删除用户"接口文档
    • 补充错误响应示例
  • 修复 Model 定义
    • 修正 UserSession.user_id 字段的 sa_column 定义
  • 原因:确保文档完整性,修复安全漏洞,提供清晰的实现指导

v2.3 (2026-01-27)

  • 移除数据库外键约束,改为应用层验证
    • 移除 users.avatar_id 的外键约束
    • 移除 user_sessions.user_id 的外键约束
    • 在 Service 层添加引用完整性验证逻辑
    • 添加表和列注释说明应用层验证
  • 优化唯一性约束策略
    • 移除表级约束 users_phone_unique, users_email_unique, users_wechat_openid_unique
    • 改用条件唯一索引(仅索引未删除且非空记录)
    • 保留 users_username_unique 表级约束(必填字段)
  • 完善 Schema 序列化
    • UserResponse 中添加 wechat_platform 字段序列化器
    • 确保 API 输出时整数枚举正确转换为字符串
  • 原因:遵循项目架构规范,提升性能和扩展性,为分库分表做准备

v2.2 (2026-01-27)

  • 统一使用 UUID 类型作为主键:
    • user_idsession_id 统一为 UUID 类型
    • 字符串字段使用 VARCHAR(N) 而非 TEXT(提升性能)
    • 更新 Python Model 使用 SQLModel + UUID Field
    • 添加完整的枚举转换方法(from_string, to_string, get_display_name)
    • 更新 Schema 定义,添加 field_validator 和 field_serializer
    • 修正 API 响应中 UUID 的序列化
  • 原因:与项目核心表(users, projects)保持一致,提升性能和类型安全

v2.1 (2025-01-27)

  • 重构枚举字段实现方式:
    • wechat_platform 字段从 TEXT CHECK 改为 SMALLINT (1=mp, 2=open)
    • Python 模型从 str, enum.Enum 改为 IntEnum
    • 添加 from_string()to_string() 转换方法
    • 添加枚举值映射表
  • 原因:更好的性能、更容易扩展、与项目其他模块保持一致

v2.0 (2025-01-14)

  • 初始版本
  • 定义用户管理服务的核心功能和 API 接口