1. 延迟队列
延迟队列是指进入队列后的元素不会被立即消费,而是达到一定时间后才会被消费。比如我的工作中需要实现这样一个功能。支付常常会因为一些原因导致支付报错,比如DNS解析失败等,这种情况一般客户会再次点击支付。为了避免每次报错都人工排查原因,需要实现一个功能,如果dns解析失败报错,则1分钟后主动查询订单是否支付成功,成功则不管,失败则手动排查。这时候可以采用延迟队列实现,每次报错把订单号放入延迟队列中,1分钟后取出订单号查询订单状态。这样可以大大减少手动排查错误的几率,提升工作效率。
工作中常常会使用到延迟队列,我们可以采用多种方式实现,常用的实现延迟队列的方式有以下几种:
- 使用JDK自带的DelayQueue实现,它的本质是封装了一个PriorityQueue(优先队列)。这种方式实现简单,但是只能在本地使用,容易发生单点故障。
- 使用redis的zset按照过期时间排列,然后轮询zset获取到期的key。缺点是无法保证数据的可靠性,易丢失。
- 可以使用定时任务框架,比如Quartz、xxl-job等工具。本质是定时任务,可以定时扫描进行处理,如果少而无规律的任务,会造成大量的扫描而不执行,不便。
- 时间轮算法。比如kafka、netty都基于该算法的实现了延时队列。
- 使用rabbitmq的死信队列实现延迟队列。
本文演示了如何使用rabbitmq实现延迟队列。
2. 几个概念
- 过期时间:每个队列都可以设置一个过期时间(ttl),过期时间和路由key一样,都是队列的属性。
- 死信交换机:死信交换机可以和队列进行绑定。当队列中的元素超过前面设置的过期时间还未被消费,则会被转发到死信交换机中。
- 死信队列:死信队列和死信交换机绑定,转发到死信交换机中的元素会进入死信队列。
总结:死信交换机/队列只是一个概念而已,其实他们和普通交换机/队列没什么区别。生产者发送元素到普通交换机A,A和设置了ttl的队列B进行绑定,B和死信交换机进行转发绑定,B不设置任何消费者,则B中的元素一定不会被消费,超时后元素会被转发到死信交换机C,C和死信队列D进行绑定,元素会进入队列D,消费者消费D,这样就实现了延迟消费。从网上找了张图,大概如下:
注意:一定不要消费普通队列,否则无法实现延迟。
3. 代码实现
代码基于spring boot + rabbitmq实现。
引入rabbitmq包
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency>
|
rabbitmq配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.HashMap; import java.util.Map;
@Configuration public class RabbitMQConfig { private String toQueryExchange = "normal_exchange"; private String toQueryQueue = "normal_queue"; private String fromQueryExchange = "dlx_exchange"; private String fromQueryQueue = "dlx_queue"; private Long checkDelayTime = 10000; private String delayRouterKey = "delayQuery"; private String routerKey = "query";
@Bean public TopicExchange queryDelayExchange() { return new TopicExchange(fromQueryExchange, true, false); }
@Bean public Queue queryDelayQueue() { return new Queue(fromQueryQueue, true, false, false); }
@Bean public Binding bindingDlxExchangeQueue(@Qualifier("queryDelayQueue") Queue queue, @Qualifier("queryDelayExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(routerKey); }
@Bean public TopicExchange queryExchange() { return new TopicExchange(toQueryExchange, true, false); }
@Bean public Queue queryQueue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", checkDelayTime); arguments.put("x-dead-letter-exchange", fromQueryExchange); arguments.put("x-dead-letter-routing-key", routerKey); return new Queue(toQueryQueue, true, false, false, arguments); }
@Bean public Binding bindingExchangeQueue(@Qualifier("queryQueue") Queue queue, @Qualifier("queryExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(delayRouterKey); } }
|
rabbitmq配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: rabbitmq: addresses: 127.0.0.1:5672 username: admin password: admin virtual-host: my_vhost publisher-confirms: true publisher-returns: true connection-timeout: 15000 template: mandatory: true listener: simple: acknowledge-mode: manual
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j @Component public class Sender {
@Resource private RabbitTemplate rabbitTemplate;
private String toQueryExchange = "normal_exchange";;
public void send(String orderId) { log.info("rabbit sender:order_id = {}", orderId); rabbitTemplate.convertAndSend(toQueryExchange, "delayQuery", orderId, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }); log.info("sender done!order_id={}", orderId); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
@Slf4j @Component public class Receiver {
@RabbitHandler @RabbitListener(queues = "dlx_queue") public void query(String orderId, Channel channel, Message message) { log.info("consumer: order_id={}", orderId); basicAck(orderId, channel, message); log.info("consumer done!order_id={}", orderId); }
private void basicAck(String orderId, Channel channel, Message message) { try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { log.info("payment auto query basicAck fail msg:{},", orderId); } } private void basicNack(String orderId, Channel channel, Message message) { try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException e) { log.info("payment auto query basicNack fail msg:{},", orderId); } }
}
|