You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

70 KiB

Jointo - 后端技术架构设计文档

技术栈:Python + PostgreSQL
项目名称:jointo
项目域名https://www.jointo.ai
文档版本:v1.0


目录

  1. 架构概述
  2. 技术栈选型
  3. 系统架构设计
  4. 数据库设计
  5. 核心服务设计
  6. API 服务设计
  7. 异步任务处理
  8. 文件存储方案
  9. AI 服务集成
  10. 部署架构
  11. 性能优化
  12. 安全设计

1. 架构概述

1.1 系统特点

  • AI 驱动:大量使用 AI 生成功能(图片、视频、音效、字幕、配音)
  • 文件密集:需要处理大量图片、视频、音频文件
  • 异步任务:AI 生成和视频导出都是耗时操作
  • 协作支持:支持多人协同编辑项目
  • 实时性要求:部分功能需要实时反馈

1.2 架构设计原则

  • 微服务化:核心服务解耦,便于扩展
  • 异步优先:耗时操作异步处理
  • 可扩展性:支持水平扩展
  • 高可用性:关键服务冗余部署
  • 安全性:完善的认证授权机制

1.3 整体架构图

┌─────────────────────────────────────────────────────────────┐
│                        前端 (React)                          │
└───────────────────────────┬─────────────────────────────────┘
                            │ HTTPS
┌───────────────────────────▼─────────────────────────────────┐
│                    API Gateway / Nginx                     │
│              (负载均衡、SSL终止、路由)                        │
└───────────────┬───────────────────────────┬─────────────────┘
                │                           │
    ┌───────────▼──────────┐    ┌───────────▼──────────┐
    │   API 服务 (FastAPI) │    │   WebSocket 服务      │
    │   - RESTful API      │    │   - 实时协作          │
    │   - 认证授权         │    │   - 实时通知          │
    └───────────┬──────────┘    └──────────────────────┘
                │
    ┌───────────▼───────────────────────────────────────────┐
    │              业务逻辑层                                │
    │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │
    │  │项目管理  │ │分镜管理  │ │资源管理  │            │
    │  └──────────┘ └──────────┘ └──────────┘            │
    │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │
    │  │视频管理  │ │AI服务    │ │导出服务  │            │
    │  └──────────┘ └──────────┘ └──────────┘            │
    └───────────┬───────────────────────────────────────────┘
                │
    ┌───────────▼───────────────────────────────────────────┐
    │              数据访问层                                │
    │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │
    │  │PostgreSQL │ │  Redis   │ │  ORM     │            │
    │  │(主数据库) │ │ (缓存)   │ │(SQLAlchemy)            │
    │  └──────────┘ └──────────┘ └──────────┘            │
    └────────────────────────────────────────────────────────┘
                │
    ┌───────────▼───────────────────────────────────────────┐
    │              异步任务层                                │
    │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │
    │  │  Celery  │ │ RabbitMQ │ │  Worker  │            │
    │  │ (任务队列)│ │ (消息队列)│ │ (任务执行)│            │
    │  └──────────┘ └──────────┘ └──────────┘            │
    └───────────┬───────────────────────────────────────────┘
                │
    ┌───────────▼───────────────────────────────────────────┐
    │              外部服务层                                │
    │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │
    │  │对象存储   │ │ AI API   │ │ 视频处理 │            │
    │  │(MinIO/   │ │(OpenAI/  │ │(FFmpeg)  │            │
    │  │ S3)      │ │ Stable   │ │          │            │
    │  │          │ │ Diffusion)│ │          │            │
    │  └──────────┘ └──────────┘ └──────────┘            │
    └────────────────────────────────────────────────────────┘

2. 技术栈选型

2.1 核心框架

2.1.1 Web 框架:FastAPI

选择理由

  • 高性能(基于 Starlette 和 Pydantic)
  • 自动生成 API 文档(Swagger/OpenAPI)
  • 类型提示支持(与 TypeScript 前端对接友好)
  • 异步支持(async/await)
  • 现代化设计,易于维护

版本:FastAPI 0.104+

# 示例:FastAPI 应用结构
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session

app = FastAPI(
    title="Jointo API",
    version="1.0.0",
    docs_url="/api/docs",
    redoc_url="/api/redoc"
)

# CORS 配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

2.1.2 ORM:SQLAlchemy 2.0

选择理由

  • 成熟的 Python ORM
  • 支持异步(async/await)
  • 强大的查询构建器
  • 数据库迁移支持(Alembic)
  • 类型提示支持

版本:SQLAlchemy 2.0+

# 示例:SQLAlchemy 模型
from sqlalchemy import Column, String, Integer, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Project(Base):
    __tablename__ = "projects"

    id = Column(String, primary_key=True)
    name = Column(String, nullable=False)
    description = Column(String)
    type = Column(String)  # 'mine' | 'collab'
    created_at = Column(DateTime)
    updated_at = Column(DateTime)

    storyboards = relationship("Storyboard", back_populates="project")

2.1.3 数据库迁移:Alembic

选择理由

  • SQLAlchemy 官方迁移工具
  • 版本控制数据库结构
  • 自动生成迁移脚本

2.2 异步任务处理

2.2.1 任务队列:Celery

选择理由

  • Python 生态最成熟的任务队列
  • 支持多种消息代理(RabbitMQ、Redis)
  • 支持任务重试、优先级、定时任务
  • 丰富的监控工具(Flower)

版本:Celery 5.3+

# 示例:Celery 任务定义
from celery import Celery

celery_app = Celery(
    "jointo",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

@celery_app.task(bind=True, max_retries=3)
def generate_video_task(self, prompt: str, video_type: str):
    try:
        # AI 生成视频逻辑
        result = ai_service.generate_video(prompt, video_type)
        return result
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

2.2.2 消息代理:RabbitMQ(推荐)或 Redis

选择理由

  • RabbitMQ:功能强大,支持复杂路由,适合生产环境
  • Redis:简单轻量,适合小规模部署

2.3 缓存系统

2.3.1 Redis

用途

  • 缓存热点数据(项目列表、分镜列表)
  • 会话存储
  • Celery 消息代理(可选)
  • 分布式锁
  • 实时数据(在线用户数等)

版本:Redis 7.0+

2.4 文件存储

2.4.1 对象存储:MinIO(开发)或 AWS S3 / 阿里云 OSS(生产)

选择理由

  • 支持 S3 兼容 API
  • 适合存储大量文件(图片、视频、音频)
  • 支持 CDN 加速
  • 成本低,扩展性好

MinIO:用于开发环境,本地部署 AWS S3 / 阿里云 OSS:用于生产环境

2.5 视频处理

2.5.1 FFmpeg

用途

  • 视频合成
  • 视频转码
  • 视频裁剪
  • 音频处理

Python 封装ffmpeg-python

2.6 AI 服务集成

2.6.1 AI 模型服务

图片生成

  • Stable Diffusion(开源)
  • DALL-E API(OpenAI)
  • Midjourney API(如果可用)

视频生成

  • Runway Gen-2
  • Pika Labs
  • Stable Video Diffusion

语音合成

  • OpenAI TTS
  • Azure Speech Services
  • 百度语音 / 讯飞语音

字幕生成

  • Whisper(OpenAI,开源)
  • 阿里云语音识别

2.7 其他工具

2.7.1 认证授权:JWT

python-jose[cryptography]

2.7.2 环境配置:Pydantic Settings

pydantic-settings

2.7.3 日志:Loguru

loguru

2.7.4 监控:Prometheus + Grafana(可选)


3. 系统架构设计

3.1 分层架构

┌─────────────────────────────────────────┐
│          API 层 (FastAPI)                │
│  - 路由定义                              │
│  - 请求验证                              │
│  - 响应格式化                            │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         业务逻辑层 (Services)            │
│  - 项目管理服务                          │
│  - 分镜管理服务                          │
│  - 资源管理服务                          │
│  - AI 生成服务                          │
│  - 视频导出服务                          │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         数据访问层 (Repositories)        │
│  - 数据库操作                            │
│  - 缓存操作                              │
│  - 文件操作                              │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         基础设施层                       │
│  - PostgreSQL                            │
│  - Redis                                 │
│  - 对象存储                              │
└─────────────────────────────────────────┘

3.2 项目目录结构

backend/
├── app/
│   ├── __init__.py
│   ├── main.py                    # FastAPI 应用入口
│   ├── config.py                  # 配置管理
│   │
│   ├── api/                       # API 路由
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── folders.py         # 文件夹相关 API
│   │   │   ├── projects.py        # 项目相关 API
│   │   │   ├── storyboards.py     # 分镜相关 API
│   │   │   ├── resources.py       # 资源相关 API
│   │   │   ├── videos.py          # 视频相关 API
│   │   │   ├── ai.py              # AI 生成 API
│   │   │   └── export.py          # 导出相关 API
│   │   └── dependencies.py        # 依赖注入
│   │
│   ├── core/                      # 核心功能
│   │   ├── __init__.py
│   │   ├── security.py            # 认证授权
│   │   ├── database.py           # 数据库连接
│   │   ├── cache.py              # 缓存管理
│   │   └── storage.py            # 文件存储
│   │
│   ├── models/                    # SQLAlchemy 模型
│   │   ├── __init__.py
│   │   ├── folder.py
│   │   ├── project.py
│   │   ├── storyboard.py
│   │   ├── resource.py
│   │   ├── video.py
│   │   └── user.py
│   │
│   ├── schemas/                   # Pydantic 模型(请求/响应)
│   │   ├── __init__.py
│   │   ├── folder.py
│   │   ├── project.py
│   │   ├── storyboard.py
│   │   └── common.py
│   │
│   ├── services/                  # 业务逻辑层
│   │   ├── __init__.py
│   │   ├── folder_service.py      # 文件夹管理服务
│   │   ├── project_service.py
│   │   ├── storyboard_service.py
│   │   ├── resource_service.py
│   │   ├── video_service.py
│   │   ├── ai_service.py         # AI 生成服务
│   │   └── export_service.py     # 视频导出服务
│   │
│   ├── repositories/              # 数据访问层
│   │   ├── __init__.py
│   │   ├── folder_repository.py
│   │   ├── project_repository.py
│   │   ├── storyboard_repository.py
│   │   └── base_repository.py
│   │
│   ├── tasks/                     # Celery 任务
│   │   ├── __init__.py
│   │   ├── ai_tasks.py           # AI 生成任务
│   │   ├── export_tasks.py       # 导出任务
│   │   └── celery_app.py         # Celery 配置
│   │
│   ├── utils/                     # 工具函数
│   │   ├── __init__.py
│   │   ├── file_utils.py
│   │   ├── video_utils.py
│   │   ├── folder_utils.py       # 文件夹工具函数
│   │   └── ai_utils.py
│   │
│   └── middleware/                # 中间件
│       ├── __init__.py
│       ├── auth.py
│       └── logging.py
│
├── alembic/                       # 数据库迁移
│   ├── versions/
│   └── env.py
│
├── tests/                         # 测试
│   ├── __init__.py
│   ├── test_api/
│   ├── test_services/
│   └── conftest.py
│
├── scripts/                       # 脚本
│   ├── init_db.py
│   └── seed_data.py
│
├── .env.example                   # 环境变量示例
├── .gitignore
├── requirements.txt               # Python 依赖
├── Dockerfile
├── docker-compose.yml
└── README.md

3.3 核心模块设计

3.3.1 API 层职责

  • 接收 HTTP 请求
  • 参数验证(使用 Pydantic)
  • 调用业务逻辑层
  • 格式化响应
  • 错误处理

3.3.2 业务逻辑层职责

  • 业务规则实现
  • 数据验证
  • 事务管理
  • 调用数据访问层
  • 调用外部服务(AI、存储等)

3.3.3 数据访问层职责

  • 数据库 CRUD 操作
  • 缓存操作
  • 查询优化
  • 数据转换

4. 数据库设计

4.1 数据库选型:PostgreSQL

选择理由

  • 强大的关系型数据库
  • 支持 JSON/JSONB(存储灵活数据)
  • 支持全文搜索
  • 支持数组类型
  • 事务支持完善
  • 扩展性强(PostGIS、pg_trgm 等)

版本:PostgreSQL 14+

4.2 核心数据表设计

4.2.1 用户表 (users)

CREATE TABLE users (
    id VARCHAR(36) PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    username VARCHAR(100) NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    avatar_url VARCHAR(500),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_users_email ON users(email);

4.2.2 项目表 (projects)

CREATE TABLE projects (
    id VARCHAR(36) PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    type VARCHAR(20) NOT NULL,  -- 'mine' | 'collab'
    owner_id VARCHAR(36) NOT NULL REFERENCES users(id),
    thumbnail_url VARCHAR(500),
    settings JSONB,  -- 项目设置(分辨率、帧率等)
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    deleted_at TIMESTAMP
);

CREATE INDEX idx_projects_owner_id ON projects(owner_id);
CREATE INDEX idx_projects_type ON projects(type);
CREATE INDEX idx_projects_created_at ON projects(created_at);

4.2.3 项目成员表 (project_members)

CREATE TABLE project_members (
    id VARCHAR(36) PRIMARY KEY,
    project_id VARCHAR(36) NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
    user_id VARCHAR(36) NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    role VARCHAR(20) NOT NULL,  -- 'owner' | 'editor' | 'viewer'
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(project_id, user_id)
);

CREATE INDEX idx_project_members_project_id ON project_members(project_id);
CREATE INDEX idx_project_members_user_id ON project_members(user_id);

4.2.4 分镜表 (storyboards)

CREATE TABLE storyboards (
    id VARCHAR(36) PRIMARY KEY,
    project_id VARCHAR(36) NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
    title VARCHAR(255) NOT NULL,
    description TEXT,
    thumbnail_url VARCHAR(500),
    order_index INTEGER NOT NULL,  -- 分镜顺序
    start_time DECIMAL(10, 2),  -- 开始时间(秒)
    end_time DECIMAL(10, 2),    -- 结束时间(秒)
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_storyboards_project_id ON storyboards(project_id);
CREATE INDEX idx_storyboards_order ON storyboards(project_id, order_index);

4.2.5 资源表 (resources)

CREATE TABLE resources (
    id VARCHAR(36) PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    type VARCHAR(20) NOT NULL,  -- 'character' | 'scene' | 'prop'
    file_url VARCHAR(500) NOT NULL,
    thumbnail_url VARCHAR(500),
    file_size BIGINT,  -- 文件大小(字节)
    mime_type VARCHAR(100),
    metadata JSONB,  -- 额外元数据
    created_by VARCHAR(36) REFERENCES users(id),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_resources_type ON resources(type);
CREATE INDEX idx_resources_created_at ON resources(created_at);

4.2.6 分镜资源关联表 (storyboard_resources)

CREATE TABLE storyboard_resources (
    id VARCHAR(36) PRIMARY KEY,
    storyboard_id VARCHAR(36) NOT NULL REFERENCES storyboards(id) ON DELETE CASCADE,
    resource_id VARCHAR(36) NOT NULL REFERENCES resources(id) ON DELETE CASCADE,
    resource_type VARCHAR(20) NOT NULL,  -- 'character' | 'scene' | 'prop'
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(storyboard_id, resource_id, resource_type)
);

CREATE INDEX idx_storyboard_resources_storyboard_id ON storyboard_resources(storyboard_id);
CREATE INDEX idx_storyboard_resources_resource_id ON storyboard_resources(resource_id);

4.2.7 视频表 (videos)

CREATE TABLE videos (
    id VARCHAR(36) PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    type VARCHAR(20) NOT NULL,  -- 'img2video' | 'text2video' | 'keyframe' | 'fusion' | 'replace' | 'real'
    video_url VARCHAR(500),
    thumbnail_url VARCHAR(500),
    duration DECIMAL(10, 2),  -- 时长(秒)
    file_size BIGINT,
    storyboard_id VARCHAR(36) REFERENCES storyboards(id),
    start_time DECIMAL(10, 2),  -- 在时间轴上的开始时间
    end_time DECIMAL(10, 2),    -- 在时间轴上的结束时间
    metadata JSONB,  -- AI 生成参数等
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_videos_storyboard_id ON videos(storyboard_id);
CREATE INDEX idx_videos_type ON videos(type);

4.2.8 音效表 (sound_effects)

CREATE TABLE sound_effects (
    id VARCHAR(36) PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    audio_url VARCHAR(500) NOT NULL,
    duration DECIMAL(10, 2),
    file_size BIGINT,
    start_time DECIMAL(10, 2),
    end_time DECIMAL(10, 2),
    volume INTEGER DEFAULT 100,  -- 0-100
    project_id VARCHAR(36) REFERENCES projects(id) ON DELETE CASCADE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_sound_effects_project_id ON sound_effects(project_id);

4.2.9 字幕表 (subtitles)

CREATE TABLE subtitles (
    id VARCHAR(36) PRIMARY KEY,
    text TEXT NOT NULL,
    start_time DECIMAL(10, 2) NOT NULL,
    end_time DECIMAL(10, 2) NOT NULL,
    style JSONB,  -- 样式配置
    project_id VARCHAR(36) REFERENCES projects(id) ON DELETE CASCADE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_subtitles_project_id ON subtitles(project_id);
CREATE INDEX idx_subtitles_time ON subtitles(project_id, start_time, end_time);

4.2.10 配音表 (voiceovers)

CREATE TABLE voiceovers (
    id VARCHAR(36) PRIMARY KEY,
    text TEXT NOT NULL,
    audio_url VARCHAR(500),
    voice_type VARCHAR(50),  -- 语音类型
    duration DECIMAL(10, 2),
    start_time DECIMAL(10, 2),
    end_time DECIMAL(10, 2),
    volume INTEGER DEFAULT 100,
    project_id VARCHAR(36) REFERENCES projects(id) ON DELETE CASCADE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_voiceovers_project_id ON voiceovers(project_id);

4.2.11 时间轴配置表 (timelines)

CREATE TABLE timelines (
    id VARCHAR(36) PRIMARY KEY,
    project_id VARCHAR(36) UNIQUE NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
    tracks JSONB NOT NULL,  -- 轨道配置
    items JSONB NOT NULL,   -- 时间轴项配置
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_timelines_project_id ON timelines(project_id);

4.2.12 AI 任务表 (ai_jobs)

CREATE TABLE ai_jobs (
    id VARCHAR(36) PRIMARY KEY,
    job_type VARCHAR(50) NOT NULL,  -- 'image' | 'video' | 'sound' | 'subtitle' | 'voice'
    status VARCHAR(20) NOT NULL,    -- 'pending' | 'processing' | 'completed' | 'failed'
    progress INTEGER DEFAULT 0,     -- 0-100
    input_data JSONB NOT NULL,      -- 输入参数
    output_data JSONB,              -- 输出结果
    error_message TEXT,
    user_id VARCHAR(36) REFERENCES users(id),
    project_id VARCHAR(36) REFERENCES projects(id),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP
);

CREATE INDEX idx_ai_jobs_status ON ai_jobs(status);
CREATE INDEX idx_ai_jobs_user_id ON ai_jobs(user_id);
CREATE INDEX idx_ai_jobs_project_id ON ai_jobs(project_id);
CREATE INDEX idx_ai_jobs_created_at ON ai_jobs(created_at);

4.2.13 导出任务表 (export_jobs)

CREATE TABLE export_jobs (
    id VARCHAR(36) PRIMARY KEY,
    project_id VARCHAR(36) NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
    status VARCHAR(20) NOT NULL,  -- 'pending' | 'processing' | 'completed' | 'failed'
    progress INTEGER DEFAULT 0,
    format VARCHAR(20) NOT NULL,  -- 'json' | 'mp4' | 'mov'
    quality VARCHAR(20),          -- 'low' | 'medium' | 'high'
    output_url VARCHAR(500),
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP
);

CREATE INDEX idx_export_jobs_project_id ON export_jobs(project_id);
CREATE INDEX idx_export_jobs_status ON export_jobs(status);

4.3 数据库优化策略

4.3.1 索引优化

  • 为常用查询字段创建索引
  • 为外键创建索引
  • 为时间范围查询创建复合索引

4.3.2 分区策略(可选)

  • 对于大表(如 ai_jobs、export_jobs),可按时间分区

4.3.3 JSONB 使用

  • 使用 JSONB 存储灵活配置(settings、metadata)
  • 为 JSONB 字段创建 GIN 索引(如果需要查询)
CREATE INDEX idx_projects_settings ON projects USING GIN(settings);

5. 核心服务设计

5.1 文件夹管理服务

# app/services/folder_service.py
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from app.models.folder import Folder
from app.repositories.folder_repository import FolderRepository
from app.schemas.folder import FolderCreate, FolderUpdate, FolderMove
from app.core.exceptions import NotFoundError, ValidationError, PermissionError

class FolderService:
    def __init__(self, db: Session):
        self.repository = FolderRepository(db)
        self.db = db

    async def get_folders(
        self,
        user_id: str,
        parent_id: Optional[str] = None,
        include_projects: bool = False,
        page: int = 1,
        page_size: int = 20
    ) -> Dict[str, Any]:
        """获取文件夹列表"""
        # 检查父文件夹权限
        if parent_id:
            await self._check_folder_permission(user_id, parent_id, 'viewer')

        folders = await self.repository.get_by_parent(
            parent_id, user_id, page, page_size
        )

        result = []
        for folder in folders:
            folder_data = folder.to_dict()

            if include_projects:
                folder_data['projects'] = await self._get_folder_projects(
                    folder.id, user_id
                )

            # 添加统计信息
            folder_data['projectCount'] = await self.repository.count_projects(folder.id)
            folder_data['subfolderCount'] = await self.repository.count_subfolders(folder.id)

            result.append(folder_data)

        return result

    async def get_folder_tree(
        self,
        user_id: str,
        max_depth: Optional[int] = None,
        include_projects: bool = False
    ) -> Dict[str, Any]:
        """获取文件夹树形结构"""
        return await self.repository.get_tree_structure(
            user_id, max_depth, include_projects
        )

    async def create_folder(
        self,
        user_id: str,
        folder_data: FolderCreate
    ) -> Folder:
        """创建文件夹"""
        # 检查父文件夹权限
        if folder_data.parent_folder_id:
            await self._check_folder_permission(
                user_id, folder_data.parent_folder_id, 'editor'
            )

        # 检查同级名称唯一性
        if await self.repository.exists_by_name(
            folder_data.name, folder_data.parent_folder_id, user_id
        ):
            raise ValidationError("同一文件夹下不能有重名的子文件夹")

        folder = Folder(
            name=folder_data.name,
            description=folder_data.description,
            parent_folder_id=folder_data.parent_folder_id,
            owner_id=user_id,
            color=folder_data.color,
            icon=folder_data.icon
        )

        return await self.repository.create(folder)

    async def move_folder(
        self,
        user_id: str,
        folder_id: str,
        move_data: FolderMove
    ) -> Folder:
        """移动文件夹"""
        # 检查权限和循环引用
        await self._check_folder_permission(user_id, folder_id, 'editor')

        if await self.repository.would_create_cycle(
            folder_id, move_data.parent_folder_id
        ):
            raise ValidationError("不能将文件夹移动到自己的子文件夹中")

        return await self.repository.move_folder(
            folder_id, move_data.parent_folder_id
        )

    async def _check_folder_permission(
        self,
        user_id: str,
        folder_id: str,
        required_role: str = 'viewer'
    ) -> None:
        """检查文件夹权限"""
        has_permission = await self.repository.check_user_permission(
            user_id, folder_id, required_role
        )
        if not has_permission:
            raise PermissionError("没有权限访问此文件夹")

5.2 项目管理服务(更新)

# app/services/project_service.py
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from app.models.project import Project
from app.repositories.project_repository import ProjectRepository
from app.repositories.folder_repository import FolderRepository
from app.schemas.project import ProjectCreate, ProjectUpdate, ProjectMove

class ProjectService:
    def __init__(self, db: Session):
        self.repository = ProjectRepository(db)
        self.folder_repository = FolderRepository(db)

    async def get_projects(
        self,
        user_id: str,
        project_type: Optional[str] = None,
        folder_id: Optional[str] = None,
        page: int = 1,
        page_size: int = 20
    ) -> List[Project]:
        """获取项目列表"""
        # 如果指定了文件夹,检查文件夹权限
        if folder_id:
            has_permission = await self.folder_repository.check_user_permission(
                user_id, folder_id, 'viewer'
            )
            if not has_permission:
                raise PermissionError("没有权限访问此文件夹")

        return await self.repository.get_by_user(
            user_id, project_type, folder_id, page, page_size
        )

    async def create_project(
        self,
        user_id: str,
        project_data: ProjectCreate
    ) -> Project:
        """创建项目"""
        # 如果指定了文件夹,检查权限
        if project_data.folder_id:
            has_permission = await self.folder_repository.check_user_permission(
                user_id, project_data.folder_id, 'editor'
            )
            if not has_permission:
                raise PermissionError("没有权限在此文件夹中创建项目")

        project = Project(
            name=project_data.name,
            description=project_data.description,
            type=project_data.type,
            owner_id=user_id,
            folder_id=project_data.folder_id,
            settings=project_data.settings.dict() if project_data.settings else {}
        )
        return await self.repository.create(project)

    async def move_project(
        self,
        user_id: str,
        project_id: str,
        move_data: ProjectMove
    ) -> Project:
        """移动项目到文件夹"""
        # 检查项目权限
        if not await self.repository.check_user_permission(user_id, project_id, 'editor'):
            raise PermissionError("没有权限移动此项目")

        # 检查目标文件夹权限
        if move_data.folder_id:
            has_permission = await self.folder_repository.check_user_permission(
                user_id, move_data.folder_id, 'editor'
            )
            if not has_permission:
                raise PermissionError("没有权限访问目标文件夹")

        return await self.repository.move_to_folder(project_id, move_data.folder_id)

    async def update_project(
        self,
        project_id: str,
        project_data: ProjectUpdate
    ) -> Project:
        """更新项目"""
        project = await self.repository.get_by_id(project_id)
        if not project:
            raise NotFoundError("Project not found")

        update_data = project_data.dict(exclude_unset=True)
        return await self.repository.update(project_id, update_data)

    async def delete_project(self, project_id: str) -> None:
        """删除项目(软删除)"""
        await self.repository.soft_delete(project_id)

5.3 AI 生成服务

# app/services/ai_service.py
from typing import Dict, Any
from app.tasks.ai_tasks import (
    generate_image_task,
    generate_video_task,
    generate_sound_task
)

class AIService:
    def __init__(self):
        self.providers = {
            'image': 'stable_diffusion',
            'video': 'runway',
            'sound': 'openai_tts'
        }

    async def generate_image(
        self,
        prompt: str,
        model: Optional[str] = None,
        **kwargs
    ) -> Dict[str, Any]:
        """生成图片(异步)"""
        task = generate_image_task.delay(
            prompt=prompt,
            model=model or self.providers['image'],
            **kwargs
        )
        return {
            'job_id': task.id,
            'status': 'pending'
        }

    async def generate_video(
        self,
        video_type: str,
        prompt: Optional[str] = None,
        image_url: Optional[str] = None,
        **kwargs
    ) -> Dict[str, Any]:
        """生成视频(异步)"""
        task = generate_video_task.delay(
            video_type=video_type,
            prompt=prompt,
            image_url=image_url,
            **kwargs
        )
        return {
            'job_id': task.id,
            'status': 'pending'
        }

    async def get_job_status(self, job_id: str) -> Dict[str, Any]:
        """查询任务状态"""
        from app.tasks.celery_app import celery_app
        task = celery_app.AsyncResult(job_id)

        return {
            'job_id': job_id,
            'status': task.state,
            'progress': task.info.get('progress', 0) if task.info else 0,
            'result': task.result if task.ready() else None,
            'error': str(task.info) if task.failed() else None
        }

5.4 视频导出服务

# app/services/export_service.py
from app.tasks.export_tasks import export_video_task
from app.utils.video_utils import VideoComposer

class ExportService:
    async def export_project(
        self,
        project_id: str,
        format: str = 'mp4',
        quality: str = 'high',
        **kwargs
    ) -> Dict[str, Any]:
        """导出项目(异步)"""
        task = export_video_task.delay(
            project_id=project_id,
            format=format,
            quality=quality,
            **kwargs
        )
        return {
            'job_id': task.id,
            'status': 'pending'
        }

    async def compose_video(
        self,
        project_id: str,
        output_path: str
    ) -> str:
        """合成视频(同步,用于任务执行)"""
        composer = VideoComposer()
        # 1. 获取项目所有素材
        # 2. 按时间轴排列
        # 3. 使用 FFmpeg 合成
        # 4. 返回输出文件路径
        return output_path

5.5 剧本管理服务

# app/services/script_service.py
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from app.models.script import Script, ScriptVersion, ScriptCharacter, ScriptScene
from app.repositories.script_repository import ScriptRepository
from app.schemas.script import ScriptCreate, ScriptUpdate, ScriptVersionCreate
from app.core.exceptions import NotFoundError, ValidationError, PermissionError

class ScriptService:
    def __init__(self, db: Session):
        self.repository = ScriptRepository(db)
        self.db = db

    async def get_scripts(
        self,
        project_id: int,
        user_id: int,
        status: Optional[str] = None,
        page: int = 1,
        page_size: int = 20
    ) -> Dict[str, Any]:
        """获取剧本列表"""
        # 检查项目权限
        await self._check_project_permission(user_id, project_id, 'viewer')

        scripts = await self.repository.get_by_project(
            project_id, status, page, page_size
        )

        return {
            'items': scripts,
            'total': await self.repository.count_by_project(project_id, status),
            'page': page,
            'page_size': page_size
        }

    async def create_script(
        self,
        user_id: int,
        script_data: ScriptCreate
    ) -> Script:
        """创建剧本"""
        # 检查项目权限
        await self._check_project_permission(
            user_id, script_data.project_id, 'editor'
        )

        # 验证剧本类型和内容
        if script_data.type == 'text' and not script_data.content:
            raise ValidationError("文本剧本必须提供内容")
        if script_data.type == 'attachment' and not script_data.attachment_id:
            raise ValidationError("附件剧本必须提供附件ID")

        script = Script(
            project_id=script_data.project_id,
            name=script_data.name,
            type=script_data.type,
            content=script_data.content,
            attachment_id=script_data.attachment_id,
            status='draft',
            created_by=user_id,
            updated_by=user_id
        )

        # 计算字数(如果是文本剧本)
        if script.type == 'text' and script.content:
            script.word_count = len(script.content)

        created_script = await self.repository.create(script)

        # 创建初始版本
        await self._create_version(created_script, user_id, "初始版本")

        return created_script

    async def update_script(
        self,
        user_id: int,
        script_id: int,
        script_data: ScriptUpdate
    ) -> Script:
        """更新剧本"""
        script = await self.repository.get_by_id(script_id)
        if not script:
            raise NotFoundError("剧本不存在")

        # 检查权限
        await self._check_project_permission(user_id, script.project_id, 'editor')

        # 如果内容有变化,创建新版本
        content_changed = False
        if script_data.content and script_data.content != script.content:
            content_changed = True

        # 更新剧本
        update_data = script_data.dict(exclude_unset=True)
        update_data['updated_by'] = user_id

        # 重新计算字数
        if 'content' in update_data and update_data['content']:
            update_data['word_count'] = len(update_data['content'])

        updated_script = await self.repository.update(script_id, update_data)

        # 如果内容变化,创建新版本
        if content_changed:
            updated_script.version += 1
            await self._create_version(
                updated_script,
                user_id,
                script_data.change_summary or "内容更新"
            )

        return updated_script

    async def approve_script(
        self,
        user_id: int,
        script_id: int
    ) -> Script:
        """审批剧本"""
        script = await self.repository.get_by_id(script_id)
        if not script:
            raise NotFoundError("剧本不存在")

        # 检查权限(需要 owner 权限)
        await self._check_project_permission(user_id, script.project_id, 'owner')

        return await self.repository.update(script_id, {
            'status': 'approved',
            'approved_by': user_id,
            'approved_at': 'now()',
            'updated_by': user_id
        })

    async def get_script_versions(
        self,
        user_id: int,
        script_id: int
    ) -> List[ScriptVersion]:
        """获取剧本版本历史"""
        script = await self.repository.get_by_id(script_id)
        if not script:
            raise NotFoundError("剧本不存在")

        await self._check_project_permission(user_id, script.project_id, 'viewer')

        return await self.repository.get_versions(script_id)

    async def add_character(
        self,
        user_id: int,
        script_id: int,
        character_data: Dict[str, Any]
    ) -> ScriptCharacter:
        """添加剧本角色"""
        script = await self.repository.get_by_id(script_id)
        if not script:
            raise NotFoundError("剧本不存在")

        await self._check_project_permission(user_id, script.project_id, 'editor')

        return await self.repository.add_character(script_id, character_data)

    async def add_scene(
        self,
        user_id: int,
        script_id: int,
        scene_data: Dict[str, Any]
    ) -> ScriptScene:
        """添加剧本场景"""
        script = await self.repository.get_by_id(script_id)
        if not script:
            raise NotFoundError("剧本不存在")

        await self._check_project_permission(user_id, script.project_id, 'editor')

        return await self.repository.add_scene(script_id, scene_data)

    async def _create_version(
        self,
        script: Script,
        user_id: int,
        change_summary: str
    ) -> ScriptVersion:
        """创建剧本版本"""
        version = ScriptVersion(
            script_id=script.script_id,
            version_number=script.version,
            content_snapshot=script.content,
            change_summary=change_summary,
            word_count=script.word_count,
            scene_count=script.scene_count,
            character_count=script.character_count,
            created_by=user_id
        )
        return await self.repository.create_version(version)

    async def _check_project_permission(
        self,
        user_id: int,
        project_id: int,
        required_role: str = 'viewer'
    ) -> None:
        """检查项目权限"""
        from app.repositories.project_repository import ProjectRepository
        project_repo = ProjectRepository(self.db)

        has_permission = await project_repo.check_user_permission(
            user_id, project_id, required_role
        )
        if not has_permission:
            raise PermissionError("没有权限访问此项目")

5.6 附件管理服务

# app/services/attachment_service.py
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from fastapi import UploadFile
from app.models.attachment import Attachment
from app.repositories.attachment_repository import AttachmentRepository
from app.core.storage import StorageService
from app.core.exceptions import NotFoundError, ValidationError
import hashlib
import os

class AttachmentService:
    def __init__(self, db: Session):
        self.repository = AttachmentRepository(db)
        self.storage = StorageService()
        self.db = db

    # 允许的文件类型
    ALLOWED_SCRIPT_TYPES = {
        'application/pdf',
        'application/msword',
        'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
        'text/plain',
        'text/markdown'
    }

    async def upload_attachment(
        self,
        user_id: int,
        file: UploadFile,
        category: str = 'other',
        project_id: Optional[int] = None
    ) -> Attachment:
        """上传附件"""
        # 验证文件类型
        if category == 'script' and file.content_type not in self.ALLOWED_SCRIPT_TYPES:
            raise ValidationError(f"不支持的剧本文件类型: {file.content_type}")

        # 读取文件内容
        content = await file.read()
        file_size = len(content)

        # 计算文件校验和
        checksum = hashlib.sha256(content).hexdigest()

        # 检查是否已存在相同文件
        existing = await self.repository.get_by_checksum(checksum)
        if existing:
            # 如果文件已存在,直接返回(去重)
            return existing

        # 生成文件名和路径
        extension = os.path.splitext(file.filename)[1]
        object_name = f"attachments/{category}/{user_id}/{checksum}{extension}"

        # 保存临时文件
        temp_path = f"/tmp/{checksum}{extension}"
        with open(temp_path, "wb") as f:
            f.write(content)

        try:
            # 上传到对象存储
            file_url = await self.storage.upload_file(
                temp_path,
                object_name,
                content_type=file.content_type
            )

            # 创建附件记录
            attachment = Attachment(
                name=file.filename,
                original_name=file.filename,
                file_url=file_url,
                file_size=file_size,
                mime_type=file.content_type,
                category=category,
                extension=extension,
                checksum=checksum,
                project_id=project_id,
                user_id=user_id,
                uploaded_by=user_id,
                storage_provider='minio',  # 或从配置读取
                storage_path=object_name
            )

            return await self.repository.create(attachment)

        finally:
            # 清理临时文件
            if os.path.exists(temp_path):
                os.remove(temp_path)

    async def get_attachment(
        self,
        user_id: int,
        attachment_id: int
    ) -> Attachment:
        """获取附件详情"""
        attachment = await self.repository.get_by_id(attachment_id)
        if not attachment:
            raise NotFoundError("附件不存在")

        # 检查访问权限
        if not attachment.is_public:
            if attachment.project_id:
                # 检查项目权限
                await self._check_project_permission(
                    user_id, attachment.project_id, 'viewer'
                )
            elif attachment.user_id != user_id and attachment.uploaded_by != user_id:
                raise PermissionError("没有权限访问此附件")

        # 增加访问计数
        await self.repository.increment_access_count(attachment_id)

        return attachment

    async def get_download_url(
        self,
        user_id: int,
        attachment_id: int,
        expires: int = 3600
    ) -> str:
        """获取附件下载链接"""
        attachment = await self.get_attachment(user_id, attachment_id)

        # 增加下载计数
        await self.repository.increment_download_count(attachment_id)

        # 生成预签名URL
        return await self.storage.get_presigned_url(
            attachment.storage_path,
            expires=expires
        )

    async def delete_attachment(
        self,
        user_id: int,
        attachment_id: int
    ) -> None:
        """删除附件"""
        attachment = await self.repository.get_by_id(attachment_id)
        if not attachment:
            raise NotFoundError("附件不存在")

        # 检查权限(只有上传者或项目所有者可以删除)
        if attachment.uploaded_by != user_id:
            if attachment.project_id:
                await self._check_project_permission(
                    user_id, attachment.project_id, 'owner'
                )
            else:
                raise PermissionError("没有权限删除此附件")

        # 从对象存储删除文件
        await self.storage.delete_file(attachment.storage_path)

        # 软删除数据库记录
        await self.repository.soft_delete(attachment_id)

    async def _check_project_permission(
        self,
        user_id: int,
        project_id: int,
        required_role: str = 'viewer'
    ) -> None:
        """检查项目权限"""
        from app.repositories.project_repository import ProjectRepository
        project_repo = ProjectRepository(self.db)

        has_permission = await project_repo.check_user_permission(
            user_id, project_id, required_role
        )
        if not has_permission:
            raise PermissionError("没有权限访问此项目")

5.7 视频导出服务

6.1 API 路由结构

# app/api/v1/projects.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.security import get_current_user
from app.schemas.project import ProjectCreate, ProjectUpdate, ProjectResponse
from app.services.project_service import ProjectService

router = APIRouter(prefix="/projects", tags=["projects"])

@router.get("", response_model=List[ProjectResponse])
async def get_projects(
    type: Optional[str] = None,
    page: int = 1,
    page_size: int = 20,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """获取项目列表"""
    service = ProjectService(db)
    projects = await service.get_projects(
        user_id=current_user.id,
        project_type=type,
        page=page,
        page_size=page_size
    )
    return projects

@router.post("", response_model=ProjectResponse, status_code=201)
async def create_project(
    project_data: ProjectCreate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """创建项目"""
    service = ProjectService(db)
    project = await service.create_project(
        user_id=current_user.id,
        project_data=project_data
    )
    return project

@router.get("/{project_id}", response_model=ProjectResponse)
async def get_project(
    project_id: str,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """获取项目详情"""
    service = ProjectService(db)
    project = await service.get_project(project_id)
    if not project:
        raise HTTPException(status_code=404, detail="Project not found")
    return project

6.2 认证授权

# app/core/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    # 从数据库获取用户
    user = get_user(user_id)
    if user is None:
        raise credentials_exception
    return user

6. 剧本和附件 API 设计

6.1 剧本相关 API

# app/api/v1/scripts.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any
from app.core.database import get_db
from app.core.security import get_current_user
from app.schemas.script import (
    ScriptCreate, ScriptUpdate, ScriptResponse,
    ScriptCharacterCreate, ScriptSceneCreate
)
from app.services.script_service import ScriptService

router = APIRouter(prefix="/scripts", tags=["scripts"])

@router.get("", response_model=Dict[str, Any])
async def get_scripts(
    project_id: int,
    status: Optional[str] = None,
    page: int = 1,
    page_size: int = 20,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """获取剧本列表"""
    service = ScriptService(db)
    return await service.get_scripts(
        project_id=project_id,
        user_id=current_user.user_id,
        status=status,
        page=page,
        page_size=page_size
    )

@router.post("", response_model=ScriptResponse, status_code=status.HTTP_201_CREATED)
async def create_script(
    script_data: ScriptCreate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """创建剧本(支持文本输入或附件上传)"""
    service = ScriptService(db)
    return await service.create_script(
        user_id=current_user.user_id,
        script_data=script_data
    )

@router.get("/{script_id}", response_model=ScriptResponse)
async def get_script(
    script_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """获取剧本详情"""
    service = ScriptService(db)
    script = await service.repository.get_by_id(script_id)
    if not script:
        raise HTTPException(status_code=404, detail="剧本不存在")

    await service._check_project_permission(
        current_user.user_id, script.project_id, 'viewer'
    )
    return script

@router.put("/{script_id}", response_model=ScriptResponse)
async def update_script(
    script_id: int,
    script_data: ScriptUpdate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """更新剧本(自动创建版本)"""
    service = ScriptService(db)
    return await service.update_script(
        user_id=current_user.user_id,
        script_id=script_id,
        script_data=script_data
    )

@router.post("/{script_id}/approve", response_model=ScriptResponse)
async def approve_script(
    script_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """审批剧本"""
    service = ScriptService(db)
    return await service.approve_script(
        user_id=current_user.user_id,
        script_id=script_id
    )

@router.get("/{script_id}/versions")
async def get_script_versions(
    script_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """获取剧本版本历史"""
    service = ScriptService(db)
    return await service.get_script_versions(
        user_id=current_user.user_id,
        script_id=script_id
    )

@router.post("/{script_id}/characters")
async def add_character(
    script_id: int,
    character_data: ScriptCharacterCreate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """添加剧本角色"""
    service = ScriptService(db)
    return await service.add_character(
        user_id=current_user.user_id,
        script_id=script_id,
        character_data=character_data.dict()
    )

@router.post("/{script_id}/scenes")
async def add_scene(
    script_id: int,
    scene_data: ScriptSceneCreate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """添加剧本场景"""
    service = ScriptService(db)
    return await service.add_scene(
        user_id=current_user.user_id,
        script_id=script_id,
        scene_data=scene_data.dict()
    )

6.2 附件相关 API

# app/api/v1/attachments.py
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.orm import Session
from typing import Optional
from app.core.database import get_db
from app.core.security import get_current_user
from app.schemas.attachment import AttachmentResponse
from app.services.attachment_service import AttachmentService

router = APIRouter(prefix="/attachments", tags=["attachments"])

@router.post("", response_model=AttachmentResponse, status_code=201)
async def upload_attachment(
    file: UploadFile = File(...),
    category: str = Form('other'),
    project_id: Optional[int] = Form(None),
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """
    上传附件

    支持的剧本文件类型:
    - PDF (.pdf)
    - Word (.doc, .docx)
    - 纯文本 (.txt)
    - Markdown (.md)
    """
    service = AttachmentService(db)
    return await service.upload_attachment(
        user_id=current_user.user_id,
        file=file,
        category=category,
        project_id=project_id
    )

@router.get("/{attachment_id}", response_model=AttachmentResponse)
async def get_attachment(
    attachment_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """获取附件详情"""
    service = AttachmentService(db)
    return await service.get_attachment(
        user_id=current_user.user_id,
        attachment_id=attachment_id
    )

@router.get("/{attachment_id}/download")
async def download_attachment(
    attachment_id: int,
    expires: int = 3600,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """
    获取附件下载链接(预签名URL)

    参数:
    - expires: 链接有效期(秒),默认3600秒(1小时)
    """
    service = AttachmentService(db)
    download_url = await service.get_download_url(
        user_id=current_user.user_id,
        attachment_id=attachment_id,
        expires=expires
    )
    return {'download_url': download_url, 'expires_in': expires}

@router.delete("/{attachment_id}", status_code=204)
async def delete_attachment(
    attachment_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """删除附件(软删除)"""
    service = AttachmentService(db)
    await service.delete_attachment(
        user_id=current_user.user_id,
        attachment_id=attachment_id
    )
    return None

6.3 API 路由注册

# app/api/v1/__init__.py
from fastapi import APIRouter
from app.api.v1 import (
    projects,
    folders,
    storyboards,
    resources,
    videos,
    scripts,      # 新增
    attachments,  # 新增
    ai,
    export
)

api_router = APIRouter()

api_router.include_router(projects.router)
api_router.include_router(folders.router)
api_router.include_router(storyboards.router)
api_router.include_router(resources.router)
api_router.include_router(videos.router)
api_router.include_router(scripts.router)      # 新增
api_router.include_router(attachments.router)  # 新增
api_router.include_router(ai.router)
api_router.include_router(export.router)

7. 异步任务处理

7.1 Celery 配置

# app/tasks/celery_app.py
from celery import Celery
from app.config import settings

celery_app = Celery(
    "jointo",
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND,
    include=['app.tasks.ai_tasks', 'app.tasks.export_tasks']
)

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=3600,  # 1 小时
    task_soft_time_limit=3300,  # 55 分钟
)

7.2 AI 生成任务

# app/tasks/ai_tasks.py
from app.tasks.celery_app import celery_app
from app.services.ai_service import AIService
from app.repositories.ai_job_repository import AIJobRepository
from app.core.database import SessionLocal

@celery_app.task(bind=True, max_retries=3)
def generate_image_task(self, prompt: str, model: str, **kwargs):
    """生成图片任务"""
    db = SessionLocal()
    try:
        job_id = self.request.id
        # 更新任务状态
        job_repo = AIJobRepository(db)
        job_repo.update_status(job_id, 'processing', progress=10)

        # 调用 AI 服务
        ai_service = AIService()
        result = ai_service._generate_image_sync(prompt, model, **kwargs)

        # 更新任务完成
        job_repo.update_status(
            job_id,
            'completed',
            progress=100,
            output_data=result
        )

        return result
    except Exception as exc:
        job_repo.update_status(job_id, 'failed', error_message=str(exc))
        raise self.retry(exc=exc, countdown=60)
    finally:
        db.close()

7.3 视频导出任务

# app/tasks/export_tasks.py
from app.tasks.celery_app import celery_app
from app.services.export_service import ExportService

@celery_app.task(bind=True, max_retries=2)
def export_video_task(
    self,
    project_id: str,
    format: str,
    quality: str,
    **kwargs
):
    """导出视频任务"""
    db = SessionLocal()
    try:
        job_id = self.request.id

        # 更新任务状态
        export_repo = ExportJobRepository(db)
        export_repo.update_status(job_id, 'processing', progress=0)

        # 执行导出
        export_service = ExportService()
        output_url = await export_service.compose_video(
            project_id=project_id,
            format=format,
            quality=quality
        )

        # 更新任务完成
        export_repo.update_status(
            job_id,
            'completed',
            progress=100,
            output_url=output_url
        )

        return {'output_url': output_url}
    except Exception as exc:
        export_repo.update_status(job_id, 'failed', error_message=str(exc))
        raise self.retry(exc=exc, countdown=300)  # 5 分钟后重试
    finally:
        db.close()

8. 文件存储方案

8.1 对象存储配置

# app/core/storage.py
from minio import Minio
from app.config import settings

class StorageService:
    def __init__(self):
        self.client = Minio(
            settings.MINIO_ENDPOINT,
            access_key=settings.MINIO_ACCESS_KEY,
            secret_key=settings.MINIO_SECRET_KEY,
            secure=settings.MINIO_SECURE
        )
        self.bucket_name = settings.MINIO_BUCKET_NAME

    async def upload_file(
        self,
        file_path: str,
        object_name: str,
        content_type: str = None
    ) -> str:
        """上传文件"""
        self.client.fput_object(
            self.bucket_name,
            object_name,
            file_path,
            content_type=content_type
        )
        return f"{settings.MINIO_PUBLIC_URL}/{self.bucket_name}/{object_name}"

    async def get_presigned_url(
        self,
        object_name: str,
        expires: int = 3600
    ) -> str:
        """获取预签名 URL(用于临时访问)"""
        return self.client.presigned_get_object(
            self.bucket_name,
            object_name,
            expires=timedelta(seconds=expires)
        )

    async def delete_file(self, object_name: str) -> None:
        """删除文件"""
        self.client.remove_object(self.bucket_name, object_name)

8.2 文件上传处理

# app/api/v1/resources.py
from fastapi import UploadFile, File
from app.core.storage import StorageService

@router.post("/upload")
async def upload_resource(
    file: UploadFile = File(...),
    resource_type: str = Form(...),
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    """上传资源文件"""
    # 1. 验证文件类型和大小
    # 2. 保存临时文件
    # 3. 上传到对象存储
    # 4. 保存资源记录到数据库
    # 5. 返回资源信息

    storage = StorageService()
    object_name = f"resources/{resource_type}/{uuid.uuid4()}/{file.filename}"

    # 保存临时文件
    temp_path = f"/tmp/{file.filename}"
    with open(temp_path, "wb") as f:
        f.write(await file.read())

    # 上传到对象存储
    file_url = await storage.upload_file(
        temp_path,
        object_name,
        content_type=file.content_type
    )

    # 保存到数据库
    resource = Resource(
        id=str(uuid.uuid4()),
        name=file.filename,
        type=resource_type,
        file_url=file_url,
        file_size=os.path.getsize(temp_path),
        mime_type=file.content_type,
        created_by=current_user.id
    )
    db.add(resource)
    db.commit()

    # 清理临时文件
    os.remove(temp_path)

    return resource

9. AI 服务集成

9.1 AI 服务抽象层

# app/services/ai_providers/base.py
from abc import ABC, abstractmethod

class AIProvider(ABC):
    @abstractmethod
    async def generate_image(self, prompt: str, **kwargs) -> Dict[str, Any]:
        pass

    @abstractmethod
    async def generate_video(self, prompt: str, **kwargs) -> Dict[str, Any]:
        pass

# app/services/ai_providers/stable_diffusion.py
class StableDiffusionProvider(AIProvider):
    def __init__(self, api_key: str, api_url: str):
        self.api_key = api_key
        self.api_url = api_url

    async def generate_image(self, prompt: str, **kwargs) -> Dict[str, Any]:
        # 调用 Stable Diffusion API
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.api_url}/generate",
                json={"prompt": prompt, **kwargs},
                headers={"Authorization": f"Bearer {self.api_key}"}
            )
            return response.json()

9.2 AI 服务工厂

# app/services/ai_service.py
from app.services.ai_providers.stable_diffusion import StableDiffusionProvider
from app.services.ai_providers.runway import RunwayProvider

class AIServiceFactory:
    @staticmethod
    def create_provider(provider_type: str) -> AIProvider:
        if provider_type == 'stable_diffusion':
            return StableDiffusionProvider(
                api_key=settings.STABLE_DIFFUSION_API_KEY,
                api_url=settings.STABLE_DIFFUSION_API_URL
            )
        elif provider_type == 'runway':
            return RunwayProvider(
                api_key=settings.RUNWAY_API_KEY,
                api_url=settings.RUNWAY_API_URL
            )
        else:
            raise ValueError(f"Unknown provider: {provider_type}")

10. 部署架构

10.1 Docker 容器化

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    ffmpeg \
    && rm -rf /var/lib/apt/lists/*

# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 运行应用
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

10.2 Docker Compose 配置

# docker-compose.yml
version: "3.8"

services:
  # API 服务
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/jointo
      - REDIS_URL=redis://redis:6379/0
      - MINIO_ENDPOINT=minio:9000
    depends_on:
      - db
      - redis
      - minio

  # Celery Worker
  celery-worker:
    build: .
    command: celery -A app.tasks.celery_app worker --loglevel=info
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/jointo
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis

  # Celery Beat(定时任务)
  celery-beat:
    build: .
    command: celery -A app.tasks.celery_app beat --loglevel=info
    depends_on:
      - db
      - redis

  # PostgreSQL
  db:
    image: postgres:14
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=jointo
    volumes:
      - postgres_data:/var/lib/postgresql/data

  # Redis
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

  # MinIO(对象存储)
  minio:
    image: minio/minio
    command: server /data
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
    volumes:
      - minio_data:/data
    ports:
      - "9000:9000"
      - "9001:9001"

volumes:
  postgres_data:
  redis_data:
  minio_data:

10.3 生产环境部署

推荐架构

  • 负载均衡:Nginx
  • 应用服务器:Gunicorn + Uvicorn Workers
  • 数据库:PostgreSQL(主从复制)
  • 缓存:Redis Cluster
  • 对象存储:AWS S3 或阿里云 OSS
  • 消息队列:RabbitMQ Cluster
  • 监控:Prometheus + Grafana

11. 性能优化

11.1 数据库优化

  • 连接池:使用 SQLAlchemy 连接池
  • 查询优化:使用 select_related、prefetch_related
  • 索引优化:为常用查询创建索引
  • 分页:所有列表接口支持分页

11.2 缓存策略

  • 项目列表:Redis 缓存,TTL 5 分钟
  • 项目详情:Redis 缓存,TTL 10 分钟
  • AI 任务状态:Redis 缓存,实时更新

11.3 异步处理

  • 文件上传:异步处理缩略图生成
  • AI 生成:全部异步,返回任务 ID
  • 视频导出:异步处理,支持进度查询

12. 安全设计

12.1 认证授权

  • JWT Token:用于 API 认证
  • RBAC:基于角色的访问控制(owner、editor、viewer)
  • 项目权限:每个项目独立权限管理

12.2 数据安全

  • 密码加密:使用 bcrypt
  • SQL 注入防护:使用 ORM,避免原生 SQL
  • XSS 防护:输入验证和转义
  • CSRF 防护:使用 CSRF Token

12.3 文件安全

  • 文件类型验证:白名单机制
  • 文件大小限制:防止大文件攻击
  • 预签名 URL:临时访问链接,设置过期时间

总结

技术栈总结

层级 技术选型
Web 框架 FastAPI
ORM SQLAlchemy 2.0
数据库 PostgreSQL 14+
缓存 Redis 7+
任务队列 Celery + RabbitMQ
文件存储 MinIO / AWS S3
视频处理 FFmpeg
认证 JWT
容器化 Docker + Docker Compose

核心优势

  1. 高性能:FastAPI 异步支持,高并发处理
  2. 可扩展:微服务架构,易于水平扩展
  3. 可靠性:异步任务处理,支持重试和错误处理
  4. 易维护:清晰的代码结构,完善的类型提示
  5. 生产就绪:完整的部署方案和监控支持

文档版本:v1.0
最后更新:2025-01-27