这是一个面试中经常会被问到的问题,我之前的面试中,几乎每一家都问到了这个问题,因此在这里来做个总结。
 可靠性主要是指保证消息在传递过程中,不会出现数据丢失的问题。整个消息传递过程中,消息主要经过三个节点,发送端、mq服务器、消费端,出现数据丢失可能有以下几方面:
- 发送端数据可能因为自身服务故障或者网络传输问题,导致消息未正确发送到mq
- mq收到了发送端发的消息,但是还未来得及被消费掉,服务器出现故障,导致消息丢失。或者mq服务器在给消费端推送消息的途中因为网络传输问题,丢失数据。
- 消费端接收到消息但还未处理完业务服务出故障了,也会导致数据丢失。
 基于以上三点,要保证消息的可靠性,必须在生产者、服务器、消费者三方面都做保证。
1.发送方可靠性保证
 当mq收到消息后给发送方回复一个确认回执,发送方即可确认消息已成功发送。有事务和confirm两种实现机制。
方案一:开启RabbitMQ事务
 事务机制主要有三个方法:TxSelect(),TxCommit(),TxRollback()。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | String message = "这是一只行动迅速的橙色的兔子";
 try {
 
 channel.txSelect();
 channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
 
 channel.txCommit();
 } catch (Exception e) {
 
 channel.txRollback();
 System.out.println("发送失败,需要处理");
 
 
 }
 
 | 
 缺点:同步模式,大量请求时会发生阻塞,吞吐量低。
方案二:开启confirm机制
  confirm机制有三种实现策略。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | channel.confirmSelect();
 
 String message = "注册成功!请短信回复[T]退订";
 
 channel.basicPublish(EXCHANGE_NAME, "s%", null, message.getBytes());
 System.out.println(" [x] Sent '" + message + "'");
 
 try {
 
 channel.waitForConfirmsOrDie(5_000);
 } catch (Exception e) {
 
 }
 
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | channel.confirmSelect();
 int batchSize = 500;
 int cnt = 0;
 for (int i = 0; i < 10000; i++) {
 String message = "注册成功!请短信回复[T]退订";
 channel.basicPublish(EXCHANGE_NAME, "s%", null, message.getBytes());
 cnt++;
 if (cnt == batchSize) {
 channel.waitForConfirms(5_000);
 cnt = 0;
 }
 }
 if (cnt > 0) {
 channel.waitForConfirmsOrDie(5_000);
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 
 |        SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
 channel.confirmSelect();
 
 channel.addConfirmListener(new ConfirmListener() {
 
 @Override
 public void handleAck(long deliveryTag, boolean multiple) {
 if (multiple) {
 System.out.println("第" + deliveryTag + "条消息发送失败");
 confirmSet.headSet(deliveryTag + 1L).clear();
 } else {
 System.out.println("第" + deliveryTag + "条消息发送成功");
 confirmSet.remove(deliveryTag);
 }
 }
 
 @Override
 public void handleNack(long deliveryTag, boolean multiple) {
 System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
 if (multiple) {
 System.out.println("第" + deliveryTag + "条消息发送失败");
 confirmSet.headSet(deliveryTag + 1L).clear();
 } else {
 System.out.println("第" + deliveryTag + "条消息发送成功");
 confirmSet.remove(deliveryTag);
 }
 }
 });
 
 
 for (int i = 0; i < 50; i++) {
 long nextSeqNo = channel.getNextPublishSeqNo();
 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());
 System.out.println("第" + i + "条消息已发送");
 confirmSet.add(nextSeqNo);
 Thread.sleep(200);
 }
 
 | 
 三种方式对比:
- 单次确认和事务一样,都是同步的,使用简单但也存在吞吐量低的问题。
- 批量确认吞吐量较之单次确认提升了不少,但是如果出现问题,很难排查,必须查找每一次publish的日志。
- 异步确认机制不论是吞吐量还是排查上都很方便,推荐使用。
2.MQ服务器本身可靠性保证
 mq服务器本身保证消息不丢失,可以把消息持久化。开启持久化需要满足交换机持久化、队列持久化、以及消息传送模式设置为持久化模式。
| 12
 3
 4
 5
 6
 7
 
 | channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
 
 channel.queueDeclare(QUEUE_NAME, true, false, false, null);
 
 AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().deliveryMode(2).build();
 channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", prop, message.getBytes());
 
 | 
 注意:面试时,面试官会问开启持久化就OK了吗,如果磁盘坏了呢,所以还得继续回答。mq最好使用集群和备份。
3.消费方可靠性保证
 消费方的可靠性和发送方一样,也是处理完毕向服务器发送确认即可。默认是自动确认,收到消息后会立刻确认,为了防止业务未处理完服务就出现故障的问题,业务中最好关闭自动确认,服务处理完毕后手动发送确认(akc/nack)。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 
 | DefaultConsumer consumer = new DefaultConsumer(channel) {
 
 
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
 
 String exchange = envelope.getExchange();
 
 long deliveryTag = envelope.getDeliveryTag();
 
 String msg = new String(body, "utf-8");
 System.out.println(" [消费者1] received : " + msg + "!");
 
 
 
 
 
 
 
 channel.basicAck(envelope.getDeliveryTag(), false);
 channel.basicNack(envelope.getDeliveryTag(), true, true);
 channel.basicReject(envelope.getDeliveryTag(), true);
 channel.basicRecover();
 }
 };
 
 channel.basicConsume(QUEUE_NAME, false, consumer);
 
 | 
4.总结

