API 开发最佳实践
构建高质量、可维护、可扩展 API 的综合指南
设计原则
API 设计最佳实践
1. RESTful 设计规范
yaml
# ✅ 好的示例
GET /api/v1/users # 获取用户列表
GET /api/v1/users/123 # 获取特定用户
POST /api/v1/users # 创建用户
PUT /api/v1/users/123 # 完整更新用户
PATCH /api/v1/users/123 # 部分更新用户
DELETE /api/v1/users/123 # 删除用户
# 子资源
GET /api/v1/users/123/orders # 获取用户订单
POST /api/v1/users/123/orders # 创建用户订单
# 行为/操作
POST /api/v1/users/123/activate # 激活用户
POST /api/v1/orders/456/cancel # 取消订单
# ❌ 避免的示例
GET /api/v1/getUsers # 动词在 URL 中
POST /api/v1/users/create # 冗余的动作
GET /api/v1/user_list # 不一致的命名2. 命名约定
typescript
// API 路径命名
const API_CONVENTIONS = {
// 使用复数名词
resources: '/products', // ✅
notResource: '/product', // ❌
// 使用连字符分隔
multiWord: '/user-profiles', // ✅
notCamelCase: '/userProfiles', // ❌
notSnakeCase: '/user_profiles', // ❌
// 使用小写
lowercase: '/api/v1/products', // ✅
notMixedCase: '/API/V1/Products', // ❌
};
// 请求/响应字段命名
interface ProductResponse {
// 使用 camelCase
productId: string; // ✅
productName: string; // ✅
createdAt: string; // ✅
// 避免
product_id: string; // ❌ snake_case
ProductName: string; // ❌ PascalCase
created_at: string; // ❌ snake_case
}3. 状态码使用
python
from http import HTTPStatus
class APIResponse:
"""正确使用 HTTP 状态码"""
# 2xx 成功
@staticmethod
def success(data):
return {"data": data}, HTTPStatus.OK # 200
@staticmethod
def created(data, location):
return {"data": data}, HTTPStatus.CREATED, {"Location": location} # 201
@staticmethod
def accepted(task_id):
return {"taskId": task_id, "status": "processing"}, HTTPStatus.ACCEPTED # 202
@staticmethod
def no_content():
return "", HTTPStatus.NO_CONTENT # 204
# 4xx 客户端错误
@staticmethod
def bad_request(message, errors=None):
return {"error": {"message": message, "details": errors}}, HTTPStatus.BAD_REQUEST # 400
@staticmethod
def unauthorized(message="Authentication required"):
return {"error": {"message": message}}, HTTPStatus.UNAUTHORIZED # 401
@staticmethod
def forbidden(message="Permission denied"):
return {"error": {"message": message}}, HTTPStatus.FORBIDDEN # 403
@staticmethod
def not_found(resource, id):
return {"error": {"message": f"{resource} {id} not found"}}, HTTPStatus.NOT_FOUND # 404
@staticmethod
def conflict(message):
return {"error": {"message": message}}, HTTPStatus.CONFLICT # 409
@staticmethod
def unprocessable_entity(message, errors=None):
return {"error": {"message": message, "details": errors}}, HTTPStatus.UNPROCESSABLE_ENTITY # 422
@staticmethod
def too_many_requests(retry_after):
return {"error": {"message": "Rate limit exceeded"}}, HTTPStatus.TOO_MANY_REQUESTS, {"Retry-After": str(retry_after)} # 429
# 5xx 服务器错误
@staticmethod
def internal_error(request_id):
return {"error": {"message": "Internal server error", "requestId": request_id}}, HTTPStatus.INTERNAL_SERVER_ERROR # 500
@staticmethod
def service_unavailable(retry_after=None):
headers = {"Retry-After": str(retry_after)} if retry_after else {}
return {"error": {"message": "Service temporarily unavailable"}}, HTTPStatus.SERVICE_UNAVAILABLE, headers # 503数据验证最佳实践
输入验证
java
// Spring Boot 验证示例
@RestController
@RequestMapping("/api/v1/users")
@Validated
public class UserController {
@PostMapping
public ResponseEntity<User> createUser(
@Valid @RequestBody CreateUserRequest request) {
// 验证通过后的处理
User user = userService.createUser(request);
return ResponseEntity.created(URI.create("/api/v1/users/" + user.getId()))
.body(user);
}
}
@Data
public class CreateUserRequest {
@NotBlank(message = "用户名不能为空")
@Size(min = 3, max = 50, message = "用户名长度必须在3-50个字符之间")
@Pattern(regexp = "^[a-zA-Z0-9_]+$", message = "用户名只能包含字母、数字和下划线")
private String username;
@NotBlank(message = "邮箱不能为空")
@Email(message = "邮箱格式不正确")
private String email;
@NotBlank(message = "密码不能为空")
@Size(min = 8, message = "密码长度至少8位")
@Pattern(
regexp = "^(?=.*[a-z])(?=.*[A-Z])(?=.*\\d)(?=.*[@$!%*?&])[A-Za-z\\d@$!%*?&]+$",
message = "密码必须包含大小写字母、数字和特殊字符"
)
private String password;
@NotNull(message = "年龄不能为空")
@Min(value = 18, message = "年龄必须大于等于18")
@Max(value = 120, message = "年龄必须小于等于120")
private Integer age;
@Valid
@NotNull(message = "地址信息不能为空")
private Address address;
}
// 自定义验证器
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Constraint(validatedBy = UniqueUsernameValidator.class)
public @interface UniqueUsername {
String message() default "用户名已存在";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
}
@Component
public class UniqueUsernameValidator implements ConstraintValidator<UniqueUsername, String> {
@Autowired
private UserRepository userRepository;
@Override
public boolean isValid(String username, ConstraintValidatorContext context) {
if (username == null) {
return true; // 让 @NotNull 处理
}
return !userRepository.existsByUsername(username);
}
}输出过滤
typescript
// 敏感信息过滤
export class ResponseSerializer {
private static readonly SENSITIVE_FIELDS = [
'password',
'creditCard',
'ssn',
'apiKey',
'secret'
];
static serialize(data: any, fields?: string[]): any {
if (Array.isArray(data)) {
return data.map(item => this.serialize(item, fields));
}
if (typeof data !== 'object' || data === null) {
return data;
}
const result: any = {};
for (const [key, value] of Object.entries(data)) {
// 过滤敏感字段
if (this.SENSITIVE_FIELDS.includes(key)) {
continue;
}
// 字段选择
if (fields && !fields.includes(key)) {
continue;
}
// 递归处理嵌套对象
if (typeof value === 'object' && value !== null) {
result[key] = this.serialize(value, fields);
} else {
result[key] = value;
}
}
return result;
}
// 数据脱敏
static maskSensitiveData(data: any): any {
if (typeof data !== 'object' || data === null) {
return data;
}
const masked = { ...data };
// 邮箱脱敏
if (masked.email) {
masked.email = this.maskEmail(masked.email);
}
// 手机号脱敏
if (masked.phone) {
masked.phone = this.maskPhone(masked.phone);
}
// 身份证脱敏
if (masked.idCard) {
masked.idCard = this.maskIdCard(masked.idCard);
}
return masked;
}
private static maskEmail(email: string): string {
const [name, domain] = email.split('@');
const maskedName = name.charAt(0) + '*'.repeat(name.length - 2) + name.charAt(name.length - 1);
return `${maskedName}@${domain}`;
}
private static maskPhone(phone: string): string {
return phone.replace(/(\d{3})\d{4}(\d{4})/, '$1****$2');
}
private static maskIdCard(idCard: string): string {
return idCard.replace(/(\d{6})\d{8}(\d{4})/, '$1********$2');
}
}安全最佳实践
认证和授权
python
from functools import wraps
from typing import List, Optional
import jwt
from fastapi import HTTPException, Security, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
class JWTBearer:
"""JWT 认证"""
def __init__(self, required_roles: Optional[List[str]] = None):
self.required_roles = required_roles or []
async def __call__(self, credentials: HTTPAuthorizationCredentials = Security(security)):
token = credentials.credentials
try:
# 验证 token
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
user_id = payload.get("sub")
user_roles = payload.get("roles", [])
if not user_id:
raise HTTPException(status_code=403, detail="Invalid authentication credentials")
# 检查角色权限
if self.required_roles:
if not any(role in user_roles for role in self.required_roles):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return {"user_id": user_id, "roles": user_roles}
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=403, detail="Token has expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=403, detail="Invalid token")
# 使用示例
@app.get("/api/v1/admin/users")
async def get_all_users(auth: dict = Depends(JWTBearer(required_roles=["admin"]))):
"""需要 admin 角色才能访问"""
return await user_service.get_all_users()
# 基于资源的授权
class ResourceAuthorization:
"""资源级别的授权"""
@staticmethod
async def can_access_resource(user_id: str, resource_type: str, resource_id: str, action: str) -> bool:
# 检查用户是否有权限访问特定资源
permission = await db.query(Permission).filter(
Permission.user_id == user_id,
Permission.resource_type == resource_type,
Permission.resource_id == resource_id,
Permission.action == action
).first()
return permission is not None
@staticmethod
def require_resource_permission(resource_type: str, action: str):
"""装饰器:检查资源权限"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 从请求中获取用户和资源信息
user_id = kwargs.get("current_user", {}).get("user_id")
resource_id = kwargs.get("resource_id") or kwargs.get("id")
if not await ResourceAuthorization.can_access_resource(
user_id, resource_type, resource_id, action
):
raise HTTPException(status_code=403, detail="Access denied")
return await func(*args, **kwargs)
return wrapper
return decoratorAPI 密钥管理
java
@Component
public class ApiKeyAuthenticationFilter extends OncePerRequestFilter {
private final ApiKeyService apiKeyService;
private final RateLimiter rateLimiter;
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
String apiKey = extractApiKey(request);
if (apiKey != null) {
try {
// 验证 API 密钥
ApiKeyDetails details = apiKeyService.validateApiKey(apiKey);
// 检查速率限制
if (!rateLimiter.allowRequest(apiKey)) {
response.setStatus(HttpServletResponse.SC_TOO_MANY_REQUESTS);
response.getWriter().write("{\"error\": \"Rate limit exceeded\"}");
return;
}
// 记录使用情况
apiKeyService.recordUsage(apiKey, request);
// 设置认证信息
SecurityContextHolder.getContext().setAuthentication(
new ApiKeyAuthentication(details)
);
} catch (InvalidApiKeyException e) {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
response.getWriter().write("{\"error\": \"Invalid API key\"}");
return;
}
}
filterChain.doFilter(request, response);
}
private String extractApiKey(HttpServletRequest request) {
// 从 header 获取
String apiKey = request.getHeader("X-API-Key");
if (apiKey != null) {
return apiKey;
}
// 从查询参数获取(不推荐)
return request.getParameter("api_key");
}
}
// API 密钥服务
@Service
@Transactional
public class ApiKeyService {
public ApiKey generateApiKey(String userId, String name, Set<String> scopes) {
// 生成安全的 API 密钥
String key = generateSecureKey();
String hashedKey = hashApiKey(key);
ApiKey apiKey = ApiKey.builder()
.id(UUID.randomUUID())
.userId(userId)
.name(name)
.keyHash(hashedKey)
.keyPrefix(key.substring(0, 8)) // 用于识别
.scopes(scopes)
.createdAt(LocalDateTime.now())
.lastUsedAt(null)
.expiresAt(LocalDateTime.now().plusYears(1))
.build();
apiKeyRepository.save(apiKey);
// 只返回一次明文密钥
apiKey.setKey(key);
return apiKey;
}
private String generateSecureKey() {
byte[] random = new byte[32];
new SecureRandom().nextBytes(random);
return Base64.getUrlEncoder().withoutPadding().encodeToString(random);
}
private String hashApiKey(String key) {
return BCrypt.hashpw(key, BCrypt.gensalt(12));
}
}CORS 配置
typescript
// Express CORS 配置
import cors from 'cors';
const corsOptions: cors.CorsOptions = {
// 允许的源
origin: (origin, callback) => {
const allowedOrigins = [
'https://app.example.com',
'https://admin.example.com',
'http://localhost:3000' // 开发环境
];
// 允许没有 origin 的请求(如 Postman)
if (!origin) {
return callback(null, true);
}
if (allowedOrigins.includes(origin)) {
callback(null, true);
} else {
callback(new Error('Not allowed by CORS'));
}
},
// 允许的方法
methods: ['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'],
// 允许的头部
allowedHeaders: [
'Content-Type',
'Authorization',
'X-API-Key',
'X-Request-ID'
],
// 暴露的头部
exposedHeaders: [
'X-Total-Count',
'X-Page-Count',
'X-Request-ID'
],
// 是否允许凭证
credentials: true,
// 预检请求缓存时间
maxAge: 86400 // 24小时
};
app.use(cors(corsOptions));
// 处理预检请求
app.options('*', cors(corsOptions));文档最佳实践
OpenAPI 文档编写
yaml
openapi: 3.0.3
info:
title: API Service
description: |
# API 服务文档
## 概述
这是我们的 RESTful API 服务,提供产品管理、用户认证等功能。
## 认证
API 使用 Bearer Token 认证。在请求头中添加:
```
Authorization: Bearer <your-token>
```
## 错误处理
所有错误响应都遵循统一格式:
```json
{
"error": {
"code": "ERROR_CODE",
"message": "错误描述",
"details": {}
}
}
```
## 速率限制
- 认证用户:1000 请求/小时
- 未认证用户:100 请求/小时
version: 1.0.0
contact:
name: API Support
email: api-support@example.com
license:
name: MIT
url: https://opensource.org/licenses/MIT
servers:
- url: https://api.example.com
description: 生产环境
- url: https://staging-api.example.com
description: 测试环境
- url: http://localhost:8080
description: 本地开发
tags:
- name: Products
description: 产品管理相关接口
- name: Users
description: 用户管理相关接口
- name: Auth
description: 认证相关接口
paths:
/api/v1/products:
get:
tags: [Products]
summary: 获取产品列表
description: |
获取产品列表,支持分页、搜索和过滤。
### 使用示例
```bash
curl -X GET "https://api.example.com/api/v1/products?page=1&limit=20&category=electronics"
```
operationId: getProducts
parameters:
- $ref: '#/components/parameters/PageParam'
- $ref: '#/components/parameters/LimitParam'
- name: search
in: query
description: 搜索关键词
schema:
type: string
example: "iPhone"
- name: category
in: query
description: 产品分类
schema:
type: string
enum: [electronics, clothing, food, books]
- name: minPrice
in: query
description: 最低价格
schema:
type: number
minimum: 0
- name: maxPrice
in: query
description: 最高价格
schema:
type: number
minimum: 0
responses:
'200':
description: 成功获取产品列表
content:
application/json:
schema:
$ref: '#/components/schemas/ProductListResponse'
examples:
success:
$ref: '#/components/examples/ProductListExample'
'400':
$ref: '#/components/responses/BadRequest'
'401':
$ref: '#/components/responses/Unauthorized'
'500':
$ref: '#/components/responses/InternalError'API 变更日志
markdown
# API 变更日志
## [2.0.0] - 2024-01-15
### 破坏性变更
- **产品价格结构变更**:`price` 字段从数字类型改为对象
```json
// 旧版本
"price": 99.99
// 新版本
"pricing": {
"amount": 99.99,
"currency": "USD",
"tax_included": true
}- 移除废弃端点:
GET /api/v1/products/search已移除,请使用GET /api/v1/products?search=keyword
新增功能
- 添加批量操作端点:
POST /api/v1/products/batch - 支持 WebSocket 实时更新:
ws://api.example.com/ws - 新增字段过滤功能:
GET /api/v1/products?fields=id,name,price
改进
- 优化分页性能,支持游标分页
- 增强错误信息,添加
suggestion字段 - 扩展搜索功能,支持全文搜索
修复
- 修复特殊字符搜索导致的错误
- 修复并发更新时的数据竞争问题
废弃通知
GET /api/v1/products/{id}/related将在 v3.0.0 中移除X-API-Version头部将在 v3.0.0 中移除,请使用 URL 版本
## 测试最佳实践
### 测试金字塔
```python
# 单元测试示例
import pytest
from unittest.mock import Mock, patch
class TestProductService:
"""产品服务单元测试"""
@pytest.fixture
def product_service(self):
mock_repo = Mock()
mock_cache = Mock()
return ProductService(mock_repo, mock_cache)
@pytest.mark.asyncio
async def test_get_product_from_cache(self, product_service):
# 准备测试数据
product_id = "123"
cached_product = {"id": product_id, "name": "Test Product"}
# 设置 mock 行为
product_service.cache.get.return_value = cached_product
# 执行测试
result = await product_service.get_product(product_id)
# 断言
assert result == cached_product
product_service.cache.get.assert_called_once_with(f"product:{product_id}")
product_service.repository.find_by_id.assert_not_called()
@pytest.mark.asyncio
async def test_get_product_from_database(self, product_service):
# 缓存未命中的情况
product_service.cache.get.return_value = None
db_product = {"id": "123", "name": "Test Product"}
product_service.repository.find_by_id.return_value = db_product
result = await product_service.get_product("123")
assert result == db_product
product_service.cache.set.assert_called_once()
# 集成测试示例
@pytest.mark.integration
class TestProductAPI:
"""产品 API 集成测试"""
@pytest.fixture
async def client(self):
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
@pytest.fixture
async def auth_headers(self, client):
# 创建测试用户并获取 token
response = await client.post("/api/v1/auth/register", json={
"username": "testuser",
"email": "test@example.com",
"password": "Test123!"
})
token = response.json()["token"]
return {"Authorization": f"Bearer {token}"}
@pytest.mark.asyncio
async def test_create_product_flow(self, client, auth_headers):
# 创建产品
create_response = await client.post(
"/api/v1/products",
json={
"name": "Integration Test Product",
"price": 99.99,
"category": "electronics"
},
headers=auth_headers
)
assert create_response.status_code == 201
product_id = create_response.json()["id"]
# 获取创建的产品
get_response = await client.get(f"/api/v1/products/{product_id}")
assert get_response.status_code == 200
assert get_response.json()["name"] == "Integration Test Product"
# 更新产品
update_response = await client.patch(
f"/api/v1/products/{product_id}",
json={"price": 149.99},
headers=auth_headers
)
assert update_response.status_code == 200
assert update_response.json()["price"] == 149.99
# 删除产品
delete_response = await client.delete(
f"/api/v1/products/{product_id}",
headers=auth_headers
)
assert delete_response.status_code == 204
# 验证删除
get_deleted = await client.get(f"/api/v1/products/{product_id}")
assert get_deleted.status_code == 404部署最佳实践
容器化
dockerfile
# 多阶段构建 Dockerfile
FROM node:18-alpine AS builder
# 安装构建依赖
RUN apk add --no-cache python3 make g++
WORKDIR /app
# 缓存依赖
COPY package*.json ./
RUN npm ci --only=production
# 复制源代码
COPY . .
# 构建应用
RUN npm run build
# 生产镜像
FROM node:18-alpine
# 安全:创建非 root 用户
RUN addgroup -g 1001 -S nodejs
RUN adduser -S nodejs -u 1001
# 安装运行时依赖
RUN apk add --no-cache tini
WORKDIR /app
# 复制构建产物
COPY --from=builder --chown=nodejs:nodejs /app/dist ./dist
COPY --from=builder --chown=nodejs:nodejs /app/node_modules ./node_modules
COPY --from=builder --chown=nodejs:nodejs /app/package*.json ./
# 切换用户
USER nodejs
# 暴露端口
EXPOSE 3000
# 使用 tini 作为 PID 1
ENTRYPOINT ["/sbin/tini", "--"]
# 启动应用
CMD ["node", "dist/server.js"]健康检查
java
@RestController
@RequestMapping("/health")
public class HealthController {
private final HealthCheckService healthCheckService;
@GetMapping
public ResponseEntity<HealthResponse> health() {
HealthResponse response = new HealthResponse();
response.setStatus("UP");
response.setTimestamp(Instant.now());
response.setVersion(buildProperties.getVersion());
return ResponseEntity.ok(response);
}
@GetMapping("/live")
public ResponseEntity<Map<String, String>> liveness() {
// 简单的存活检查
return ResponseEntity.ok(Map.of("status", "alive"));
}
@GetMapping("/ready")
public ResponseEntity<ReadinessResponse> readiness() {
ReadinessResponse response = new ReadinessResponse();
// 检查各个依赖
response.setDatabase(healthCheckService.checkDatabase());
response.setRedis(healthCheckService.checkRedis());
response.setExternalApi(healthCheckService.checkExternalApi());
boolean isReady = response.getDatabase().isHealthy()
&& response.getRedis().isHealthy();
return ResponseEntity
.status(isReady ? HttpStatus.OK : HttpStatus.SERVICE_UNAVAILABLE)
.body(response);
}
@GetMapping("/metrics")
public ResponseEntity<MetricsResponse> metrics() {
MetricsResponse response = new MetricsResponse();
response.setUptime(ManagementFactory.getRuntimeMXBean().getUptime());
response.setMemoryUsage(getMemoryUsage());
response.setThreadCount(Thread.activeCount());
response.setRequestCount(metricsService.getRequestCount());
response.setErrorRate(metricsService.getErrorRate());
return ResponseEntity.ok(response);
}
}监控和日志最佳实践
结构化日志
typescript
// 日志中间件
export const loggingMiddleware = (req: Request, res: Response, next: NextFunction) => {
const start = Date.now();
const requestId = req.headers['x-request-id'] || uuidv4();
// 添加请求ID到请求对象
req.requestId = requestId;
res.setHeader('X-Request-ID', requestId);
// 记录请求
logger.info('Incoming request', {
requestId,
method: req.method,
path: req.path,
query: req.query,
ip: req.ip,
userAgent: req.get('user-agent')
});
// 拦截响应
const originalSend = res.send;
res.send = function(data) {
res.send = originalSend;
const duration = Date.now() - start;
// 记录响应
logger.info('Outgoing response', {
requestId,
statusCode: res.statusCode,
duration,
contentLength: res.get('content-length')
});
// 记录慢请求
if (duration > 1000) {
logger.warn('Slow request detected', {
requestId,
duration,
path: req.path
});
}
return res.send(data);
};
next();
};
// 审计日志
export class AuditLogger {
static async logAction(
userId: string,
action: string,
resource: string,
resourceId: string,
changes?: any
) {
const auditLog = {
timestamp: new Date().toISOString(),
userId,
action,
resource,
resourceId,
changes,
ip: getCurrentUserIp(),
userAgent: getCurrentUserAgent()
};
// 异步写入审计日志
await auditLogRepository.create(auditLog);
// 发送到集中日志系统
logger.audit('User action', auditLog);
}
}性能监控
python
from prometheus_client import Counter, Histogram, Gauge
import time
from functools import wraps
# 定义指标
request_count = Counter('api_requests_total', 'Total API requests', ['method', 'endpoint', 'status'])
request_duration = Histogram('api_request_duration_seconds', 'API request duration', ['method', 'endpoint'])
active_connections = Gauge('api_active_connections', 'Active API connections')
error_rate = Counter('api_errors_total', 'Total API errors', ['type', 'endpoint'])
def monitor_performance(endpoint_name: str):
"""性能监控装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 增加活跃连接数
active_connections.inc()
start_time = time.time()
try:
# 执行函数
result = await func(*args, **kwargs)
# 记录成功请求
request_count.labels(
method=request.method,
endpoint=endpoint_name,
status='success'
).inc()
return result
except Exception as e:
# 记录错误
error_rate.labels(
type=type(e).__name__,
endpoint=endpoint_name
).inc()
request_count.labels(
method=request.method,
endpoint=endpoint_name,
status='error'
).inc()
raise
finally:
# 记录请求时长
duration = time.time() - start_time
request_duration.labels(
method=request.method,
endpoint=endpoint_name
).observe(duration)
# 减少活跃连接数
active_connections.dec()
return wrapper
return decorator总结清单
API 设计清单
- [ ] 遵循 RESTful 原则
- [ ] 使用一致的命名约定
- [ ] 正确使用 HTTP 状态码
- [ ] 实现版本控制策略
- [ ] 提供清晰的错误消息
- [ ] 支持分页和过滤
- [ ] 实现幂等性操作
- [ ] 考虑向后兼容性
安全清单
- [ ] 实施认证和授权
- [ ] 使用 HTTPS
- [ ] 验证所有输入
- [ ] 防止 SQL 注入
- [ ] 实施速率限制
- [ ] 保护敏感数据
- [ ] 使用安全的密钥管理
- [ ] 定期安全审计
性能清单
- [ ] 优化数据库查询
- [ ] 实施缓存策略
- [ ] 使用连接池
- [ ] 支持批量操作
- [ ] 启用响应压缩
- [ ] 实施分页
- [ ] 监控性能指标
- [ ] 进行负载测试
文档清单
- [ ] 编写 OpenAPI 规范
- [ ] 提供使用示例
- [ ] 记录错误代码
- [ ] 维护变更日志
- [ ] 编写快速开始指南
- [ ] 提供 SDK 和代码示例
- [ ] 记录认证方式
- [ ] 说明速率限制
测试清单
- [ ] 编写单元测试(>80% 覆盖率)
- [ ] 实施集成测试
- [ ] 进行契约测试
- [ ] 执行端到端测试
- [ ] 进行性能测试
- [ ] 实施安全测试
- [ ] 自动化测试流程
- [ ] 测试错误场景