RabbitMQ进阶之路:基于Keepalived+Haproxy+Quonum构建Rabbitmq高可用队列_rabbitmq+haproxy+keepalived
引言
在分布式系统中,消息队列作为解耦、流量削峰的关键中间件,确保消息队列的高可用至关重要。基于Keepalived+Haproxy+Quonum队列构建的高可用架构可以为应用提供可靠、高效的消息传递解决方案,极大提升系统稳定性与容错能力。
一、核心术语解释
1.1 Haproxy
- 定义:是一款开源、速度极快且可靠的反向代理软件,可为基于 TCP 和 HTTP 的应用程序提供高可用负载均衡和代理服务。
- 作用:将客户端请求依据多种算法分配到多个后端服务器,实现负载均衡。通过健康检查机制保障应用高可用性。
1.2 Keepalived
- 定义:基于VRRP(Virtual Router Redundancy Protocol)协议的高可用解决方案,用于管理VIP的故障切换。
- 作用:监控 Haproxy节点状态,主节点故障时自动将VIP切换到备用节点,实现无缝切换。
1.3 Quonum Queue
- 定义:仲裁队列是一种现代队列类型,它基于Raft共识算法实现了一个持久、高可用队列。
- 作用:保障数据安全,节点故障自动转移,提升系统容错性。
二、RabbitMQ高可用部署架构图
三、Haproxy安装与配置
在192.168.197.128,192.168.197.129上分别安装haproxy。
3.1 下载Haproxy安装包
官网:https://www.haproxy.org/
3.2 解压tar包
cd /usr/localtar -zvxf haproxy-3.2.3.tar.gz
3.3 安装依赖包
yum install -y gcc make pcre-devel openssl-devel zlib-devel
3.4 安装Haproxy
cd /usr/local/haproxy-3.2.3make TARGET=linux-glibc USE_OPENSSL=1 USE_PCRE=1 USE_ZLIB=1make install PREFIX=/usr/local/haproxy
3.5 创建用户和组
groupadd -r haproxyuseradd -r -g haproxy -s /sbin/nologin -M haproxy
3.6 创建Haproxy配置文件
vi /etc/haproxy/haproxy.cfg
配置RabbitMQ
global log /dev/log local0 log /dev/log local1 notice chroot /var/lib/haproxy stats socket /run/haproxy/admin.sock mode 660 level admin expose-fd listeners stats timeout 30s user haproxy group haproxy daemondefaults log global mode tcp option httplog option dontlognull timeout connect 5s timeout client 50s timeout server 50s#定义客户端请求入口frontend rabbitmq_front_amqp bind *:1672 mode tcp default_backend rabbitmq_back_amqp#定义客户端请求入口frontend rabbitmq_front_web bind *:11672 mode http default_backend rabbitmq_back_web#后端负载均衡配置backend rabbitmq_back_amqp mode tcp balance roundrobin option tcp-check tcp-check send PROTOCOL\\ 1 tcp-check expect string AMQP server ndoe1 192.168.197.128:5672 check inter 5000 rise 2 fall 3 weight 10 server ndoe2 192.168.197.129:5672 check inter 5000 rise 2 fall 3 weight 10 server ndoe3 192.168.197.130:5672 check inter 5000 rise 2 fall 3 weight 10 backup#后端负载均衡配置backend rabbitmq_back_web mode http balance roundrobin http-check expect status 200 server node1 192.168.197.128:15672 check inter 5000 rise 2 fall 3 weight 10 server node2 192.168.197.129:15672 check inter 5000 rise 2 fall 3 weight 10 server node3 192.168.197.130:15672 check inter 5000 rise 2 fall 3 weight 10 backup#配置haproxy监控界面 listen stats bind *:9000 mode http stats enable stats uri /monitor stats realm \"HAProxy Statistics\" stats auth admin:123456 stats refresh 5s stats hide-version stats show-legends stats show-desc
3.7 创建Haproxy相关目录并授权
mkdir -p /run/haproxymkdir -p /var/lib/haproxychown -R haproxy:haproxy /run/haproxy /var/lib/haproxychmod 750 /var/lib/haproxy /run/haproxy
3.8 授予 Haproxy 二进制文件 chroot 能力
sudo setcap \'cap_sys_chroot=+ep\' /usr/local/haproxy/sbin/haproxy
3.9 配置服务管理
vi /etc/systemd/system/haproxy.service
将以下内容复制到文件中
[Unit]Description=HAProxy Load BalancerAfter=network.target[Service]RuntimeDirectory=haproxyExecStart=/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg -p /run/haproxy/haproxy.pidExecReload=/bin/kill -USR2 $MAINPIDRestart=alwaysType=forkingUser=haproxyGroup=haproxyRestartSec=5sStartLimitInterval=5sStartLimitBurst=3# 确保有权限绑定低端口AmbientCapabilities=CAP_NET_BIND_SERVICE[Install]WantedBy=multi-user.target
3.10 启动Haproxy
systemctl daemon-reloadsystemctl restart haproxy
四、Keepalived安装与配置
在192.168。197.128,192.168.197.129上分别安装keepalived。
4.1 Keepalived安装
详见:https://blog.csdn.net/liangjiyong/article/details/147521410
4.2 Keepalived配置
主节点192.168.197.128配置(/etc/keepalived/keepalived.conf)
global_defs { router_id LVS_MASTER}vrrp_script chk_haproxy { script \"/usr/bin/pgrep haproxy\" # 检查 haproxy 进程是否存在 interval 2 # 检查间隔(秒) weight -20 # 检查失败时,节点优先级降低 20 fall 3 # 连续 3 次失败触发状态切换 rise 2}vrrp_instance VI_1 { state MASTER interface ens33 # 网卡名称 virtual_router_id 51 priority 100 # 主服务器优先级更高 advert_int 1 authentication { auth_type PASS auth_pass 1111 } virtual_ipaddress { 192.168.197.200/24 } track_script { chk_haproxy }}
从节点192.168.197.129配置(/etc/keepalived/keepalived.conf)
global_defs { router_id LVS_BACKUP}vrrp_script chk_haproxy { script \"/usr/bin/pgrep haproxy\" # 检查 haproxy 进程是否存在 interval 2 # 检查间隔(秒) weight -20 # 检查失败时,节点优先级降低 20 fall 3 # 连续 3 次失败触发状态切换 rise 2}vrrp_instance VI_1 { state BACKUP interface ens33 # 网卡名称 virtual_router_id 51 priority 90 # 主服务器优先级更高 advert_int 1 authentication { auth_type PASS auth_pass 1111 } virtual_ipaddress { 192.168.197.200/24 } track_script { chk_haproxy }}
五、RabbitMQ安装与配置
在192.168.197.128,192.168.197.129,192.168.197.130上安装rabbitmq
5.1 下载RabbitMQ,erlang安装包
rabbitmq下载地址:https://github.com/rabbitmq/rabbitmq-server/releases
erlang下载地址:https://github.com/rabbitmq/erlang-rpm/releases
5.2 安装erlang
rpm -ivh erlang-27.3.4-1.el7.x86_64.rpm
5.3 安装RabbitMQ
rpm -ivh rabbitmq-server-4.1.0-1.el8.noarch.rpm
5.4 开启RabbitMQ管理界面
rabbitmq-plugins enable rabbitmq_management
5.5 同步.erlang.cookie文件
同步128cookie文件到129,130
scp /var/lib/rabbitmq/.erlang.cookie root@192.168.197.129:/var/lib/rabbitmq/scp /var/lib/rabbitmq/.erlang.cookie root@192.168.197.130:/var/lib/rabbitmq/
5.6 配置主机名解析
# 分别在128,129,130三台服务器上执行vi /etc/hosts
192.168.197.128 node1192.168.197.130 node3192.168.197.129 node2
5.7 启动所有节点MQ服务
# 所有节点执行systemctl restart rabbitmq-server
5.8 将从节点加入主节点
在从节点上执行以下命令
# 停止 RabbitMQ 应用(保持 Erlang 进程运行)rabbitmqctl stop_app# 加入主节点集群rabbitmqctl join_cluster rabbit@node1# 启动 RabbitMQ 应用rabbitmqctl start_app
5.9 访问RabbitMQ管理界面
六、Springboot集成Quonum Queue
6.1 添加pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope></dependency><dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope></dependency>
6.2 添加RabbitMQ配置
spring.application.name=springboot-rabbitmq-quonumspring.rabbitmq.host=192.168.197.200spring.rabbitmq.port=1672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/
6.3 定义Quorum队列配置
package com.example.rabbitmqquonum.config;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitQuonumConfig { // 声明交换机名称 public static final String QUORUM_EXCHANGE = \"quorum.exchange\"; // 声明队列名称 public static final String QUORUM_QUEUE = \"quorum.queue\"; // 声明路由键 public static final String QUORUM_ROUTING_KEY = \"quorum.routing.key\"; /** * 声明Quorum队列 */ @Bean public Queue quorumQueue() { // 使用QueueBuilder构建Quorum队列 return QueueBuilder.durable(QUORUM_QUEUE) .quorum() // 指定为Quorum队列 .build(); } /** * 声明交换机 */ @Bean public DirectExchange quorumExchange() { return ExchangeBuilder.directExchange(QUORUM_EXCHANGE) .durable(true) .build(); } /** * 绑定队列和交换机 */ @Bean public Binding quorumBinding(Queue quorumQueue, DirectExchange quorumExchange) { return BindingBuilder.bind(quorumQueue) .to(quorumExchange) .with(QUORUM_ROUTING_KEY); }}
6.4 创建生产者
package com.example.rabbitmqquonum.producer;import com.example.rabbitmqquonum.config.RabbitQuonumConfig;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class QuorumMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息到Quorum队列 */ public void sendMessage(String message) { rabbitTemplate.convertAndSend( RabbitQuonumConfig.QUORUM_EXCHANGE, RabbitQuonumConfig.QUORUM_ROUTING_KEY, message ); }}
6.5 创建消费者
package com.example.rabbitmqquonum.consumer;import com.example.rabbitmqquonum.config.RabbitQuonumConfig;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class QuorumMessageConsumer { /** * 监听Quorum队列 */ @RabbitListener(queues = RabbitQuonumConfig.QUORUM_QUEUE) public void receiveMessage(String message) { System.out.println(\"Received message from quorum queue: \" + message); }}
6.6 测试代码
package com.example.rabbitmqquonum;import com.example.rabbitmqquonum.producer.QuorumMessageProducer;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass SpringbootRabbitmqQuonumApplicationTests { @Autowired private QuorumMessageProducer producer; @Test public void testQuorumQueue() { // 发送测试消息 producer.sendMessage(\"Hello, Quorum Queue!\"); }}
七、总结
RabbitMQ仲裁队列基于Raft共识算法实现了一个持久、可复制的高可用队列。利用Haproxy统一入口简化客户端配置,并通过健康检查机制,自动剔除故障节点避免客户端反复连接已宕机的节点。结合Keepalived主备切换机制,确保VIP始终可用。通过SpringBoot的自动配置与spring-boot-starter-amqp依赖,可快速集成并发挥其优势。