前面文章《消息队列中间件选型》我们了解了消息队列技术选型,本文我们来学习开源消息队列RabbitMQ。

目录

  1. 1. RabbitMQ简介
  2. 2. RabbitMQ工作流程
  3. 3. RabbitMQ基本概念
    1. 3.1 Producer(生产者)
    2. 3.2 Consumer(消费者)
    3. 3.3 Connection(连接)
    4. 3.4 Channel(信道)
    5. 3.5 Exchange(交换机)
    6. 3.6 Exchange类型
    7. 3.7 Queue(队列)
    8. 3.8 Binding(绑定)
    9. 3.9 Routing Key(路由键)
    10. 3.10 Binding Key(绑定键)
    11. 3.11 Virtual Hosts
  4. 4. RabbitMQ队列模式
    1. 4.1 简单队列模式
    2. 4.2 工作队列模式
    3. 4.3 发布/订阅模式
    4. 4.4 路由模式
    5. 4.5 通配符模式
    6. 4.6 RPC模式
  5. 5. RabbitMQ安装
    1. 5.1 Docker安装
    2. 5.2 本地安装

1. RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

2. RabbitMQ工作流程

RabbitMQ架构图

发布者P-Clients(Publisher)发布消息(Message),经由交换机X(Exchange)
交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)
最后RabbitMQ代理(Broker)会将消息投递(Push)给订阅了此队列的消费者(Consumer),或者消费者按照需求自行获取(Pull)

深入理解:

(1)发布者、交换机、队列、消费者都可以有多个,因为RabbitMQ是基于网络协议AMQP,所以这个过程中发布者,交换机,队列,消费者可以处于不同的设备上。

(2)消息发布者可以给消息指定各种属性(Message Meta-data),一些属性可能被消息代理(Broker)使用,一些只能被消费者使用。消息分两部分:payload(有效载荷,传输的数据)和label(标签,exchange的名字或者说是一个tag,它描述了payload),RabbitMQ通过label决定把Message发给哪个消费者。AMQP协议仅仅描述了label,RabbitMQ决定了如何使用label的规则。消费者在收到消息时只有payload部分,label已经被删掉,对于消费者而言是不知道谁发送的消息。

(3)从安全和可靠性角度,RabbitMQ很好实现了AMQP协议的消息确认(Message Acknowledgements)机制,当一个消息投递给消费者后,不会立即删除,直到它收到来自消费者的确认回执(Acknowledgements)后,才完成从队列里删除。

3. RabbitMQ基本概念

我们先来看一张RabbitMQ管理界面截图

这个界面包含了RabbitMQ很关键的几个概念(不是全部)

3.1 Producer(生产者)

消息生产者

3.2 Consumer(消费者)

消息消费者

3.3 Connection(连接)

无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP,当不要连接时,需要优雅释放掉RabbitMQ连接,而不是直接将TCP连接关闭。后面我们可以知道使用RabbitMQ程序开头时,都是先建立TCP连接。

3.4 Channel(信道)

一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,Rabbit处理的每条指令都是通过Channel完成的。一般情况程序起始建立TCP连接,第二步就是建立Channel。

为什么要用Channel,而不直接用TCP连接?

试想如果一个场景,如果一个应用程序有很多个线程需要从RabbitMQ消费或生产消息,那么必然会创建很多个TCP连接,我们知道建立一个TCP连接需要3次握手,关闭一个TCP连接需4次握手,对于系统来说频繁建立和关闭TCP连接对于系统性能有很大影响,而且TCP连接数也有限制,也限制了系统处理高并发的能力。
Rabbitmq采用类似NIO(也称非阻塞 I/O,包含三大核心部分:Channel信道、Buffer缓冲区和 Selector选择器)的做法,选择TCP连接复用,每个线程把持一个信道,信道复用了Connection的TCP连接.当每个信道的流量不是很大时,复用单一connection可以有效节省TCP连接资源,但如果信道流量很大,复用单一connection,connection的带宽会限制消息传输。此时需创建多个connection,将信道分摊到connection中。

3.5 Exchange(交换机)

消息交换机指定消息按什么规则,路由到哪个队列去。

那为什么需要Exchange而不是直接将消息发送至队列呢?

回到与RabbitM关系紧密的AMQP协议,AMQP协议核心思想就是生产者和消费者解耦,生产者不直接将消息发送给队列。生产者不知道消息会被发到哪个队列,它只将消息发给交换机,交换机接收到消息后按照特定的规则转发到队列进行存储。

在实际应用中我们只需要定义好Exchange的路由策略,而生产者不需要关心消息发送到哪个队列或被谁消费。消费者不需要关心谁生产,只需面向队列消费消息。

Exchange定义了消息路由到Queue的规则,将各个层面的消息传递隔离开,使每一层只需要关心自己面向的下一层,降低了整体的耦合度。

创建一个新的Exchange

  • Virtual host:属于哪个Virtual host。
  • Name:名字,同一个Virtual host里面的Name不能重复。
  • Durability: 是否持久化,Durable:持久化。Transient:不持久化。
  • Auto delete:当最后一个绑定(队列或者exchange)被unbind之后,该exchange自动被删除。
  • Internal: 是否是内部专用exchange,是的话,就意味着我们不能往该exchange里面发消息。
  • Arguments: 参数,是AMQP协议留给AMQP实现做扩展使用的。
  • alternate_exchange配置的时候,exchange根据路由路由不到对应的队列的时候,这时候消息被路由到指定的alternate_exchange的value值配置的exchange上。

3.6 Exchange类型

(1).Direct exchange

将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。

(2).Topic Exchange

将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。

(3).Fanout Exchange

直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key,所以fanout交换机也是转消息最快的。

(4).Headers Exchange

将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中。

3.7 Queue(队列)

消息队列载体,用于存储消息,每个消息都会被投入到一个或多个队列。消息消费者就是通过订阅队列来获取消息,多个消费者可订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有消息。

3.8 Binding(绑定)

绑定,它的作用就是把Exchange和Queue按照路由规则绑定起来。

3.9 Routing Key(路由键)

消息发送给Exchange(交换机)时,消息将拥有一个路由键(默认为空),Exchange(交换机)根据这个路由键将消息发送到匹配的队列中。

3.10 Binding Key(绑定键)

3.11 Virtual Hosts

虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

4. RabbitMQ队列模式

所有实例采用PHP实现,引用包php-amqplib,引用类:

1
2
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

完整代码:https://github.com/heyuan110/laravel-rabbitmq

4.1 简单队列模式

功能:一个生产者P发送消息到队列,一个消费者C消费

生产者P代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//https://www.rabbitmq.com/tutorials/tutorial-one-php.html
public function publishSimpleMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();
//declare a queue for us to send to; then we can publish a message to the queue
$channel->queue_declare('hello',false,false,false,false);

//create a message, publish it to the queue
$msg = new AMQPMessage("hello world");
$channel->basic_publish($msg,'','hello');
echo " [x] Sent 'Hello World!'\n";
Log::info(" [x] Sent 'Hello World!'\n");

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建队列
(4). 创建消息
(5). 使用信道向队列发送消息
(6). 关闭信道,关闭连接

消费者C代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public function consumeSimpleMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();
//declare a queue for us to send to; then we can publish a message to the queue
$channel->queue_declare('hello',false,false,false,false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg){
echo '[x] Received ' . $msg->body . '\n';
Log::info('[x] Received ' . $msg->body . '\n');
};
$channel->basic_consume('hello','',false,true,false,false,$callback);
while($channel->is_consuming()){
$channel->wait();
}

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建队列
(4). 创建消费者监听队列,从队列中读取消息
(5). 关闭信道,关闭连接

4.2 工作队列模式

功能:一个生产者P发送消息到队列,多个消费者C消费,每个消费者获取到的消息唯一,多个消费者只有一个队列。

工作队列:设计思路是为了避免资源密集型任务立即执行,并必须等待它完成,相反安排任务稍后执行。将任务封装成消息,后台运行的消费者进程将获取任务并最终执行,多个消费者运行时,它们之间共享任务。

生产者P代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//https://www.rabbitmq.com/tutorials/tutorial-two-php.html
public function produceWorkMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();
//declare a queue for us to send to; then we can publish a message to the queue
//set queue durable is ture,make sure that messages aren't lost
$channel->queue_declare('hello',false,$durable=true,false,false);

//create a message, publish it to the queue
$data = $this->option('msg');
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(
$data,
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($msg,'','hello');
echo " [x] Sent '".$data."'\n";
Log::info(" [x] Sent '".$data."'\n");

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建队列,设置队列durable参数为true,确保在rabbitmq server停止时不丢失队列
(4). 创建消息,消息内容来自输入参数msg,设置delivery_mode为2,确保在rabbitmq server停止时不丢失消息
(5). 使用信道向队列发送消息
(6). 关闭信道,关闭连接

与simple队列相比,我们增加了队列和消息的持久化,确保rabbitmq server服务器停止时,队列和消息都不会丢失

消费者C代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public function consumeWorkMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();
//declare a queue for us to send to; then we can publish a message to the queue
//set queue durable is ture,make sure that messages aren't lost
$channel->queue_declare('hello',false,$durable=true,false,false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg){
echo '[x] Received ' . $msg->body . '\n';
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
//ack notification
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
//open ack
$channel->basic_consume('hello','',false,$no_ack=false,false,false,$callback);
while($channel->is_consuming()){
$channel->wait();
}

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建队列
(4). 设置basic_qos值为1,告诉RabbitMQ在处理并确认前一个消息之前,不要向消费者发送新消息
(5). 创建消费者监听队列,从队列中读取消息,为了确保没有消息丢失,开启了ack机制(basic_consume方法第四个参数),在消息处理完成后回调basic_ack通知rabbitmq。
(6). 关闭信道,关闭连接

与simple队列相比,增加了消息处理完成ack确认,避免由于消费者意外中断导致消息未正确执行完成而丢失。多个消费者C时可能出现一些消费者特别忙,一些特别闲,但是这种情况RabbitMQ并不知道,还是均匀的发送消息(原因:This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn’t look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.),这样明显是不合理的,基于调度公平原则,设置basic_qos参数为1,直到Rabbitmq收到上次消息完成的确认再推送新消息给此消费者。

4.3 发布/订阅模式

功能:一个生产者P发送的消息会被多个消费者C消费。一个生产者,一个交换机,多个队列,多个消费者。

RabbitMQ种消息传递模型的核心思想是生产者永远不会将消息直接发送到队列,生产者甚至都不知道消息是否被发送到了队列。在消息传递模型中,生产者只能向交换机发送消息,交换机必须要准确知道消息将如果转发,这个转发规则类型有:direct,topic,headers,fanout,在上面的exchange type里已经详细解释。

4.1和4.2里我们没有提到交换机,为什么也可以给队列发消息?因为basic_publish在发送消息时其实用了默认的交换机(第二个参数设置为’’),设置为默认交换机时,消息会被路由到routing_key的队列(如果队列存在)。

如果消息发送到没有绑定队列的交换机,消息将会丢失,交换机本身不存储消息,消息存储在队列中.

生产者P代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// https://www.rabbitmq.com/tutorials/tutorial-three-php.html
public function publishMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();

//create a exchange logs, type:fanout
$channel->exchange_declare('logs', 'fanout', false, false, false);

//create a message, publish it to the queue
$data = $this->option('msg');
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(date("Y-m-d H:i:s ").$data);

//send a message to exchange logs
$channel->basic_publish($msg,'logs');

echo " [x] Sent '".$data."'\n";

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建交换机,设置交换机类型为fanout(交换机接收的所有消息无差别转发到绑定队列)
(4). 创建消息,消息内容来自输入参数msg
(5). 使用信道向交换机发送消息,注意在这里我们没有设置routing_key,也没有创建任何队列
(6). 关闭信道,关闭连接

消费者C代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public function subscribekMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();

//create a exchange logs, type:fanout
$channel->exchange_declare('logs', 'fanout', false, false, false);

//declare a temporary queue for us to send to; the temporary queue is auto delete when the consumer disconnect
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

//bind queue to exchange logs
$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
};

//create consume
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while($channel->is_consuming()){
$channel->wait();
}

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建交换机,设置交换机类型为fanout(交换机接收的所有消息无差别转发到绑定队列)
(4). 使用信道创建临时队列,队列名字设置为空,服务器会自动创建随机队列名称(大概类似:amq.gen-RAkn-INGuKex4JMNmoTZDA),一旦订阅消费者端口,临时队列会自动删除。返回队列名称$queue_name备用。
(5). 绑定队列交换机,让发送到交换机的消息都转发到队列
(6). 创建消费者监听队列,从队列中读取消息。
(7). 关闭信道,关闭连接

这个DEMO可以很好帮我们理解发布订阅模式,在Console里多启动几个订阅消费者,观察rabbitmq里队列会发现多了几个临时队列,如果终止它们随即会删除。

4.4 路由模式

功能:生产者P发送的消息到交换机并指定routing_key,消费者将队列绑定到交换机时需要指定路由key。

生产者P代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//https://www.rabbitmq.com/tutorials/tutorial-four-php.html
public function publishMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();

//create a exchange logs, type:direct
$exchange_name = 'direct_logs';
$echange_type = 'direct';
$channel->exchange_declare($exchange_name, $echange_type, false, false, false);

//get routing key
$routing_key = $this->option('level');
if (empty($routing_key)) {
$routing_key = "info";
}

//create a message, publish it to the queue
$data = $this->option('msg');
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(date("Y-m-d H:i:s ").$data);

//send a message to exchange logs
$channel->basic_publish($msg,$exchange_name,$routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建交换机,设置交换机类型为direct(根据routing_key转发消息)
(4). 创建消息,消息内容来自输入参数msg
(5). 使用信道向交换机发送消息,注意设置了routing_key
(6). 关闭信道,关闭连接

消费者C代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public function subscribekMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();

//create a exchange logs, type:fanout
$exchange_name = 'direct_logs';
$echange_type = 'direct';
$channel->exchange_declare($exchange_name, $echange_type, false, false, false);

//declare a temporary queue for us to send to; the temporary queue is auto delete when the consumer disconnect
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$level = $this->option('level');
$levels = [];
if (empty($level)) {
$levels = ['info'];
}else{
$levels = explode(',',$level);
}
//bind queue to exchange logs
foreach ($levels as $routing_key) {
$channel->queue_bind($queue_name,$exchange_name, $routing_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

//create consume
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while($channel->is_consuming()){
$channel->wait();
}

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建交换机,设置交换机类型为direct(根据routing_key转发消息)
(4). 使用信道创建临时队列,队列名字设置为空,服务器会自动创建随机队列名称(大概类似:amq.gen-RAkn-INGuKex4JMNmoTZDA),一旦订阅消费者端口,临时队列会自动删除。返回队列名称$queue_name备用。
(5). 根据输入的routing_key绑定队列交换机,让发送到交换机的消息根据routing_key转发到队列
(6). 创建消费者监听队列,从队列中读取消息。
(7). 关闭信道,关闭连接

试着运行php artisan consume:routing_mq --level="info,error,warnning"php artisan consume:routing_mq --level="info",然后在生产者端输入不同的level看看,例如:
php artisan produce:routing_mq --level=error --msg="error:test data"php artisan produce:routing_mq --level=info --msg="info:test data".

4.5 通配符模式

功能:生产者P发送的消息到交换机并指定routing_key,并设置类型为topic,消费者将队列绑定到交换机时根据routing_key的值进行通配符匹配。

生产者P代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//https://www.rabbitmq.com/tutorials/tutorial-five-php.html
public function publishMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();

//create a exchange logs, type:topic
$exchange_name = 'topic_logs';
$echange_type = 'topic';
$channel->exchange_declare($exchange_name, $echange_type, false, false, false);

//get routing key
$routing_key = $this->option('routing_key');
if (empty($routing_key)) {
$routing_key = "anonymous.info";
}

//create a message, publish it to the queue
$data = $this->option('msg');
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(date("Y-m-d H:i:s ").$data);

//send a message to exchange logs
$channel->basic_publish($msg,$exchange_name,$routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建交换机,设置交换机类型为topic(根据routing_key转发消息),注意和4.4里类型不一样。
(4). 创建消息,消息内容来自输入参数msg
(5). 使用信道向交换机发送消息,注意设置了routing_key
(6). 关闭信道,关闭连接

消费者C代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public function subscribekMQ()
{
//create a connection
$connection = new AMQPStreamConnection('172.16.166.130','5672','guest','guest','/');
//create a channel
$channel = $connection->channel();

//create a exchange logs, type:topic
$exchange_name = 'topic_logs';
$echange_type = 'topic';
$channel->exchange_declare($exchange_name, $echange_type, false, false, false);

//declare a temporary queue for us to send to; the temporary queue is auto delete when the consumer disconnect
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_key = $this->option('binding_key');
$binding_keys = [];
if (empty($binding_key)) {
$binding_keys = ['info'];
}else{
$binding_keys = explode(',',$binding_key);
}
//bind queue to exchange logs
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name,$exchange_name, $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

//create consume
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while($channel->is_consuming()){
$channel->wait();
}

//close channel, close connection
$channel->close();
$connection->close();
}

过程:
(1). 创建连接,设置服务地址,端口,账号,vhost
(2). 使用连接创建信道
(3). 使用信道创建交换机,设置交换机类型为topic(根据routing_key转发消息)
(4). 使用信道创建临时队列,队列名字设置为空,服务器会自动创建随机队列名称(大概类似:amq.gen-RAkn-INGuKex4JMNmoTZDA),一旦订阅消费者端口,临时队列会自动删除。返回队列名称$queue_name备用。
(5). 根据输入的routing_key规则绑定队列交换机,让发送到交换机的消息根据routing_key通配符规则转发到队列
(6). 创建消费者监听队列,从队列中读取消息。
(7). 关闭信道,关闭连接

通配符模式下生产者和消费者的routing_key通配符必须是.号分隔的单词列表,例如生产者的routing_key设置为lazy.white.rabbit,与消费者routing_key设置为lazy.*.rabbit的队列匹配。

替换规则(注意代替的是单词,不是字符):

  • *(星号)代替一个单词,
  • #(井号)代替零个或多个单词

当topic的routing_key设置为一个”#”时,会接收所有消息,类似fanout交换机类型。
当topic的routing_key不包含通配符*或#时,类似direct交换机类型。

先运行消费者:

1
2
3
4
5
6
php artisan consume:topic_mq --binding_key="*.white.dog"
php artisan consume:topic_mq --binding_key="test-key"
php artisan consume:topic_mq --binding_key="lazy.*.rabbit"
php artisan consume:topic_mq --binding_key="lazy.#"
php artisan consume:topic_mq --binding_key="*"
php artisan consume:topic_mq --binding_key="#"

随后运行生产者观察消费者端输出

1
2
3
4
5
6
php artisan produce:topic_mq --routing_key="quick.red.fox" --msg="test1"
php artisan produce:topic_mq --routing_key="lazy.red.fox" --msg="test2"
php artisan produce:topic_mq --routing_key="lazy.red.fox.test" --msg="test3"
php artisan produce:topic_mq --routing_key="quick.white.dog" --msg="test4"
php artisan produce:topic_mq --routing_key="test-key" --msg="test5"
php artisan produce:topic_mq --routing_key="test" --msg="test6"

通过上面生产和消费消息演示,可以快速加深理解topic通配符模式。

4.6 RPC模式

RPC工作过程:

  • 当客户端启动时,创建一个匿名的独占回调队列
  • 对于RPC请求,客户端发送带有两个属性的消息,reply_to设置为回调队列,correlation_id设置为每个请求的唯一值。
  • 请求发送到rpc_queue队列
  • RPC服务端获取到rpc_queue队列上的请求后,执行完成,将结果通过reply_to设置的队列返回给客户端
  • 客户端收到消息后,检查消息的correlation_id属性,如果和请求中的值匹配,则将响应返回给应用程序。

在AMQP0-9-1协议中消息带有14个属性,大多数很少使用,但下面几个需要了解:

  1. delivery_mode:设置为2表示消息持久化,设置为1表示临时消息
  2. content_type: 消息类型,例如json格式的可设置为:application / json
  3. reply_to: 通常用于命名回调队列
  4. correlation_id: 将RPC的请求和响应关联起来

本节参考:https://www.rabbitmq.com/tutorials/tutorial-six-php.html

ClientServer代码查看。

5. RabbitMQ安装

5.1 Docker安装

5.2 本地安装