Skip to content

集成测试

验证组件间的协作与交互

什么是集成测试?

集成测试验证多个组件或系统之间的交互是否正确。它位于单元测试和端到端测试之间,关注点包括:

  • 🔗 组件交互: 验证模块间的接口
  • 🗄️ 数据库集成: 测试真实的数据持久化
  • 🌐 外部服务: 验证与第三方服务的集成
  • 📡 消息传递: 测试异步通信机制

Spring Boot 集成测试

数据库集成测试

java
@SpringBootTest
@AutoConfigureMockMvc
@Transactional
@TestPropertySource(properties = {
    "spring.datasource.url=jdbc:h2:mem:testdb",
    "spring.jpa.hibernate.ddl-auto=create-drop"
})
public class UserIntegrationTest {
    
    @Autowired
    private MockMvc mockMvc;
    
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    @Test
    @DisplayName("完整的用户注册流程")
    public void testCompleteUserRegistrationFlow() throws Exception {
        // 1. 注册新用户
        UserRegistrationRequest request = UserRegistrationRequest.builder()
            .email("test@example.com")
            .password("SecurePass123!")
            .username("testuser")
            .build();
            
        MvcResult result = mockMvc.perform(post("/api/users/register")
                .contentType(MediaType.APPLICATION_JSON)
                .content(objectMapper.writeValueAsString(request)))
                .andExpect(status().isCreated())
                .andExpect(header().exists("Location"))
                .andReturn();
                
        String locationHeader = result.getResponse().getHeader("Location");
        String userId = locationHeader.substring(locationHeader.lastIndexOf("/") + 1);
        
        // 2. 验证用户已保存到数据库
        Optional<User> savedUser = userRepository.findById(userId);
        assertTrue(savedUser.isPresent());
        assertEquals("test@example.com", savedUser.get().getEmail());
        
        // 3. 测试登录
        LoginRequest loginRequest = new LoginRequest("test@example.com", "SecurePass123!");
        
        mockMvc.perform(post("/api/auth/login")
                .contentType(MediaType.APPLICATION_JSON)
                .content(objectMapper.writeValueAsString(loginRequest)))
                .andExpect(status().isOk())
                .andExpect(jsonPath("$.token").exists())
                .andExpect(jsonPath("$.expiresIn").value(3600));
        
        // 4. 使用 token 访问受保护资源
        String token = extractToken(mockMvc.perform(post("/api/auth/login")
                .contentType(MediaType.APPLICATION_JSON)
                .content(objectMapper.writeValueAsString(loginRequest)))
                .andReturn());
                
        mockMvc.perform(get("/api/users/" + userId)
                .header("Authorization", "Bearer " + token))
                .andExpect(status().isOk())
                .andExpect(jsonPath("$.email").value("test@example.com"));
    }
    
    @Test
    @Sql(scripts = "/test-data/users.sql")
    @DisplayName("查询用户列表 - 分页和排序")
    public void testGetUsersWithPaginationAndSorting() throws Exception {
        mockMvc.perform(get("/api/users")
                .param("page", "0")
                .param("size", "10")
                .param("sort", "createdAt,desc"))
                .andExpect(status().isOk())
                .andExpect(jsonPath("$.content").isArray())
                .andExpect(jsonPath("$.content.length()").value(10))
                .andExpect(jsonPath("$.totalElements").value(25))
                .andExpect(jsonPath("$.totalPages").value(3))
                .andExpect(jsonPath("$.number").value(0));
    }
}

使用 TestContainers

java
@SpringBootTest
@Testcontainers
@ActiveProfiles("integration-test")
public class OrderServiceIntegrationTest {
    
    @Container
    static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:14")
            .withDatabaseName("testdb")
            .withUsername("test")
            .withPassword("test");
            
    @Container
    static GenericContainer<?> redis = new GenericContainer<>("redis:6-alpine")
            .withExposedPorts(6379);
            
    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));
    
    @DynamicPropertySource
    static void properties(DynamicPropertyRegistry registry) {
        registry.add("spring.datasource.url", postgres::getJdbcUrl);
        registry.add("spring.datasource.username", postgres::getUsername);
        registry.add("spring.datasource.password", postgres::getPassword);
        
        registry.add("spring.redis.host", redis::getHost);
        registry.add("spring.redis.port", redis::getFirstMappedPort);
        
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
    }
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Test
    @DisplayName("订单创建完整流程 - 包含缓存和消息发送")
    public void testCompleteOrderCreationFlow() {
        // 创建订单
        CreateOrderRequest request = CreateOrderRequest.builder()
            .userId("USER-123")
            .items(List.of(
                new OrderItem("PROD-001", 2, 99.99),
                new OrderItem("PROD-002", 1, 149.99)
            ))
            .build();
            
        Order order = orderService.createOrder(request);
        
        // 验证订单已保存
        assertNotNull(order.getId());
        assertEquals(OrderStatus.PENDING, order.getStatus());
        assertEquals(349.97, order.getTotalAmount(), 0.01);
        
        // 验证缓存
        Order cachedOrder = (Order) redisTemplate.opsForValue()
            .get("order:" + order.getId());
        assertNotNull(cachedOrder);
        assertEquals(order.getId(), cachedOrder.getId());
        
        // 验证 Kafka 消息
        ConsumerRecords<String, OrderEvent> records = KafkaTestUtils.getRecords(consumer);
        assertEquals(1, records.count());
        
        OrderEvent event = records.iterator().next().value();
        assertEquals("ORDER_CREATED", event.getEventType());
        assertEquals(order.getId(), event.getOrderId());
    }
}

Node.js 集成测试

Express + MongoDB 集成测试

typescript
// test/integration/user.integration.test.ts
import request from 'supertest';
import { MongoMemoryServer } from 'mongodb-memory-server';
import mongoose from 'mongoose';
import { app } from '../../src/app';
import { User } from '../../src/models/User';
import { generateToken } from '../../src/utils/auth';

describe('User API Integration Tests', () => {
  let mongoServer: MongoMemoryServer;
  
  beforeAll(async () => {
    // 启动内存 MongoDB
    mongoServer = await MongoMemoryServer.create();
    const mongoUri = mongoServer.getUri();
    await mongoose.connect(mongoUri);
  });
  
  afterAll(async () => {
    await mongoose.disconnect();
    await mongoServer.stop();
  });
  
  beforeEach(async () => {
    // 清理数据库
    await User.deleteMany({});
  });
  
  describe('POST /api/users', () => {
    it('应该创建新用户并返回 token', async () => {
      const userData = {
        email: 'test@example.com',
        password: 'SecurePass123!',
        name: 'Test User'
      };
      
      const response = await request(app)
        .post('/api/users')
        .send(userData)
        .expect(201);
        
      expect(response.body).toMatchObject({
        user: {
          email: userData.email,
          name: userData.name
        },
        token: expect.any(String)
      });
      
      // 验证用户已保存到数据库
      const savedUser = await User.findOne({ email: userData.email });
      expect(savedUser).toBeTruthy();
      expect(savedUser?.password).not.toBe(userData.password); // 密码应该被哈希
    });
    
    it('应该防止重复注册', async () => {
      // 创建第一个用户
      await User.create({
        email: 'existing@example.com',
        password: 'hashed_password',
        name: 'Existing User'
      });
      
      // 尝试使用相同邮箱注册
      const response = await request(app)
        .post('/api/users')
        .send({
          email: 'existing@example.com',
          password: 'Password123!',
          name: 'Another User'
        })
        .expect(409);
        
      expect(response.body.error).toBe('Email already registered');
    });
  });
  
  describe('Protected Routes', () => {
    let authToken: string;
    let userId: string;
    
    beforeEach(async () => {
      // 创建测试用户并生成 token
      const user = await User.create({
        email: 'auth@example.com',
        password: 'hashed_password',
        name: 'Auth User'
      });
      
      userId = user._id.toString();
      authToken = generateToken(userId);
    });
    
    it('应该允许认证用户访问个人资料', async () => {
      const response = await request(app)
        .get(`/api/users/${userId}`)
        .set('Authorization', `Bearer ${authToken}`)
        .expect(200);
        
      expect(response.body.email).toBe('auth@example.com');
    });
    
    it('应该拒绝未认证的请求', async () => {
      await request(app)
        .get(`/api/users/${userId}`)
        .expect(401);
    });
    
    it('应该拒绝无效的 token', async () => {
      await request(app)
        .get(`/api/users/${userId}`)
        .set('Authorization', 'Bearer invalid_token')
        .expect(401);
    });
  });
});

使用 Docker Compose 进行集成测试

yaml
# docker-compose.test.yml
version: '3.8'

services:
  app:
    build:
      context: .
      dockerfile: Dockerfile.test
    environment:
      - NODE_ENV=test
      - DATABASE_URL=postgresql://test:test@postgres:5432/testdb
      - REDIS_URL=redis://redis:6379
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
    depends_on:
      - postgres
      - redis
      - rabbitmq
    command: npm run test:integration
    volumes:
      - ./test-results:/app/test-results
      
  postgres:
    image: postgres:14
    environment:
      - POSTGRES_USER=test
      - POSTGRES_PASSWORD=test
      - POSTGRES_DB=testdb
      
  redis:
    image: redis:6-alpine
    
  rabbitmq:
    image: rabbitmq:3-management
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest
typescript
// test/integration/order.integration.test.ts
import { Container } from 'typedi';
import { createConnection, Connection } from 'typeorm';
import * as amqp from 'amqplib';
import Redis from 'ioredis';
import { OrderService } from '../../src/services/OrderService';
import { Order } from '../../src/entities/Order';

describe('Order Service Integration Tests', () => {
  let connection: Connection;
  let rabbitConnection: amqp.Connection;
  let redisClient: Redis;
  let orderService: OrderService;
  
  beforeAll(async () => {
    // 数据库连接
    connection = await createConnection({
      type: 'postgres',
      url: process.env.DATABASE_URL,
      entities: [Order],
      synchronize: true
    });
    
    // RabbitMQ 连接
    rabbitConnection = await amqp.connect(process.env.RABBITMQ_URL!);
    const channel = await rabbitConnection.createChannel();
    await channel.assertQueue('order-events');
    
    // Redis 连接
    redisClient = new Redis(process.env.REDIS_URL);
    
    // 注入依赖
    Container.set('db.connection', connection);
    Container.set('rabbitmq.channel', channel);
    Container.set('redis.client', redisClient);
    
    orderService = Container.get(OrderService);
  });
  
  afterAll(async () => {
    await connection.close();
    await rabbitConnection.close();
    await redisClient.quit();
  });
  
  it('应该创建订单并发送消息', async () => {
    // 监听消息队列
    const messages: any[] = [];
    const channel = Container.get('rabbitmq.channel') as amqp.Channel;
    
    await channel.consume('order-events', (msg) => {
      if (msg) {
        messages.push(JSON.parse(msg.content.toString()));
        channel.ack(msg);
      }
    });
    
    // 创建订单
    const order = await orderService.createOrder({
      userId: 'USER-123',
      items: [
        { productId: 'PROD-001', quantity: 2, price: 99.99 }
      ]
    });
    
    // 等待消息处理
    await new Promise(resolve => setTimeout(resolve, 1000));
    
    // 验证订单
    expect(order.id).toBeDefined();
    expect(order.status).toBe('PENDING');
    expect(order.totalAmount).toBe(199.98);
    
    // 验证数据库
    const savedOrder = await connection.getRepository(Order).findOne(order.id);
    expect(savedOrder).toBeDefined();
    
    // 验证缓存
    const cachedOrder = await redisClient.get(`order:${order.id}`);
    expect(cachedOrder).toBeDefined();
    
    // 验证消息
    expect(messages).toHaveLength(1);
    expect(messages[0]).toMatchObject({
      eventType: 'OrderCreated',
      orderId: order.id,
      userId: 'USER-123'
    });
  });
});

Python 集成测试

FastAPI + SQLAlchemy 集成测试

python
# test/integration/test_user_api.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.database import Base, get_db
from app.models import User
from app.config import Settings

# 测试数据库设置
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 覆盖依赖
def override_get_db():
    try:
        db = TestingSessionLocal()
        yield db
    finally:
        db.close()

app.dependency_overrides[get_db] = override_get_db

# 创建测试客户端
client = TestClient(app)

@pytest.fixture(scope="module")
def setup_database():
    """设置测试数据库"""
    Base.metadata.create_all(bind=engine)
    yield
    Base.metadata.drop_all(bind=engine)

class TestUserAPI:
    """用户 API 集成测试"""
    
    def test_user_registration_flow(self, setup_database):
        """测试完整的用户注册流程"""
        # 1. 注册新用户
        registration_data = {
            "email": "test@example.com",
            "password": "SecurePass123!",
            "full_name": "Test User"
        }
        
        response = client.post("/api/v1/auth/register", json=registration_data)
        assert response.status_code == 201
        
        data = response.json()
        assert data["email"] == registration_data["email"]
        assert data["full_name"] == registration_data["full_name"]
        assert "id" in data
        
        user_id = data["id"]
        
        # 2. 尝试重复注册
        response = client.post("/api/v1/auth/register", json=registration_data)
        assert response.status_code == 400
        assert response.json()["detail"] == "Email already registered"
        
        # 3. 登录
        login_data = {
            "username": registration_data["email"],  # OAuth2 使用 username 字段
            "password": registration_data["password"]
        }
        
        response = client.post("/api/v1/auth/login", data=login_data)
        assert response.status_code == 200
        
        token_data = response.json()
        assert "access_token" in token_data
        assert token_data["token_type"] == "bearer"
        
        access_token = token_data["access_token"]
        
        # 4. 使用 token 获取用户信息
        headers = {"Authorization": f"Bearer {access_token}"}
        response = client.get(f"/api/v1/users/{user_id}", headers=headers)
        assert response.status_code == 200
        
        user_data = response.json()
        assert user_data["email"] == registration_data["email"]
        
        # 5. 更新用户信息
        update_data = {"full_name": "Updated Name"}
        response = client.patch(
            f"/api/v1/users/{user_id}", 
            json=update_data,
            headers=headers
        )
        assert response.status_code == 200
        assert response.json()["full_name"] == "Updated Name"
        
        # 6. 验证未授权访问
        response = client.get(f"/api/v1/users/{user_id}")
        assert response.status_code == 401

使用 pytest-docker 进行集成测试

python
# conftest.py
import pytest
import docker
from sqlalchemy import create_engine
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer
from testcontainers.kafka import KafkaContainer

@pytest.fixture(scope="session")
def postgres_container():
    """PostgreSQL 容器"""
    with PostgresContainer("postgres:14") as postgres:
        yield postgres

@pytest.fixture(scope="session")
def redis_container():
    """Redis 容器"""
    with RedisContainer("redis:6-alpine") as redis:
        yield redis

@pytest.fixture(scope="session")
def kafka_container():
    """Kafka 容器"""
    with KafkaContainer("confluentinc/cp-kafka:7.3.0") as kafka:
        yield kafka

@pytest.fixture(scope="session")
def test_app(postgres_container, redis_container, kafka_container):
    """配置测试应用"""
    import os
    
    # 设置环境变量
    os.environ["DATABASE_URL"] = postgres_container.get_connection_url()
    os.environ["REDIS_URL"] = f"redis://{redis_container.get_container_host_ip()}:{redis_container.get_exposed_port(6379)}"
    os.environ["KAFKA_BOOTSTRAP_SERVERS"] = kafka_container.get_bootstrap_server()
    
    from app.main import app
    from app.database import Base, engine
    
    # 创建数据库表
    Base.metadata.create_all(bind=engine)
    
    yield app
    
    # 清理
    Base.metadata.drop_all(bind=engine)

# test/integration/test_order_flow.py
class TestOrderFlow:
    """订单流程集成测试"""
    
    @pytest.mark.asyncio
    async def test_complete_order_flow(self, test_app, test_client):
        """测试完整的订单流程"""
        from app.services.order_service import OrderService
        from app.services.inventory_service import InventoryService
        from app.services.payment_service import PaymentService
        from app.events.publisher import EventPublisher
        
        # 初始化服务
        order_service = OrderService()
        inventory_service = InventoryService()
        payment_service = PaymentService()
        event_publisher = EventPublisher()
        
        # 1. 准备库存
        await inventory_service.add_stock("PROD-001", 100)
        
        # 2. 创建订单
        order_data = {
            "user_id": "USER-123",
            "items": [
                {"product_id": "PROD-001", "quantity": 2, "price": 99.99}
            ],
            "shipping_address": {
                "street": "123 Main St",
                "city": "New York",
                "zip_code": "10001"
            }
        }
        
        order = await order_service.create_order(order_data)
        
        assert order.id is not None
        assert order.status == "PENDING"
        assert order.total_amount == 199.98
        
        # 3. 验证库存已预留
        stock = await inventory_service.get_stock("PROD-001")
        assert stock.available == 98
        assert stock.reserved == 2
        
        # 4. 处理支付
        payment_result = await payment_service.process_payment({
            "order_id": order.id,
            "amount": order.total_amount,
            "payment_method": "credit_card",
            "card_token": "tok_visa"
        })
        
        assert payment_result.status == "SUCCESS"
        
        # 5. 确认订单
        await order_service.confirm_order(order.id, payment_result.transaction_id)
        
        # 6. 验证订单状态
        updated_order = await order_service.get_order(order.id)
        assert updated_order.status == "CONFIRMED"
        assert updated_order.payment_id == payment_result.transaction_id
        
        # 7. 验证事件发布
        events = await event_publisher.get_published_events()
        assert len(events) >= 2
        
        event_types = [e.event_type for e in events]
        assert "OrderCreated" in event_types
        assert "OrderConfirmed" in event_types

集成测试策略

测试金字塔中的定位

集成测试最佳实践

  1. 使用测试容器

    java
    @Testcontainers
    class DatabaseIntegrationTest {
        @Container
        PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:14");
    }
  2. 事务回滚

    python
    @pytest.fixture
    def db_session():
        connection = engine.connect()
        transaction = connection.begin()
        session = Session(bind=connection)
        yield session
        transaction.rollback()
        connection.close()
  3. 测试数据管理

    typescript
    beforeEach(async () => {
        await seedDatabase();
    });
    
    afterEach(async () => {
        await cleanupDatabase();
    });
  4. 并行执行

    yaml
    # jest.config.js
    module.exports = {
        maxWorkers: '50%',
        testTimeout: 30000
    };

常见集成测试场景

1. 数据库事务测试

java
@Test
@Transactional
@Rollback
public void testTransactionalBehavior() {
    // 测试事务隔离和回滚
}

2. 消息队列集成

python
async def test_message_publishing():
    # 发布消息
    await publisher.publish("user.created", user_data)
    
    # 验证消息
    message = await consumer.receive(timeout=5)
    assert message.topic == "user.created"

3. 缓存集成

typescript
it('should cache query results', async () => {
    // 第一次查询 - 从数据库
    const result1 = await service.getUser('123');
    
    // 第二次查询 - 从缓存
    const result2 = await service.getUser('123');
    
    expect(result1).toEqual(result2);
    expect(mockDb.query).toHaveBeenCalledTimes(1);
});

故障排除

常见问题

  1. 端口冲突

    bash
    # 使用随机端口
    @LocalServerPort
    private int port;
  2. 超时问题

    python
    @pytest.mark.timeout(30)
    def test_slow_integration():
        pass
  3. 数据污染

    javascript
    beforeEach(() => {
        return db.clean();
    });

总结

集成测试的价值:

  • 🔍 发现集成问题: 及早发现组件间的不兼容
  • 🛡️ 验证真实场景: 使用真实的数据库和服务
  • 🔄 回归保护: 防止集成点的破坏性变更
  • 📈 信心提升: 增加系统整体运行的信心

通过编写全面的集成测试,确保系统各组件协同工作正常。

SOLO Development Guide