RabbitMQ使用问题及高可用方案
RabbitMQ使用问题及高可用方案
应用问题
消息可靠性保障
如何100%确保消息发送成功
RabbitMQ特性
生产者消息确认
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18$channel = $connection->channel();
$channel->set_ack_handler(
function (AMQPMessage $message) {
echo 'Message acked with content ' . $message->body . PHP_EOL;
}
);
$channel->set_nack_handler(
function (AMQPMessage $message) {
echo 'Message nacked with content ' . $message->body . PHP_EOL;
}
);
$channel->set_return_listener(
function ($replyCode, $replyText, $exchange, $routingKey, AMQPMessage $message) {
echo 'Message returned with content ' . $message->body . PHP_EOL;
}
);消息持久化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28$channel = $connection->channel();
/*
name: $queue
passive: false
durable: true // the queue will survive server restarts
exclusive: false // the queue can be accessed in other channels
auto_delete: false //the queue won't be deleted once the channel is closed.
*/
# 队列持久化
$channel->queue_declare($queue, false, true, false, false);
/*
name: $exchange
type: direct
passive: false
durable: true // the exchange will survive server restarts
auto_delete: false //the exchange won't be deleted once the channel is closed.
*/
# 交换机持久化
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
# 消息持久化
$message = new AMQPMessage(
$messageBody,
array(
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
)
);
$channel->basic_publish($message, $exchange);消费者消息确认
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27/**
* @param \PhpAmqpLib\Message\AMQPMessage $message
*/
function process_message($message)
{
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->ack();
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->getChannel()->basic_cancel($message->getConsumerTag());
}
}
/*
queue: Queue from where to get the messages
consumer_tag: Consumer identifier
no_local: Don't receive messages published by this consumer.
no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
*/
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');消费失败重试机制
失败后返回nack, 尝试重试, 到达重试次数上限后, 投递入死信队列
1
2
3
4
5
6
7
8$channel = $connection->channel();
$channel->queue_declare('test11', false, true, false, false, false,
new AMQPTable(array(
'x-dead-letter-exchange' => 't_test1',
'x-message-ttl' => 15000,
'x-expires' => 16000
)));
消息补偿

消息幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指消费多条相同的消息,得到与消费该消息一次相同的结果。
乐观锁机制:

延迟消息问题
利用TTL结合死信交换机,实现消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html
1 | $channel = $connection->channel(); |
消息堆积问题
队列上绑定多个消费者,提高消费速度
使用惰性队列,可以在mq中保存更多消息
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
1
文档: https://www.rabbitmq.com/lazy-queues.html
基于命令行设置lazy-queue:
要设置一个队列为惰性队列,只需要在声明队列时,指定
x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:1
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
1
2
3
4
5
6
7
8命令解读:
- rabbitmqctl :RabbitMQ的命令行工具
- set_policy :添加一个策略
- Lazy :策略名称,可以自定义
- "^lazy-queue$" :用正则表达式匹配队列的名字
- '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
- --apply-to queues:策略的作用对象,是所有的队列优点: 基于磁盘存储,消息上限高, 没有间歇性的page-out,性能比较稳定
缺点: 基于磁盘存储,消息时效性会降低, 性能受限于磁盘的IO
高可用问题
RabbitMQ的集群有两种模式:
普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
普通集群,或者叫标准集群(classic cluster),具备下列特征:
- 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失
镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
镜像集群:本质是主从模式,具备下面的特征:
- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主
镜像模式的配置有3种模式:
ha-mode ha-params 效果 准确模式exactly 队列的副本量count 集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。 all (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。 nodes node names 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。 1
2
3
4
5
6
7
8
9
10exactly模式
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
- rabbitmqctl set_policy:固定写法
- ha-two:策略名称,自定义
- "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称
- '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
- "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
- "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像
- "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销1
2
3
4
5
6
7all模式
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
- ha-all:策略名称,自定义
- "^all\.":匹配所有以all.开头的队列名
- '{"ha-mode":"all"}':策略内容
- "ha-mode":"all":策略模式,此处是all模式,即所有节点都为镜像节点1
2
3
4
5
6
7
8
9nodes模式
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
- rabbitmqctl set_policy:固定写法
- ha-nodes:策略名称,自定义
- "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称
- '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
- "ha-mode":"nodes":策略模式,此处是nodes模式
- "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称仲裁队列
镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。
具备下列特征:
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致

集群搭建
集群方案的原理
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

单机多实例部署
1 | 官方文档: https://www.rabbitmq.com/clustering.html |
1 | 停止rabbitmq服务 |


集群管理
1 | rabbitmqctl join_cluster {cluster_node} [–ram] |
RabbitMQ镜像集群配置
上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。
镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列。设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。
1 | sudo rabbitmqctl -n rabbit1@'ni9nes' set_policy my_ha_proxy "^" '{"ha-mode":"all"}' |

1 | - Name:策略名称 |
负载均衡-HAProxy
HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。
安装HAProxy
1 | wget https://www.haproxy.org/download/1.6/src/haproxy-1.6.16.tar.gz |
1 | #logging options |
启动HAproxy负载
1 | /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg |
