这是一个面试中经常会被问到的问题,我之前的面试中,几乎每一家都问到了这个问题,因此在这里来做个总结。
可靠性主要是指保证消息在传递过程中,不会出现数据丢失的问题。整个消息传递过程中,消息主要经过三个节点,发送端、mq服务器、消费端,出现数据丢失可能有以下几方面:
- 发送端数据可能因为自身服务故障或者网络传输问题,导致消息未正确发送到mq
- mq收到了发送端发的消息,但是还未来得及被消费掉,服务器出现故障,导致消息丢失。或者mq服务器在给消费端推送消息的途中因为网络传输问题,丢失数据。
- 消费端接收到消息但还未处理完业务服务出故障了,也会导致数据丢失。
基于以上三点,要保证消息的可靠性,必须在生产者、服务器、消费者三方面都做保证。
1.发送方可靠性保证
当mq收到消息后给发送方回复一个确认回执,发送方即可确认消息已成功发送。有事务和confirm两种实现机制。
方案一:开启RabbitMQ事务
事务机制主要有三个方法:TxSelect(),TxCommit(),TxRollback()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| String message = "这是一只行动迅速的橙色的兔子"; try { channel.txSelect(); channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes()); channel.txCommit(); } catch (Exception e) { channel.txRollback(); System.out.println("发送失败,需要处理"); }
|
缺点:同步模式,大量请求时会发生阻塞,吞吐量低。
方案二:开启confirm机制
confirm机制有三种实现策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| channel.confirmSelect();
String message = "注册成功!请短信回复[T]退订";
channel.basicPublish(EXCHANGE_NAME, "s%", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
try { channel.waitForConfirmsOrDie(5_000); } catch (Exception e) { }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| channel.confirmSelect(); int batchSize = 500; int cnt = 0; for (int i = 0; i < 10000; i++) { String message = "注册成功!请短信回复[T]退订"; channel.basicPublish(EXCHANGE_NAME, "s%", null, message.getBytes()); cnt++; if (cnt == batchSize) { channel.waitForConfirms(5_000); cnt = 0; } } if (cnt > 0) { channel.waitForConfirmsOrDie(5_000); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) { if (multiple) { System.out.println("第" + deliveryTag + "条消息发送失败"); confirmSet.headSet(deliveryTag + 1L).clear(); } else { System.out.println("第" + deliveryTag + "条消息发送成功"); confirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { System.out.println("第" + deliveryTag + "条消息发送失败"); confirmSet.headSet(deliveryTag + 1L).clear(); } else { System.out.println("第" + deliveryTag + "条消息发送成功"); confirmSet.remove(deliveryTag); } } });
for (int i = 0; i < 50; i++) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes()); System.out.println("第" + i + "条消息已发送"); confirmSet.add(nextSeqNo); Thread.sleep(200); }
|
三种方式对比:
- 单次确认和事务一样,都是同步的,使用简单但也存在吞吐量低的问题。
- 批量确认吞吐量较之单次确认提升了不少,但是如果出现问题,很难排查,必须查找每一次publish的日志。
- 异步确认机制不论是吞吐量还是排查上都很方便,推荐使用。
2.MQ服务器本身可靠性保证
mq服务器本身保证消息不丢失,可以把消息持久化。开启持久化需要满足交换机持久化、队列持久化、以及消息传送模式设置为持久化模式。
1 2 3 4 5 6 7
| channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().deliveryMode(2).build(); channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", prop, message.getBytes());
|
注意:面试时,面试官会问开启持久化就OK了吗,如果磁盘坏了呢,所以还得继续回答。mq最好使用集群和备份。
3.消费方可靠性保证
消费方的可靠性和发送方一样,也是处理完毕向服务器发送确认即可。默认是自动确认,收到消息后会立刻确认,为了防止业务未处理完服务就出现故障的问题,业务中最好关闭自动确认,服务处理完毕后手动发送确认(akc/nack)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange(); long deliveryTag = envelope.getDeliveryTag(); String msg = new String(body, "utf-8"); System.out.println(" [消费者1] received : " + msg + "!");
channel.basicAck(envelope.getDeliveryTag(), false); channel.basicNack(envelope.getDeliveryTag(), true, true); channel.basicReject(envelope.getDeliveryTag(), true); channel.basicRecover(); } };
channel.basicConsume(QUEUE_NAME, false, consumer);
|
4.总结