(四)rabbitmq消息堆积、顺序消费

 rabbitmq问的最多的问题是使用场景、消息的可靠性保证、消息的重复消费、消息堆积的处理以及如何实现顺序消费。

 使用场景:异步、消峰、限流。可靠性保证前面有讲过了。重复消费也好解决,消费端做好幂等校验即可。今天写一写关于rabbitmq的消息堆积和顺序消费问题。

1. 消息堆积

 顾名思义,消息堆积是指生产者发送消息的速度远大于消费者消费消息的速度,比如发送者1s钟发送一个消息,而消费者1分钟才处理完一个,这样会使得大量的消息堆积在队列中。

 解决消息堆积的核心在于使生产者生产消息速度和消费者消费消息速度相当,有两种处理方式:

  1. 减缓生产者生产消息的速度,比如发送完毕后暂停一段时间 再发送下一条消息,这种方法往往不适用,因为发送消息跟业务相关的,比如前面提到的支付报错再查询处理,只要用户碰到了就得发消息,我们无法控制时机。
  2. 消息发送到多个队列,每个队列被各自的消费者订阅。这种方式不能使用fanout广播模式,否则会出现重复消费的问题,使用worker队列、direct、topic模式都可以实现。但是topic要注意,多个队列的路由key需要互相不匹配,否则也会出现重复消费问题。至于生产者怎么发送,可以自行实现,比如按照计数、随机选择路由等。
  3. 增加消费者,提高消息的处理速度。前面将消息模式例子有演示过,一个队列多个订阅者。这里演示如何在spring boot中添加多个消费者
1
2
3
4
5
//直接监听短信队列,并开10个并发
@RabbitListener(queues = "queue_sms", concurrency = "10")
public void recv_sms(String msg) {
log.info("[短信服务] received:{}", msg);
}

也可以指定工厂:https://blog.csdn.net/weixin_45711444/article/details/102111658?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.vipsorttest&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.vipsorttest

2. 顺序消费

 生产者生产的消息,往往可能由于网络等原因,可能导致他们到达队列中的顺序和实际发出顺序不一致,那么如何解决顺序消费的问题呢?以下方法是我个人的总结。

  1. 在本地把消息按照序号存储在有序链表中,消费的时候记录上一次消费的序号n,每次从链表头开始消费,如果表头不是n+1,则暂停一会再继续消费。缺点是:只能单机消费,且出故障数据会丢失。
  2. 把消息按照序号存放到redis的zset中。每次消费也需要记录上次消费序号,记住上次消费序号不能存储在本机中。且需要做好并发操作可能导致的问题。缺点:redis出故障数据丢失,需要做好备份和同步,以及解决数据不一致问题。

 基本上都是解决排序的问题,关键就在存在哪里排,如果数据下一个序号迟迟不到,该怎么办?可以等待一定时间,如果依旧获取不到,则记录日志报警等。问题只要涉及到分布式,肯定会出现AP、CP的矛盾问题,怎么解决看业务的具体需求。面试官并不是要你回答一个完美的答案,因为很多问题没有完美答案,他是要看你的解决问题的能力,所以需要一步步分析。


 可能是源码是erlang的原因,面试过程中几乎没有问rabbitmq具体实现细节的问题,也可能是他们不用,所以不问。不像kafka这种,八股文全是问细节问实现的,比如持久化怎么实现的,集群之间的同步、备份等,等我有机会学学kafka。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!