集成测试
验证组件间的协作与交互
什么是集成测试?
集成测试验证多个组件或系统之间的交互是否正确。它位于单元测试和端到端测试之间,关注点包括:
- 🔗 组件交互: 验证模块间的接口
- 🗄️ 数据库集成: 测试真实的数据持久化
- 🌐 外部服务: 验证与第三方服务的集成
- 📡 消息传递: 测试异步通信机制
Spring Boot 集成测试
数据库集成测试
java
@SpringBootTest
@AutoConfigureMockMvc
@Transactional
@TestPropertySource(properties = {
"spring.datasource.url=jdbc:h2:mem:testdb",
"spring.jpa.hibernate.ddl-auto=create-drop"
})
public class UserIntegrationTest {
@Autowired
private MockMvc mockMvc;
@Autowired
private UserRepository userRepository;
@Autowired
private ObjectMapper objectMapper;
@Test
@DisplayName("完整的用户注册流程")
public void testCompleteUserRegistrationFlow() throws Exception {
// 1. 注册新用户
UserRegistrationRequest request = UserRegistrationRequest.builder()
.email("test@example.com")
.password("SecurePass123!")
.username("testuser")
.build();
MvcResult result = mockMvc.perform(post("/api/users/register")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(request)))
.andExpect(status().isCreated())
.andExpect(header().exists("Location"))
.andReturn();
String locationHeader = result.getResponse().getHeader("Location");
String userId = locationHeader.substring(locationHeader.lastIndexOf("/") + 1);
// 2. 验证用户已保存到数据库
Optional<User> savedUser = userRepository.findById(userId);
assertTrue(savedUser.isPresent());
assertEquals("test@example.com", savedUser.get().getEmail());
// 3. 测试登录
LoginRequest loginRequest = new LoginRequest("test@example.com", "SecurePass123!");
mockMvc.perform(post("/api/auth/login")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(loginRequest)))
.andExpect(status().isOk())
.andExpect(jsonPath("$.token").exists())
.andExpect(jsonPath("$.expiresIn").value(3600));
// 4. 使用 token 访问受保护资源
String token = extractToken(mockMvc.perform(post("/api/auth/login")
.contentType(MediaType.APPLICATION_JSON)
.content(objectMapper.writeValueAsString(loginRequest)))
.andReturn());
mockMvc.perform(get("/api/users/" + userId)
.header("Authorization", "Bearer " + token))
.andExpect(status().isOk())
.andExpect(jsonPath("$.email").value("test@example.com"));
}
@Test
@Sql(scripts = "/test-data/users.sql")
@DisplayName("查询用户列表 - 分页和排序")
public void testGetUsersWithPaginationAndSorting() throws Exception {
mockMvc.perform(get("/api/users")
.param("page", "0")
.param("size", "10")
.param("sort", "createdAt,desc"))
.andExpect(status().isOk())
.andExpect(jsonPath("$.content").isArray())
.andExpect(jsonPath("$.content.length()").value(10))
.andExpect(jsonPath("$.totalElements").value(25))
.andExpect(jsonPath("$.totalPages").value(3))
.andExpect(jsonPath("$.number").value(0));
}
}使用 TestContainers
java
@SpringBootTest
@Testcontainers
@ActiveProfiles("integration-test")
public class OrderServiceIntegrationTest {
@Container
static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:14")
.withDatabaseName("testdb")
.withUsername("test")
.withPassword("test");
@Container
static GenericContainer<?> redis = new GenericContainer<>("redis:6-alpine")
.withExposedPorts(6379);
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
@DynamicPropertySource
static void properties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", postgres::getJdbcUrl);
registry.add("spring.datasource.username", postgres::getUsername);
registry.add("spring.datasource.password", postgres::getPassword);
registry.add("spring.redis.host", redis::getHost);
registry.add("spring.redis.port", redis::getFirstMappedPort);
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired
private OrderService orderService;
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Test
@DisplayName("订单创建完整流程 - 包含缓存和消息发送")
public void testCompleteOrderCreationFlow() {
// 创建订单
CreateOrderRequest request = CreateOrderRequest.builder()
.userId("USER-123")
.items(List.of(
new OrderItem("PROD-001", 2, 99.99),
new OrderItem("PROD-002", 1, 149.99)
))
.build();
Order order = orderService.createOrder(request);
// 验证订单已保存
assertNotNull(order.getId());
assertEquals(OrderStatus.PENDING, order.getStatus());
assertEquals(349.97, order.getTotalAmount(), 0.01);
// 验证缓存
Order cachedOrder = (Order) redisTemplate.opsForValue()
.get("order:" + order.getId());
assertNotNull(cachedOrder);
assertEquals(order.getId(), cachedOrder.getId());
// 验证 Kafka 消息
ConsumerRecords<String, OrderEvent> records = KafkaTestUtils.getRecords(consumer);
assertEquals(1, records.count());
OrderEvent event = records.iterator().next().value();
assertEquals("ORDER_CREATED", event.getEventType());
assertEquals(order.getId(), event.getOrderId());
}
}Node.js 集成测试
Express + MongoDB 集成测试
typescript
// test/integration/user.integration.test.ts
import request from 'supertest';
import { MongoMemoryServer } from 'mongodb-memory-server';
import mongoose from 'mongoose';
import { app } from '../../src/app';
import { User } from '../../src/models/User';
import { generateToken } from '../../src/utils/auth';
describe('User API Integration Tests', () => {
let mongoServer: MongoMemoryServer;
beforeAll(async () => {
// 启动内存 MongoDB
mongoServer = await MongoMemoryServer.create();
const mongoUri = mongoServer.getUri();
await mongoose.connect(mongoUri);
});
afterAll(async () => {
await mongoose.disconnect();
await mongoServer.stop();
});
beforeEach(async () => {
// 清理数据库
await User.deleteMany({});
});
describe('POST /api/users', () => {
it('应该创建新用户并返回 token', async () => {
const userData = {
email: 'test@example.com',
password: 'SecurePass123!',
name: 'Test User'
};
const response = await request(app)
.post('/api/users')
.send(userData)
.expect(201);
expect(response.body).toMatchObject({
user: {
email: userData.email,
name: userData.name
},
token: expect.any(String)
});
// 验证用户已保存到数据库
const savedUser = await User.findOne({ email: userData.email });
expect(savedUser).toBeTruthy();
expect(savedUser?.password).not.toBe(userData.password); // 密码应该被哈希
});
it('应该防止重复注册', async () => {
// 创建第一个用户
await User.create({
email: 'existing@example.com',
password: 'hashed_password',
name: 'Existing User'
});
// 尝试使用相同邮箱注册
const response = await request(app)
.post('/api/users')
.send({
email: 'existing@example.com',
password: 'Password123!',
name: 'Another User'
})
.expect(409);
expect(response.body.error).toBe('Email already registered');
});
});
describe('Protected Routes', () => {
let authToken: string;
let userId: string;
beforeEach(async () => {
// 创建测试用户并生成 token
const user = await User.create({
email: 'auth@example.com',
password: 'hashed_password',
name: 'Auth User'
});
userId = user._id.toString();
authToken = generateToken(userId);
});
it('应该允许认证用户访问个人资料', async () => {
const response = await request(app)
.get(`/api/users/${userId}`)
.set('Authorization', `Bearer ${authToken}`)
.expect(200);
expect(response.body.email).toBe('auth@example.com');
});
it('应该拒绝未认证的请求', async () => {
await request(app)
.get(`/api/users/${userId}`)
.expect(401);
});
it('应该拒绝无效的 token', async () => {
await request(app)
.get(`/api/users/${userId}`)
.set('Authorization', 'Bearer invalid_token')
.expect(401);
});
});
});使用 Docker Compose 进行集成测试
yaml
# docker-compose.test.yml
version: '3.8'
services:
app:
build:
context: .
dockerfile: Dockerfile.test
environment:
- NODE_ENV=test
- DATABASE_URL=postgresql://test:test@postgres:5432/testdb
- REDIS_URL=redis://redis:6379
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
depends_on:
- postgres
- redis
- rabbitmq
command: npm run test:integration
volumes:
- ./test-results:/app/test-results
postgres:
image: postgres:14
environment:
- POSTGRES_USER=test
- POSTGRES_PASSWORD=test
- POSTGRES_DB=testdb
redis:
image: redis:6-alpine
rabbitmq:
image: rabbitmq:3-management
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guesttypescript
// test/integration/order.integration.test.ts
import { Container } from 'typedi';
import { createConnection, Connection } from 'typeorm';
import * as amqp from 'amqplib';
import Redis from 'ioredis';
import { OrderService } from '../../src/services/OrderService';
import { Order } from '../../src/entities/Order';
describe('Order Service Integration Tests', () => {
let connection: Connection;
let rabbitConnection: amqp.Connection;
let redisClient: Redis;
let orderService: OrderService;
beforeAll(async () => {
// 数据库连接
connection = await createConnection({
type: 'postgres',
url: process.env.DATABASE_URL,
entities: [Order],
synchronize: true
});
// RabbitMQ 连接
rabbitConnection = await amqp.connect(process.env.RABBITMQ_URL!);
const channel = await rabbitConnection.createChannel();
await channel.assertQueue('order-events');
// Redis 连接
redisClient = new Redis(process.env.REDIS_URL);
// 注入依赖
Container.set('db.connection', connection);
Container.set('rabbitmq.channel', channel);
Container.set('redis.client', redisClient);
orderService = Container.get(OrderService);
});
afterAll(async () => {
await connection.close();
await rabbitConnection.close();
await redisClient.quit();
});
it('应该创建订单并发送消息', async () => {
// 监听消息队列
const messages: any[] = [];
const channel = Container.get('rabbitmq.channel') as amqp.Channel;
await channel.consume('order-events', (msg) => {
if (msg) {
messages.push(JSON.parse(msg.content.toString()));
channel.ack(msg);
}
});
// 创建订单
const order = await orderService.createOrder({
userId: 'USER-123',
items: [
{ productId: 'PROD-001', quantity: 2, price: 99.99 }
]
});
// 等待消息处理
await new Promise(resolve => setTimeout(resolve, 1000));
// 验证订单
expect(order.id).toBeDefined();
expect(order.status).toBe('PENDING');
expect(order.totalAmount).toBe(199.98);
// 验证数据库
const savedOrder = await connection.getRepository(Order).findOne(order.id);
expect(savedOrder).toBeDefined();
// 验证缓存
const cachedOrder = await redisClient.get(`order:${order.id}`);
expect(cachedOrder).toBeDefined();
// 验证消息
expect(messages).toHaveLength(1);
expect(messages[0]).toMatchObject({
eventType: 'OrderCreated',
orderId: order.id,
userId: 'USER-123'
});
});
});Python 集成测试
FastAPI + SQLAlchemy 集成测试
python
# test/integration/test_user_api.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import Base, get_db
from app.models import User
from app.config import Settings
# 测试数据库设置
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 覆盖依赖
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
# 创建测试客户端
client = TestClient(app)
@pytest.fixture(scope="module")
def setup_database():
"""设置测试数据库"""
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
class TestUserAPI:
"""用户 API 集成测试"""
def test_user_registration_flow(self, setup_database):
"""测试完整的用户注册流程"""
# 1. 注册新用户
registration_data = {
"email": "test@example.com",
"password": "SecurePass123!",
"full_name": "Test User"
}
response = client.post("/api/v1/auth/register", json=registration_data)
assert response.status_code == 201
data = response.json()
assert data["email"] == registration_data["email"]
assert data["full_name"] == registration_data["full_name"]
assert "id" in data
user_id = data["id"]
# 2. 尝试重复注册
response = client.post("/api/v1/auth/register", json=registration_data)
assert response.status_code == 400
assert response.json()["detail"] == "Email already registered"
# 3. 登录
login_data = {
"username": registration_data["email"], # OAuth2 使用 username 字段
"password": registration_data["password"]
}
response = client.post("/api/v1/auth/login", data=login_data)
assert response.status_code == 200
token_data = response.json()
assert "access_token" in token_data
assert token_data["token_type"] == "bearer"
access_token = token_data["access_token"]
# 4. 使用 token 获取用户信息
headers = {"Authorization": f"Bearer {access_token}"}
response = client.get(f"/api/v1/users/{user_id}", headers=headers)
assert response.status_code == 200
user_data = response.json()
assert user_data["email"] == registration_data["email"]
# 5. 更新用户信息
update_data = {"full_name": "Updated Name"}
response = client.patch(
f"/api/v1/users/{user_id}",
json=update_data,
headers=headers
)
assert response.status_code == 200
assert response.json()["full_name"] == "Updated Name"
# 6. 验证未授权访问
response = client.get(f"/api/v1/users/{user_id}")
assert response.status_code == 401使用 pytest-docker 进行集成测试
python
# conftest.py
import pytest
import docker
from sqlalchemy import create_engine
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
from testcontainers.kafka import KafkaContainer
@pytest.fixture(scope="session")
def postgres_container():
"""PostgreSQL 容器"""
with PostgresContainer("postgres:14") as postgres:
yield postgres
@pytest.fixture(scope="session")
def redis_container():
"""Redis 容器"""
with RedisContainer("redis:6-alpine") as redis:
yield redis
@pytest.fixture(scope="session")
def kafka_container():
"""Kafka 容器"""
with KafkaContainer("confluentinc/cp-kafka:7.3.0") as kafka:
yield kafka
@pytest.fixture(scope="session")
def test_app(postgres_container, redis_container, kafka_container):
"""配置测试应用"""
import os
# 设置环境变量
os.environ["DATABASE_URL"] = postgres_container.get_connection_url()
os.environ["REDIS_URL"] = f"redis://{redis_container.get_container_host_ip()}:{redis_container.get_exposed_port(6379)}"
os.environ["KAFKA_BOOTSTRAP_SERVERS"] = kafka_container.get_bootstrap_server()
from app.main import app
from app.database import Base, engine
# 创建数据库表
Base.metadata.create_all(bind=engine)
yield app
# 清理
Base.metadata.drop_all(bind=engine)
# test/integration/test_order_flow.py
class TestOrderFlow:
"""订单流程集成测试"""
@pytest.mark.asyncio
async def test_complete_order_flow(self, test_app, test_client):
"""测试完整的订单流程"""
from app.services.order_service import OrderService
from app.services.inventory_service import InventoryService
from app.services.payment_service import PaymentService
from app.events.publisher import EventPublisher
# 初始化服务
order_service = OrderService()
inventory_service = InventoryService()
payment_service = PaymentService()
event_publisher = EventPublisher()
# 1. 准备库存
await inventory_service.add_stock("PROD-001", 100)
# 2. 创建订单
order_data = {
"user_id": "USER-123",
"items": [
{"product_id": "PROD-001", "quantity": 2, "price": 99.99}
],
"shipping_address": {
"street": "123 Main St",
"city": "New York",
"zip_code": "10001"
}
}
order = await order_service.create_order(order_data)
assert order.id is not None
assert order.status == "PENDING"
assert order.total_amount == 199.98
# 3. 验证库存已预留
stock = await inventory_service.get_stock("PROD-001")
assert stock.available == 98
assert stock.reserved == 2
# 4. 处理支付
payment_result = await payment_service.process_payment({
"order_id": order.id,
"amount": order.total_amount,
"payment_method": "credit_card",
"card_token": "tok_visa"
})
assert payment_result.status == "SUCCESS"
# 5. 确认订单
await order_service.confirm_order(order.id, payment_result.transaction_id)
# 6. 验证订单状态
updated_order = await order_service.get_order(order.id)
assert updated_order.status == "CONFIRMED"
assert updated_order.payment_id == payment_result.transaction_id
# 7. 验证事件发布
events = await event_publisher.get_published_events()
assert len(events) >= 2
event_types = [e.event_type for e in events]
assert "OrderCreated" in event_types
assert "OrderConfirmed" in event_types集成测试策略
测试金字塔中的定位
集成测试最佳实践
使用测试容器
java@Testcontainers class DatabaseIntegrationTest { @Container PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:14"); }事务回滚
python@pytest.fixture def db_session(): connection = engine.connect() transaction = connection.begin() session = Session(bind=connection) yield session transaction.rollback() connection.close()测试数据管理
typescriptbeforeEach(async () => { await seedDatabase(); }); afterEach(async () => { await cleanupDatabase(); });并行执行
yaml# jest.config.js module.exports = { maxWorkers: '50%', testTimeout: 30000 };
常见集成测试场景
1. 数据库事务测试
java
@Test
@Transactional
@Rollback
public void testTransactionalBehavior() {
// 测试事务隔离和回滚
}2. 消息队列集成
python
async def test_message_publishing():
# 发布消息
await publisher.publish("user.created", user_data)
# 验证消息
message = await consumer.receive(timeout=5)
assert message.topic == "user.created"3. 缓存集成
typescript
it('should cache query results', async () => {
// 第一次查询 - 从数据库
const result1 = await service.getUser('123');
// 第二次查询 - 从缓存
const result2 = await service.getUser('123');
expect(result1).toEqual(result2);
expect(mockDb.query).toHaveBeenCalledTimes(1);
});故障排除
常见问题
端口冲突
bash# 使用随机端口 @LocalServerPort private int port;超时问题
python@pytest.mark.timeout(30) def test_slow_integration(): pass数据污染
javascriptbeforeEach(() => { return db.clean(); });
总结
集成测试的价值:
- 🔍 发现集成问题: 及早发现组件间的不兼容
- 🛡️ 验证真实场景: 使用真实的数据库和服务
- 🔄 回归保护: 防止集成点的破坏性变更
- 📈 信心提升: 增加系统整体运行的信心
通过编写全面的集成测试,确保系统各组件协同工作正常。