RabbitMQ队列使用解析

MQ基本概念

MQ概述

消息队列(Message Queue), 是在消息的传输过程中保存消息的容器. 多用于分布式系统之间的通信

image-20220709125842485

发送方称为生产者,接收方称为消费者

分布式系统通信两种方式:直接远程调用 和 借助第三方 完成间接通信

MQ 的优势和劣势

MQ 的优势

  • 应用解耦

    系统的耦合性越高,容错性就越低,可维护性就越低。

    使用 MQ 使得应用间解耦,提升容错性和可维护性。

  • 异步提速

    提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

    image-20220709130316032

  • 削峰填谷

    image-20220709130427441

    使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了.

    但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

MQ 的劣势

image-20220709130614372

  • 系统可用性降低

    系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

  • 系统复杂度提高

MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

  • 一致性问题

    A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

使用 MQ 需要满足的条件

  • 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
  • 容许短暂的不一致性。
  • 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

常见的 MQ 产品

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP, STOMP OpenWire,STOMP, REST,XMPP,AMQP 自定义 自定义协议,社区封装了http协议支持
客户端支持语言 官方支持Erlang,Java, Ruby等,社区产出多种API,几乎支持所有语言 Java,C,C++, Python,PHP, Perl,.net等 Java,C++(不成熟 官方支持Java,社区产出 多种API,如PHP, Python等

RabbitMQ 简介

1
官网: https://www.rabbitmq.com

AMQP协议

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

基础架构

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。
Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

RabbitMQ 基础架构如下图:

image-20220709131308174

1
2
3
4
5
6
7
8
9
10
11
12
13
14
相关概念:
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

- Connection:publisher/consumer 和 broker 之间的 TCP 连接

- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection极大减少了操作系统建立 TCP connection 的开销

- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

- Queue:消息最终被送到这里等待 consumer 取走

- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

工作模式

RabbitMQ 提供了 6 种工作模式

简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

1
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html

image-20220709131840455

RabbitMQ 的安装和配置

ubuntu 18.04安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/sh

sudo apt-get install curl gnupg apt-transport-https -y

## Team RabbitMQ's main signing key
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
## Launchpad PPA that provides modern Erlang releases
curl -1sLf "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xf77f1eda57ebb1cc" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg > /dev/null
## PackageCloud RabbitMQ repository
curl -1sLf "https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/io.packagecloud.rabbitmq.gpg > /dev/null

## Add apt repositories maintained by Team RabbitMQ
sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
## Provides modern Erlang/OTP releases
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb [signed-by=/usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg] http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main
deb-src [signed-by=/usr/share/keyrings/net.launchpad.ppa.rabbitmq.erlang.gpg] http://ppa.launchpad.net/rabbitmq/rabbitmq-erlang/ubuntu bionic main

## Provides RabbitMQ
##
## "bionic" as distribution name should work for any reasonably recent Ubuntu or Debian release.
## See the release to distribution mapping table in RabbitMQ doc guides to learn more.
deb [signed-by=/usr/share/keyrings/io.packagecloud.rabbitmq.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
deb-src [signed-by=/usr/share/keyrings/io.packagecloud.rabbitmq.gpg] https://packagecloud.io/rabbitmq/rabbitmq-server/ubuntu/ bionic main
EOF

## Update package indices
sudo apt-get update -y

## Install Erlang packages
sudo apt-get install -y erlang-base \
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \
erlang-runtime-tools erlang-snmp erlang-ssl \
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

## Install rabbitmq-server and its dependencies
sudo apt-get install rabbitmq-server -y --fix-missing

下面是应该与 RabbitMQ apt 存储库一起使用的操作系统版本和分发名称表

Release Distribution
Ubuntu 20.04 focal
Ubuntu 18.04 bionic
Debian Buster buster
Debian Bullseye bullseye
Debian Sid bullseye

开启管理界面及配置

1
2
3
4
5
6
# 开启管理界面
$ sudo rabbitmq-plugins enable rabbitmq_management
# 修改默认配置信息
$ sudo vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
$ sudo vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.5/plugins/rabbit-3.10.5/ebin/rabbit.app
# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest

控制

1
2
3
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务

配置文件

1
$ cp /usr/share/doc/rabbitmq-server-3.6.5/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

登陆管理页面

默认账密: guest guest

image-20220709152518724

RabbitMQ 快速入门

window_wamp开发环境搭建

  • 下载扩展

    1
    https://pecl.php.net/package/amqp
  • 移动扩展文件

    1
    2
    复制文件 php_amqp.dll + php_amqp.pdb 到 D:\wamp64\bin\php\php7.3.21\ext\
    复制文件 rabbitmq.4.dll + rabbitmq.4.pdb 到 D:\wamp64\bin\php\php7.3.21
  • 修改配置文件

    1
    2
    3
    4
    5
    D:\wamp64\bin\php\php7.3.21\phpForApache.ini
    添加行 : extension=php_amqp.dll

    D:\wamp64\bin\apache\apache2.4.46\conf\httpd.conf
    添加行: LoadFile "D:/wamp64/bin/php/php7.3.21/rabbitmq.4.dll"
  • 重启服务

    image-20220709154449199

测试用例

1
2
3
官方文档: 使用php-amqplib/php-amqplib
https://www.rabbitmq.com/tutorials/tutorial-one-php.html
https://github.com/php-amqplib/php-amqplib/tree/master/demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
# producer.php
$conn_args = [
'host' => '192.168.253.141',
'port' => 5672,
'login' => 'ni9ne',
'password' => 'ni9ne',
'vhost' => '/ni9ne'
];

$conn = new AMQPConnection($conn_args);
try {
if (!$conn->connect()) die('failed to connect to AMQP');
} catch (AMQPConnectionException $e) {
die($e->getMessage());
}

$exchange_name = 'e_ni9ne';
// $queue_name = 'q_ni9ne';
$route_key = 'key_1';

$channel = new AMQPChannel($conn);
// 创建交换机
$exchange = new AMQPExchange($channel);
$exchange->setName($exchange_name);
date_default_timezone_set('Asia/Shanghai');

for ($i = 0; $i < 100; $i ++){
usleep(100);
$msg = "This is A Message From Producer!" . date("H:i:s");
$result = $exchange->publish($msg, $route_key);
print("Publish Message: ". $result. '; time: ' . $i);
print('<br/>');
}

$conn->disconnect();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?php
# consumer.php
$conn_args = [
'host' => '192.168.253.141',
'port' => 5672,
'login' => 'ni9ne',
'password' => 'ni9ne',
'vhost' => '/ni9ne'
];

$conn = new AMQPConnection($conn_args);

try {
if (!$conn->connect()) die('failed to connect to AMQP');
} catch (AMQPConnectionException $e) {
die($e->getMessage());
}

$exchange_name = 'e_ni9ne';
$queue_name = 'q_ni9ne';
$route_key = 'key_1';

$channel = new AMQPChannel($conn);

// 创建交换机
$exchange = new AMQPExchange($channel);
$exchange->setName($exchange_name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
print("Exchange Status: " . $exchange->declareExchange());
print('<br/>');


// 创建队列
$queue = new AMQPQueue($channel);
$queue->setName($queue_name);
$queue->setFlags(AMQP_DURABLE);
print("Queue Message Total: " . $queue->declareQueue());
print('<br/>');


// 绑定交换机与队列, 并指定路由键
$bind = $queue->bind($exchange_name, $route_key);
print("Queue Bind: ". $bind);
print('<br/>');


// 阻塞模式接收消息处理
print("Messages: ");

$queue->consume('handleMessage');
# 自动应答
# $queue->consume('handleMessage',AMQP_AUTOACK);
$conn->disconnect();

function handleMessage(AMQPEnvelope $envelope, AMQPQueue $queue){
$msg = $envelope->getBody();
print($msg);
print('<br/>');
file_put_contents('msg.txt', "消息体: " . $msg . PHP_EOL, FILE_APPEND);
$queue->ack($envelope->getDeliveryTag());
}

image-20220712082828313

RabbitMQ 的工作模式

Work queues 工作队列模式

image-20220712095617691

支持多个消费者, 共同消费同一个队列中的消息。对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
# worker.php 生产者
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done\n";
$msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
$channel->wait();
}

$channel->close();
$connection->close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
# new_task.php 消费者
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage(
$data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

Pub/Sub 订阅模式

image-20220712100007813

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

  • C:消费者,消息的接收者,会一直等待消息到来

  • Queue:消息队列,接收消息、缓存消息

  • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,指导如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。如何操作,取决于Exchange的类型。

Exchange有常见以下3种类型

  • Fanout:广播,将消息交给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key 的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列Queue与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
# emit_log.php 生产者
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "info: Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
# receive_logs.php 消费者
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
$channel->wait();
}

$channel->close();
$connection->close();

Routing 路由模式

  • 队列与交换机的绑定,不是任意绑定,而是要指定一个 RoutingKey(路由key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

image-20220712105401217

1
2
3
4
P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
# emit_log_direct.php 生产者
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo ' [x] Sent ', $severity, ':', $data, "\n";

$channel->close();
$connection->close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
# receive_logs_direct.php 消费者
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
exit(1);
}

foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
$channel->wait();
}

$channel->close();
$connection->close();

Topics 通配符模式

  • Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符

  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

  • 通配符规则:# 匹配0个或多个单词,* 匹配1个单词,

    例如:item.# 能够匹配 item.insert.abc, 或者 item.insert.

    item.* 只能匹配 item.insert

image-20220712110100820

1
2
红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php
# emit_log_topic.php 生产者
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo ' [x] Sent ', $routing_key, ':', $data, "\n";

$channel->close();
$connection->close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
# receive_logs_topic.php 消费者
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}

foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while ($channel->is_open()) {
$channel->wait();
}

$channel->close();
$connection->close();

工作模式总结

  • 简单模式 HelloWorld

    一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。

  • 工作队列模式 Work Queue

    一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

  • 发布订阅模式 Publish/subscribe

    需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

  • 路由模式 Routing

    需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

  • 通配符模式 Topic

    需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

高级特性

消息可靠性投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 提供两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式

    此模式是作用在生产者的,开启了这个模式就可以知道消息有没有发送到 Exchange上。不管有没有发送到都会触发回调方法。

    • 消息成功投递到Exchange, 返回ack
    • 消息未投递到Exchange, 返回nack
  • return 退回模式

    此模式同样是作用在生产者端的,这个模式就是为了知道消息有没有发送到对应的 Queue上。如果发送到了对应的队列不会触发回调方法,如果没有发送到对应的队列才会触发回调方法。

    即消息投递到交换机了,但是没有路由到队列,返回ACK,及路由失败原因。

1
2
3
4
5
rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
- 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
- 消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递

Consumer ACK

AckAcknowledge确认。 表示消费端收到消息后的确认方式。有三种确认方式:

  • 自动确认:acknowledge="none"
  • 手动确认:acknowledge="manual"
  • 根据异常情况确认:acknowledge="auto"

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到 (AMQP_AUTOACK),并将相应 message 从 RabbitMQ 的消息缓存中移除。

但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用ack(),手动签收。

如果出现异常,则调用nack()方法,让其自动重新发送消息。

1
2
3
4
5
6
7
8
即要达到消息的可靠性, 需要以下方面配合:
- 持久化
• Exchange要持久化
• Queue要持久化
• message要持久化
- 生产方确认Confirm
- 消费方确认Ack
- Broker高可用

消费端限流

消息队列中囤积了大量的消息, 或者某些时刻生产的消息远远大于消费者处理能力的时候, 这个时候如果消费者一次取出大量的消息, 但是客户端又无法处理, 就会出现问题, 甚至可能导致服务崩溃, 所以需要对消费端进行限流

RabbitMQ提供了一种**qos(服务质量保证)**功能, 即在非自动确认消息的前提下, 如果一定数目的消息(通过consumer或者channel设置qos的值)未被确认前, 不进行消费新的消息

1
2
3
消费端的确认模式一定为手动确认。acknowledge="manual"
channel.setPrefetchSize : 消息大小限制, 一般设置为0, 消费端不做限制
channel.setPrefetchCount : 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息, 即一旦有N个消息还没有ack, 则该consumer将block(阻塞), 直到有消息ack

TTL

TTL 全称 Time To Live存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ 可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

1
2
3
4
5
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都进行了设置,以时间短的为准。

死信队列

死信队列,英文缩写:DLXDead Letter Exchange死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)

image-20220712135131320

消息成为死信的三种情况:

  1. 队列消息长度到达限制;
  2. 消费者拒接消费消息,Nack/Reject,并且不把消息重新放入原目标队列,requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费

延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

1
2
3
如需求:
- 下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候。

可以使用 TTL+死信队列 组合实现延迟队列的效果。

image-20220712135435128

也可以通过使用DelayExchange插件实现

1
2
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 文档: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

DelayExchange插件对官方原生的Exchange做了功能升级:

  • 将DelayExchange中接收到的消息暂存在内存中
  • 在DelayExchange中计时, 超时后才投递消息到队列

管理平台声明一个DelayExchange:

image-20220712185348243

日志与监控

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。

也可以通过web控制台/命令行监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 查看队列
rabbitmqctl list_queues

# 查看exchanges
rabbitmqctl list_exchanges

# 查看用户
rabbitmqctl list_users

# 查看连接
rabbitmqctl list_connections

# 查看消费者信息
rabbitmqctl list_consumers

# 查看环境变量
rabbitmqctl environment

# 查看未被确认的队列
rabbitmqctl list_queues name messages_unacknowledged

# 查看单个队列的内存使用
rabbitmqctl list_queues name memory

# 查看准备就绪的队列
rabbitmqctl list_queues name messages_ready

消息追踪

在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。

对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

在RabbitMQ中可以使用 Firehoserabbitmq_tracing插件 功能来实现消息追踪。

  • Firehose

    firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息 按照指定的格式发送到默认的exchange上。

    这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。

    发送到这个exchange上的消息的routing keypublish.exchangenamedeliver.queuename。其中exchangenamequeuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息

    image-20220712140547006

    1
    2
    3
    rabbitmqctl trace_on:开启Firehose命令
    rabbitmqctl trace_off:关闭Firehose命令
    注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
  • rabbitmq_tracing

    rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一
    层GUI的包装,更容易使用和管理。

    1
    启用插件:rabbitmq-plugins enable rabbitmq_tracing

    image-20220712141212675