Skip to content

API 开发最佳实践

构建高质量、可维护、可扩展 API 的综合指南

设计原则

API 设计最佳实践

1. RESTful 设计规范

yaml
# ✅ 好的示例
GET    /api/v1/users              # 获取用户列表
GET    /api/v1/users/123          # 获取特定用户
POST   /api/v1/users              # 创建用户
PUT    /api/v1/users/123          # 完整更新用户
PATCH  /api/v1/users/123          # 部分更新用户
DELETE /api/v1/users/123          # 删除用户

# 子资源
GET    /api/v1/users/123/orders   # 获取用户订单
POST   /api/v1/users/123/orders   # 创建用户订单

# 行为/操作
POST   /api/v1/users/123/activate # 激活用户
POST   /api/v1/orders/456/cancel  # 取消订单

# ❌ 避免的示例
GET    /api/v1/getUsers           # 动词在 URL 中
POST   /api/v1/users/create       # 冗余的动作
GET    /api/v1/user_list          # 不一致的命名

2. 命名约定

typescript
// API 路径命名
const API_CONVENTIONS = {
  // 使用复数名词
  resources: '/products',      // ✅
  notResource: '/product',     // ❌
  
  // 使用连字符分隔
  multiWord: '/user-profiles', // ✅
  notCamelCase: '/userProfiles', // ❌
  notSnakeCase: '/user_profiles', // ❌
  
  // 使用小写
  lowercase: '/api/v1/products', // ✅
  notMixedCase: '/API/V1/Products', // ❌
};

// 请求/响应字段命名
interface ProductResponse {
  // 使用 camelCase
  productId: string;        // ✅
  productName: string;      // ✅
  createdAt: string;        // ✅
  
  // 避免
  product_id: string;       // ❌ snake_case
  ProductName: string;      // ❌ PascalCase
  created_at: string;       // ❌ snake_case
}

3. 状态码使用

python
from http import HTTPStatus

class APIResponse:
    """正确使用 HTTP 状态码"""
    
    # 2xx 成功
    @staticmethod
    def success(data):
        return {"data": data}, HTTPStatus.OK  # 200
    
    @staticmethod
    def created(data, location):
        return {"data": data}, HTTPStatus.CREATED, {"Location": location}  # 201
    
    @staticmethod
    def accepted(task_id):
        return {"taskId": task_id, "status": "processing"}, HTTPStatus.ACCEPTED  # 202
    
    @staticmethod
    def no_content():
        return "", HTTPStatus.NO_CONTENT  # 204
    
    # 4xx 客户端错误
    @staticmethod
    def bad_request(message, errors=None):
        return {"error": {"message": message, "details": errors}}, HTTPStatus.BAD_REQUEST  # 400
    
    @staticmethod
    def unauthorized(message="Authentication required"):
        return {"error": {"message": message}}, HTTPStatus.UNAUTHORIZED  # 401
    
    @staticmethod
    def forbidden(message="Permission denied"):
        return {"error": {"message": message}}, HTTPStatus.FORBIDDEN  # 403
    
    @staticmethod
    def not_found(resource, id):
        return {"error": {"message": f"{resource} {id} not found"}}, HTTPStatus.NOT_FOUND  # 404
    
    @staticmethod
    def conflict(message):
        return {"error": {"message": message}}, HTTPStatus.CONFLICT  # 409
    
    @staticmethod
    def unprocessable_entity(message, errors=None):
        return {"error": {"message": message, "details": errors}}, HTTPStatus.UNPROCESSABLE_ENTITY  # 422
    
    @staticmethod
    def too_many_requests(retry_after):
        return {"error": {"message": "Rate limit exceeded"}}, HTTPStatus.TOO_MANY_REQUESTS, {"Retry-After": str(retry_after)}  # 429
    
    # 5xx 服务器错误
    @staticmethod
    def internal_error(request_id):
        return {"error": {"message": "Internal server error", "requestId": request_id}}, HTTPStatus.INTERNAL_SERVER_ERROR  # 500
    
    @staticmethod
    def service_unavailable(retry_after=None):
        headers = {"Retry-After": str(retry_after)} if retry_after else {}
        return {"error": {"message": "Service temporarily unavailable"}}, HTTPStatus.SERVICE_UNAVAILABLE, headers  # 503

数据验证最佳实践

输入验证

java
// Spring Boot 验证示例
@RestController
@RequestMapping("/api/v1/users")
@Validated
public class UserController {
    
    @PostMapping
    public ResponseEntity<User> createUser(
            @Valid @RequestBody CreateUserRequest request) {
        // 验证通过后的处理
        User user = userService.createUser(request);
        return ResponseEntity.created(URI.create("/api/v1/users/" + user.getId()))
                .body(user);
    }
}

@Data
public class CreateUserRequest {
    @NotBlank(message = "用户名不能为空")
    @Size(min = 3, max = 50, message = "用户名长度必须在3-50个字符之间")
    @Pattern(regexp = "^[a-zA-Z0-9_]+$", message = "用户名只能包含字母、数字和下划线")
    private String username;
    
    @NotBlank(message = "邮箱不能为空")
    @Email(message = "邮箱格式不正确")
    private String email;
    
    @NotBlank(message = "密码不能为空")
    @Size(min = 8, message = "密码长度至少8位")
    @Pattern(
        regexp = "^(?=.*[a-z])(?=.*[A-Z])(?=.*\\d)(?=.*[@$!%*?&])[A-Za-z\\d@$!%*?&]+$",
        message = "密码必须包含大小写字母、数字和特殊字符"
    )
    private String password;
    
    @NotNull(message = "年龄不能为空")
    @Min(value = 18, message = "年龄必须大于等于18")
    @Max(value = 120, message = "年龄必须小于等于120")
    private Integer age;
    
    @Valid
    @NotNull(message = "地址信息不能为空")
    private Address address;
}

// 自定义验证器
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = UniqueUsernameValidator.class)
public @interface UniqueUsername {
    String message() default "用户名已存在";
    Class<?>[] groups() default {};
    Class<? extends Payload>[] payload() default {};
}

@Component
public class UniqueUsernameValidator implements ConstraintValidator<UniqueUsername, String> {
    @Autowired
    private UserRepository userRepository;
    
    @Override
    public boolean isValid(String username, ConstraintValidatorContext context) {
        if (username == null) {
            return true; // 让 @NotNull 处理
        }
        return !userRepository.existsByUsername(username);
    }
}

输出过滤

typescript
// 敏感信息过滤
export class ResponseSerializer {
    private static readonly SENSITIVE_FIELDS = [
        'password',
        'creditCard',
        'ssn',
        'apiKey',
        'secret'
    ];
    
    static serialize(data: any, fields?: string[]): any {
        if (Array.isArray(data)) {
            return data.map(item => this.serialize(item, fields));
        }
        
        if (typeof data !== 'object' || data === null) {
            return data;
        }
        
        const result: any = {};
        
        for (const [key, value] of Object.entries(data)) {
            // 过滤敏感字段
            if (this.SENSITIVE_FIELDS.includes(key)) {
                continue;
            }
            
            // 字段选择
            if (fields && !fields.includes(key)) {
                continue;
            }
            
            // 递归处理嵌套对象
            if (typeof value === 'object' && value !== null) {
                result[key] = this.serialize(value, fields);
            } else {
                result[key] = value;
            }
        }
        
        return result;
    }
    
    // 数据脱敏
    static maskSensitiveData(data: any): any {
        if (typeof data !== 'object' || data === null) {
            return data;
        }
        
        const masked = { ...data };
        
        // 邮箱脱敏
        if (masked.email) {
            masked.email = this.maskEmail(masked.email);
        }
        
        // 手机号脱敏
        if (masked.phone) {
            masked.phone = this.maskPhone(masked.phone);
        }
        
        // 身份证脱敏
        if (masked.idCard) {
            masked.idCard = this.maskIdCard(masked.idCard);
        }
        
        return masked;
    }
    
    private static maskEmail(email: string): string {
        const [name, domain] = email.split('@');
        const maskedName = name.charAt(0) + '*'.repeat(name.length - 2) + name.charAt(name.length - 1);
        return `${maskedName}@${domain}`;
    }
    
    private static maskPhone(phone: string): string {
        return phone.replace(/(\d{3})\d{4}(\d{4})/, '$1****$2');
    }
    
    private static maskIdCard(idCard: string): string {
        return idCard.replace(/(\d{6})\d{8}(\d{4})/, '$1********$2');
    }
}

安全最佳实践

认证和授权

python
from functools import wraps
from typing import List, Optional
import jwt
from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

class JWTBearer:
    """JWT 认证"""
    
    def __init__(self, required_roles: Optional[List[str]] = None):
        self.required_roles = required_roles or []
    
    async def __call__(self, credentials: HTTPAuthorizationCredentials = Security(security)):
        token = credentials.credentials
        
        try:
            # 验证 token
            payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
            user_id = payload.get("sub")
            user_roles = payload.get("roles", [])
            
            if not user_id:
                raise HTTPException(status_code=403, detail="Invalid authentication credentials")
            
            # 检查角色权限
            if self.required_roles:
                if not any(role in user_roles for role in self.required_roles):
                    raise HTTPException(status_code=403, detail="Insufficient permissions")
            
            return {"user_id": user_id, "roles": user_roles}
            
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=403, detail="Token has expired")
        except jwt.InvalidTokenError:
            raise HTTPException(status_code=403, detail="Invalid token")

# 使用示例
@app.get("/api/v1/admin/users")
async def get_all_users(auth: dict = Depends(JWTBearer(required_roles=["admin"]))):
    """需要 admin 角色才能访问"""
    return await user_service.get_all_users()

# 基于资源的授权
class ResourceAuthorization:
    """资源级别的授权"""
    
    @staticmethod
    async def can_access_resource(user_id: str, resource_type: str, resource_id: str, action: str) -> bool:
        # 检查用户是否有权限访问特定资源
        permission = await db.query(Permission).filter(
            Permission.user_id == user_id,
            Permission.resource_type == resource_type,
            Permission.resource_id == resource_id,
            Permission.action == action
        ).first()
        
        return permission is not None
    
    @staticmethod
    def require_resource_permission(resource_type: str, action: str):
        """装饰器:检查资源权限"""
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                # 从请求中获取用户和资源信息
                user_id = kwargs.get("current_user", {}).get("user_id")
                resource_id = kwargs.get("resource_id") or kwargs.get("id")
                
                if not await ResourceAuthorization.can_access_resource(
                    user_id, resource_type, resource_id, action
                ):
                    raise HTTPException(status_code=403, detail="Access denied")
                
                return await func(*args, **kwargs)
            return wrapper
        return decorator

API 密钥管理

java
@Component
public class ApiKeyAuthenticationFilter extends OncePerRequestFilter {
    
    private final ApiKeyService apiKeyService;
    private final RateLimiter rateLimiter;
    
    @Override
    protected void doFilterInternal(HttpServletRequest request, 
                                  HttpServletResponse response, 
                                  FilterChain filterChain) throws ServletException, IOException {
        String apiKey = extractApiKey(request);
        
        if (apiKey != null) {
            try {
                // 验证 API 密钥
                ApiKeyDetails details = apiKeyService.validateApiKey(apiKey);
                
                // 检查速率限制
                if (!rateLimiter.allowRequest(apiKey)) {
                    response.setStatus(HttpServletResponse.SC_TOO_MANY_REQUESTS);
                    response.getWriter().write("{\"error\": \"Rate limit exceeded\"}");
                    return;
                }
                
                // 记录使用情况
                apiKeyService.recordUsage(apiKey, request);
                
                // 设置认证信息
                SecurityContextHolder.getContext().setAuthentication(
                    new ApiKeyAuthentication(details)
                );
                
            } catch (InvalidApiKeyException e) {
                response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
                response.getWriter().write("{\"error\": \"Invalid API key\"}");
                return;
            }
        }
        
        filterChain.doFilter(request, response);
    }
    
    private String extractApiKey(HttpServletRequest request) {
        // 从 header 获取
        String apiKey = request.getHeader("X-API-Key");
        if (apiKey != null) {
            return apiKey;
        }
        
        // 从查询参数获取(不推荐)
        return request.getParameter("api_key");
    }
}

// API 密钥服务
@Service
@Transactional
public class ApiKeyService {
    
    public ApiKey generateApiKey(String userId, String name, Set<String> scopes) {
        // 生成安全的 API 密钥
        String key = generateSecureKey();
        String hashedKey = hashApiKey(key);
        
        ApiKey apiKey = ApiKey.builder()
            .id(UUID.randomUUID())
            .userId(userId)
            .name(name)
            .keyHash(hashedKey)
            .keyPrefix(key.substring(0, 8))  // 用于识别
            .scopes(scopes)
            .createdAt(LocalDateTime.now())
            .lastUsedAt(null)
            .expiresAt(LocalDateTime.now().plusYears(1))
            .build();
            
        apiKeyRepository.save(apiKey);
        
        // 只返回一次明文密钥
        apiKey.setKey(key);
        return apiKey;
    }
    
    private String generateSecureKey() {
        byte[] random = new byte[32];
        new SecureRandom().nextBytes(random);
        return Base64.getUrlEncoder().withoutPadding().encodeToString(random);
    }
    
    private String hashApiKey(String key) {
        return BCrypt.hashpw(key, BCrypt.gensalt(12));
    }
}

CORS 配置

typescript
// Express CORS 配置
import cors from 'cors';

const corsOptions: cors.CorsOptions = {
    // 允许的源
    origin: (origin, callback) => {
        const allowedOrigins = [
            'https://app.example.com',
            'https://admin.example.com',
            'http://localhost:3000' // 开发环境
        ];
        
        // 允许没有 origin 的请求(如 Postman)
        if (!origin) {
            return callback(null, true);
        }
        
        if (allowedOrigins.includes(origin)) {
            callback(null, true);
        } else {
            callback(new Error('Not allowed by CORS'));
        }
    },
    
    // 允许的方法
    methods: ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'],
    
    // 允许的头部
    allowedHeaders: [
        'Content-Type',
        'Authorization',
        'X-API-Key',
        'X-Request-ID'
    ],
    
    // 暴露的头部
    exposedHeaders: [
        'X-Total-Count',
        'X-Page-Count',
        'X-Request-ID'
    ],
    
    // 是否允许凭证
    credentials: true,
    
    // 预检请求缓存时间
    maxAge: 86400 // 24小时
};

app.use(cors(corsOptions));

// 处理预检请求
app.options('*', cors(corsOptions));

文档最佳实践

OpenAPI 文档编写

yaml
openapi: 3.0.3
info:
  title: API Service
  description: |
    # API 服务文档
    
    ## 概述
    这是我们的 RESTful API 服务,提供产品管理、用户认证等功能。
    
    ## 认证
    API 使用 Bearer Token 认证。在请求头中添加:
    ```
    Authorization: Bearer <your-token>
    ```
    
    ## 错误处理
    所有错误响应都遵循统一格式:
    ```json
    {
      "error": {
        "code": "ERROR_CODE",
        "message": "错误描述",
        "details": {}
      }
    }
    ```
    
    ## 速率限制
    - 认证用户:1000 请求/小时
    - 未认证用户:100 请求/小时
    
  version: 1.0.0
  contact:
    name: API Support
    email: api-support@example.com
  license:
    name: MIT
    url: https://opensource.org/licenses/MIT

servers:
  - url: https://api.example.com
    description: 生产环境
  - url: https://staging-api.example.com
    description: 测试环境
  - url: http://localhost:8080
    description: 本地开发

tags:
  - name: Products
    description: 产品管理相关接口
  - name: Users
    description: 用户管理相关接口
  - name: Auth
    description: 认证相关接口

paths:
  /api/v1/products:
    get:
      tags: [Products]
      summary: 获取产品列表
      description: |
        获取产品列表,支持分页、搜索和过滤。
        
        ### 使用示例
        ```bash
        curl -X GET "https://api.example.com/api/v1/products?page=1&limit=20&category=electronics"
        ```
      operationId: getProducts
      parameters:
        - $ref: '#/components/parameters/PageParam'
        - $ref: '#/components/parameters/LimitParam'
        - name: search
          in: query
          description: 搜索关键词
          schema:
            type: string
            example: "iPhone"
        - name: category
          in: query
          description: 产品分类
          schema:
            type: string
            enum: [electronics, clothing, food, books]
        - name: minPrice
          in: query
          description: 最低价格
          schema:
            type: number
            minimum: 0
        - name: maxPrice
          in: query
          description: 最高价格
          schema:
            type: number
            minimum: 0
      responses:
        '200':
          description: 成功获取产品列表
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/ProductListResponse'
              examples:
                success:
                  $ref: '#/components/examples/ProductListExample'
        '400':
          $ref: '#/components/responses/BadRequest'
        '401':
          $ref: '#/components/responses/Unauthorized'
        '500':
          $ref: '#/components/responses/InternalError'

API 变更日志

markdown
# API 变更日志

## [2.0.0] - 2024-01-15

### 破坏性变更
- **产品价格结构变更**`price` 字段从数字类型改为对象
  ```json
  // 旧版本
  "price": 99.99
  
  // 新版本
  "pricing": {
    "amount": 99.99,
    "currency": "USD",
    "tax_included": true
  }
  • 移除废弃端点GET /api/v1/products/search 已移除,请使用 GET /api/v1/products?search=keyword

新增功能

  • 添加批量操作端点:POST /api/v1/products/batch
  • 支持 WebSocket 实时更新:ws://api.example.com/ws
  • 新增字段过滤功能:GET /api/v1/products?fields=id,name,price

改进

  • 优化分页性能,支持游标分页
  • 增强错误信息,添加 suggestion 字段
  • 扩展搜索功能,支持全文搜索

修复

  • 修复特殊字符搜索导致的错误
  • 修复并发更新时的数据竞争问题

废弃通知

  • GET /api/v1/products/{id}/related 将在 v3.0.0 中移除
  • X-API-Version 头部将在 v3.0.0 中移除,请使用 URL 版本

## 测试最佳实践

### 测试金字塔

```python
# 单元测试示例
import pytest
from unittest.mock import Mock, patch

class TestProductService:
    """产品服务单元测试"""
    
    @pytest.fixture
    def product_service(self):
        mock_repo = Mock()
        mock_cache = Mock()
        return ProductService(mock_repo, mock_cache)
    
    @pytest.mark.asyncio
    async def test_get_product_from_cache(self, product_service):
        # 准备测试数据
        product_id = "123"
        cached_product = {"id": product_id, "name": "Test Product"}
        
        # 设置 mock 行为
        product_service.cache.get.return_value = cached_product
        
        # 执行测试
        result = await product_service.get_product(product_id)
        
        # 断言
        assert result == cached_product
        product_service.cache.get.assert_called_once_with(f"product:{product_id}")
        product_service.repository.find_by_id.assert_not_called()
    
    @pytest.mark.asyncio
    async def test_get_product_from_database(self, product_service):
        # 缓存未命中的情况
        product_service.cache.get.return_value = None
        
        db_product = {"id": "123", "name": "Test Product"}
        product_service.repository.find_by_id.return_value = db_product
        
        result = await product_service.get_product("123")
        
        assert result == db_product
        product_service.cache.set.assert_called_once()

# 集成测试示例
@pytest.mark.integration
class TestProductAPI:
    """产品 API 集成测试"""
    
    @pytest.fixture
    async def client(self):
        async with AsyncClient(app=app, base_url="http://test") as ac:
            yield ac
    
    @pytest.fixture
    async def auth_headers(self, client):
        # 创建测试用户并获取 token
        response = await client.post("/api/v1/auth/register", json={
            "username": "testuser",
            "email": "test@example.com",
            "password": "Test123!"
        })
        token = response.json()["token"]
        return {"Authorization": f"Bearer {token}"}
    
    @pytest.mark.asyncio
    async def test_create_product_flow(self, client, auth_headers):
        # 创建产品
        create_response = await client.post(
            "/api/v1/products",
            json={
                "name": "Integration Test Product",
                "price": 99.99,
                "category": "electronics"
            },
            headers=auth_headers
        )
        
        assert create_response.status_code == 201
        product_id = create_response.json()["id"]
        
        # 获取创建的产品
        get_response = await client.get(f"/api/v1/products/{product_id}")
        assert get_response.status_code == 200
        assert get_response.json()["name"] == "Integration Test Product"
        
        # 更新产品
        update_response = await client.patch(
            f"/api/v1/products/{product_id}",
            json={"price": 149.99},
            headers=auth_headers
        )
        
        assert update_response.status_code == 200
        assert update_response.json()["price"] == 149.99
        
        # 删除产品
        delete_response = await client.delete(
            f"/api/v1/products/{product_id}",
            headers=auth_headers
        )
        
        assert delete_response.status_code == 204
        
        # 验证删除
        get_deleted = await client.get(f"/api/v1/products/{product_id}")
        assert get_deleted.status_code == 404

部署最佳实践

容器化

dockerfile
# 多阶段构建 Dockerfile
FROM node:18-alpine AS builder

# 安装构建依赖
RUN apk add --no-cache python3 make g++

WORKDIR /app

# 缓存依赖
COPY package*.json ./
RUN npm ci --only=production

# 复制源代码
COPY . .

# 构建应用
RUN npm run build

# 生产镜像
FROM node:18-alpine

# 安全:创建非 root 用户
RUN addgroup -g 1001 -S nodejs
RUN adduser -S nodejs -u 1001

# 安装运行时依赖
RUN apk add --no-cache tini

WORKDIR /app

# 复制构建产物
COPY --from=builder --chown=nodejs:nodejs /app/dist ./dist
COPY --from=builder --chown=nodejs:nodejs /app/node_modules ./node_modules
COPY --from=builder --chown=nodejs:nodejs /app/package*.json ./

# 切换用户
USER nodejs

# 暴露端口
EXPOSE 3000

# 使用 tini 作为 PID 1
ENTRYPOINT ["/sbin/tini", "--"]

# 启动应用
CMD ["node", "dist/server.js"]

健康检查

java
@RestController
@RequestMapping("/health")
public class HealthController {
    
    private final HealthCheckService healthCheckService;
    
    @GetMapping
    public ResponseEntity<HealthResponse> health() {
        HealthResponse response = new HealthResponse();
        response.setStatus("UP");
        response.setTimestamp(Instant.now());
        response.setVersion(buildProperties.getVersion());
        
        return ResponseEntity.ok(response);
    }
    
    @GetMapping("/live")
    public ResponseEntity<Map<String, String>> liveness() {
        // 简单的存活检查
        return ResponseEntity.ok(Map.of("status", "alive"));
    }
    
    @GetMapping("/ready")
    public ResponseEntity<ReadinessResponse> readiness() {
        ReadinessResponse response = new ReadinessResponse();
        
        // 检查各个依赖
        response.setDatabase(healthCheckService.checkDatabase());
        response.setRedis(healthCheckService.checkRedis());
        response.setExternalApi(healthCheckService.checkExternalApi());
        
        boolean isReady = response.getDatabase().isHealthy() 
            && response.getRedis().isHealthy();
        
        return ResponseEntity
            .status(isReady ? HttpStatus.OK : HttpStatus.SERVICE_UNAVAILABLE)
            .body(response);
    }
    
    @GetMapping("/metrics")
    public ResponseEntity<MetricsResponse> metrics() {
        MetricsResponse response = new MetricsResponse();
        response.setUptime(ManagementFactory.getRuntimeMXBean().getUptime());
        response.setMemoryUsage(getMemoryUsage());
        response.setThreadCount(Thread.activeCount());
        response.setRequestCount(metricsService.getRequestCount());
        response.setErrorRate(metricsService.getErrorRate());
        
        return ResponseEntity.ok(response);
    }
}

监控和日志最佳实践

结构化日志

typescript
// 日志中间件
export const loggingMiddleware = (req: Request, res: Response, next: NextFunction) => {
    const start = Date.now();
    const requestId = req.headers['x-request-id'] || uuidv4();
    
    // 添加请求ID到请求对象
    req.requestId = requestId;
    res.setHeader('X-Request-ID', requestId);
    
    // 记录请求
    logger.info('Incoming request', {
        requestId,
        method: req.method,
        path: req.path,
        query: req.query,
        ip: req.ip,
        userAgent: req.get('user-agent')
    });
    
    // 拦截响应
    const originalSend = res.send;
    res.send = function(data) {
        res.send = originalSend;
        
        const duration = Date.now() - start;
        
        // 记录响应
        logger.info('Outgoing response', {
            requestId,
            statusCode: res.statusCode,
            duration,
            contentLength: res.get('content-length')
        });
        
        // 记录慢请求
        if (duration > 1000) {
            logger.warn('Slow request detected', {
                requestId,
                duration,
                path: req.path
            });
        }
        
        return res.send(data);
    };
    
    next();
};

// 审计日志
export class AuditLogger {
    static async logAction(
        userId: string,
        action: string,
        resource: string,
        resourceId: string,
        changes?: any
    ) {
        const auditLog = {
            timestamp: new Date().toISOString(),
            userId,
            action,
            resource,
            resourceId,
            changes,
            ip: getCurrentUserIp(),
            userAgent: getCurrentUserAgent()
        };
        
        // 异步写入审计日志
        await auditLogRepository.create(auditLog);
        
        // 发送到集中日志系统
        logger.audit('User action', auditLog);
    }
}

性能监控

python
from prometheus_client import Counter, Histogram, Gauge
import time
from functools import wraps

# 定义指标
request_count = Counter('api_requests_total', 'Total API requests', ['method', 'endpoint', 'status'])
request_duration = Histogram('api_request_duration_seconds', 'API request duration', ['method', 'endpoint'])
active_connections = Gauge('api_active_connections', 'Active API connections')
error_rate = Counter('api_errors_total', 'Total API errors', ['type', 'endpoint'])

def monitor_performance(endpoint_name: str):
    """性能监控装饰器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 增加活跃连接数
            active_connections.inc()
            start_time = time.time()
            
            try:
                # 执行函数
                result = await func(*args, **kwargs)
                
                # 记录成功请求
                request_count.labels(
                    method=request.method,
                    endpoint=endpoint_name,
                    status='success'
                ).inc()
                
                return result
                
            except Exception as e:
                # 记录错误
                error_rate.labels(
                    type=type(e).__name__,
                    endpoint=endpoint_name
                ).inc()
                
                request_count.labels(
                    method=request.method,
                    endpoint=endpoint_name,
                    status='error'
                ).inc()
                
                raise
                
            finally:
                # 记录请求时长
                duration = time.time() - start_time
                request_duration.labels(
                    method=request.method,
                    endpoint=endpoint_name
                ).observe(duration)
                
                # 减少活跃连接数
                active_connections.dec()
        
        return wrapper
    return decorator

总结清单

API 设计清单

  • [ ] 遵循 RESTful 原则
  • [ ] 使用一致的命名约定
  • [ ] 正确使用 HTTP 状态码
  • [ ] 实现版本控制策略
  • [ ] 提供清晰的错误消息
  • [ ] 支持分页和过滤
  • [ ] 实现幂等性操作
  • [ ] 考虑向后兼容性

安全清单

  • [ ] 实施认证和授权
  • [ ] 使用 HTTPS
  • [ ] 验证所有输入
  • [ ] 防止 SQL 注入
  • [ ] 实施速率限制
  • [ ] 保护敏感数据
  • [ ] 使用安全的密钥管理
  • [ ] 定期安全审计

性能清单

  • [ ] 优化数据库查询
  • [ ] 实施缓存策略
  • [ ] 使用连接池
  • [ ] 支持批量操作
  • [ ] 启用响应压缩
  • [ ] 实施分页
  • [ ] 监控性能指标
  • [ ] 进行负载测试

文档清单

  • [ ] 编写 OpenAPI 规范
  • [ ] 提供使用示例
  • [ ] 记录错误代码
  • [ ] 维护变更日志
  • [ ] 编写快速开始指南
  • [ ] 提供 SDK 和代码示例
  • [ ] 记录认证方式
  • [ ] 说明速率限制

测试清单

  • [ ] 编写单元测试(>80% 覆盖率)
  • [ ] 实施集成测试
  • [ ] 进行契约测试
  • [ ] 执行端到端测试
  • [ ] 进行性能测试
  • [ ] 实施安全测试
  • [ ] 自动化测试流程
  • [ ] 测试错误场景

相关资源

SOLO Development Guide