跳转至

Spring 消息驱动架构深度解析


概述

消息驱动架构是现代微服务架构的核心模式,Spring 提供了完整的消息处理解决方案。本文深度解析 Spring 消息驱动的核心组件和高级用法。

graph TB
    A[Spring 消息驱动] --> B[消息队列集成]
    A --> C[事件驱动架构]
    A --> D[异步处理模式]
    A --> E[事务消息]
    A --> F[监控与运维]

    B --> B1[RabbitMQ]
    B --> B2[Kafka]
    B --> B3[RocketMQ]
    B --> B4[ActiveMQ]

    C --> C1[Spring Events]
    C --> C2[Domain Events]
    C --> C3[Event Sourcing]
    C --> C4[CQRS]

    D --> D1["@Async"]
    D --> D2["@EventListener"]
    D --> D3[Reactive Streams]
    D --> D4[Batch Processing]

    E --> E1[本地事务]
    E --> E2[分布式事务]
    E --> E3[最终一致性]
    E --> E4[消息重试]

    F --> F1[消息追踪]
    F --> F2[监控指标]
    F --> F3[死信队列]
    F --> F4[性能调优]

Spring 消息队列集成

1. RabbitMQ 深度集成

高级配置和用法

@Configuration
@EnableRabbit
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setChannelCacheSize(25);
        factory.setConnectionTimeout(30000);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        template.setConfirmCallback(confirmCallback());
        template.setReturnsCallback(returnsCallback());
        template.setMandatory(true); // 确保消息路由失败时返回
        return template;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public ConfirmCallback confirmCallback() {
        return (correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息发送成功: " + correlationData);
            } else {
                System.err.println("消息发送失败: " + cause);
            }
        };
    }

    @Bean
    public ReturnsCallback returnsCallback() {
        return returned -> {
            System.err.println("消息路由失败: " + returned.getMessage() + 
                ", 路由键: " + returned.getRoutingKey());
        };
    }

    // 声明交换机和队列
    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange("order.exchange", true, false);
    }

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order.queue")
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .withArgument("x-dead-letter-routing-key", "order.dlq")
            .withArgument("x-message-ttl", 60000) // 1分钟TTL
            .build();
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
            .to(orderExchange())
            .with("order.#");
    }

    // 死信队列配置
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange", true, false);
    }

    @Bean
    public Queue dlqQueue() {
        return new Queue("order.dlq", true);
    }

    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(dlqQueue())
            .to(dlxExchange())
            .with("order.dlq");
    }
}

// 高级消息生产者
@Service
public class AdvancedMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 发送可靠消息
    public void sendReliableMessage(OrderMessage message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        MessageProperties properties = new MessageProperties();
        properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        properties.setMessageId(UUID.randomUUID().toString());
        properties.setTimestamp(new Date());
        properties.setHeader("retryCount", 0);

        Message rabbitMessage = new Message(
            new ObjectMapper().writeValueAsBytes(message), 
            properties
        );

        rabbitTemplate.convertAndSend(
            "order.exchange", 
            "order.created", 
            rabbitMessage, 
            correlationData
        );
    }

    // 延迟消息
    public void sendDelayedMessage(OrderMessage message, long delayMs) {
        MessageProperties properties = new MessageProperties();
        properties.setDelay((int) delayMs);

        Message rabbitMessage = new Message(
            new ObjectMapper().writeValueAsBytes(message), 
            properties
        );

        rabbitTemplate.convertAndSend(
            "delayed.exchange", 
            "order.delayed", 
            rabbitMessage
        );
    }

    // 批量发送
    public void sendBatchMessages(List<OrderMessage> messages) {
        List<Message> rabbitMessages = messages.stream()
            .map(message -> {
                try {
                    MessageProperties properties = new MessageProperties();
                    properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
                    return new Message(
                        new ObjectMapper().writeValueAsBytes(message), 
                        properties
                    );
                } catch (Exception e) {
                    throw new RuntimeException("消息序列化失败", e);
                }
            })
            .collect(Collectors.toList());

        rabbitMessages.forEach(msg -> 
            rabbitTemplate.convertAndSend("order.exchange", "order.batch", msg)
        );
    }
}

// 高级消息消费者
@Service
public class AdvancedMessageConsumer {

    @RabbitListener(queues = "order.queue")
    @RabbitHandler
    public void handleOrderMessage(OrderMessage message, 
                                 Channel channel, 
                                 @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 业务处理逻辑
            processOrder(message);

            // 手动确认消息
            channel.basicAck(deliveryTag, false);

        } catch (BusinessException e) {
            // 业务异常,重试
            System.err.println("业务异常,消息将重试: " + e.getMessage());
            channel.basicNack(deliveryTag, false, true);

        } catch (Exception e) {
            // 系统异常,拒绝消息
            System.err.println("系统异常,消息将被拒绝: " + e.getMessage());
            channel.basicNack(deliveryTag, false, false);
        }
    }

    // 死信队列处理
    @RabbitListener(queues = "order.dlq")
    public void handleDeadLetter(OrderMessage message, 
                                @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                                Channel channel) {
        try {
            // 死信消息处理逻辑
            handleDeadLetterMessage(message);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 死信消息处理失败,记录日志
            System.err.println("死信消息处理失败: " + e.getMessage());
            channel.basicNack(deliveryTag, false, false);
        }
    }

    // 批量消费
    @RabbitListener(queues = "order.batch.queue", 
                   containerFactory = "batchContainerFactory")
    public void handleBatchMessages(List<OrderMessage> messages) {
        // 批量处理逻辑
        batchProcessOrders(messages);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory batchContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setBatchListener(true);
        factory.setBatchSize(10);
        factory.setConsumerBatchEnabled(true);
        factory.setReceiveTimeout(10000L);
        return factory;
    }

    private void processOrder(OrderMessage message) {
        // 订单处理逻辑
    }

    private void handleDeadLetterMessage(OrderMessage message) {
        // 死信消息处理逻辑
    }

    private void batchProcessOrders(List<OrderMessage> messages) {
        // 批量处理逻辑
    }
}

2. Kafka 深度集成

Spring Kafka 高级配置

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic orderTopic() {
        return new NewTopic("order-topic", 3, (short) 1); // 3个分区,1个副本
    }

    @Bean
    public NewTopic deadLetterTopic() {
        return new NewTopic("order-dlt", 1, (short) 1); // 死信主题
    }

    @Bean
    public ProducerFactory<String, OrderMessage> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 延迟发送
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, OrderMessage> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, OrderMessage> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
        configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // 每次拉取最大记录数
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.messages");

        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderMessage> 
        kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, OrderMessage> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3); // 并发消费者数量
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate()), 3)); // 重试3次后进入死信队列

        return factory;
    }
}

// 高级 Kafka 生产者
@Service
public class AdvancedKafkaProducer {

    @Autowired
    private KafkaTemplate<String, OrderMessage> kafkaTemplate;

    // 发送可靠消息
    public void sendReliableMessage(String key, OrderMessage message) {
        ListenableFuture<SendResult<String, OrderMessage>> future = 
            kafkaTemplate.send("order-topic", key, message);

        future.addCallback(
            result -> {
                System.out.println("消息发送成功: " + 
                    result.getRecordMetadata().topic() + "-" + 
                    result.getRecordMetadata().partition() + "-" + 
                    result.getRecordMetadata().offset());
            },
            ex -> {
                System.err.println("消息发送失败: " + ex.getMessage());
                // 失败重试或记录日志
                handleSendFailure(key, message, ex);
            }
        );
    }

    // 批量发送
    public void sendBatchMessages(Map<String, OrderMessage> messages) {
        List<ListenableFuture<SendResult<String, OrderMessage>>> futures = 
            new ArrayList<>();

        messages.forEach((key, message) -> {
            futures.add(kafkaTemplate.send("order-topic", key, message));
        });

        // 等待所有发送完成
        CompletableFuture.allOf(
            futures.stream()
                .map(future -> future.completable())
                .toArray(CompletableFuture[]::new)
        ).join();
    }

    // 事务消息
    @Transactional
    public void sendTransactionalMessage(OrderMessage message) {
        // 数据库操作
        orderRepository.save(message.toOrder());

        // Kafka 事务消息
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send("order-topic", message.getOrderId(), message);
            return null;
        });
    }

    private void handleSendFailure(String key, OrderMessage message, Throwable ex) {
        // 失败处理逻辑
    }
}

// 高级 Kafka 消费者
@Service
public class AdvancedKafkaConsumer {

    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void consumeOrderMessage(OrderMessage message,
                                  Acknowledgment ack,
                                  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                  @Header(KafkaHeaders.OFFSET) long offset) {
        try {
            // 业务处理逻辑
            processOrder(message);

            // 手动提交偏移量
            ack.acknowledge();

        } catch (BusinessException e) {
            // 业务异常,记录日志但不提交偏移量(会重试)
            System.err.println("业务异常,消息将重试: " + e.getMessage());

        } catch (Exception e) {
            // 系统异常,记录日志并提交偏移量(避免无限重试)
            System.err.println("系统异常,消息将被跳过: " + e.getMessage());
            ack.acknowledge();
        }
    }

    // 批量消费
    @KafkaListener(topics = "order-batch-topic", groupId = "order-batch-group")
    public void consumeBatchMessages(List<OrderMessage> messages,
                                   Acknowledgment ack) {
        try {
            // 批量处理逻辑
            batchProcessOrders(messages);
            ack.acknowledge();
        } catch (Exception e) {
            System.err.println("批量处理失败: " + e.getMessage());
        }
    }

    // 死信队列处理
    @KafkaListener(topics = "order-topic.DLT", groupId = "order-dlt-group")
    public void consumeDeadLetter(OrderMessage message) {
        // 死信消息处理逻辑
        handleDeadLetterMessage(message);
    }

    private void processOrder(OrderMessage message) {
        // 订单处理逻辑
    }

    private void batchProcessOrders(List<OrderMessage> messages) {
        // 批量处理逻辑
    }

    private void handleDeadLetterMessage(OrderMessage message) {
        // 死信消息处理逻辑
    }
}

事件驱动架构

1. Spring Events 深度使用

自定义事件和监听器

// 自定义领域事件
public abstract class DomainEvent {
    private final String eventId;
    private final LocalDateTime timestamp;
    private final String aggregateId;

    public DomainEvent(String aggregateId) {
        this.eventId = UUID.randomUUID().toString();
        this.timestamp = LocalDateTime.now();
        this.aggregateId = aggregateId;
    }

    // getters...
}

// 订单创建事件
public class OrderCreatedEvent extends DomainEvent {
    private final Order order;

    public OrderCreatedEvent(Order order) {
        super(order.getId().toString());
        this.order = order;
    }

    // getter...
}

// 订单支付事件
public class OrderPaidEvent extends DomainEvent {
    private final String orderId;
    private final BigDecimal amount;

    public OrderPaidEvent(String orderId, BigDecimal amount) {
        super(orderId);
        this.orderId = orderId;
        this.amount = amount;
    }

    // getters...
}

// 事件发布器
@Component
public class DomainEventPublisher {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    public void publishOrderCreated(Order order) {
        OrderCreatedEvent event = new OrderCreatedEvent(order);
        eventPublisher.publishEvent(event);
    }

    public void publishOrderPaid(String orderId, BigDecimal amount) {
        OrderPaidEvent event = new OrderPaidEvent(orderId, amount);
        eventPublisher.publishEvent(event);
    }
}

// 事件监听器
@Component
public class OrderEventListener {

    private static final Logger logger = LoggerFactory.getLogger(OrderEventListener.class);

    // 异步处理订单创建事件
    @Async("eventExecutor")
    @EventListener
    @Order(1) // 执行顺序
    public void handleOrderCreated(OrderCreatedEvent event) {
        logger.info("处理订单创建事件: {}", event.getAggregateId());

        // 发送通知
        notificationService.sendOrderCreatedNotification(event.getOrder());

        // 更新缓存
        cacheService.updateOrderCache(event.getOrder());
    }

    // 条件监听器
    @EventListener(condition = "#event.amount > 1000")
    public void handleLargeOrderPaid(OrderPaidEvent event) {
        logger.info("处理大额订单支付事件: {}", event.getOrderId());

        // 大额订单特殊处理
        riskService.checkLargeOrder(event.getOrderId(), event.getAmount());
    }

    // 事务后事件监听器
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void handleAfterCommit(OrderCreatedEvent event) {
        logger.info("事务提交后处理订单创建事件: {}", event.getAggregateId());

        // 事务提交后的处理逻辑
        auditService.logOrderCreation(event.getOrder());
    }

    // 事务失败事件监听器
    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void handleAfterRollback(OrderCreatedEvent event) {
        logger.warn("事务回滚,清理订单创建事件相关数据: {}", event.getAggregateId());

        // 事务回滚后的清理逻辑
        cleanupService.cleanupFailedOrder(event.getOrder());
    }
}

// 事件执行器配置
@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean("eventExecutor")
    public Executor eventExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("event-executor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

异步处理模式

1. @Async 深度使用

高级异步配置

@Configuration
@EnableAsync
public class AdvancedAsyncConfig {

    // 计算密集型任务执行器
    @Bean("cpuExecutor")
    public Executor cpuExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("cpu-executor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }

    // I/O 密集型任务执行器
    @Bean("ioExecutor")
    public Executor ioExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("io-executor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }

    // 定时任务执行器
    @Bean("scheduledExecutor")
    public Executor scheduledExecutor() {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        return new DelegatingExecutor(executor);
    }
}

// 高级异步服务
@Service
public class AdvancedAsyncService {

    private static final Logger logger = LoggerFactory.getLogger(AdvancedAsyncService.class);

    // 计算密集型异步任务
    @Async("cpuExecutor")
    public CompletableFuture<BigDecimal> calculateComplexFormula(ComplexData data) {
        logger.info("开始计算复杂公式,线程: {}", Thread.currentThread().getName());

        try {
            // 模拟复杂计算
            Thread.sleep(1000);
            BigDecimal result = performComplexCalculation(data);

            return CompletableFuture.completedFuture(result);
        } catch (Exception e) {
            logger.error("计算失败", e);
            return CompletableFuture.failedFuture(e);
        }
    }

    // I/O 密集型异步任务
    @Async("ioExecutor")
    public CompletableFuture<String> processFileUpload(MultipartFile file) {
        logger.info("开始处理文件上传,线程: {}", Thread.currentThread().getName());

        try {
            // 模拟文件处理
            String filePath = saveFile(file);
            processFileContent(filePath);

            return CompletableFuture.completedFuture(filePath);
        } catch (Exception e) {
            logger.error("文件处理失败", e);
            return CompletableFuture.failedFuture(e);
        }
    }

    // 异步任务链
    @Async("ioExecutor")
    public CompletableFuture<OrderResult> processOrderAsync(OrderRequest request) {
        return validateOrder(request)
            .thenCompose(this::checkInventory)
            .thenCompose(this::calculatePrice)
            .thenCompose(this::createOrder)
            .thenApply(this::sendConfirmation);
    }

    // 异步超时控制
    @Async("ioExecutor")
    public CompletableFuture<ApiResponse> callExternalApiWithTimeout(String url) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 模拟外部API调用
                return callApi(url);
            } catch (Exception e) {
                throw new RuntimeException("API调用失败", e);
            }
        }).orTimeout(30, TimeUnit.SECONDS); // 30秒超时
    }

    // 异步异常处理
    @Async("ioExecutor")
    public CompletableFuture<ProcessResult> processWithFallback(ProcessRequest request) {
        return CompletableFuture.supplyAsync(() -> primaryProcess(request))
            .exceptionally(throwable -> {
                logger.warn("主处理失败,使用备用方案", throwable);
                return fallbackProcess(request);
            });
    }

    // 异步任务组合
    @Async("cpuExecutor")
    public CompletableFuture<CombinedResult> combineAsyncTasks(TaskData data) {
        CompletableFuture<ResultA> futureA = processTaskA(data);
        CompletableFuture<ResultB> futureB = processTaskB(data);
        CompletableFuture<ResultC> futureC = processTaskC(data);

        return CompletableFuture.allOf(futureA, futureB, futureC)
            .thenApply(v -> {
                ResultA a = futureA.join();
                ResultB b = futureB.join();
                ResultC c = futureC.join();
                return new CombinedResult(a, b, c);
            });
    }

    private CompletableFuture<ValidatedOrder> validateOrder(OrderRequest request) {
        return CompletableFuture.supplyAsync(() -> {
            // 验证逻辑
            return new ValidatedOrder(request);
        });
    }

    private CompletableFuture<InventoryChecked> checkInventory(ValidatedOrder order) {
        return CompletableFuture.supplyAsync(() -> {
            // 库存检查逻辑
            return new InventoryChecked(order);
        });
    }

    // 其他辅助方法...
    private BigDecimal performComplexCalculation(ComplexData data) {
        // 复杂计算逻辑
        return BigDecimal.ZERO;
    }

    private String saveFile(MultipartFile file) {
        // 文件保存逻辑
        return "file-path";
    }

    private void processFileContent(String filePath) {
        // 文件内容处理逻辑
    }

    private ApiResponse callApi(String url) {
        // API调用逻辑
        return new ApiResponse();
    }

    private ProcessResult primaryProcess(ProcessRequest request) {
        // 主处理逻辑
        return new ProcessResult();
    }

    private ProcessResult fallbackProcess(ProcessRequest request) {
        // 备用处理逻辑
        return new ProcessResult();
    }
}

事务消息处理

1. 本地事务与消息一致性

事务消息模式实现

@Service
@Transactional
public class TransactionalMessageService {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private KafkaTemplate<String, OrderMessage> kafkaTemplate;

    // 本地事务 + 消息发送(可能不一致)
    public void createOrderWithMessage(OrderRequest request) {
        // 1. 保存订单到数据库
        Order order = createOrder(request);
        orderRepository.save(order);

        // 2. 发送消息(如果这里失败,数据库已提交)
        OrderMessage message = convertToMessage(order);
        rabbitTemplate.convertAndSend("order.exchange", "order.created", message);
    }

    // 事务消息模式(最终一致性)
    public void createOrderWithTransactionMessage(OrderRequest request) {
        // 1. 保存订单和消息到数据库(同一事务)
        Order order = createOrder(request);
        OutboxMessage outboxMessage = createOutboxMessage(order);

        orderRepository.save(order);
        outboxRepository.save(outboxMessage);

        // 事务提交后,定时任务会发送消息
    }

    // 使用事务监听器确保消息发送
    @Transactional
    public void createOrderWithTransactionalEvent(OrderRequest request) {
        // 1. 保存订单
        Order order = createOrder(request);
        orderRepository.save(order);

        // 2. 发布事件(事务提交后发送)
        applicationEventPublisher.publishEvent(new OrderCreatedEvent(order));
    }

    // Kafka 事务消息
    @Transactional
    public void createOrderWithKafkaTransaction(OrderRequest request) {
        // 1. 保存订单
        Order order = createOrder(request);
        orderRepository.save(order);

        // 2. 发送 Kafka 事务消息
        kafkaTemplate.executeInTransaction(operations -> {
            OrderMessage message = convertToMessage(order);
            operations.send("order-topic", order.getId().toString(), message);
            return null;
        });
    }

    // 最大努力通知模式
    @Transactional
    public void createOrderWithBestEffort(OrderRequest request) {
        // 1. 保存订单
        Order order = createOrder(request);
        orderRepository.save(order);

        // 2. 异步发送消息(可能失败,但有重试机制)
        asyncSendMessageWithRetry(order);
    }

    @Async
    public void asyncSendMessageWithRetry(Order order) {
        int maxRetries = 3;
        int retryCount = 0;

        while (retryCount < maxRetries) {
            try {
                OrderMessage message = convertToMessage(order);
                rabbitTemplate.convertAndSend("order.exchange", "order.created", message);
                break; // 发送成功,退出循环
            } catch (Exception e) {
                retryCount++;
                if (retryCount == maxRetries) {
                    // 记录失败日志,人工干预
                    logFailedMessage(order, e);
                } else {
                    // 等待后重试
                    try {
                        Thread.sleep(1000 * retryCount);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    // TCC 模式实现
    @Transactional
    public void createOrderWithTCC(OrderRequest request) {
        // 1. Try 阶段:预留资源
        reserveInventory(request);
        reserveCoupon(request);

        // 2. Confirm 阶段:确认操作
        Order order = createOrder(request);
        orderRepository.save(order);

        confirmInventory(request);
        confirmCoupon(request);

        // 3. 发送消息
        sendOrderCreatedMessage(order);
    }

    // Saga 模式实现
    @Transactional
    public void createOrderWithSaga(OrderRequest request) {
        // 1. 开始 Saga 事务
        String sagaId = startSagaTransaction();

        try {
            // 2. 执行各个步骤
            step1ReserveInventory(request, sagaId);
            step2CreateOrder(request, sagaId);
            step3DeductBalance(request, sagaId);

            // 3. 完成 Saga
            completeSaga(sagaId);

        } catch (Exception e) {
            // 4. 补偿操作
            compensateSaga(sagaId);
            throw e;
        }
    }

    private Order createOrder(OrderRequest request) {
        // 创建订单逻辑
        return new Order();
    }

    private OutboxMessage createOutboxMessage(Order order) {
        // 创建出站消息逻辑
        return new OutboxMessage();
    }

    private OrderMessage convertToMessage(Order order) {
        // 转换消息逻辑
        return new OrderMessage();
    }

    private void logFailedMessage(Order order, Exception e) {
        // 记录失败消息逻辑
    }

    private void reserveInventory(OrderRequest request) {
        // 预留库存逻辑
    }

    private void reserveCoupon(OrderRequest request) {
        // 预留优惠券逻辑
    }

    private void confirmInventory(OrderRequest request) {
        // 确认库存逻辑
    }

    private void confirmCoupon(OrderRequest request) {
        // 确认优惠券逻辑
    }

    private String startSagaTransaction() {
        // 开始 Saga 事务逻辑
        return UUID.randomUUID().toString();
    }

    private void step1ReserveInventory(OrderRequest request, String sagaId) {
        // Saga 步骤1逻辑
    }

    private void step2CreateOrder(OrderRequest request, String sagaId) {
        // Saga 步骤2逻辑
    }

    private void step3DeductBalance(OrderRequest request, String sagaId) {
        // Saga 步骤3逻辑
    }

    private void completeSaga(String sagaId) {
        // 完成 Saga 逻辑
    }

    private void compensateSaga(String sagaId) {
        // 补偿 Saga 逻辑
    }
}

监控与运维

1. 消息系统监控

Spring Boot Actuator 集成

# application.yml
management:
  endpoints:
    web:
      exposure:
        include: health,metrics,info,beans,env
  endpoint:
    health:
      show-details: always
      show-components: always
    metrics:
      enabled: true
  metrics:
    export:
      prometheus:
        enabled: true
    tags:
      application: ${spring.application.name}
      environment: ${spring.profiles.active:default}

# RabbitMQ 健康检查配置
spring:
  rabbitmq:
    health-check:
      enabled: true
      timeout: 10s

# Kafka 健康检查配置
spring:
  kafka:
    health-check:
      enabled: true
      timeout: 10s

自定义监控指标

@Component
public class MessageMetrics {

    private final MeterRegistry meterRegistry;

    // 消息发送指标
    private final Counter messagesSent;
    private final Timer messageSendTimer;

    // 消息消费指标
    private final Counter messagesConsumed;
    private final Timer messageProcessTimer;

    // 错误指标
    private final Counter messageErrors;
    private final Counter retryCount;

    public MessageMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;

        this.messagesSent = Counter.builder("message.sent")
            .description("Number of messages sent")
            .tag("type", "total")
            .register(meterRegistry);

        this.messageSendTimer = Timer.builder("message.send.time")
            .description("Time taken to send messages")
            .register(meterRegistry);

        this.messagesConsumed = Counter.builder("message.consumed")
            .description("Number of messages consumed")
            .tag("type", "total")
            .register(meterRegistry);

        this.messageProcessTimer = Timer.builder("message.process.time")
            .description("Time taken to process messages")
            .register(meterRegistry);

        this.messageErrors = Counter.builder("message.errors")
            .description("Number of message processing errors")
            .register(meterRegistry);

        this.retryCount = Counter.builder("message.retries")
            .description("Number of message retries")
            .register(meterRegistry);
    }

    public void recordMessageSent(String queue, long duration) {
        messagesSent.increment();
        meterRegistry.counter("message.sent", "queue", queue).increment();
        messageSendTimer.record(duration, TimeUnit.MILLISECONDS);
    }

    public void recordMessageConsumed(String queue, long duration) {
        messagesConsumed.increment();
        meterRegistry.counter("message.consumed", "queue", queue).increment();
        messageProcessTimer.record(duration, TimeUnit.MILLISECONDS);
    }

    public void recordMessageError(String queue, String errorType) {
        messageErrors.increment();
        meterRegistry.counter("message.errors", 
            "queue", queue, "error", errorType).increment();
    }

    public void recordRetry(String queue) {
        retryCount.increment();
        meterRegistry.counter("message.retries", "queue", queue).increment();
    }

    // 获取消息队列积压情况
    public void monitorQueueBacklog(String queueName, long backlogSize) {
        Gauge.builder("queue.backlog", backlogSize, Number::longValue)
            .description("Queue backlog size")
            .tag("queue", queueName)
            .register(meterRegistry);
    }
}

// 监控增强的消息生产者
@Service
public class MonitoredMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MessageMetrics messageMetrics;

    public void sendMonitoredMessage(String exchange, String routingKey, Object message) {
        long startTime = System.currentTimeMillis();

        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message);

            long duration = System.currentTimeMillis() - startTime;
            messageMetrics.recordMessageSent(routingKey, duration);

        } catch (Exception e) {
            messageMetrics.recordMessageError(routingKey, e.getClass().getSimpleName());
            throw e;
        }
    }
}

// 监控增强的消息消费者
@Service
public class MonitoredMessageConsumer {

    @Autowired
    private MessageMetrics messageMetrics;

    @RabbitListener(queues = "order.queue")
    public void handleMonitoredMessage(OrderMessage message, Channel channel, 
                                     @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        long startTime = System.currentTimeMillis();

        try {
            processMessage(message);
            channel.basicAck(deliveryTag, false);

            long duration = System.currentTimeMillis() - startTime;
            messageMetrics.recordMessageConsumed("order.queue", duration);

        } catch (BusinessException e) {
            // 业务异常,重试
            channel.basicNack(deliveryTag, false, true);
            messageMetrics.recordRetry("order.queue");

        } catch (Exception e) {
            // 系统异常,拒绝
            channel.basicNack(deliveryTag, false, false);
            messageMetrics.recordMessageError("order.queue", e.getClass().getSimpleName());
        }
    }

    private void processMessage(OrderMessage message) {
        // 消息处理逻辑
    }
}

2. 运维最佳实践

健康检查端点

@Component
public class MessageHealthIndicator implements HealthIndicator {

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Override
    public Health health() {
        Map<String, Object> details = new HashMap<>();

        // RabbitMQ 健康检查
        try {
            Connection connection = rabbitConnectionFactory.createConnection();
            Channel channel = connection.createChannel(false);

            details.put("rabbitmq", "UP");
            details.put("rabbitmq.connection", connection.isOpen());
            details.put("rabbitmq.channel", channel.isOpen());

            channel.close();
            connection.close();
        } catch (Exception e) {
            details.put("rabbitmq", "DOWN");
            details.put("rabbitmq.error", e.getMessage());
        }

        // Kafka 健康检查
        try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
            DescribeClusterResult clusterResult = adminClient.describeCluster();
            details.put("kafka", "UP");
            details.put("kafka.brokers", clusterResult.nodes().get().size());
        } catch (Exception e) {
            details.put("kafka", "DOWN");
            details.put("kafka.error", e.getMessage());
        }

        boolean isHealthy = details.get("rabbitmq").equals("UP") && 
                           details.get("kafka").equals("UP");

        return isHealthy ? Health.up().withDetails(details).build() 
                        : Health.down().withDetails(details).build();
    }
}

配置管理

@ConfigurationProperties(prefix = "message")
@Component
@Data
public class MessageProperties {

    // RabbitMQ 配置
    private Rabbit rabbit = new Rabbit();

    // Kafka 配置
    private Kafka kafka = new Kafka();

    // 重试配置
    private Retry retry = new Retry();

    // 监控配置
    private Monitor monitor = new Monitor();

    @Data
    public static class Rabbit {
        private String host = "localhost";
        private int port = 5672;
        private String username = "guest";
        private String password = "guest";
        private String virtualHost = "/";
        private int connectionTimeout = 30000;
        private int channelCacheSize = 25;
    }

    @Data
    public static class Kafka {
        private String bootstrapServers = "localhost:9092";
        private String groupId = "default-group";
        private int retries = 3;
        private int batchSize = 16384;
        private int lingerMs = 10;
        private int bufferMemory = 33554432;
    }

    @Data
    public static class Retry {
        private int maxAttempts = 3;
        private long backoffDelay = 1000;
        private double backoffMultiplier = 2.0;
        private long maxBackoffDelay = 10000;
    }

    @Data
    public static class Monitor {
        private boolean enabled = true;
        private long metricsInterval = 60000; // 1分钟
        private int queueBacklogThreshold = 1000;
        private int errorRateThreshold = 10; // 10%
    }
}

总结

Spring 消息驱动架构提供了完整的异步处理解决方案:

核心优势

  1. 解耦系统组件:通过消息队列实现系统间松耦合
  2. 提高系统吞吐量:异步处理避免阻塞,提升并发能力
  3. 增强系统可靠性:重试机制、死信队列保证消息不丢失
  4. 支持水平扩展:消费者可以水平扩展处理能力

关键技术点

  1. 消息队列集成:RabbitMQ、Kafka 深度集成
  2. 事件驱动架构:Spring Events、领域事件、事务事件
  3. 异步处理模式:@Async、CompletableFuture、响应式编程
  4. 事务消息处理:本地事务、分布式事务、最终一致性
  5. 监控与运维:健康检查、指标监控、配置管理

最佳实践

  1. 合理选择消息队列:根据业务场景选择 RabbitMQ(复杂路由)或 Kafka(高吞吐)
  2. 设计幂等消费:确保消息重复消费不会产生副作用
  3. 实现死信处理:为重要消息设置死信队列和告警
  4. 监控关键指标:消息积压、处理延迟、错误率
  5. 配置合理重试:避免无限重试,设置最大重试次数和退避策略

通过深度掌握 Spring 消息驱动架构,可以构建高可用、高并发的分布式系统。