1. AMQP协议
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。
AMQP有以下几个概念:
- Broker:接收和分发消息的应用,就是处理消息的服务器,RabbitMQ Server就是Message Broker。
- Virtual host:类似于namespace,通过它可以在同一个mq服务器上划分不同项目/功能的amqp实体组。
- Connection:消息发布者/订阅者和服务器之间的TCP连接。服务器不会主动断开连接,只能在客户端断开。
- Channel:一个channel其实就是一次连接,客户端并不会在每次发送/消费消息时都新建连接,而是复用channel,类似长连接。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
- Exchange:消息达到mq服务器的第一站,通过交换机转发到各个队列中去。和生产者直接交互。
- Queue:存储消息的队列。会被消费者取走消息。
- Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。用于消息的分发。
AMQP的工作模型:
注意:图中没画出来,一个broker可以包含多个虚拟主机,虚拟主机可包含多个交换机和队列。
生产者发送消息流程:
- 生产者和Broker建立TCP连接。
- 生产者和Broker建立通道channel。
- 生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
- Exchange将消息转发到指定的Queue(队列)。
消费者接收消息流程:
- 消费者和Broker建立TCP连接。
- 消费者和Broker建立通道channel。
- 消费者监听指定的Queue(队列)
- 当有消息到达Queue时Broker默认将消息推送给消费者。
- 消费者接收到消息。
- ack回复。
2. RabbitMQ消息模型
rabbitmq是amqp的实现,因此以上amqp的概念也适用于rabbitmq。
2.1 交换机类型
不同的交换机类型对应着不同的消息模式,rabbitmq交换机的类型有以下几种:
- direct:路由模式下交换机会用到。
- fanout:广播模型。发布/订阅模式交换机用到。
- topic:通过通配符匹配多个路由key。通配符模式交换机会用到。
- headers:通过
2.2 6种消息模型
本节参考自官网https://www.rabbitmq.com/getstarted.html
以下所有代码需要导入rabbitmq相关的包以及建立连接。
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.1</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class ConnectionUtil {
public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("my_vhost"); factory.setUsername("admin"); factory.setPassword("admin"); return factory.newConnection(); } }
|
简单消息模型不需要交换机的参与,生产者发送消息到队列中,消费者监听队列进行消费。代码如下:
生产者发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class SimpleSender { private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [生产者者]sender message: '" + message + "'"); channel.close(); connection.close(); } }
|
消费者消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeUnit;
public class SimpleReceiver { private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("等待消息消费。Ctrl+C可退出:");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println(" [消费者] received : " + msg + "!"); } };
channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
worker模型下一个队列可以被多个消费者消费,但是一个消息只能被一个消费者消费。这种模式适用于大量消息堆积,需要增加消费者快速出消费的场景。代码如下:
生产者循环发送50条消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet;
public class DSend {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes()); System.out.println("第" + i + "条消息已发送"); Thread.sleep(200); } channel.close(); connection.close(); } }
|
多个消费者消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeUnit; public class DRecv1 { private final static String QUEUE_NAME = "work_queue";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println(" [消费者1] received : " + msg + "!");
try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, consumer); } }
|
注意:通过 BasicQos 方法设置prefetchCount = 1
。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。
发布订阅模型是指凡是和fanout交换机绑定的队列,都会收到元素,会被对应的消费者消费。又叫广播模式,交换机类型为fanout
。代码如下:
生产者发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class PSPublisher {
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "注册成功!!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [生产者] Sent '" + message + "'"); channel.close(); connection.close(); } }
|
多个队列和多个消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import com.rabbitmq.client.*;
import java.io.IOException;
public class PSSubscriber1 { private final static String QUEUE_NAME = "fanout_exchange_queue_sms";
private final static String EXCHANGE_NAME = "test_fanout_exchange";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println(" [短信服务] received : " + msg + "!"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
路由模型下,生产者生产的消息到达交换机之后,会根据路由key发送到不同的队列中,交换机类型为direct
。代码如下:
生产者发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class RouterSender { private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); String message = "注册成功!请短信回复[T]退订"; channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
channel.close(); connection.close(); } }
|
消费者消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import com.rabbitmq.client.*; public class RouterRecv1 { private final static String QUEUE_NAME = "direct_exchange_queue_emial"; private final static String EXCHANGE_NAME = "test_direct_exchange";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String msg = new String(body); System.out.println(" [邮件服务] received : " + msg + "!");
} }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
注意:以上只有Receiver2才能消费消息,因为消息发送方指定了只有sms
留有key的队列才能消费。路由模式适用于带条件的广播
路由模式虽然可是实现带条件的广播,但是如果要求匹配多个条件呢,这时候通配符模式就登场了。交换机类型为topic,支持正则匹配。代码如下:
生产者发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class TopicSender { private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); String message = "这是一只行动迅速的橙色的兔子"; channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes()); System.out.println(" [动物描述:] Sent '" + message + "'");
channel.close(); connection.close(); } }
|
消费者消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import com.rabbitmq.client.*; import java.io.IOException;
public class TopicRecv1 { private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
private final static String EXCHANGE_NAME = "test_topic_exchange";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String msg = new String(body); System.out.println(" [消费者1] received : " + msg + "!"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
注意:以上消息两个队列都能接收到,发送路由key为quick.orange.rabbit
,Q1队列绑定路由为*
和*.orange
,第一个路由键可匹配到。Q2队列绑定路由为*.*.rabbit
和lazy.#
第一个路由可匹配到。因此都能接收。
direct模式适合单条件广播的场景,topic适合多条件广播的场景
以上我们提到的模型都是生产者发送消息,消费者处理消息。那么如果我们需要客户端异步发送大量请求到服务器,然后服务端处理完后把处理结果返回给客户端,如何实现呢?因为是大量异步,所以直接使用异步发送到服务器肯定会出现请求过多导致服务器宕机的问题,这时候需要对请求进行削峰处理,rpc模型就派上用场啦。
基本概念:
callback queue 回调队列:客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。
correlation_id 关联标识:客户端可能会发送多个请求给服务器,当服务器处理完后无法辨别哪一个结果对应哪一个请求,因此在发送请求时会附带一个correlation_id属性,这样客户端可根据该属性进行结果处理。
流程说明:
当客户端启动的时候,它创建一个匿名独享的回调队列。
在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
将请求发送到一个 rpc_queue 队列中。
服务器等待请求发送到 rpc_queue 队列中来。当请求出现的时候,服务器进行处理并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用。
客户端发起请求,并处理回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue;
@Slf4j public class RPCClient {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String replyQueueName = channel.queueDeclare().getQueue(); String corId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties().builder() .correlationId(corId) .replyTo(replyQueueName) .build(); for (int i = 0; i < 10; i++) { String message = "RPC MESSAGE" + i; channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8")); log.info("rpc client, requesting: {}", message); }
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(10);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { if (properties.getCorrelationId().equals(corId)) { response.offer(new String(body, StandardCharsets.UTF_8)); } }
});
while (true) { if (!response.isEmpty()) { log.info("rpc client result: {}", response.take()); } }
} }
|
服务端处理请求并返回结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.nio.charset.StandardCharsets;
@Slf4j public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); log.info("waiting for rpc request:");
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = generateResponse(body); channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8)); channel.basicAck(envelope.getDeliveryTag(), false); synchronized (this) { this.notify(); } } };
channel.basicConsume(RPC_QUEUE_NAME, false, consumer); }
private static String generateResponse(byte[] body) { System.out.println(" rpc server, receive request: " + new String(body)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "response:" + new String(body) + "-" + System.currentTimeMillis(); } }
|
3. RabbitMQ服务器安装
3.1 直接下载安装
需要安装erlang和rabbitmq。
https://www.rabbitmq.com/download.html
3.2 docker安装
https://hub.docker.com/_/rabbitmq
https://www.cnblogs.com/sentangle/p/13201127.html
1 2 3 4 5 6 7 8 9 10 11 12
| docker pull rabbitmq
docker images
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 7471fb821b97
docker exec -it rabbitmq
docker run -d --hostname my-rabbit -p 5672:5672 -v `pwd`:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name my-rabbit rabbitmq:3 docker run -d --hostname my-rabbit --name my-rabbit-manage -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
|