RabbitMQ使用问题及高可用方案

RabbitMQ使用问题及高可用方案

应用问题

消息可靠性保障

如何100%确保消息发送成功

  • RabbitMQ特性

    • 生产者消息确认

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      $channel = $connection->channel();
      $channel->set_ack_handler(
      function (AMQPMessage $message) {
      echo 'Message acked with content ' . $message->body . PHP_EOL;
      }
      );

      $channel->set_nack_handler(
      function (AMQPMessage $message) {
      echo 'Message nacked with content ' . $message->body . PHP_EOL;
      }
      );

      $channel->set_return_listener(
      function ($replyCode, $replyText, $exchange, $routingKey, AMQPMessage $message) {
      echo 'Message returned with content ' . $message->body . PHP_EOL;
      }
      );
    • 消息持久化

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      $channel = $connection->channel();
      /*
      name: $queue
      passive: false
      durable: true // the queue will survive server restarts
      exclusive: false // the queue can be accessed in other channels
      auto_delete: false //the queue won't be deleted once the channel is closed.
      */
      # 队列持久化
      $channel->queue_declare($queue, false, true, false, false);
      /*
      name: $exchange
      type: direct
      passive: false
      durable: true // the exchange will survive server restarts
      auto_delete: false //the exchange won't be deleted once the channel is closed.
      */
      # 交换机持久化
      $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
      # 消息持久化
      $message = new AMQPMessage(
      $messageBody,
      array(
      'content_type' => 'text/plain',
      'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
      )
      );
      $channel->basic_publish($message, $exchange);
    • 消费者消息确认

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      /**
      * @param \PhpAmqpLib\Message\AMQPMessage $message
      */
      function process_message($message)
      {
      echo "\n--------\n";
      echo $message->body;
      echo "\n--------\n";

      $message->ack();

      // Send a message with the string "quit" to cancel the consumer.
      if ($message->body === 'quit') {
      $message->getChannel()->basic_cancel($message->getConsumerTag());
      }
      }
      /*
      queue: Queue from where to get the messages
      consumer_tag: Consumer identifier
      no_local: Don't receive messages published by this consumer.
      no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
      exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
      nowait:
      callback: A PHP Callback
      */

      $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
    • 消费失败重试机制

      失败后返回nack, 尝试重试, 到达重试次数上限后, 投递入死信队列

      1
      2
      3
      4
      5
      6
      7
      8
      $channel = $connection->channel();

      $channel->queue_declare('test11', false, true, false, false, false,
      new AMQPTable(array(
      'x-dead-letter-exchange' => 't_test1',
      'x-message-ttl' => 15000,
      'x-expires' => 16000
      )));
  • 消息补偿

image-20220712141419805

消息幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指消费多条相同的消息,得到与消费该消息一次相同的结果。

乐观锁机制:

image-20220712141500430

延迟消息问题

利用TTL结合死信交换机,实现消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$channel = $connection->channel();

$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, new AMQPTable(array(
'x-delayed-type' => AMQPExchangeType::FANOUT
)));

$channel->queue_declare('delayed_queue', false, false, false, false, false, new AMQPTable(array(
'x-dead-letter-exchange' => 'delayed'
)));

$headers = new AMQPTable(array('x-delay' => 7000));
$message = new AMQPMessage('hello', array('delivery_mode' => 2));
$message->set('application_headers', $headers);
$channel->basic_publish($message, 'delayed_exchange');

消息堆积问题

  • 队列上绑定多个消费者,提高消费速度

  • 使用惰性队列,可以在mq中保存更多消息

    从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

    • 接收到消息后直接存入磁盘而非内存
    • 消费者要消费消息时才会从磁盘中读取并加载到内存
    • 支持数百万条的消息存储
    1
    文档: https://www.rabbitmq.com/lazy-queues.html

    基于命令行设置lazy-queue:

    要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

    1
    rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
    1
    2
    3
    4
    5
    6
    7
    8
    命令解读:
    - rabbitmqctl :RabbitMQ的命令行工具
    - set_policy :添加一个策略
    - Lazy :策略名称,可以自定义
    - "^lazy-queue$" :用正则表达式匹配队列的名字
    - '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
    - --apply-to queues:策略的作用对象,是所有的队列

    优点: 基于磁盘存储,消息上限高, 没有间歇性的page-out,性能比较稳定

    缺点: 基于磁盘存储,消息时效性会降低, 性能受限于磁盘的IO

高可用问题

RabbitMQ的集群有两种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。

    普通集群,或者叫标准集群(classic cluster),具备下列特征:

    • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
    • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
    • 队列所在节点宕机,队列中的消息就会丢失
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

    镜像集群:本质是主从模式,具备下面的特征:

    • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
    • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
    • 一个队列的主节点可能是另一个队列的镜像节点
    • 所有操作都是主节点完成,然后同步给镜像节点
    • 主宕机后,镜像节点会替代成新的主

    镜像模式的配置有3种模式:

    ha-mode ha-params 效果
    准确模式exactly 队列的副本量count 集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
    all (none) 队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
    nodes node names 指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    exactly模式
    rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

    - rabbitmqctl set_policy:固定写法
    - ha-two:策略名称,自定义
    - "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称
    - '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
    - "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
    - "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像
    - "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销
    1
    2
    3
    4
    5
    6
    7
    all模式
    rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'

    - ha-all:策略名称,自定义
    - "^all\.":匹配所有以all.开头的队列名
    - '{"ha-mode":"all"}':策略内容
    - "ha-mode":"all":策略模式,此处是all模式,即所有节点都为镜像节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    nodes模式
    rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

    - rabbitmqctl set_policy:固定写法
    - ha-nodes:策略名称,自定义
    - "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称
    - '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
    - "ha-mode":"nodes":策略模式,此处是nodes模式
    - "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称
  • 仲裁队列

    镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

    具备下列特征:

    • 与镜像队列一样,都是主从模式,支持主从数据同步
    • 使用非常简单,没有复杂的配置
    • 主从同步基于Raft协议,强一致

    image-20210717234329640

集群搭建

集群方案的原理

RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

1565245219265

单机多实例部署

1
官方文档: https://www.rabbitmq.com/clustering.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 停止rabbitmq服务
service rabbitmq-server stop
# 启动第一个节点:
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
# 启动第二个节点:
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start

# rabbit1操作作为主节点
rabbitmqctl -n rabbit1 stop_app
rabbitmqctl -n rabbit1 reset
rabbitmqctl -n rabbit1 start_app

# rabbit2操作为从节点:
rabbitmqctl -n rabbit2 stop_app
rabbitmqctl -n rabbit2 reset
rabbitmqctl -n rabbit2 join_cluster rabbit1@'ni9nes' # ''内是主机名
rabbitmqctl -n rabbit2 start_app

# 查看集群状态:
rabbitmqctl cluster_status -n rabbit1

image-20220712142716093

image-20220712142727735

集群管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rabbitmqctl join_cluster {cluster_node} [–ram]
将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。

rabbitmqctl cluster_status
显示集群的状态。

rabbitmqctl change_cluster_node_type {disc|ram}
修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。

rabbitmqctl forget_cluster_node [–offline]
将节点从集群中删除,允许离线执行。

rabbitmqctl update_cluster_nodes {clusternode}
在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。

RabbitMQ镜像集群配置

上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列

镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列。设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。

1
sudo rabbitmqctl -n rabbit1@'ni9nes'  set_policy my_ha_proxy "^" '{"ha-mode":"all"}'

image-20220712143050345

1
2
3
- Name:策略名称
- Pattern:匹配的规则,如果是匹配所有的队列,是^.
- Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档。

负载均衡-HAProxy

HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。

安装HAProxy

1
2
3
4
5
6
7
8
9
10
11
12
13
wget https://www.haproxy.org/download/1.6/src/haproxy-1.6.16.tar.gz
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
# 进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=custom PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
# 赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
# 创建haproxy配置文件
mkdir /etc/haproxy
vim /etc/haproxy/haproxy.cfg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#logging options
global
log 127.0.0.1 local0 info
maxconn 5120
chroot /usr/local/haproxy
uid 99
gid 99
daemon
quiet
nbproc 20
pidfile /var/run/haproxy.pid

defaults
log global
mode tcp
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s
clitimeout 60s
srvtimeout 15s
#front-end IP for consumers and producters

listen rabbitmq_cluster
bind 0.0.0.0:5672
mode tcp
#balance url_param userid
#balance url_param session_id check_post 64
#balance hdr(User-Agent)
#balance hdr(host)
#balance hdr(Host) use_domain_only
#balance rdp-cookie
#balance leastconn
#balance source //ip
balance roundrobin
server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2

listen stats
bind 192.168.253.141:8100
mode http
option httplog
stats enable
stats uri /rabbitmq-stats
stats refresh 5s

启动HAproxy负载

1
2
3
4
5
6
7
8
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
# 查看haproxy进程状态
ps -ef | grep haproxy

#访问如下地址对mq节点进行监控
#http://192.168.253.141:8100/rabbitmq-stats

# 代码中访问mq集群地址,则变为访问haproxy地址:5672

image-20220712144941078