Python FastAPI + SOLO 完整实践指南
本指南展示如何在Python FastAPI项目中完整应用SOLO工作模式,从产品需求到生产部署的端到端实践。
🎯 项目概述
示例项目:用户管理API
- 业务场景: 为SaaS平台提供用户管理RESTful API服务
- 技术栈: Python 3.11 + FastAPI + PostgreSQL + Redis + Celery
- SOLO应用: 完整的四阶段工作流程
- 预期成果: 生产就绪的高性能异步API服务
学习目标
完成本指南后,您将掌握:
- ✅ 在Python项目中应用SOLO四阶段工作流
- ✅ FastAPI与OpenAPI规范的完美融合
- ✅ Python异步编程的TDD实践
- ✅ 现代Python开发的质量保证体系
🚀 PRODUCT阶段:需求分析
启动SOLO项目
bash
# 创建项目目录
mkdir user-management-api
cd user-management-api
# 初始化Python虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 初始化Git仓库
git init
# 启动SOLO模式
/solo 创建一个用户管理API系统,支持用户注册、认证、权限管理等功能需求澄清对话过程
Product Manager子代理引导需求收集:
🤖 Product Manager: 我来帮您分析用户管理API的产品需求。
首先,让我们明确核心问题:
1. 这个API要解决什么业务问题?
2. 主要用户群体是谁?
3. 与现有系统的集成方式?
用户回答:
我们正在构建一个多租户SaaS平台,需要一个统一的用户管理服务。
当前各个子系统都有自己的用户管理,导致用户体验割裂,
数据不一致,权限管理复杂。
主要用户包括:
- 平台管理员(管理租户、用户、权限)
- 租户管理员(管理本租户用户)
- 普通用户(管理个人信息、使用各种服务)
- 系统集成(第三方服务通过API调用)生成的PRD文档
基于需求澄清,产生详细的PRD:
markdown
# 用户管理API - 产品需求文档
## 产品概述
### 产品名称
用户管理API (User Management API)
### 产品愿景
为多租户SaaS平台提供统一的用户管理服务,实现用户身份认证、权限控制和数据治理
### 目标用户
- **平台管理员**: 管理整个平台的租户和用户
- **租户管理员**: 管理租户内的用户和权限
- **普通用户**: 管理个人信息和偏好设置
- **系统集成方**: 通过API集成用户管理功能
### 核心问题
当前多租户系统存在以下问题:
- 用户数据分散在各个子系统,管理复杂
- 权限体系不统一,安全风险高
- 用户体验割裂,无法实现单点登录
- 数据一致性无法保证,运维成本高
## 功能需求
### 用户故事
#### US001: 用户注册和认证
- **作为** 新用户
- **我想要** 通过API注册账户并进行身份认证
- **以便** 访问平台的各项服务
**验收标准**:
1. 支持邮箱、手机号等多种注册方式
2. 支持邮箱验证和手机验证码验证
3. 密码强度要求和安全存储
4. 支持JWT Token认证机制
5. 支持OAuth2.0第三方登录
**优先级**:必须有
#### US002: 多租户管理
- **作为** 平台管理员
- **我想要** 管理不同租户的用户和权限
- **以便** 实现多租户隔离和管理
**验收标准**:
1. 支持租户的创建、编辑、禁用操作
2. 租户间数据完全隔离
3. 支持租户级别的配置和定制
4. 租户管理员权限控制
5. 租户使用情况统计和监控
**优先级**:必须有
#### US003: 权限角色管理
- **作为** 租户管理员
- **我想要** 灵活配置用户角色和权限
- **以便** 实现精细化的访问控制
**验收标准**:
1. 支持基于角色的权限控制(RBAC)
2. 支持自定义角色创建和权限分配
3. 支持权限继承和权限组合
4. 实时权限验证和缓存机制
5. 权限变更审计日志
**优先级**:必须有
#### US004: 用户信息管理
- **作为** 用户
- **我想要** 管理个人信息和账户设置
- **以便** 维护准确的个人资料
**验收标准**:
1. 用户基础信息的查看和编辑
2. 密码修改和安全设置
3. 个人偏好和通知设置
4. 账户注销和数据导出
5. 头像上传和管理
**优先级**:必须有
#### US005: API访问管理
- **作为** 系统集成方
- **我想要** 安全地调用用户管理API
- **以便** 在第三方系统中集成用户功能
**验收标准**:
1. API密钥管理和访问控制
2. 调用频率限制和配额管理
3. API访问日志和监控
4. SDK和文档支持
5. Webhook事件通知
**优先级**:应该有
### 功能清单
| ID | 功能名称 | 描述 | 优先级 | 所属故事 |
|----|----------|------|--------|----------|
| F001 | 用户注册 | 邮箱/手机号注册 | 必须有 | US001 |
| F002 | 身份认证 | JWT Token认证 | 必须有 | US001 |
| F003 | 租户管理 | 多租户CRUD操作 | 必须有 | US002 |
| F004 | 角色权限 | RBAC权限控制 | 必须有 | US003 |
| F005 | 用户CRUD | 用户信息管理 | 必须有 | US004 |
| F006 | OAuth集成 | 第三方登录 | 应该有 | US001 |
| F007 | API管理 | API密钥和访问控制 | 应该有 | US005 |
## 非功能需求
### 性能要求
- **响应时间**: 认证接口 < 100ms,查询接口 < 200ms
- **并发量**: 支持峰值 2000 QPS
- **数据量**: 支持 1000万+ 用户,10万+ 租户
### 可用性要求
- **系统可用性**: 99.9%(年停机时间 < 9小时)
- **数据备份**: 实时备份,RTO < 1小时
- **故障恢复**: 自动故障转移,恢复时间 < 5分钟
### 安全要求
- **认证机制**: JWT + OAuth2.0,支持多因子认证
- **数据加密**: 传输和存储全加密
- **权限控制**: 基于角色的精细化权限控制
- **审计日志**: 完整的用户操作和API访问日志
## 约束和假设
### 技术约束
- 必须使用Python技术栈(FastAPI)
- 数据库使用PostgreSQL
- 缓存使用Redis
- 消息队列使用Celery + Redis
- 必须提供OpenAPI 3.0规范文档
### 资源约束
- 开发周期:8周
- 开发团队:3名Python工程师 + 1名测试工程师
- 服务器成本:月均预算8000元
### 假设条件
1. 现有系统支持API集成方式改造
2. 用户迁移可以分批进行,不影响业务连续性
3. 监管合规要求可以通过配置满足
## 发布计划
### MVP(最小可行产品)- 6周后发布
**包含功能**:
- 用户注册和JWT认证
- 基础的用户CRUD操作
- 简单的角色权限控制
- 租户基础管理
### V1.1 - MVP后2周
- OAuth2.0第三方登录
- 完善的权限管理
- API访问控制
- 用户数据迁移工具
### V1.2 - V1.1后4周
- 高级权限策略
- 完整的审计日志
- 性能优化和监控
- 多语言SDK支持
## 成功指标
### 业务指标
- API日调用量 > 100万次
- 用户认证成功率 > 99.5%
- 系统可用性 > 99.9%
- 用户满意度 > 4.5/5.0
### 技术指标
- 代码测试覆盖率 > 85%
- API文档完整度 100%
- 安全漏洞数量 = 0
- 代码质量评分 > A级🏗️ ARCHITECT阶段:技术架构设计
架构设计和技术选型
PRD完成后,Architect子代理开始技术设计:
🤖 Architect: 基于PRD需求,我来设计用户管理API的技术架构。
## 技术栈选型
### 后端框架
- **Python 3.11**: 最新稳定版,性能优异,类型提示完善
- **FastAPI 0.104+**: 现代异步框架,原生OpenAPI支持
- **SQLAlchemy 2.0**: 现代ORM,支持异步操作
- **Alembic**: 数据库迁移工具
### 数据存储
- **PostgreSQL 15**: 强ACID保证,支持JSON和全文搜索
- **Redis 7**: 缓存和会话存储,支持持久化
- **MinIO**: 对象存储,用于头像和文件管理
### 异步和任务队列
- **Celery 5.3**: 分布式任务队列
- **Redis**: 作为Celery的broker和result backend
- **uvicorn**: ASGI服务器,支持高并发
### 基础设施
- **Poetry**: 现代Python依赖管理
- **Docker**: 容器化部署
- **OpenAPI 3.0**: API规范和文档生成
这个技术选型考虑了Python生态的最新发展和异步性能需求。系统架构设计
OpenAPI规范生成
基于需求分析生成详细的API规范:
yaml
# user-api-spec.yaml
openapi: 3.0.3
info:
title: User Management API
version: 1.0.0
description: |
多租户用户管理API系统,基于SOLO-PRD需求设计
## 功能特性
- 用户注册和JWT认证
- 多租户管理和数据隔离
- 基于角色的权限控制(RBAC)
- OAuth2.0第三方登录集成
- 完整的用户生命周期管理
contact:
name: User API Team
email: user-api@example.com
servers:
- url: https://api.userservice.example.com/v1
description: 生产环境
- url: https://staging-api.userservice.example.com/v1
description: 测试环境
security:
- BearerAuth: []
paths:
/auth/register:
post:
tags: [Authentication]
summary: 用户注册
description: |
基于PRD-US001需求,支持用户注册功能
## 注册方式
- 邮箱注册(需要邮箱验证)
- 手机号注册(需要短信验证码)
- 支持自定义用户名
security: [] # 注册接口不需要认证
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/UserRegisterRequest'
example:
username: "john_doe"
email: "john@example.com"
password: "SecurePass123!"
tenant_id: "acme-corp"
verification_code: "123456"
responses:
'201':
description: 注册成功
content:
application/json:
schema:
$ref: '#/components/schemas/UserRegisterResponse'
example:
id: "user_123"
username: "john_doe"
email: "john@example.com"
tenant_id: "acme-corp"
status: "pending_verification"
created_at: "2024-01-15T10:30:00Z"
'400':
$ref: '#/components/responses/BadRequest'
'409':
description: 用户已存在
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
/auth/login:
post:
tags: [Authentication]
summary: 用户登录
description: |
基于PRD-US001需求,支持用户身份认证
## 登录方式
- 用户名/邮箱 + 密码
- 支持多因子认证
- 返回JWT访问令牌
security: []
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/UserLoginRequest'
example:
identifier: "john@example.com"
password: "SecurePass123!"
mfa_code: "123456"
responses:
'200':
description: 登录成功
content:
application/json:
schema:
$ref: '#/components/schemas/LoginResponse'
example:
access_token: "eyJhbGciOiJIUzI1NiIs..."
refresh_token: "eyJhbGciOiJIUzI1NiIs..."
token_type: "bearer"
expires_in: 3600
user:
id: "user_123"
username: "john_doe"
email: "john@example.com"
tenant_id: "acme-corp"
roles: ["user"]
'401':
$ref: '#/components/responses/Unauthorized'
'423':
description: 账户被锁定
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
/users:
get:
tags: [Users]
summary: 查询用户列表
description: |
基于PRD-US004需求,支持用户信息查询
## 权限控制
- 普通用户只能查询自己的信息
- 租户管理员可以查询租户内所有用户
- 平台管理员可以查询所有用户
parameters:
- name: tenant_id
in: query
description: 租户ID筛选
schema:
type: string
- name: role
in: query
description: 角色筛选
schema:
type: string
enum: [admin, manager, user]
- name: status
in: query
description: 状态筛选
schema:
type: string
enum: [active, inactive, pending]
- name: page
in: query
description: 页码,从1开始
schema:
type: integer
minimum: 1
default: 1
- name: limit
in: query
description: 每页条数
schema:
type: integer
minimum: 1
maximum: 100
default: 20
responses:
'200':
description: 成功返回用户列表
content:
application/json:
schema:
$ref: '#/components/schemas/UserListResponse'
example:
data:
- id: "user_123"
username: "john_doe"
email: "john@example.com"
tenant_id: "acme-corp"
roles: ["user"]
status: "active"
last_login: "2024-01-15T10:30:00Z"
created_at: "2024-01-10T10:30:00Z"
pagination:
page: 1
limit: 20
total: 156
total_pages: 8
has_next: true
has_prev: false
'403':
$ref: '#/components/responses/Forbidden'
post:
tags: [Users]
summary: 创建用户
description: |
基于PRD-US002需求,支持管理员创建用户
## 权限要求
- 需要租户管理员或平台管理员权限
- 支持批量创建用户
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateUserRequest'
example:
username: "jane_doe"
email: "jane@example.com"
password: "TempPass123!"
tenant_id: "acme-corp"
roles: ["user"]
profile:
first_name: "Jane"
last_name: "Doe"
department: "Engineering"
responses:
'201':
description: 用户创建成功
content:
application/json:
schema:
$ref: '#/components/schemas/UserResponse'
'400':
$ref: '#/components/responses/BadRequest'
'403':
$ref: '#/components/responses/Forbidden'
'409':
description: 用户名或邮箱已存在
/users/{user_id}:
get:
tags: [Users]
summary: 查询用户详情
description: 获取指定用户的详细信息
parameters:
- name: user_id
in: path
required: true
description: 用户ID
schema:
type: string
example: "user_123"
responses:
'200':
description: 成功返回用户详情
content:
application/json:
schema:
$ref: '#/components/schemas/UserDetailResponse'
'404':
$ref: '#/components/responses/NotFound'
'403':
$ref: '#/components/responses/Forbidden'
put:
tags: [Users]
summary: 更新用户信息
description: |
基于PRD-US004需求,支持用户信息更新
## 权限控制
- 用户可以更新自己的基础信息
- 管理员可以更新用户的角色和状态
parameters:
- name: user_id
in: path
required: true
description: 用户ID
schema:
type: string
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/UpdateUserRequest'
responses:
'200':
description: 更新成功
content:
application/json:
schema:
$ref: '#/components/schemas/UserResponse'
'400':
$ref: '#/components/responses/BadRequest'
'403':
$ref: '#/components/responses/Forbidden'
'404':
$ref: '#/components/responses/NotFound'
/tenants:
get:
tags: [Tenants]
summary: 查询租户列表
description: |
基于PRD-US002需求,支持多租户管理
## 权限要求
- 仅平台管理员可以访问
parameters:
- name: status
in: query
description: 租户状态筛选
schema:
type: string
enum: [active, inactive, suspended]
- name: page
in: query
schema:
type: integer
default: 1
- name: limit
in: query
schema:
type: integer
default: 20
responses:
'200':
description: 成功返回租户列表
content:
application/json:
schema:
$ref: '#/components/schemas/TenantListResponse'
'403':
$ref: '#/components/responses/Forbidden'
post:
tags: [Tenants]
summary: 创建租户
description: |
创建新的租户,仅平台管理员可操作
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateTenantRequest'
example:
name: "Acme Corporation"
slug: "acme-corp"
domain: "acme.example.com"
settings:
max_users: 1000
features: ["sso", "api_access"]
responses:
'201':
description: 租户创建成功
content:
application/json:
schema:
$ref: '#/components/schemas/TenantResponse'
/roles:
get:
tags: [Roles]
summary: 查询角色列表
description: |
基于PRD-US003需求,支持角色权限管理
parameters:
- name: tenant_id
in: query
description: 租户ID,不提供则查询全局角色
schema:
type: string
responses:
'200':
description: 成功返回角色列表
content:
application/json:
schema:
$ref: '#/components/schemas/RoleListResponse'
post:
tags: [Roles]
summary: 创建角色
description: |
创建自定义角色,需要管理员权限
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateRoleRequest'
example:
name: "Project Manager"
description: "Can manage projects and users"
permissions: ["project:read", "project:write", "user:read"]
tenant_id: "acme-corp"
responses:
'201':
description: 角色创建成功
content:
application/json:
schema:
$ref: '#/components/schemas/RoleResponse'
components:
schemas:
# 用户相关Schema
UserRegisterRequest:
type: object
required: [username, email, password, tenant_id]
properties:
username:
type: string
minLength: 3
maxLength: 50
pattern: '^[a-zA-Z0-9_-]+$'
description: 用户名
example: "john_doe"
email:
type: string
format: email
description: 邮箱地址
example: "john@example.com"
password:
type: string
minLength: 8
description: 密码,需满足复杂度要求
example: "SecurePass123!"
tenant_id:
type: string
description: 租户ID
example: "acme-corp"
verification_code:
type: string
description: 验证码(邮箱或短信)
example: "123456"
profile:
$ref: '#/components/schemas/UserProfile'
UserLoginRequest:
type: object
required: [identifier, password]
properties:
identifier:
type: string
description: 用户名或邮箱
example: "john@example.com"
password:
type: string
description: 密码
example: "SecurePass123!"
mfa_code:
type: string
description: 多因子认证码
example: "123456"
LoginResponse:
type: object
required: [access_token, token_type, expires_in, user]
properties:
access_token:
type: string
description: JWT访问令牌
refresh_token:
type: string
description: 刷新令牌
token_type:
type: string
enum: [bearer]
description: 令牌类型
expires_in:
type: integer
description: 令牌过期时间(秒)
user:
$ref: '#/components/schemas/UserSummary'
UserProfile:
type: object
properties:
first_name:
type: string
description: 名
example: "John"
last_name:
type: string
description: 姓
example: "Doe"
avatar_url:
type: string
format: uri
description: 头像URL
phone:
type: string
description: 手机号
department:
type: string
description: 部门
title:
type: string
description: 职位
UserSummary:
type: object
required: [id, username, email, tenant_id, status]
properties:
id:
type: string
description: 用户ID
example: "user_123"
username:
type: string
description: 用户名
example: "john_doe"
email:
type: string
format: email
description: 邮箱
example: "john@example.com"
tenant_id:
type: string
description: 租户ID
example: "acme-corp"
roles:
type: array
items:
type: string
description: 用户角色
example: ["user", "developer"]
status:
type: string
enum: [active, inactive, pending, suspended]
description: 用户状态
last_login:
type: string
format: date-time
description: 最后登录时间
created_at:
type: string
format: date-time
description: 创建时间
# 租户相关Schema
TenantResponse:
type: object
required: [id, name, slug, status]
properties:
id:
type: string
description: 租户ID
example: "tenant_123"
name:
type: string
description: 租户名称
example: "Acme Corporation"
slug:
type: string
description: 租户标识
example: "acme-corp"
domain:
type: string
description: 租户域名
example: "acme.example.com"
status:
type: string
enum: [active, inactive, suspended]
description: 租户状态
settings:
type: object
description: 租户配置
additionalProperties: true
created_at:
type: string
format: date-time
description: 创建时间
# 通用Schema
PaginationInfo:
type: object
required: [page, limit, total, total_pages]
properties:
page:
type: integer
description: 当前页码
example: 1
limit:
type: integer
description: 每页条数
example: 20
total:
type: integer
description: 总记录数
example: 156
total_pages:
type: integer
description: 总页数
example: 8
has_next:
type: boolean
description: 是否有下一页
has_prev:
type: boolean
description: 是否有上一页
ErrorResponse:
type: object
required: [error]
properties:
error:
type: object
required: [code, message]
properties:
code:
type: string
description: 错误代码
example: "USER_NOT_FOUND"
message:
type: string
description: 错误描述
example: "User not found"
details:
type: object
description: 错误详细信息
additionalProperties: true
trace_id:
type: string
description: 请求追踪ID
securitySchemes:
BearerAuth:
type: http
scheme: bearer
bearerFormat: JWT
description: |
使用JWT Bearer Token进行认证
Token Claims:
- sub: 用户ID
- tenant_id: 租户ID
- roles: 用户角色列表
- exp: 过期时间
responses:
BadRequest:
description: 请求参数错误
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
Unauthorized:
description: 未授权访问
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
Forbidden:
description: 权限不足
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
NotFound:
description: 资源不存在
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'项目任务分解
Architect基于API规范生成详细的项目计划:
markdown
# 用户管理API - 项目实施计划
## 任务分解
### 阶段一:基础设施搭建 (第1-2周)
#### TASK001: FastAPI项目初始化
- **描述**: 创建FastAPI项目,配置基础依赖和项目结构
- **验收标准**:
1. FastAPI项目结构完整,支持异步操作
2. Poetry依赖管理配置正确
3. 应用可以正常启动,支持热重载
4. 基础配置管理(Pydantic Settings)
5. 日志系统配置(structlog)
- **预估时间**: 2天
- **优先级**: P0
#### TASK002: 数据库设计和配置
- **描述**: 设计用户管理相关的数据库表结构
- **验收标准**:
1. 完整的ER图设计,支持多租户
2. PostgreSQL异步连接配置
3. SQLAlchemy 2.0 模型定义
4. Alembic迁移脚本配置
5. 数据库连接池优化
- **预估时间**: 3天
- **优先级**: P0
#### TASK003: 认证授权框架
- **描述**: 实现JWT认证和RBAC权限控制
- **验收标准**:
1. JWT生成、验证和刷新机制
2. 基于角色的权限控制装饰器
3. 异步权限验证中间件
4. 密码加密和验证(bcrypt)
5. 会话管理和安全配置
- **预估时间**: 4天
- **优先级**: P0
### 阶段二:核心功能实现 (第3-5周)
#### TASK004: 用户认证功能
- **描述**: 实现用户注册、登录、认证功能
- **验收标准**:
1. 用户注册API,支持邮箱验证
2. 用户登录API,支持多种登录方式
3. JWT token管理和刷新
4. 密码安全策略实现
5. 多因子认证支持
- **预估时间**: 5天
- **优先级**: P0
#### TASK005: 多租户管理
- **描述**: 实现多租户数据隔离和管理
- **验收标准**:
1. 租户CRUD操作API
2. 租户级别数据隔离
3. 租户配置和定制支持
4. 租户管理员权限控制
5. 租户使用统计
- **预估时间**: 4天
- **优先级**: P0
#### TASK006: 用户管理功能
- **描述**: 实现完整的用户生命周期管理
- **验收标准**:
1. 用户CRUD操作API
2. 用户信息查询和筛选
3. 用户状态管理
4. 用户头像上传
5. 用户批量操作
- **预估时间**: 4天
- **优先级**: P0
#### TASK007: 角色权限系统
- **描述**: 实现灵活的RBAC权限系统
- **验收标准**:
1. 角色CRUD操作
2. 权限定义and管理
3. 用户角色分配
4. 权限继承机制
5. 权限缓存优化
- **预估时间**: 5天
- **优先级**: P0
### 阶段三:高级功能和优化 (第6-8周)
#### TASK008: 异步任务处理
- **描述**: 集成Celery处理异步任务
- **验收标准**:
1. Celery配置和Redis集成
2. 邮件发送异步任务
3. 用户数据导出任务
4. 任务状态跟踪
5. 任务失败重试机制
- **预估时间**: 3天
- **优先级**: P1
#### TASK009: 缓存和性能优化
- **描述**: 实现Redis缓存和性能优化
- **验收标准**:
1. Redis缓存配置和使用
2. 用户会话缓存
3. 权限信息缓存
4. API响应缓存
5. 缓存失效策略
- **预估时间**: 3天
- **优先级**: P1
#### TASK010: API访问控制
- **描述**: 实现API密钥管理和访问控制
- **验收标准**:
1. API密钥生成和管理
2. 调用频率限制
3. API访问日志
4. 访问统计和监控
5. API版本管理
- **预估时间**: 4天
- **优先级**: P1
#### TASK011: 监控和日志
- **描述**: 完善监控和日志系统
- **验收标准**:
1. 结构化日志记录
2. API访问指标收集
3. 业务指标监控
4. 健康检查端点
5. 错误监控和告警
- **预估时间**: 3天
- **优先级**: P1
#### TASK012: 测试和部署
- **描述**: 完善测试套件和部署配置
- **验收标准**:
1. 单元测试覆盖率 > 85%
2. 集成测试完整
3. API测试自动化
4. Docker化部署配置
5. CI/CD流水线配置
- **预估时间**: 5天
- **优先级**: P0🛠️ ENGINEER阶段:TDD代码实现
项目结构搭建
首先创建FastAPI项目结构:
bash
# 使用Poetry创建项目
poetry new user-management-api
cd user-management-api
# 添加依赖
poetry add fastapi[all] uvicorn sqlalchemy[asyncio] alembic asyncpg redis celery python-multipart structlog pydantic-settings bcrypt python-jose[cryptography] pytest pytest-asyncio httpx pytest-cov
# 创建项目结构
mkdir -p {app/{api,core,db,models,schemas,services,utils},tests/{unit,integration},alembic/versions}
# 创建基础文件
touch app/{__init__.py,main.py} app/api/__init__.py app/core/{__init__.py,config.py,security.py} app/db/{__init__.py,base.py,session.py} app/models/__init__.py app/schemas/__init__.py app/services/__init__.py app/utils/__init__.pyTDD实现示例:用户认证功能
RED阶段:编写失败测试
python
# tests/test_auth.py
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.main import app
from app.core.config import settings
from app.db.session import get_db
from app.models.user import User
from app.schemas.auth import UserRegisterRequest
class TestUserAuthentication:
"""用户认证功能测试"""
@pytest.mark.asyncio
async def test_user_registration_success(
self,
async_client: AsyncClient,
db_session: AsyncSession
):
"""测试用户注册成功场景"""
# Given - 准备注册数据
register_data = {
"username": "john_doe",
"email": "john@example.com",
"password": "SecurePass123!",
"tenant_id": "test-tenant",
"profile": {
"first_name": "John",
"last_name": "Doe"
}
}
# When - 调用注册API
response = await async_client.post(
"/api/v1/auth/register",
json=register_data
)
# Then - 验证响应
assert response.status_code == 201
response_data = response.json()
assert response_data["username"] == "john_doe"
assert response_data["email"] == "john@example.com"
assert response_data["tenant_id"] == "test-tenant"
assert response_data["status"] == "pending_verification"
assert "password" not in response_data # 密码不应该在响应中
# 验证数据库记录
user = await db_session.get(User, response_data["id"])
assert user is not None
assert user.username == "john_doe"
assert user.email == "john@example.com"
@pytest.mark.asyncio
async def test_user_registration_duplicate_email(
self,
async_client: AsyncClient,
existing_user: User
):
"""测试邮箱重复注册场景"""
# Given - 使用已存在用户的邮箱
register_data = {
"username": "different_user",
"email": existing_user.email, # 重复邮箱
"password": "SecurePass123!",
"tenant_id": "test-tenant"
}
# When - 尝试注册
response = await async_client.post(
"/api/v1/auth/register",
json=register_data
)
# Then - 应该返回409冲突
assert response.status_code == 409
error_data = response.json()
assert error_data["error"]["code"] == "EMAIL_ALREADY_EXISTS"
@pytest.mark.asyncio
async def test_user_login_success(
self,
async_client: AsyncClient,
verified_user: User
):
"""测试用户登录成功场景"""
# Given - 准备登录数据
login_data = {
"identifier": verified_user.email,
"password": "password123" # 测试用户的密码
}
# When - 调用登录API
response = await async_client.post(
"/api/v1/auth/login",
json=login_data
)
# Then - 验证响应
assert response.status_code == 200
response_data = response.json()
assert "access_token" in response_data
assert "refresh_token" in response_data
assert response_data["token_type"] == "bearer"
assert response_data["expires_in"] > 0
# 验证用户信息
user_data = response_data["user"]
assert user_data["id"] == str(verified_user.id)
assert user_data["username"] == verified_user.username
assert user_data["email"] == verified_user.email
@pytest.mark.asyncio
async def test_user_login_invalid_credentials(
self,
async_client: AsyncClient,
verified_user: User
):
"""测试无效凭据登录场景"""
# Given - 错误的密码
login_data = {
"identifier": verified_user.email,
"password": "wrong_password"
}
# When - 尝试登录
response = await async_client.post(
"/api/v1/auth/login",
json=login_data
)
# Then - 应该返回401未授权
assert response.status_code == 401
error_data = response.json()
assert error_data["error"]["code"] == "INVALID_CREDENTIALS"
@pytest.mark.asyncio
async def test_protected_endpoint_with_valid_token(
self,
async_client: AsyncClient,
auth_headers: dict
):
"""测试带有效token访问受保护端点"""
# When - 使用有效token访问受保护资源
response = await async_client.get(
"/api/v1/users/me",
headers=auth_headers
)
# Then - 应该成功访问
assert response.status_code == 200
user_data = response.json()
assert "id" in user_data
assert "username" in user_data
@pytest.mark.asyncio
async def test_protected_endpoint_without_token(
self,
async_client: AsyncClient
):
"""测试无token访问受保护端点"""
# When - 不带token访问受保护资源
response = await async_client.get("/api/v1/users/me")
# Then - 应该返回401未授权
assert response.status_code == 401
# tests/unit/test_auth_service.py
import pytest
from unittest.mock import Mock, AsyncMock
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.auth_service import AuthService
from app.models.user import User
from app.schemas.auth import UserRegisterRequest
from app.core.exceptions import UserAlreadyExistsException
class TestAuthService:
"""认证服务单元测试"""
@pytest.fixture
def mock_db_session(self):
return AsyncMock(spec=AsyncSession)
@pytest.fixture
def auth_service(self, mock_db_session):
return AuthService(db=mock_db_session)
@pytest.mark.asyncio
async def test_register_user_success(
self,
auth_service: AuthService,
mock_db_session: AsyncSession
):
"""测试用户注册服务逻辑"""
# Given - 准备注册请求
register_request = UserRegisterRequest(
username="john_doe",
email="john@example.com",
password="SecurePass123!",
tenant_id="test-tenant"
)
# Mock数据库操作
mock_db_session.scalar.return_value = None # 用户不存在
mock_user = User(
id="user_123",
username="john_doe",
email="john@example.com",
tenant_id="test-tenant"
)
mock_db_session.add.return_value = None
mock_db_session.commit.return_value = None
mock_db_session.refresh.return_value = None
# When - 调用注册服务
result = await auth_service.register_user(register_request)
# Then - 验证结果
assert result.username == "john_doe"
assert result.email == "john@example.com"
assert result.tenant_id == "test-tenant"
# 验证数据库操作
mock_db_session.scalar.assert_called_once()
mock_db_session.add.assert_called_once()
mock_db_session.commit.assert_called_once()
@pytest.mark.asyncio
async def test_register_user_email_exists(
self,
auth_service: AuthService,
mock_db_session: AsyncSession
):
"""测试邮箱重复注册"""
# Given - 准备注册请求
register_request = UserRegisterRequest(
username="john_doe",
email="existing@example.com",
password="SecurePass123!",
tenant_id="test-tenant"
)
# Mock - 邮箱已存在
existing_user = User(email="existing@example.com")
mock_db_session.scalar.return_value = existing_user
# When & Then - 应该抛出异常
with pytest.raises(UserAlreadyExistsException) as exc_info:
await auth_service.register_user(register_request)
assert "Email already exists" in str(exc_info.value)
@pytest.mark.asyncio
async def test_authenticate_user_success(
self,
auth_service: AuthService,
mock_db_session: AsyncSession
):
"""测试用户认证成功"""
# Given - 准备用户和密码
mock_user = Mock()
mock_user.id = "user_123"
mock_user.username = "john_doe"
mock_user.email = "john@example.com"
mock_user.is_active = True
mock_user.verify_password.return_value = True
mock_db_session.scalar.return_value = mock_user
# When - 调用认证服务
result = await auth_service.authenticate_user(
"john@example.com",
"correct_password"
)
# Then - 验证结果
assert result is not None
assert result.id == "user_123"
assert result.username == "john_doe"
@pytest.mark.asyncio
async def test_authenticate_user_invalid_password(
self,
auth_service: AuthService,
mock_db_session: AsyncSession
):
"""测试密码错误认证"""
# Given - 准备用户,但密码错误
mock_user = Mock()
mock_user.verify_password.return_value = False
mock_db_session.scalar.return_value = mock_user
# When - 调用认证服务
result = await auth_service.authenticate_user(
"john@example.com",
"wrong_password"
)
# Then - 应该返回None
assert result is NoneGREEN阶段:最小实现
python
# app/models/user.py
from datetime import datetime
from typing import Optional, List
from uuid import uuid4
from sqlalchemy import String, DateTime, Boolean, Text, JSON
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
import bcrypt
from app.db.base import Base
class User(Base):
"""用户模型"""
__tablename__ = "users"
id: Mapped[str] = mapped_column(
UUID(as_uuid=False),
primary_key=True,
default=lambda: str(uuid4())
)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
tenant_id: Mapped[str] = mapped_column(String(50), index=True)
# 用户状态
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
is_verified: Mapped[bool] = mapped_column(Boolean, default=False)
is_superuser: Mapped[bool] = mapped_column(Boolean, default=False)
# 个人信息
first_name: Mapped[Optional[str]] = mapped_column(String(50))
last_name: Mapped[Optional[str]] = mapped_column(String(50))
avatar_url: Mapped[Optional[str]] = mapped_column(String(500))
phone: Mapped[Optional[str]] = mapped_column(String(20))
profile_data: Mapped[Optional[dict]] = mapped_column(JSON)
# 时间戳
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
last_login: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
def set_password(self, password: str) -> None:
"""设置密码哈希"""
salt = bcrypt.gensalt()
self.hashed_password = bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def verify_password(self, password: str) -> bool:
"""验证密码"""
return bcrypt.checkpw(
password.encode('utf-8'),
self.hashed_password.encode('utf-8')
)
@property
def full_name(self) -> str:
"""获取全名"""
if self.first_name and self.last_name:
return f"{self.first_name} {self.last_name}"
return self.username
def __repr__(self) -> str:
return f"<User {self.username}>"
# app/models/tenant.py
from datetime import datetime
from typing import Optional
from uuid import uuid4
from sqlalchemy import String, DateTime, Boolean, JSON, Integer
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.sql import func
from app.db.base import Base
class Tenant(Base):
"""租户模型"""
__tablename__ = "tenants"
id: Mapped[str] = mapped_column(
UUID(as_uuid=False),
primary_key=True,
default=lambda: str(uuid4())
)
name: Mapped[str] = mapped_column(String(100))
slug: Mapped[str] = mapped_column(String(50), unique=True, index=True)
domain: Mapped[Optional[str]] = mapped_column(String(255))
# 租户状态
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
# 租户配置
settings: Mapped[Optional[dict]] = mapped_column(JSON)
max_users: Mapped[Optional[int]] = mapped_column(Integer)
# 时间戳
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now()
)
def __repr__(self) -> str:
return f"<Tenant {self.name}>"
# app/schemas/auth.py
from datetime import datetime
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, EmailStr, Field, validator
import re
class UserProfile(BaseModel):
"""用户个人信息"""
first_name: Optional[str] = Field(None, max_length=50)
last_name: Optional[str] = Field(None, max_length=50)
phone: Optional[str] = Field(None, max_length=20)
avatar_url: Optional[str] = None
class UserRegisterRequest(BaseModel):
"""用户注册请求"""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8)
tenant_id: str = Field(..., max_length=50)
verification_code: Optional[str] = None
profile: Optional[UserProfile] = None
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError('Username can only contain letters, numbers, underscore and hyphen')
return v
@validator('password')
def validate_password(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'[0-9]', v):
raise ValueError('Password must contain at least one digit')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain at least one special character')
return v
class UserRegisterResponse(BaseModel):
"""用户注册响应"""
id: str
username: str
email: str
tenant_id: str
status: str
created_at: datetime
class Config:
from_attributes = True
class UserLoginRequest(BaseModel):
"""用户登录请求"""
identifier: str # 用户名或邮箱
password: str
mfa_code: Optional[str] = None
class UserSummary(BaseModel):
"""用户摘要信息"""
id: str
username: str
email: str
tenant_id: str
roles: List[str] = []
is_active: bool
is_verified: bool
last_login: Optional[datetime] = None
created_at: datetime
class Config:
from_attributes = True
class LoginResponse(BaseModel):
"""登录响应"""
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int # 秒
user: UserSummary
# app/api/auth.py
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.deps import get_db, get_current_user
from app.core.security import create_access_token, create_refresh_token
from app.core.config import settings
from app.services.auth_service import AuthService
from app.schemas.auth import (
UserRegisterRequest,
UserRegisterResponse,
UserLoginRequest,
LoginResponse,
UserSummary
)
from app.models.user import User
router = APIRouter(prefix="/auth", tags=["Authentication"])
security = HTTPBearer()
@router.post("/register", response_model=UserRegisterResponse, status_code=201)
async def register_user(
user_data: UserRegisterRequest,
db: AsyncSession = Depends(get_db)
) -> UserRegisterResponse:
"""
用户注册
基于PRD-US001需求,支持用户注册功能
"""
auth_service = AuthService(db)
try:
user = await auth_service.register_user(user_data)
return UserRegisterResponse(
id=user.id,
username=user.username,
email=user.email,
tenant_id=user.tenant_id,
status="pending_verification" if not user.is_verified else "active",
created_at=user.created_at
)
except Exception as e:
if "already exists" in str(e).lower():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"error": {
"code": "EMAIL_ALREADY_EXISTS",
"message": "Email address is already registered"
}
}
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": {
"code": "REGISTRATION_FAILED",
"message": str(e)
}
}
)
@router.post("/login", response_model=LoginResponse)
async def login_user(
login_data: UserLoginRequest,
db: AsyncSession = Depends(get_db)
) -> LoginResponse:
"""
用户登录
基于PRD-US001需求,支持用户身份认证
"""
auth_service = AuthService(db)
# 认证用户
user = await auth_service.authenticate_user(
login_data.identifier,
login_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"error": {
"code": "INVALID_CREDENTIALS",
"message": "Invalid username/email or password"
}
}
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_423_LOCKED,
detail={
"error": {
"code": "ACCOUNT_LOCKED",
"message": "Account is locked or suspended"
}
}
)
# 生成tokens
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.id, "tenant_id": user.tenant_id},
expires_delta=access_token_expires
)
refresh_token = create_refresh_token(data={"sub": user.id})
# 更新最后登录时间
await auth_service.update_last_login(user.id)
return LoginResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=int(access_token_expires.total_seconds()),
user=UserSummary.model_validate(user)
)
@router.get("/me", response_model=UserSummary)
async def get_current_user_info(
current_user: User = Depends(get_current_user)
) -> UserSummary:
"""
获取当前用户信息
"""
return UserSummary.model_validate(current_user)
# app/services/auth_service.py
from datetime import datetime
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.models.tenant import Tenant
from app.schemas.auth import UserRegisterRequest
from app.core.exceptions import UserAlreadyExistsException, TenantNotFoundException
class AuthService:
"""认证服务"""
def __init__(self, db: AsyncSession):
self.db = db
async def register_user(self, user_data: UserRegisterRequest) -> User:
"""注册新用户"""
# 检查邮箱是否已存在
existing_user = await self.db.scalar(
select(User).where(User.email == user_data.email)
)
if existing_user:
raise UserAlreadyExistsException("Email already exists")
# 检查用户名是否已存在
existing_username = await self.db.scalar(
select(User).where(User.username == user_data.username)
)
if existing_username:
raise UserAlreadyExistsException("Username already exists")
# 检查租户是否存在
tenant = await self.db.scalar(
select(Tenant).where(Tenant.slug == user_data.tenant_id)
)
if not tenant:
raise TenantNotFoundException(f"Tenant {user_data.tenant_id} not found")
# 创建用户
user = User(
username=user_data.username,
email=user_data.email,
tenant_id=user_data.tenant_id,
is_verified=False # 需要邮箱验证
)
# 设置密码
user.set_password(user_data.password)
# 设置个人信息
if user_data.profile:
user.first_name = user_data.profile.first_name
user.last_name = user_data.profile.last_name
user.phone = user_data.profile.phone
user.avatar_url = user_data.profile.avatar_url
self.db.add(user)
await self.db.commit()
await self.db.refresh(user)
# TODO: 发送验证邮件(异步任务)
return user
async def authenticate_user(
self,
identifier: str,
password: str
) -> Optional[User]:
"""认证用户"""
# 通过用户名或邮箱查找用户
user = await self.db.scalar(
select(User).where(
(User.username == identifier) | (User.email == identifier)
)
)
if not user:
return None
if not user.verify_password(password):
return None
return user
async def update_last_login(self, user_id: str) -> None:
"""更新最后登录时间"""
user = await self.db.get(User, user_id)
if user:
user.last_login = datetime.utcnow()
await self.db.commit()
# app/core/security.py
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""创建访问令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
def create_refresh_token(data: Dict[str, Any]) -> str:
"""创建刷新令牌"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(
days=settings.REFRESH_TOKEN_EXPIRE_DAYS
)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""验证JWT令牌"""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
return payload
except JWTError:
return None
# app/core/deps.py
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from app.core.security import verify_token
from app.models.user import User
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
"""获取当前认证用户"""
token = credentials.credentials
payload = verify_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload"
)
user = await db.get(User, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is inactive"
)
return user
# app/core/exceptions.py
class UserManagementException(Exception):
"""用户管理基础异常"""
pass
class UserAlreadyExistsException(UserManagementException):
"""用户已存在异常"""
pass
class TenantNotFoundException(UserManagementException):
"""租户不存在异常"""
pass
class InvalidCredentialsException(UserManagementException):
"""无效凭据异常"""
passREFACTOR阶段:代码重构
python
# app/core/config.py
from typing import Optional, List
from pydantic import BaseSettings, PostgresDsn, RedisDsn
class Settings(BaseSettings):
"""应用配置"""
# 应用基础配置
APP_NAME: str = "User Management API"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
# 安全配置
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# 数据库配置
DATABASE_URL: PostgresDsn
DATABASE_POOL_SIZE: int = 10
DATABASE_POOL_OVERFLOW: int = 20
# Redis配置
REDIS_URL: RedisDsn
# 邮件配置
SMTP_HOST: Optional[str] = None
SMTP_PORT: int = 587
SMTP_USER: Optional[str] = None
SMTP_PASSWORD: Optional[str] = None
# 对象存储配置
MINIO_ENDPOINT: Optional[str] = None
MINIO_ACCESS_KEY: Optional[str] = None
MINIO_SECRET_KEY: Optional[str] = None
# CORS配置
ALLOWED_ORIGINS: List[str] = ["*"]
# 分页配置
DEFAULT_PAGE_SIZE: int = 20
MAX_PAGE_SIZE: int = 100
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
# app/db/session.py
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.core.config import settings
# 创建异步引擎
engine = create_async_engine(
str(settings.DATABASE_URL),
pool_size=settings.DATABASE_POOL_SIZE,
max_overflow=settings.DATABASE_POOL_OVERFLOW,
echo=settings.DEBUG,
)
# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""获取数据库会话"""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
# app/utils/pagination.py
from typing import Generic, TypeVar, List, Optional
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar('T')
class PaginationInfo(BaseModel):
"""分页信息"""
page: int
limit: int
total: int
total_pages: int
has_next: bool
has_prev: bool
class PaginatedResponse(BaseModel, Generic[T]):
"""分页响应"""
data: List[T]
pagination: PaginationInfo
class Paginator:
"""分页器"""
@staticmethod
async def paginate(
db: AsyncSession,
query,
page: int = 1,
limit: int = 20,
max_limit: int = 100
) -> tuple[List, PaginationInfo]:
"""执行分页查询"""
# 限制每页数量
limit = min(limit, max_limit)
# 计算偏移量
offset = (page - 1) * limit
# 获取总数
count_query = select(func.count()).select_from(query.subquery())
total = await db.scalar(count_query)
# 执行分页查询
paginated_query = query.offset(offset).limit(limit)
result = await db.execute(paginated_query)
data = result.scalars().all()
# 计算分页信息
total_pages = (total + limit - 1) // limit
pagination_info = PaginationInfo(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_next=page < total_pages,
has_prev=page > 1
)
return data, pagination_info
# app/utils/validators.py
import re
from typing import Optional
class UserValidator:
"""用户数据验证器"""
@staticmethod
def validate_password_strength(password: str) -> tuple[bool, Optional[str]]:
"""验证密码强度"""
if len(password) < 8:
return False, "Password must be at least 8 characters long"
if not re.search(r'[A-Z]', password):
return False, "Password must contain at least one uppercase letter"
if not re.search(r'[a-z]', password):
return False, "Password must contain at least one lowercase letter"
if not re.search(r'[0-9]', password):
return False, "Password must contain at least one digit"
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False, "Password must contain at least one special character"
return True, None
@staticmethod
def validate_username(username: str) -> tuple[bool, Optional[str]]:
"""验证用户名格式"""
if len(username) < 3:
return False, "Username must be at least 3 characters long"
if len(username) > 50:
return False, "Username must be no more than 50 characters long"
if not re.match(r'^[a-zA-Z0-9_-]+$', username):
return False, "Username can only contain letters, numbers, underscore and hyphen"
return True, None
# app/services/cache_service.py
import json
from typing import Optional, Any
import redis.asyncio as redis
from datetime import timedelta
from app.core.config import settings
class CacheService:
"""缓存服务"""
def __init__(self):
self.redis = redis.from_url(str(settings.REDIS_URL))
async def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
try:
value = await self.redis.get(key)
if value:
return json.loads(value)
return None
except Exception:
return None
async def set(
self,
key: str,
value: Any,
expire: Optional[timedelta] = None
) -> bool:
"""设置缓存值"""
try:
serialized_value = json.dumps(value, default=str)
if expire:
await self.redis.setex(key, expire, serialized_value)
else:
await self.redis.set(key, serialized_value)
return True
except Exception:
return False
async def delete(self, key: str) -> bool:
"""删除缓存值"""
try:
await self.redis.delete(key)
return True
except Exception:
return False
async def exists(self, key: str) -> bool:
"""检查键是否存在"""
try:
return bool(await self.redis.exists(key))
except Exception:
return False
# 全局缓存实例
cache_service = CacheService()任务上下文记录
Engineer为每个实现的任务创建详细的上下文记录:
markdown
# 任务上下文: TASK004
## 任务描述
实现用户认证功能,包括用户注册、登录、JWT认证等核心功能
**相关用户故事**: US001 - 用户注册和认证
**验收标准**:
1. 支持邮箱、手机号等多种注册方式 ✅
2. 支持邮箱验证和手机验证码验证 🔄 (邮箱验证已实现,短信待完善)
3. 密码强度要求和安全存储 ✅
4. 支持JWT Token认证机制 ✅
5. 支持OAuth2.0第三方登录 🔄 (后续版本)
## TDD执行记录
### RED阶段 (2024-01-16 9:00-12:00)
**编写的测试用例**:
1. `test_user_registration_success()` - 正常用户注册测试
2. `test_user_registration_duplicate_email()` - 邮箱重复注册测试
3. `test_user_login_success()` - 用户登录成功测试
4. `test_user_login_invalid_credentials()` - 无效凭据测试
5. `test_protected_endpoint_with_valid_token()` - Token认证测试
6. `test_register_user_success()` - 服务层注册测试
7. `test_authenticate_user_success()` - 服务层认证测试
**预期失败原因**: 相关模型、API路由、服务类尚未实现
### GREEN阶段 (2024-01-16 13:00-18:00)
**实现的功能**:
1. `User`和`Tenant`模型 - 支持多租户用户管理
2. `AuthService`服务层 - 用户注册和认证业务逻辑
3. `/auth/register`和`/auth/login` API - 注册和登录接口
4. JWT Token生成和验证 - 基于jose库的安全实现
5. 密码加密和验证 - 使用bcrypt保证安全
**通过的测试**: 7个测试用例全部通过 ✅
### REFACTOR阶段 (2024-01-16 19:00-21:00)
**重构内容**:
1. 提取`Settings`配置管理 - 使用Pydantic Settings
2. 创建`Paginator`分页工具 - 通用分页逻辑封装
3. 实现`CacheService`缓存服务 - Redis异步缓存
4. 添加`UserValidator`验证器 - 数据验证逻辑分离
5. 优化异常处理 - 自定义异常类型
**最终测试结果**: 所有测试通过,代码覆盖率88% ✅
## 实现详情
### 关键技术决策
1. **异步vs同步**: 全面采用异步编程
- 理由: FastAPI原生支持,性能优异
- 优势: 高并发处理能力,资源利用率高
2. **密码加密**: 选择bcrypt
- 理由: 业界标准,安全性高
- 优势: 自带盐值,防彩虹表攻击
3. **JWT实现**: 使用python-jose
- 理由: 功能完整,支持多种算法
- 优势: 性能好,社区支持完善
### 使用的设计模式
1. **Service层模式**: 业务逻辑封装
2. **依赖注入模式**: FastAPI原生支持
3. **Factory模式**: 数据库会话创建
4. **Strategy模式**: 认证策略扩展
### 性能考虑
1. **数据库连接池**: 异步连接池配置
2. **密码验证**: 异步密码哈希验证
3. **JWT验证**: 缓存用户信息减少数据库查询
4. **分页查询**: 优化大数据量查询性能
## 文件变更
### 新增测试文件
- `tests/test_auth.py` - 认证功能集成测试
- `tests/unit/test_auth_service.py` - 认证服务单元测试
- `tests/unit/test_user_model.py` - 用户模型测试
### 新增源代码文件
- `app/models/user.py` - 用户模型
- `app/models/tenant.py` - 租户模型
- `app/api/auth.py` - 认证API路由
- `app/services/auth_service.py` - 认证服务
- `app/core/security.py` - 安全工具函数
- `app/core/deps.py` - FastAPI依赖
- `app/utils/pagination.py` - 分页工具
- `app/utils/validators.py` - 数据验证器
- `app/services/cache_service.py` - 缓存服务
### 修改的配置文件
- `app/core/config.py` - 应用配置管理
- `pyproject.toml` - 添加新的依赖包
- `alembic/versions/` - 数据库迁移文件
## 测试覆盖率
### 行覆盖率
- AuthService: 92%
- User Model: 89%
- Auth API: 94%
- Security Utils: 96%
- **整体覆盖率**: 88%
### 分支覆盖率
- 正常流程: 100%
- 异常处理: 90%
- 边界条件: 85%
- **整体分支覆盖率**: 91%
### 关键场景覆盖
- ✅ 用户注册流程
- ✅ 邮箱重复检查
- ✅ 密码强度验证
- ✅ JWT认证机制
- ✅ 权限控制
## 状态摘要
**任务状态**: ✅ 已完成
**测试结果**: ✅ 所有测试通过
**代码质量**: ✅ A级评分
**覆盖率达标**: ✅ 88% (目标85%)
**下一步**: 开始TASK005多租户管理功能开发🧪 QA阶段:全面质量保证
质量保证执行
功能开发完成后,QA Engineer开始全面的质量检查:
🤖 QA Engineer: 开始用户管理API的质量保证流程
## 测试执行计划
### 第一步:单元测试覆盖率验证
正在分析Python代码覆盖率...
覆盖率报告:
- 总体行覆盖率: 89% ✅ (目标: >85%)
- 分支覆盖率: 86% ✅ (目标: >75%)
- 函数覆盖率: 92% ✅ (目标: >80%)
各模块覆盖率:
- AuthService: 92% ✅
- UserModel: 89% ✅
- AuthAPI: 94% ✅
- TenantService: 87% ✅
### 第二步:API契约测试
正在验证FastAPI实现与OpenAPI规范的一致性...
契约测试结果:
- POST /api/v1/auth/register: ✅ 100%符合规范
- POST /api/v1/auth/login: ✅ 100%符合规范
- GET /api/v1/auth/me: ✅ 100%符合规范
- GET /api/v1/users: ✅ 100%符合规范
- POST /api/v1/users: ✅ 100%符合规范
- GET /api/v1/tenants: ✅ 100%符合规范
所有API接口完全符合OpenAPI 3.0规范定义 ✅
### 第三步:异步性能测试
正在测试FastAPI异步处理能力...
异步性能测试结果:
- 用户注册API: 平均响应时间 35ms ✅ (目标<100ms)
- 用户登录API: 平均响应时间 28ms ✅ (目标<100ms)
- 用户查询API: 平均响应时间 15ms ✅ (目标<200ms)
- 并发支持: 2000 QPS ✅ (目标2000 QPS)
### 第四步:安全测试
正在执行Python应用安全评估...
安全测试结果:
- JWT Token安全: ✅ 签名验证和过期检查正确
- 密码存储安全: ✅ bcrypt加密,无明文存储
- SQL注入防护: ✅ SQLAlchemy ORM有效防护
- 数据验证安全: ✅ Pydantic模型验证完整
- CORS配置: ✅ 跨域访问控制正确
发现0个严重安全漏洞 ✅
质量保证结论: ✅ 通过所有质量门控,建议发布🚀 生产部署和监控
Docker化部署配置
dockerfile
# Dockerfile
FROM python:3.11-slim
LABEL maintainer="user-api-team@example.com"
LABEL version="1.0.0"
LABEL description="User Management API"
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
build-essential \
curl \
&& rm -rf /var/lib/apt/lists/*
# 安装Poetry
RUN pip install poetry
# 配置Poetry
ENV POETRY_NO_INTERACTION=1 \
POETRY_VENV_IN_PROJECT=1 \
POETRY_CACHE_DIR=/tmp/poetry_cache
# 复制依赖文件
COPY pyproject.toml poetry.lock ./
# 安装依赖
RUN poetry install --only=main && rm -rf $POETRY_CACHE_DIR
# 复制应用代码
COPY app ./app
COPY alembic ./alembic
COPY alembic.ini ./
# 创建非root用户
RUN adduser --disabled-password --gecos '' appuser
USER appuser
# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["poetry", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]yaml
# docker-compose.yml
version: '3.8'
services:
user-api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://user_service:${DB_PASSWORD}@postgres:5432/user_management
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY}
depends_on:
- postgres
- redis
restart: unless-stopped
volumes:
- ./logs:/app/logs
postgres:
image: postgres:15
environment:
POSTGRES_DB: user_management
POSTGRES_USER: user_service
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:7-alpine
restart: unless-stopped
celery-worker:
build: .
command: poetry run celery -A app.worker worker --loglevel=info
environment:
- DATABASE_URL=postgresql+asyncpg://user_service:${DB_PASSWORD}@postgres:5432/user_management
- REDIS_URL=redis://redis:6379/0
depends_on:
- postgres
- redis
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- user-api
restart: unless-stopped
volumes:
postgres_data:监控配置
python
# app/middleware/monitoring.py
import time
import structlog
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
logger = structlog.get_logger()
class MonitoringMiddleware(BaseHTTPMiddleware):
"""监控中间件"""
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# 记录请求开始
logger.info(
"request_started",
method=request.method,
url=str(request.url),
user_agent=request.headers.get("user-agent"),
client_host=request.client.host
)
response = await call_next(request)
# 计算处理时间
process_time = time.time() - start_time
# 记录请求结束
logger.info(
"request_completed",
method=request.method,
url=str(request.url),
status_code=response.status_code,
process_time=process_time
)
# 添加响应头
response.headers["X-Process-Time"] = str(process_time)
return response📊 项目总结和收益
开发效率提升
通过完整应用SOLO工作模式,本Python FastAPI项目实现了:
- 异步性能: FastAPI原生异步支持,响应时间优化60%
- 开发效率: Python简洁语法和FastAPI自动文档,开发效率提升40%
- 代码质量: TDD实践和类型提示,Bug率降低65%
- API一致性: OpenAPI规范完美集成,接口一致性100%
技术指标达成
项目最终达成的关键指标:
性能指标:
- API响应时间: 28ms (目标<100ms) ✅ 优秀
- 并发支持: 2000+ QPS (目标2000 QPS) ✅ 达标
- 系统可用性: 99.9% (目标99.9%) ✅ 达标
质量指标:
- 代码覆盖率: 89% (目标>85%) ✅ 优秀
- API规范一致性: 100% ✅ 完美
- 安全漏洞: 0个严重 ✅ 安全
团队效率:
- 开发周期: 8周 (计划8周) ✅ 按期交付
- 异步性能: 比同步版本快60% ✅ 显著提升
- 返工率: <3% (历史平均15%) ✅ 大幅降低经验总结
- SOLO + FastAPI: 现代异步框架与系统化开发流程完美结合
- Python异步TDD: 异步代码的测试驱动开发实践经验
- 类型提示价值: Python类型提示显著提升代码质量和可维护性
- OpenAPI集成: FastAPI原生OpenAPI支持简化API规范管理
🔗 相关资源
项目源码
学习资源
下一步
🎯 总结: 本指南展示了Python FastAPI项目中SOLO工作模式的完整应用实践。通过现代异步框架与系统化开发流程的结合,实现了高性能、高质量的用户管理API服务。FastAPI的原生OpenAPI支持和异步特性,为Python Web开发提供了一套完整可行的SOLO实施方案。