(三)rabbitmq之延迟队列

1. 延迟队列

 延迟队列是指进入队列后的元素不会被立即消费,而是达到一定时间后才会被消费。比如我的工作中需要实现这样一个功能。支付常常会因为一些原因导致支付报错,比如DNS解析失败等,这种情况一般客户会再次点击支付。为了避免每次报错都人工排查原因,需要实现一个功能,如果dns解析失败报错,则1分钟后主动查询订单是否支付成功,成功则不管,失败则手动排查。这时候可以采用延迟队列实现,每次报错把订单号放入延迟队列中,1分钟后取出订单号查询订单状态。这样可以大大减少手动排查错误的几率,提升工作效率。

 工作中常常会使用到延迟队列,我们可以采用多种方式实现,常用的实现延迟队列的方式有以下几种:

  • 使用JDK自带的DelayQueue实现,它的本质是封装了一个PriorityQueue(优先队列)。这种方式实现简单,但是只能在本地使用,容易发生单点故障。
  • 使用redis的zset按照过期时间排列,然后轮询zset获取到期的key。缺点是无法保证数据的可靠性,易丢失。
  • 可以使用定时任务框架,比如Quartz、xxl-job等工具。本质是定时任务,可以定时扫描进行处理,如果少而无规律的任务,会造成大量的扫描而不执行,不便。
  • 时间轮算法。比如kafka、netty都基于该算法的实现了延时队列。
  • 使用rabbitmq的死信队列实现延迟队列。

 本文演示了如何使用rabbitmq实现延迟队列。

2. 几个概念

  • 过期时间:每个队列都可以设置一个过期时间(ttl),过期时间和路由key一样,都是队列的属性。
  • 死信交换机:死信交换机可以和队列进行绑定。当队列中的元素超过前面设置的过期时间还未被消费,则会被转发到死信交换机中。
  • 死信队列:死信队列和死信交换机绑定,转发到死信交换机中的元素会进入死信队列。

 总结:死信交换机/队列只是一个概念而已,其实他们和普通交换机/队列没什么区别。生产者发送元素到普通交换机A,A和设置了ttl的队列B进行绑定,B和死信交换机进行转发绑定,B不设置任何消费者,则B中的元素一定不会被消费,超时后元素会被转发到死信交换机C,C和死信队列D进行绑定,元素会进入队列D,消费者消费D,这样就实现了延迟消费。从网上找了张图,大概如下:

延迟队列的实现

注意:一定不要消费普通队列,否则无法实现延迟。

3. 代码实现

 代码基于spring boot + rabbitmq实现。

 引入rabbitmq包

1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>

 rabbitmq配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {
//死信交换机名
private String toQueryExchange = "normal_exchange";
//死信队列名
private String toQueryQueue = "normal_queue";
//交换机名
private String fromQueryExchange = "dlx_exchange";
//队列名
private String fromQueryQueue = "dlx_queue";
//超时时间毫秒
private Long checkDelayTime = 10000;//10s
//交换机到队列的路由键
private String delayRouterKey = "delayQuery";
//死信交换机到死信队列的路由键
private String routerKey = "query";


/**
* ===========================================查询延迟消息配置==============================================
*/

/**
* 定义死信交换机
* 交换机的类型自定,我这里采用Topic
* @return
*/
@Bean
public TopicExchange queryDelayExchange() {
return new TopicExchange(fromQueryExchange, true, false);
}

/**
* 定义死信队列
* 消费者从这里消费
*
* @return
*/
@Bean
public Queue queryDelayQueue() {
return new Queue(fromQueryQueue, true, false, false);
}

/**
* 绑定死信交换器和死信队列
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding bindingDlxExchangeQueue(@Qualifier("queryDelayQueue") Queue queue,
@Qualifier("queryDelayExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routerKey);
}


/**
* 定义普通交换机
*
* @return
*/
@Bean
public TopicExchange queryExchange() {
return new TopicExchange(toQueryExchange, true, false);
}

/**
* 定义延迟队列10s
*
* @return
*/
@Bean
public Queue queryQueue() {
Map<String, Object> arguments = new HashMap<>();
//设置超时时间
arguments.put("x-message-ttl", checkDelayTime);
//设置转发交换机
arguments.put("x-dead-letter-exchange", fromQueryExchange);
//设置转发路由键,信息会被死信交换机发送到指定路由的队列上
arguments.put("x-dead-letter-routing-key", routerKey);
return new Queue(toQueryQueue, true, false, false, arguments);
}


/**
* 绑定普通交换机和延迟队列
*
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding bindingExchangeQueue(@Qualifier("queryQueue") Queue queue,
@Qualifier("queryExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(delayRouterKey);
}
}

  rabbitmq配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
rabbitmq:
addresses: 127.0.0.1:5672
username: admin
password: admin
virtual-host: my_vhost
publisher-confirms: true
publisher-returns: true
connection-timeout: 15000
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual

 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Component
public class Sender {

@Resource
private RabbitTemplate rabbitTemplate;

private String toQueryExchange = "normal_exchange";;//同前config定义一致

public void send(String orderId) {
log.info("rabbit sender:order_id = {}", orderId);
rabbitTemplate.convertAndSend(toQueryExchange, "delayQuery", orderId, message -> {
//设置持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
});
log.info("sender done!order_id={}", orderId);
}
}

 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* Receiver
*/
@Slf4j
@Component
public class Receiver {

@RabbitHandler
@RabbitListener(queues = "dlx_queue") //同前config配置名称一致
public void query(String orderId, Channel channel, Message message) {
log.info("consumer: order_id={}", orderId);
//查询支付是否完成
//具体的业务逻辑
//消息的消费确认
basicAck(orderId, channel, message);
log.info("consumer done!order_id={}", orderId);
}

//消费成功回复
private void basicAck(String orderId, Channel channel, Message message) {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
log.info("payment auto query basicAck fail msg:{},", orderId);
}
}
//消费失败回复
private void basicNack(String orderId, Channel channel, Message message) {
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException e) {
log.info("payment auto query basicNack fail msg:{},", orderId);
}
}

}