(一)rabbitmq简介及其消息模型

1. AMQP协议

 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议

 AMQP有以下几个概念:

  • Broker:接收和分发消息的应用,就是处理消息的服务器,RabbitMQ Server就是Message Broker。
  • Virtual host:类似于namespace,通过它可以在同一个mq服务器上划分不同项目/功能的amqp实体组。
  • Connection:消息发布者/订阅者和服务器之间的TCP连接。服务器不会主动断开连接,只能在客户端断开。
  • Channel:一个channel其实就是一次连接,客户端并不会在每次发送/消费消息时都新建连接,而是复用channel,类似长连接。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
  • Exchange:消息达到mq服务器的第一站,通过交换机转发到各个队列中去。和生产者直接交互。
  • Queue:存储消息的队列。会被消费者取走消息。
  • Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key。用于消息的分发。

 AMQP的工作模型:

amqp工作模型

 注意:图中没画出来,一个broker可以包含多个虚拟主机,虚拟主机可包含多个交换机和队列。

生产者发送消息流程:

  1. 生产者和Broker建立TCP连接。
  2. 生产者和Broker建立通道channel。
  3. 生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
  4. Exchange将消息转发到指定的Queue(队列)。

消费者接收消息流程:

  1. 消费者和Broker建立TCP连接。
  2. 消费者和Broker建立通道channel。
  3. 消费者监听指定的Queue(队列)
  4. 当有消息到达Queue时Broker默认将消息推送给消费者。
  5. 消费者接收到消息。
  6. ack回复。

2. RabbitMQ消息模型

 rabbitmq是amqp的实现,因此以上amqp的概念也适用于rabbitmq。

2.1 交换机类型

[1]不同的交换机类型对应着不同的消息模式,rabbitmq交换机的类型有以下几种:

  • direct:路由模式下交换机会用到。
  • fanout:广播模型。发布/订阅模式交换机用到。
  • topic:通过通配符匹配多个路由key。通配符模式交换机会用到。
  • headers:通过

2.2 6种消息模型

 本节参考自官网https://www.rabbitmq.com/getstarted.html
 以下所有代码需要导入rabbitmq相关的包以及建立连接。

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
*
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("my_vhost");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
factory.setUsername("admin");
factory.setPassword("admin");
// 通过工厂获取连接
return factory.newConnection();
}
}
  1. 简单消息模型

简单消息模型

 简单消息模型不需要交换机的参与,生产者发送消息到队列中,消费者监听队列进行消费。代码如下:

 生产者发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SimpleSender {
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws Exception {
// 1、获取连接
Connection connection = ConnectionUtil.getConnection();
// 2、从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 3、申明队列
/**
* 参数说明:
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 4、发送消息
/**
* 参数说明:
* 1、exchange 交换机名称,简单队列不需要
* 2、routingKey 路由键,不通过交换机则路由键为队列名称
* 3、props 消息的相关属性,比如deliveryMode等
* 4、body 具体的消息,byte数组
*/
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [生产者者]sender message: '" + message + "'");
//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
channel.close();
connection.close();
}
}

 消费者消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class SimpleReceiver {
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("等待消息消费。Ctrl+C可退出:");

//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body, "utf-8");
System.out.println(" [消费者] received : " + msg + "!");
}
};

/**
* 参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为true表示会自动回复mq,如果设置为false要通过编程实现回复。为了保证消息的可靠性,最好关闭自动回复,手动回复,后面会讲到。
* 3、callback,消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

  1. 工作队列模型

工作队列模型

  worker模型下一个队列可以被多个消费者消费,但是一个消息只能被一个消费者消费。这种模式适用于大量消息堆积,需要增加消费者快速出消费的场景。代码如下:
 生产者循环发送50条消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

public class DSend {

private final static String QUEUE_NAME = "work_queue";

public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 任务发布有可能失败,为了保证可靠性,可使用确认机制,后续会讲到。
// 循环发布任务,向指定队列发布消息
for (int i = 0; i < 50; i++) {
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "条消息").getBytes());
System.out.println("第" + i + "条消息已发送");
Thread.sleep(200);
}
channel.close();
connection.close();
}
}

 多个消费者消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class DRecv1 {
private final static String QUEUE_NAME = "work_queue";

public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 设置每个消费者同时只能处理一条消息,在手动ack下才生效
// 如果不设置这个,会平均分配消息给消费者,这样处理较快的消费者大量时间会浪费掉
channel.basicQos(1);

//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println(" [消费者1] received : " + msg + "!");

//模拟任务耗时1s
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
//手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//关闭自动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}

 注意:通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。

  1. 发布订阅模型

发布订阅模型

 发布订阅模型是指凡是和fanout交换机绑定的队列,都会收到元素,会被对应的消费者消费。又叫广播模式,交换机类型为fanout。代码如下:

 生产者发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class PSPublisher {

private final static String EXCHANGE_NAME = "test_fanout_exchange";

public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

// 消息内容
String message = "注册成功!!";
// 发布消息到Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");
channel.close();
connection.close();
}
}

 多个队列和多个消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.rabbitmq.client.*;

import java.io.IOException;

public class PSSubscriber1 {
private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信队列
//private final static String QUEUE_NAME = "fanout_exchange_queue_email";//邮件队列

private final static String EXCHANGE_NAME = "test_fanout_exchange";

public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [短信服务] received : " + msg + "!");
//System.out.println(" [邮件服务] received : " + msg + "!");
}
};
// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
  1. 路由模型

路由模型

 路由模型下,生产者生产的消息到达交换机之后,会根据路由key发送到不同的队列中,交换机类型为direct。代码如下:

 生产者发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class RouterSender {
private final static String EXCHANGE_NAME = "test_direct_exchange";

public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);//第三个参数为durable的类型,需要和交换机一致
String message = "注册成功!请短信回复[T]退订";
// 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息
channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}
}

 消费者消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.rabbitmq.client.*;
public class RouterRecv1 {
private final static String QUEUE_NAME = "direct_exchange_queue_emial";//邮件队列
private final static String EXCHANGE_NAME = "test_direct_exchange";
// private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信队列

public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收发送方指定routing key为email的消息
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {
// body 即消息体
String msg = new String(body);
System.out.println(" [邮件服务] received : " + msg + "!");
// System.out.println(" [短信服务] received : " + msg + "!");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

 注意:以上只有Receiver2才能消费消息,因为消息发送方指定了只有sms留有key的队列才能消费。路由模式适用于带条件的广播

  1. 通配符模型

topic模型

  路由模式虽然可是实现带条件的广播,但是如果要求匹配多个条件呢,这时候通配符模式就登场了。交换机类型为topic,支持正则匹配。代码如下:

 生产者发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class TopicSender {
private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
// 消息内容
String message = "这是一只行动迅速的橙色的兔子";
// 发送消息,并且指定routing key为:quick.orange.rabbit
channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
System.out.println(" [动物描述:] Sent '" + message + "'");

channel.close();
connection.close();
}
}

 消费者消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import com.rabbitmq.client.*;
import java.io.IOException;

public class TopicRecv1 {
private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
// private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
private final static String EXCHANGE_NAME = "test_topic_exchange";

public static void main(String[] argv) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key(可多个)。订阅所有的橙色动物
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅关于兔子以及懒惰动物的消息
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");

// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

 注意:以上消息两个队列都能接收到,发送路由key为quick.orange.rabbit,Q1队列绑定路由为**.orange,第一个路由键可匹配到。Q2队列绑定路由为*.*.rabbitlazy.#第一个路由可匹配到。因此都能接收。
direct模式适合单条件广播的场景,topic适合多条件广播的场景

  1. RPC模型(请求/响应模型)

RPC模型

  以上我们提到的模型都是生产者发送消息,消费者处理消息。那么如果我们需要客户端异步发送大量请求到服务器,然后服务端处理完后把处理结果返回给客户端,如何实现呢?因为是大量异步,所以直接使用异步发送到服务器肯定会出现请求过多导致服务器宕机的问题,这时候需要对请求进行削峰处理,rpc模型就派上用场啦。

 基本概念:

  • callback queue 回调队列:客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。
  • correlation_id 关联标识:客户端可能会发送多个请求给服务器,当服务器处理完后无法辨别哪一个结果对应哪一个请求,因此在发送请求时会附带一个correlation_id属性,这样客户端可根据该属性进行结果处理。
  •  流程说明:

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 rpc_queue 队列中。
  • 服务器等待请求发送到 rpc_queue 队列中来。当请求出现的时候,服务器进行处理并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用。

  •   客户端发起请求,并处理回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    import com.rabbitmq.client.*;
    import lombok.extern.slf4j.Slf4j;

    import java.nio.charset.StandardCharsets;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;

    @Slf4j
    public class RPCClient {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    // 定义临时队列,并返回生成的队列名称
    String replyQueueName = channel.queueDeclare().getQueue();
    // 唯一标志本次请求
    String corId = UUID.randomUUID().toString();
    AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
    .correlationId(corId)
    .replyTo(replyQueueName)
    .build();
    //发送消息到默认交换机
    for (int i = 0; i < 10; i++) {
    String message = "RPC MESSAGE" + i;
    channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));
    log.info("rpc client, requesting: {}", message);
    }

    // 阻塞队列,用于存储回调结果
    final BlockingQueue<String> response = new ArrayBlockingQueue<String>(10);

    channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
    if (properties.getCorrelationId().equals(corId)) {
    //将结果存入阻塞队列
    response.offer(new String(body, StandardCharsets.UTF_8));
    }
    }

    });

    // 从阻塞队列中获取回调的结果
    while (true) {
    if (!response.isEmpty()) {
    log.info("rpc client result: {}", response.take());
    }
    }

    }
    }

      服务端处理请求并返回结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    import com.rabbitmq.client.*;
    import lombok.extern.slf4j.Slf4j;

    import java.io.IOException;
    import java.nio.charset.StandardCharsets;

    @Slf4j
    public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
    // 设置同时最多只能获取一个消息
    channel.basicQos(1);
    log.info("waiting for rpc request:");

    // 定义消息的回调处理类
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
    AMQP.BasicProperties properties, byte[] body) throws IOException {
    // 生成返回的结果,关键是设置correlationId值
    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
    .Builder()
    .correlationId(properties.getCorrelationId())
    .build();
    // 生成返回
    String response = generateResponse(body);
    // 回复消息,通知已经收到请求,讲response放入replyTo队列
    channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
    // 对消息进行应答
    channel.basicAck(envelope.getDeliveryTag(), false);
    // 唤醒正在消费者所有的线程
    synchronized (this) {
    this.notify();
    }
    }
    };

    // 消息消费
    channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
    }


    /**
    * 暂停1s,并返回结果。模拟服务器处理流程
    *
    * @param body
    * @return
    */
    private static String generateResponse(byte[] body) {
    System.out.println(" rpc server, receive request: " + new String(body));
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return "response:" + new String(body) + "-" + System.currentTimeMillis();
    }
    }

    3. RabbitMQ服务器安装

    3.1 直接下载安装

     需要安装erlang和rabbitmq。
    https://www.rabbitmq.com/download.html

    3.2 docker安装

    https://hub.docker.com/_/rabbitmq
    https://www.cnblogs.com/sentangle/p/13201127.html

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //拉取镜像
    docker pull rabbitmq
    //查看镜像
    docker images
    //安装rabbitmq
    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 7471fb821b97
    //启动rabbitmq_management
    docker exec -it rabbitmq

    //以上方式管理界面有点问题,后来去官网找到了,以下即可
    docker run -d --hostname my-rabbit -p 5672:5672 -v `pwd`:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name my-rabbit rabbitmq:3
    docker run -d --hostname my-rabbit --name my-rabbit-manage -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
    1. 可在这里http://tryrabbitmq.com/ 模拟不同交换机的消息转发过程