(二)rabbitmq之消息的可靠性保证

 这是一个面试中经常会被问到的问题,我之前的面试中,几乎每一家都问到了这个问题,因此在这里来做个总结。

 可靠性主要是指保证消息在传递过程中,不会出现数据丢失的问题。整个消息传递过程中,消息主要经过三个节点,发送端、mq服务器、消费端,出现数据丢失可能有以下几方面:

  1. 发送端数据可能因为自身服务故障或者网络传输问题,导致消息未正确发送到mq
  2. mq收到了发送端发的消息,但是还未来得及被消费掉,服务器出现故障,导致消息丢失。或者mq服务器在给消费端推送消息的途中因为网络传输问题,丢失数据。
  3. 消费端接收到消息但还未处理完业务服务出故障了,也会导致数据丢失。

 基于以上三点,要保证消息的可靠性,必须在生产者、服务器、消费者三方面都做保证。

1.发送方可靠性保证

 当mq收到消息后给发送方回复一个确认回执,发送方即可确认消息已成功发送。有事务和confirm两种实现机制。

方案一:开启RabbitMQ事务

 事务机制主要有三个方法:TxSelect(),TxCommit(),TxRollback()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 消息内容
String message = "这是一只行动迅速的橙色的兔子";
try {
//1. 开启事务,事务提交一定会成功
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
//2、 成功提交
channel.txCommit();
} catch (Exception e) {
//3. 失败回滚
channel.txRollback();
System.out.println("发送失败,需要处理");
// 这里处理失败消息,比如重发
//resend()
}

 缺点:同步模式,大量请求时会发生阻塞,吞吐量低。

方案二:开启confirm机制

  confirm机制有三种实现策略[1]

  1. 单次确认
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//开启确认
channel.confirmSelect();
// 消息内容
String message = "注册成功!请短信回复[T]退订";
// 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息
channel.basicPublish(EXCHANGE_NAME, "s%", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 等待5s确认
try {
//waitForConfirms等待确认,waitForConfirmsOrDie只等待指定时间,超时或者收到nack会报错
channel.waitForConfirmsOrDie(5_000);//channel.waitForConfirms();
} catch (Exception e) {
//处理未确认消息
}

  1. 批量确认
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//批量确认
channel.confirmSelect();
int batchSize = 500;//每发送100条确认一次
int cnt = 0;
for (int i = 0; i < 10000; i++) {
String message = "注册成功!请短信回复[T]退订";
channel.basicPublish(EXCHANGE_NAME, "s%", null, message.getBytes());
cnt++;
if (cnt == batchSize) {
channel.waitForConfirms(5_000);
cnt = 0;
}
}
if (cnt > 0) {
channel.waitForConfirmsOrDie(5_000);
}
  1. 异步确认
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
       // 异步确认
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
//添加确认监听
channel.addConfirmListener(new ConfirmListener() {
//ack代表确认成功
@Override
public void handleAck(long deliveryTag, boolean multiple) {
if (multiple) {
System.out.println("第" + deliveryTag + "条消息发送失败");
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
System.out.println("第" + deliveryTag + "条消息发送成功");
confirmSet.remove(deliveryTag);
}
}
//nack代表确认失败
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
System.out.println("第" + deliveryTag + "条消息发送失败");
confirmSet.headSet(deliveryTag + 1L).clear();
} else {
System.out.println("第" + deliveryTag + "条消息发送成功");
confirmSet.remove(deliveryTag);
}
}
});

// 循环发布任务,向指定队列发布消息
for (int i = 0; i < 50; i++) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());
System.out.println("第" + i + "条消息已发送");
confirmSet.add(nextSeqNo);
Thread.sleep(200);
}

 三种方式对比:

  • 单次确认和事务一样,都是同步的,使用简单但也存在吞吐量低的问题。
  • 批量确认吞吐量较之单次确认提升了不少,但是如果出现问题,很难排查,必须查找每一次publish的日志。
  • 异步确认机制不论是吞吐量还是排查上都很方便,推荐使用。

2.MQ服务器本身可靠性保证

 mq服务器本身保证消息不丢失,可以把消息持久化。开启持久化需要满足交换机持久化、队列持久化、以及消息传送模式设置为持久化模式。

1
2
3
4
5
6
7
// 交换机持久化,第三个参数为true
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
// 队列持久化,第二个参数为true
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 消息开启持久化模式
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", prop, message.getBytes());

 注意:面试时,面试官会问开启持久化就OK了吗,如果磁盘坏了呢,所以还得继续回答。mq最好使用集群和备份。

3.消费方可靠性保证

 消费方的可靠性和发送方一样,也是处理完毕向服务器发送确认即可。默认是自动确认,收到消息后会立刻确认,为了防止业务未处理完服务就出现故障的问题,业务中最好关闭自动确认,服务处理完毕后手动发送确认(akc/nack)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body, "utf-8");
System.out.println(" [消费者1] received : " + msg + "!");

// 手动进行ACK
/*
* void basicAck(long deliveryTag, boolean multiple) throws IOException;
* deliveryTag:用来标识消息的id
* multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
*/
channel.basicAck(envelope.getDeliveryTag(), false); //手动ack
channel.basicNack(envelope.getDeliveryTag(), true, true);//处理失败,最后一个参数代表是否重新消费
channel.basicReject(envelope.getDeliveryTag(), true); //拒绝处理这条消息
channel.basicRecover();//路由不成功的消息可以使用recovery重新发送到队列中
}
};
//false代表关闭自动确认
channel.basicConsume(QUEUE_NAME, false, consumer);

4.总结

丢失原因

解决方案