解決MQ消(xiao)息丟失問題的(de)5種方案
前言
今天(tian)我們來聊聊一個(ge)讓(rang)很多開發者(zhe)頭疼的話題——MQ消息丟(diu)失問題。
有些小伙伴在工作中(zhong),一提到消息(xi)隊(dui)列就覺得很簡(jian)單(dan),但真(zhen)正(zheng)遇(yu)到線上消息(xi)丟失時,排(pai)查(cha)起來卻讓人抓狂。
其(qi)實(shi),我在實(shi)際工(gong)作中,也遇到過(guo)MQ消息丟失的情況。
今天這(zhe)篇(pian)文(wen)章(zhang),專(zhuan)門跟大(da)家一起聊聊這(zhe)個話題,希望(wang)對你會(hui)有(you)所幫助(zhu)。
一、消息丟失的三大環節
在深入(ru)解決方案之前,我(wo)們先(xian)搞清(qing)楚消息在哪幾(ji)個環(huan)節可能丟失:

1. 生產者發送階段
- 網絡抖動導致發送失敗
- 生產者宕機未發送
- Broker處理失敗未返回確認
2. Broker存儲階段
- 內存消息未持久化,重啟丟失
- 磁盤故障導致數據丟失
- 集群切換時消息丟失
3. 消費者處理階段
- 自動確認模式下處理異常
- 消費者宕機處理中斷
- 手動確認但忘記確認
理(li)解(jie)了問(wen)題根源,接下(xia)來我們看5種實用(yong)的解(jie)決(jue)方案。
二、方案一:生產者確認機制
核心原理
生產者發送消(xiao)息后(hou)等待Broker確認,確保消(xiao)息成功(gong)到(dao)達。
這是(shi)防止(zhi)消(xiao)息丟(diu)失(shi)的(de)第一道防線。

關鍵實現
// RabbitMQ生產者確認配置
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 消息成功到達Broker
messageStatusService.markConfirmed(correlationData.getId());
} else {
// 發送失敗,觸發重試
retryService.scheduleRetry(correlationData.getId());
}
});
return template;
}
// 可靠發送方法
public void sendReliable(String exchange, String routingKey, Object message) {
String messageId = generateId();
// 先落庫保存發送狀態
messageStatusService.saveSendingStatus(messageId, message);
// 發送持久化消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
msg.getMessageProperties().setMessageId(messageId);
return msg;
}, new CorrelationData(messageId));
}
適用場景
- 對消息可靠性要求高的業務
- 金融交易、訂單處理等關鍵業務
- 需要精確知道消息發送結果的場景
三、方案二:消息持久化機制
核心原理
將消息(xi)保存到磁盤,確保Broker重(zhong)啟后消息(xi)不丟(diu)失(shi)。
這是防(fang)止Broker端消息丟失的關鍵。

關鍵實現
// 持久化隊列配置
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue") // 隊列持久化
.deadLetterExchange("order.dlx") // 死信交換機
.build();
}
// 發送持久化消息
public void sendPersistentMessage(Object message) {
rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
return msg;
});
}
// Kafka持久化配置
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本確認
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重試次數
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 冪等性
return new DefaultKafkaProducerFactory<>(props);
}
優缺點
優點:
- 有效防止Broker重啟導致的消息丟失
- 配置簡單,效果明顯
缺點:
- 磁盤IO影響性能
- 需要足夠的磁盤空間
四、方案三:消費者確認機制
核心原理
消費者處理(li)完(wan)消息后手動(dong)向Broker發送確認,Broker收到確認后才刪除消息。
這是保證消息(xi)不丟(diu)失的最后一道(dao)防線。

關鍵實現
// 手動確認消費者
@RabbitListener(queues = "order.queue")
public void handleMessage(Order order, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 業務處理
orderService.processOrder(order);
// 手動確認
channel.basicAck(deliveryTag, false);
log.info("消息處理完成: {}", order.getOrderId());
} catch (Exception e) {
log.error("消息處理失敗: {}", order.getOrderId(), e);
// 處理失敗,重新入隊
channel.basicNack(deliveryTag, false, true);
}
}
// 消費者容器配置
@Bean
public SimpleRabbitListenerContainerFactory containerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手動確認
factory.setPrefetchCount(10); // 預取數量
factory.setConcurrentConsumers(3); // 并發消費者
return factory;
}
注意事項
- 確保業務處理完成后再確認
- 合理設置預取數量,避免內存溢出
- 處理異常時要正確使用NACK
五、方案四:事務消息機制
核心原理
通過事務(wu)(wu)保證本地(di)業務(wu)(wu)操(cao)作和消息發送的(de)原子性,要么都成功,要么都失(shi)敗。

關鍵實現
// 本地事務表方案
@Transactional
public void createOrder(Order order) {
// 1. 保存訂單到數據庫
orderRepository.save(order);
// 2. 保存消息到本地消息表
LocalMessage localMessage = new LocalMessage();
localMessage.setBusinessId(order.getOrderId());
localMessage.setContent(JSON.toJSONString(order));
localMessage.setStatus(MessageStatus.PENDING);
localMessageRepository.save(localMessage);
// 3. 事務提交,本地業務和消息存儲保持一致性
}
// 定時任務掃描并發送消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List<LocalMessage> pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);
for (LocalMessage message : pendingMessages) {
try {
// 發送消息到MQ
rabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());
// 更新消息狀態為已發送
message.setStatus(MessageStatus.SENT);
localMessageRepository.save(message);
} catch (Exception e) {
log.error("發送消息失敗: {}", message.getId(), e);
}
}
}
// RocketMQ事務消息
public void sendTransactionMessage(Order order) {
TransactionMQProducer producer = new TransactionMQProducer("order_producer");
// 發送事務消息
Message msg = new Message("order_topic", "create",
JSON.toJSONBytes(order));
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
log.info("事務消息提交成功");
}
}
適用場景
- 需要嚴格保證業務和消息一致性的場景
- 分布式事務場景
- 金融、電商等對數據一致性要求高的業務
六、方案五:消息重試與死信隊列
核心原理
通過重試機制處(chu)理臨時故(gu)障,通過死信(xin)隊列(lie)處(chu)理最終(zhong)無法消費的消息。

關鍵實現
// 重試隊列配置
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "order.dlx") // 死信交換機
.withArgument("x-dead-letter-routing-key", "order.dead")
.withArgument("x-message-ttl", 60000) // 60秒后進入死信
.build();
}
// 死信隊列配置
@Bean
public Queue orderDeadLetterQueue() {
return QueueBuilder.durable("order.dead.queue").build();
}
// 消費者重試邏輯
@RabbitListener(queues = "order.queue")
public void handleMessageWithRetry(Order order, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
orderService.processOrder(order);
channel.basicAck(deliveryTag, false);
} catch (TemporaryException e) {
// 臨時異常,重新入隊重試
channel.basicNack(deliveryTag, false, true);
} catch (PermanentException e) {
// 永久異常,直接確認進入死信隊列
channel.basicAck(deliveryTag, false);
log.error("消息進入死信隊列: {}", order.getOrderId(), e);
}
}
// 死信隊列消費者
@RabbitListener(queues = "order.dead.queue")
public void handleDeadLetterMessage(Order order) {
log.warn("處理死信消息: {}", order.getOrderId());
// 發送告警、記錄日志、人工處理等
alertService.sendAlert("死信消息告警", order.toString());
}
重試策略建議
- 指數退避:1s, 5s, 15s, 30s
- 最大重試次數:3-5次
- 死信處理:人工介入或特殊處理流程
七、方案對比與選型指南
為了幫(bang)助大(da)家選擇(ze)合適的方案,我整理了詳細(xi)的對比表(biao):
| 方案 | 可靠性 | 性能影響 | 復雜度 | 適用場景 |
|---|---|---|---|---|
| 生產者確認 | 高 | 中 | 低 | 所有需要可靠發送的場景 |
| 消息持久化 | 中 | 中 | 低 | Broker重啟保護 |
| 消費者確認 | 高 | 低 | 中 | 確保消息被成功處理 |
| 事務消息 | 最高 | 高 | 高 | 強一致性要求的業務 |
| 重試+死信 | 高 | 低 | 中 | 處理臨時故障和最終死信 |
選型建議
初創項目/簡單業務:
- 生產者確認 + 消息持久化 + 消費者確認
- 滿足大部分場景,實現簡單
電商/交易系統:
- 生產者確認 + 事務消息 + 重試機制
- 保證數據一致性,處理復雜業務
大數據/日志處理:
- 消息持久化 + 消費者確認
- 允許少量丟失,追求吞吐量
金融/支付系統:
- 全方案組合使用
- 最高可靠性要求
總結
消(xiao)(xiao)息丟失問題是(shi)消(xiao)(xiao)息隊列(lie)使用中的常見挑戰,通過今(jin)天介(jie)紹(shao)的5種方案,我們可以構(gou)建(jian)一個可靠(kao)的消(xiao)(xiao)息系統(tong):
- 生產者確認機制 - 保證消息成功發送到Broker
- 消息持久化機制 - 防止Broker重啟導致消息丟失
- 消費者確認機制 - 確保消息被成功處理
- 事務消息機制 - 保證業務和消息的一致性
- 重試與死信隊列 - 處理異常情況和最終死信
有些小伙伴可能(neng)會問:"我需要全部(bu)使用這些方案嗎(ma)?
"我的建議是:根據業務需求選擇合適的組合。
對于關鍵(jian)業(ye)務,建(jian)議至少(shao)使用前三(san)種方(fang)案;對于普(pu)通業(ye)務,可以根據實際情況(kuang)適當(dang)簡化(hua)。
記(ji)住(zhu),沒(mei)有完美的方案,只(zhi)有最適合的方案。
最后說一句(求關注,別白嫖我)
如果這篇文(wen)章對您有(you)所幫助,或(huo)者有(you)所啟發的(de)(de)話(hua),幫忙關注一下我的(de)(de)同(tong)名公眾(zhong)號:蘇(su)三說技術,您的(de)(de)支持(chi)是(shi)我堅持(chi)寫作最(zui)大的(de)(de)動力。
求一鍵三連(lian):點贊、轉發、在看。
關(guan)注公眾號:【蘇(su)三說技術】,在(zai)公眾號中回復(fu):進大廠(chang),可(ke)以免費獲取(qu)我最(zui)近整(zheng)理的10萬字的面試寶典(dian),好多小(xiao)伙伴靠這(zhe)個寶典(dian)拿到(dao)了多家大廠(chang)的offer。
更(geng)多項目實戰在(zai)我的技(ji)術網站:
