Spring 消息驱动架构深度解析¶
概述¶
消息驱动架构是现代微服务架构的核心模式,Spring 提供了完整的消息处理解决方案。本文深度解析 Spring 消息驱动的核心组件和高级用法。
graph TB
A[Spring 消息驱动] --> B[消息队列集成]
A --> C[事件驱动架构]
A --> D[异步处理模式]
A --> E[事务消息]
A --> F[监控与运维]
B --> B1[RabbitMQ]
B --> B2[Kafka]
B --> B3[RocketMQ]
B --> B4[ActiveMQ]
C --> C1[Spring Events]
C --> C2[Domain Events]
C --> C3[Event Sourcing]
C --> C4[CQRS]
D --> D1["@Async"]
D --> D2["@EventListener"]
D --> D3[Reactive Streams]
D --> D4[Batch Processing]
E --> E1[本地事务]
E --> E2[分布式事务]
E --> E3[最终一致性]
E --> E4[消息重试]
F --> F1[消息追踪]
F --> F2[监控指标]
F --> F3[死信队列]
F --> F4[性能调优] Spring 消息队列集成¶
1. RabbitMQ 深度集成¶
高级配置和用法¶
@Configuration
@EnableRabbit
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setChannelCacheSize(25);
factory.setConnectionTimeout(30000);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
template.setConfirmCallback(confirmCallback());
template.setReturnsCallback(returnsCallback());
template.setMandatory(true); // 确保消息路由失败时返回
return template;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public ConfirmCallback confirmCallback() {
return (correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息发送成功: " + correlationData);
} else {
System.err.println("消息发送失败: " + cause);
}
};
}
@Bean
public ReturnsCallback returnsCallback() {
return returned -> {
System.err.println("消息路由失败: " + returned.getMessage() +
", 路由键: " + returned.getRoutingKey());
};
}
// 声明交换机和队列
@Bean
public TopicExchange orderExchange() {
return new TopicExchange("order.exchange", true, false);
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "order.dlq")
.withArgument("x-message-ttl", 60000) // 1分钟TTL
.build();
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.#");
}
// 死信队列配置
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange", true, false);
}
@Bean
public Queue dlqQueue() {
return new Queue("order.dlq", true);
}
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlqQueue())
.to(dlxExchange())
.with("order.dlq");
}
}
// 高级消息生产者
@Service
public class AdvancedMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送可靠消息
public void sendReliableMessage(OrderMessage message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
properties.setMessageId(UUID.randomUUID().toString());
properties.setTimestamp(new Date());
properties.setHeader("retryCount", 0);
Message rabbitMessage = new Message(
new ObjectMapper().writeValueAsBytes(message),
properties
);
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
rabbitMessage,
correlationData
);
}
// 延迟消息
public void sendDelayedMessage(OrderMessage message, long delayMs) {
MessageProperties properties = new MessageProperties();
properties.setDelay((int) delayMs);
Message rabbitMessage = new Message(
new ObjectMapper().writeValueAsBytes(message),
properties
);
rabbitTemplate.convertAndSend(
"delayed.exchange",
"order.delayed",
rabbitMessage
);
}
// 批量发送
public void sendBatchMessages(List<OrderMessage> messages) {
List<Message> rabbitMessages = messages.stream()
.map(message -> {
try {
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
return new Message(
new ObjectMapper().writeValueAsBytes(message),
properties
);
} catch (Exception e) {
throw new RuntimeException("消息序列化失败", e);
}
})
.collect(Collectors.toList());
rabbitMessages.forEach(msg ->
rabbitTemplate.convertAndSend("order.exchange", "order.batch", msg)
);
}
}
// 高级消息消费者
@Service
public class AdvancedMessageConsumer {
@RabbitListener(queues = "order.queue")
@RabbitHandler
public void handleOrderMessage(OrderMessage message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
// 业务处理逻辑
processOrder(message);
// 手动确认消息
channel.basicAck(deliveryTag, false);
} catch (BusinessException e) {
// 业务异常,重试
System.err.println("业务异常,消息将重试: " + e.getMessage());
channel.basicNack(deliveryTag, false, true);
} catch (Exception e) {
// 系统异常,拒绝消息
System.err.println("系统异常,消息将被拒绝: " + e.getMessage());
channel.basicNack(deliveryTag, false, false);
}
}
// 死信队列处理
@RabbitListener(queues = "order.dlq")
public void handleDeadLetter(OrderMessage message,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
Channel channel) {
try {
// 死信消息处理逻辑
handleDeadLetterMessage(message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 死信消息处理失败,记录日志
System.err.println("死信消息处理失败: " + e.getMessage());
channel.basicNack(deliveryTag, false, false);
}
}
// 批量消费
@RabbitListener(queues = "order.batch.queue",
containerFactory = "batchContainerFactory")
public void handleBatchMessages(List<OrderMessage> messages) {
// 批量处理逻辑
batchProcessOrders(messages);
}
@Bean
public SimpleRabbitListenerContainerFactory batchContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setBatchListener(true);
factory.setBatchSize(10);
factory.setConsumerBatchEnabled(true);
factory.setReceiveTimeout(10000L);
return factory;
}
private void processOrder(OrderMessage message) {
// 订单处理逻辑
}
private void handleDeadLetterMessage(OrderMessage message) {
// 死信消息处理逻辑
}
private void batchProcessOrders(List<OrderMessage> messages) {
// 批量处理逻辑
}
}
2. Kafka 深度集成¶
Spring Kafka 高级配置¶
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic orderTopic() {
return new NewTopic("order-topic", 3, (short) 1); // 3个分区,1个副本
}
@Bean
public NewTopic deadLetterTopic() {
return new NewTopic("order-dlt", 1, (short) 1); // 死信主题
}
@Bean
public ProducerFactory<String, OrderMessage> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 延迟发送
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, OrderMessage> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, OrderMessage> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // 每次拉取最大记录数
configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.messages");
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderMessage>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 并发消费者数量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()), 3)); // 重试3次后进入死信队列
return factory;
}
}
// 高级 Kafka 生产者
@Service
public class AdvancedKafkaProducer {
@Autowired
private KafkaTemplate<String, OrderMessage> kafkaTemplate;
// 发送可靠消息
public void sendReliableMessage(String key, OrderMessage message) {
ListenableFuture<SendResult<String, OrderMessage>> future =
kafkaTemplate.send("order-topic", key, message);
future.addCallback(
result -> {
System.out.println("消息发送成功: " +
result.getRecordMetadata().topic() + "-" +
result.getRecordMetadata().partition() + "-" +
result.getRecordMetadata().offset());
},
ex -> {
System.err.println("消息发送失败: " + ex.getMessage());
// 失败重试或记录日志
handleSendFailure(key, message, ex);
}
);
}
// 批量发送
public void sendBatchMessages(Map<String, OrderMessage> messages) {
List<ListenableFuture<SendResult<String, OrderMessage>>> futures =
new ArrayList<>();
messages.forEach((key, message) -> {
futures.add(kafkaTemplate.send("order-topic", key, message));
});
// 等待所有发送完成
CompletableFuture.allOf(
futures.stream()
.map(future -> future.completable())
.toArray(CompletableFuture[]::new)
).join();
}
// 事务消息
@Transactional
public void sendTransactionalMessage(OrderMessage message) {
// 数据库操作
orderRepository.save(message.toOrder());
// Kafka 事务消息
kafkaTemplate.executeInTransaction(operations -> {
operations.send("order-topic", message.getOrderId(), message);
return null;
});
}
private void handleSendFailure(String key, OrderMessage message, Throwable ex) {
// 失败处理逻辑
}
}
// 高级 Kafka 消费者
@Service
public class AdvancedKafkaConsumer {
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consumeOrderMessage(OrderMessage message,
Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
try {
// 业务处理逻辑
processOrder(message);
// 手动提交偏移量
ack.acknowledge();
} catch (BusinessException e) {
// 业务异常,记录日志但不提交偏移量(会重试)
System.err.println("业务异常,消息将重试: " + e.getMessage());
} catch (Exception e) {
// 系统异常,记录日志并提交偏移量(避免无限重试)
System.err.println("系统异常,消息将被跳过: " + e.getMessage());
ack.acknowledge();
}
}
// 批量消费
@KafkaListener(topics = "order-batch-topic", groupId = "order-batch-group")
public void consumeBatchMessages(List<OrderMessage> messages,
Acknowledgment ack) {
try {
// 批量处理逻辑
batchProcessOrders(messages);
ack.acknowledge();
} catch (Exception e) {
System.err.println("批量处理失败: " + e.getMessage());
}
}
// 死信队列处理
@KafkaListener(topics = "order-topic.DLT", groupId = "order-dlt-group")
public void consumeDeadLetter(OrderMessage message) {
// 死信消息处理逻辑
handleDeadLetterMessage(message);
}
private void processOrder(OrderMessage message) {
// 订单处理逻辑
}
private void batchProcessOrders(List<OrderMessage> messages) {
// 批量处理逻辑
}
private void handleDeadLetterMessage(OrderMessage message) {
// 死信消息处理逻辑
}
}
事件驱动架构¶
1. Spring Events 深度使用¶
自定义事件和监听器¶
// 自定义领域事件
public abstract class DomainEvent {
private final String eventId;
private final LocalDateTime timestamp;
private final String aggregateId;
public DomainEvent(String aggregateId) {
this.eventId = UUID.randomUUID().toString();
this.timestamp = LocalDateTime.now();
this.aggregateId = aggregateId;
}
// getters...
}
// 订单创建事件
public class OrderCreatedEvent extends DomainEvent {
private final Order order;
public OrderCreatedEvent(Order order) {
super(order.getId().toString());
this.order = order;
}
// getter...
}
// 订单支付事件
public class OrderPaidEvent extends DomainEvent {
private final String orderId;
private final BigDecimal amount;
public OrderPaidEvent(String orderId, BigDecimal amount) {
super(orderId);
this.orderId = orderId;
this.amount = amount;
}
// getters...
}
// 事件发布器
@Component
public class DomainEventPublisher {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void publishOrderCreated(Order order) {
OrderCreatedEvent event = new OrderCreatedEvent(order);
eventPublisher.publishEvent(event);
}
public void publishOrderPaid(String orderId, BigDecimal amount) {
OrderPaidEvent event = new OrderPaidEvent(orderId, amount);
eventPublisher.publishEvent(event);
}
}
// 事件监听器
@Component
public class OrderEventListener {
private static final Logger logger = LoggerFactory.getLogger(OrderEventListener.class);
// 异步处理订单创建事件
@Async("eventExecutor")
@EventListener
@Order(1) // 执行顺序
public void handleOrderCreated(OrderCreatedEvent event) {
logger.info("处理订单创建事件: {}", event.getAggregateId());
// 发送通知
notificationService.sendOrderCreatedNotification(event.getOrder());
// 更新缓存
cacheService.updateOrderCache(event.getOrder());
}
// 条件监听器
@EventListener(condition = "#event.amount > 1000")
public void handleLargeOrderPaid(OrderPaidEvent event) {
logger.info("处理大额订单支付事件: {}", event.getOrderId());
// 大额订单特殊处理
riskService.checkLargeOrder(event.getOrderId(), event.getAmount());
}
// 事务后事件监听器
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleAfterCommit(OrderCreatedEvent event) {
logger.info("事务提交后处理订单创建事件: {}", event.getAggregateId());
// 事务提交后的处理逻辑
auditService.logOrderCreation(event.getOrder());
}
// 事务失败事件监听器
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void handleAfterRollback(OrderCreatedEvent event) {
logger.warn("事务回滚,清理订单创建事件相关数据: {}", event.getAggregateId());
// 事务回滚后的清理逻辑
cleanupService.cleanupFailedOrder(event.getOrder());
}
}
// 事件执行器配置
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("eventExecutor")
public Executor eventExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("event-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
异步处理模式¶
1. @Async 深度使用¶
高级异步配置¶
@Configuration
@EnableAsync
public class AdvancedAsyncConfig {
// 计算密集型任务执行器
@Bean("cpuExecutor")
public Executor cpuExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("cpu-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
// I/O 密集型任务执行器
@Bean("ioExecutor")
public Executor ioExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("io-executor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
// 定时任务执行器
@Bean("scheduledExecutor")
public Executor scheduledExecutor() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
return new DelegatingExecutor(executor);
}
}
// 高级异步服务
@Service
public class AdvancedAsyncService {
private static final Logger logger = LoggerFactory.getLogger(AdvancedAsyncService.class);
// 计算密集型异步任务
@Async("cpuExecutor")
public CompletableFuture<BigDecimal> calculateComplexFormula(ComplexData data) {
logger.info("开始计算复杂公式,线程: {}", Thread.currentThread().getName());
try {
// 模拟复杂计算
Thread.sleep(1000);
BigDecimal result = performComplexCalculation(data);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
logger.error("计算失败", e);
return CompletableFuture.failedFuture(e);
}
}
// I/O 密集型异步任务
@Async("ioExecutor")
public CompletableFuture<String> processFileUpload(MultipartFile file) {
logger.info("开始处理文件上传,线程: {}", Thread.currentThread().getName());
try {
// 模拟文件处理
String filePath = saveFile(file);
processFileContent(filePath);
return CompletableFuture.completedFuture(filePath);
} catch (Exception e) {
logger.error("文件处理失败", e);
return CompletableFuture.failedFuture(e);
}
}
// 异步任务链
@Async("ioExecutor")
public CompletableFuture<OrderResult> processOrderAsync(OrderRequest request) {
return validateOrder(request)
.thenCompose(this::checkInventory)
.thenCompose(this::calculatePrice)
.thenCompose(this::createOrder)
.thenApply(this::sendConfirmation);
}
// 异步超时控制
@Async("ioExecutor")
public CompletableFuture<ApiResponse> callExternalApiWithTimeout(String url) {
return CompletableFuture.supplyAsync(() -> {
try {
// 模拟外部API调用
return callApi(url);
} catch (Exception e) {
throw new RuntimeException("API调用失败", e);
}
}).orTimeout(30, TimeUnit.SECONDS); // 30秒超时
}
// 异步异常处理
@Async("ioExecutor")
public CompletableFuture<ProcessResult> processWithFallback(ProcessRequest request) {
return CompletableFuture.supplyAsync(() -> primaryProcess(request))
.exceptionally(throwable -> {
logger.warn("主处理失败,使用备用方案", throwable);
return fallbackProcess(request);
});
}
// 异步任务组合
@Async("cpuExecutor")
public CompletableFuture<CombinedResult> combineAsyncTasks(TaskData data) {
CompletableFuture<ResultA> futureA = processTaskA(data);
CompletableFuture<ResultB> futureB = processTaskB(data);
CompletableFuture<ResultC> futureC = processTaskC(data);
return CompletableFuture.allOf(futureA, futureB, futureC)
.thenApply(v -> {
ResultA a = futureA.join();
ResultB b = futureB.join();
ResultC c = futureC.join();
return new CombinedResult(a, b, c);
});
}
private CompletableFuture<ValidatedOrder> validateOrder(OrderRequest request) {
return CompletableFuture.supplyAsync(() -> {
// 验证逻辑
return new ValidatedOrder(request);
});
}
private CompletableFuture<InventoryChecked> checkInventory(ValidatedOrder order) {
return CompletableFuture.supplyAsync(() -> {
// 库存检查逻辑
return new InventoryChecked(order);
});
}
// 其他辅助方法...
private BigDecimal performComplexCalculation(ComplexData data) {
// 复杂计算逻辑
return BigDecimal.ZERO;
}
private String saveFile(MultipartFile file) {
// 文件保存逻辑
return "file-path";
}
private void processFileContent(String filePath) {
// 文件内容处理逻辑
}
private ApiResponse callApi(String url) {
// API调用逻辑
return new ApiResponse();
}
private ProcessResult primaryProcess(ProcessRequest request) {
// 主处理逻辑
return new ProcessResult();
}
private ProcessResult fallbackProcess(ProcessRequest request) {
// 备用处理逻辑
return new ProcessResult();
}
}
事务消息处理¶
1. 本地事务与消息一致性¶
事务消息模式实现¶
@Service
@Transactional
public class TransactionalMessageService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private KafkaTemplate<String, OrderMessage> kafkaTemplate;
// 本地事务 + 消息发送(可能不一致)
public void createOrderWithMessage(OrderRequest request) {
// 1. 保存订单到数据库
Order order = createOrder(request);
orderRepository.save(order);
// 2. 发送消息(如果这里失败,数据库已提交)
OrderMessage message = convertToMessage(order);
rabbitTemplate.convertAndSend("order.exchange", "order.created", message);
}
// 事务消息模式(最终一致性)
public void createOrderWithTransactionMessage(OrderRequest request) {
// 1. 保存订单和消息到数据库(同一事务)
Order order = createOrder(request);
OutboxMessage outboxMessage = createOutboxMessage(order);
orderRepository.save(order);
outboxRepository.save(outboxMessage);
// 事务提交后,定时任务会发送消息
}
// 使用事务监听器确保消息发送
@Transactional
public void createOrderWithTransactionalEvent(OrderRequest request) {
// 1. 保存订单
Order order = createOrder(request);
orderRepository.save(order);
// 2. 发布事件(事务提交后发送)
applicationEventPublisher.publishEvent(new OrderCreatedEvent(order));
}
// Kafka 事务消息
@Transactional
public void createOrderWithKafkaTransaction(OrderRequest request) {
// 1. 保存订单
Order order = createOrder(request);
orderRepository.save(order);
// 2. 发送 Kafka 事务消息
kafkaTemplate.executeInTransaction(operations -> {
OrderMessage message = convertToMessage(order);
operations.send("order-topic", order.getId().toString(), message);
return null;
});
}
// 最大努力通知模式
@Transactional
public void createOrderWithBestEffort(OrderRequest request) {
// 1. 保存订单
Order order = createOrder(request);
orderRepository.save(order);
// 2. 异步发送消息(可能失败,但有重试机制)
asyncSendMessageWithRetry(order);
}
@Async
public void asyncSendMessageWithRetry(Order order) {
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
OrderMessage message = convertToMessage(order);
rabbitTemplate.convertAndSend("order.exchange", "order.created", message);
break; // 发送成功,退出循环
} catch (Exception e) {
retryCount++;
if (retryCount == maxRetries) {
// 记录失败日志,人工干预
logFailedMessage(order, e);
} else {
// 等待后重试
try {
Thread.sleep(1000 * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// TCC 模式实现
@Transactional
public void createOrderWithTCC(OrderRequest request) {
// 1. Try 阶段:预留资源
reserveInventory(request);
reserveCoupon(request);
// 2. Confirm 阶段:确认操作
Order order = createOrder(request);
orderRepository.save(order);
confirmInventory(request);
confirmCoupon(request);
// 3. 发送消息
sendOrderCreatedMessage(order);
}
// Saga 模式实现
@Transactional
public void createOrderWithSaga(OrderRequest request) {
// 1. 开始 Saga 事务
String sagaId = startSagaTransaction();
try {
// 2. 执行各个步骤
step1ReserveInventory(request, sagaId);
step2CreateOrder(request, sagaId);
step3DeductBalance(request, sagaId);
// 3. 完成 Saga
completeSaga(sagaId);
} catch (Exception e) {
// 4. 补偿操作
compensateSaga(sagaId);
throw e;
}
}
private Order createOrder(OrderRequest request) {
// 创建订单逻辑
return new Order();
}
private OutboxMessage createOutboxMessage(Order order) {
// 创建出站消息逻辑
return new OutboxMessage();
}
private OrderMessage convertToMessage(Order order) {
// 转换消息逻辑
return new OrderMessage();
}
private void logFailedMessage(Order order, Exception e) {
// 记录失败消息逻辑
}
private void reserveInventory(OrderRequest request) {
// 预留库存逻辑
}
private void reserveCoupon(OrderRequest request) {
// 预留优惠券逻辑
}
private void confirmInventory(OrderRequest request) {
// 确认库存逻辑
}
private void confirmCoupon(OrderRequest request) {
// 确认优惠券逻辑
}
private String startSagaTransaction() {
// 开始 Saga 事务逻辑
return UUID.randomUUID().toString();
}
private void step1ReserveInventory(OrderRequest request, String sagaId) {
// Saga 步骤1逻辑
}
private void step2CreateOrder(OrderRequest request, String sagaId) {
// Saga 步骤2逻辑
}
private void step3DeductBalance(OrderRequest request, String sagaId) {
// Saga 步骤3逻辑
}
private void completeSaga(String sagaId) {
// 完成 Saga 逻辑
}
private void compensateSaga(String sagaId) {
// 补偿 Saga 逻辑
}
}
监控与运维¶
1. 消息系统监控¶
Spring Boot Actuator 集成¶
# application.yml
management:
endpoints:
web:
exposure:
include: health,metrics,info,beans,env
endpoint:
health:
show-details: always
show-components: always
metrics:
enabled: true
metrics:
export:
prometheus:
enabled: true
tags:
application: ${spring.application.name}
environment: ${spring.profiles.active:default}
# RabbitMQ 健康检查配置
spring:
rabbitmq:
health-check:
enabled: true
timeout: 10s
# Kafka 健康检查配置
spring:
kafka:
health-check:
enabled: true
timeout: 10s
自定义监控指标¶
@Component
public class MessageMetrics {
private final MeterRegistry meterRegistry;
// 消息发送指标
private final Counter messagesSent;
private final Timer messageSendTimer;
// 消息消费指标
private final Counter messagesConsumed;
private final Timer messageProcessTimer;
// 错误指标
private final Counter messageErrors;
private final Counter retryCount;
public MessageMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.messagesSent = Counter.builder("message.sent")
.description("Number of messages sent")
.tag("type", "total")
.register(meterRegistry);
this.messageSendTimer = Timer.builder("message.send.time")
.description("Time taken to send messages")
.register(meterRegistry);
this.messagesConsumed = Counter.builder("message.consumed")
.description("Number of messages consumed")
.tag("type", "total")
.register(meterRegistry);
this.messageProcessTimer = Timer.builder("message.process.time")
.description("Time taken to process messages")
.register(meterRegistry);
this.messageErrors = Counter.builder("message.errors")
.description("Number of message processing errors")
.register(meterRegistry);
this.retryCount = Counter.builder("message.retries")
.description("Number of message retries")
.register(meterRegistry);
}
public void recordMessageSent(String queue, long duration) {
messagesSent.increment();
meterRegistry.counter("message.sent", "queue", queue).increment();
messageSendTimer.record(duration, TimeUnit.MILLISECONDS);
}
public void recordMessageConsumed(String queue, long duration) {
messagesConsumed.increment();
meterRegistry.counter("message.consumed", "queue", queue).increment();
messageProcessTimer.record(duration, TimeUnit.MILLISECONDS);
}
public void recordMessageError(String queue, String errorType) {
messageErrors.increment();
meterRegistry.counter("message.errors",
"queue", queue, "error", errorType).increment();
}
public void recordRetry(String queue) {
retryCount.increment();
meterRegistry.counter("message.retries", "queue", queue).increment();
}
// 获取消息队列积压情况
public void monitorQueueBacklog(String queueName, long backlogSize) {
Gauge.builder("queue.backlog", backlogSize, Number::longValue)
.description("Queue backlog size")
.tag("queue", queueName)
.register(meterRegistry);
}
}
// 监控增强的消息生产者
@Service
public class MonitoredMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageMetrics messageMetrics;
public void sendMonitoredMessage(String exchange, String routingKey, Object message) {
long startTime = System.currentTimeMillis();
try {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
long duration = System.currentTimeMillis() - startTime;
messageMetrics.recordMessageSent(routingKey, duration);
} catch (Exception e) {
messageMetrics.recordMessageError(routingKey, e.getClass().getSimpleName());
throw e;
}
}
}
// 监控增强的消息消费者
@Service
public class MonitoredMessageConsumer {
@Autowired
private MessageMetrics messageMetrics;
@RabbitListener(queues = "order.queue")
public void handleMonitoredMessage(OrderMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
long startTime = System.currentTimeMillis();
try {
processMessage(message);
channel.basicAck(deliveryTag, false);
long duration = System.currentTimeMillis() - startTime;
messageMetrics.recordMessageConsumed("order.queue", duration);
} catch (BusinessException e) {
// 业务异常,重试
channel.basicNack(deliveryTag, false, true);
messageMetrics.recordRetry("order.queue");
} catch (Exception e) {
// 系统异常,拒绝
channel.basicNack(deliveryTag, false, false);
messageMetrics.recordMessageError("order.queue", e.getClass().getSimpleName());
}
}
private void processMessage(OrderMessage message) {
// 消息处理逻辑
}
}
2. 运维最佳实践¶
健康检查端点¶
@Component
public class MessageHealthIndicator implements HealthIndicator {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Autowired
private KafkaAdmin kafkaAdmin;
@Override
public Health health() {
Map<String, Object> details = new HashMap<>();
// RabbitMQ 健康检查
try {
Connection connection = rabbitConnectionFactory.createConnection();
Channel channel = connection.createChannel(false);
details.put("rabbitmq", "UP");
details.put("rabbitmq.connection", connection.isOpen());
details.put("rabbitmq.channel", channel.isOpen());
channel.close();
connection.close();
} catch (Exception e) {
details.put("rabbitmq", "DOWN");
details.put("rabbitmq.error", e.getMessage());
}
// Kafka 健康检查
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
DescribeClusterResult clusterResult = adminClient.describeCluster();
details.put("kafka", "UP");
details.put("kafka.brokers", clusterResult.nodes().get().size());
} catch (Exception e) {
details.put("kafka", "DOWN");
details.put("kafka.error", e.getMessage());
}
boolean isHealthy = details.get("rabbitmq").equals("UP") &&
details.get("kafka").equals("UP");
return isHealthy ? Health.up().withDetails(details).build()
: Health.down().withDetails(details).build();
}
}
配置管理¶
@ConfigurationProperties(prefix = "message")
@Component
@Data
public class MessageProperties {
// RabbitMQ 配置
private Rabbit rabbit = new Rabbit();
// Kafka 配置
private Kafka kafka = new Kafka();
// 重试配置
private Retry retry = new Retry();
// 监控配置
private Monitor monitor = new Monitor();
@Data
public static class Rabbit {
private String host = "localhost";
private int port = 5672;
private String username = "guest";
private String password = "guest";
private String virtualHost = "/";
private int connectionTimeout = 30000;
private int channelCacheSize = 25;
}
@Data
public static class Kafka {
private String bootstrapServers = "localhost:9092";
private String groupId = "default-group";
private int retries = 3;
private int batchSize = 16384;
private int lingerMs = 10;
private int bufferMemory = 33554432;
}
@Data
public static class Retry {
private int maxAttempts = 3;
private long backoffDelay = 1000;
private double backoffMultiplier = 2.0;
private long maxBackoffDelay = 10000;
}
@Data
public static class Monitor {
private boolean enabled = true;
private long metricsInterval = 60000; // 1分钟
private int queueBacklogThreshold = 1000;
private int errorRateThreshold = 10; // 10%
}
}
总结¶
Spring 消息驱动架构提供了完整的异步处理解决方案:
核心优势¶
- 解耦系统组件:通过消息队列实现系统间松耦合
- 提高系统吞吐量:异步处理避免阻塞,提升并发能力
- 增强系统可靠性:重试机制、死信队列保证消息不丢失
- 支持水平扩展:消费者可以水平扩展处理能力
关键技术点¶
- 消息队列集成:RabbitMQ、Kafka 深度集成
- 事件驱动架构:Spring Events、领域事件、事务事件
- 异步处理模式:@Async、CompletableFuture、响应式编程
- 事务消息处理:本地事务、分布式事务、最终一致性
- 监控与运维:健康检查、指标监控、配置管理
最佳实践¶
- 合理选择消息队列:根据业务场景选择 RabbitMQ(复杂路由)或 Kafka(高吞吐)
- 设计幂等消费:确保消息重复消费不会产生副作用
- 实现死信处理:为重要消息设置死信队列和告警
- 监控关键指标:消息积压、处理延迟、错误率
- 配置合理重试:避免无限重试,设置最大重试次数和退避策略
通过深度掌握 Spring 消息驱动架构,可以构建高可用、高并发的分布式系统。