-
RabbitMQ集群包含四种架构模式
-
主备模式 Warren
-
镜像模式 Mirror
-
远程模式 Shovel
-
多活模式 Federation
-
主备模式(一主一备):实现RabbitMQ的高可用集群,一般在并发和数据量不高的情况下,这种模型非常的简单且好用,主备模式也称为Warren模式。一台干活,一台闲着,只有当主服务器挂掉的时候,备份服务器才会被启用,因此会有严重的负载不均衡的问题。
-
镜像模式:集群模式非常经典的就是Mirror镜像模式,保证100%数据不丢失,在实际工作中也是用的最多的。并且实现集群非常简单,一般会联网大厂都会构建这种镜像集群模式。在主备基础上进行了扩展,集群中所有节点的数据和配置信息都是一样的,在底层同时进行工作,集群的前面有一个负载均衡器(Nginx、Haproxy),来进行负载均衡,当其中的某个节点挂掉的时候,不会影响整个集群对外提供服务。
-
远程模式:远程模式可以实现双活的一种模式(容灾机制),简称Shovel模式,Shovel就是我们可以把消息进行不同数据中心的复制工作,可以跨地域的让两个MQ集群进行互联。(要求较高:所有节点的MQ的版本是一致的, 配置相对复杂,早起的版本不支持,已经被淘汰了)
-
多活模式: 是实现异地数据复制的主流模式,因为Shovel模式配置比较复杂,所以一般实现异地集群都是使用这种双活或者多活模型来实现的。这种模型需要依赖RabbitMQ和Federation插件,可以实现持续的可靠的AMQP数据通信,多活模式在实际配置与应用非常简单。
-
Mirror集群环境的搭建
-
首先需要准备两台服务器(192.168.18.177和192.168.18.178),并且都安装了RabbitMQ
-
修改hostname,在192.168.18.177 /etc/hostname 的内容做如下的修改
将localhost.localdomain 修改为 m1
-
修改hostname,在192.168.18.178 /etc/hostname 的内容做如下的修改
将localhost.localdomain 修改为 m2
-
修改hosts,上面两台主机的 /etc/hosts 文件的最下面分辨添加如下的配置
192.168.18.177 m1
192.168.18.178 m2
-
开放上面两台主机的的4个端口号,分别是4369、5672、15672、25672
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --relead
为了保险起见,最好重启主机,执行reboot命令
-
复制.erlang.cookie:.erlang.cookie是erlang分布式的token文件,集群内所有的节点要持有相同的.erlang.cookie文件,才允许彼此通信。
find / -name *.cookie
scp /var/lib/rabbitmq/.erlang.cookie 192.168.18.178:/var/lib/rabbitmq/
chmod 400 /var/lib/rabbitmq/.erlang.cookie
-
在m2下执行下面的命令
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@m1
rabbitmqctl start_app
rabbitmqctl cluster_status
-
查看集群状态是出现下面的信息表示集群配置成功
-
此时登录后端管理页面的时候会看到集群中所有节点的信息
-
此时若在m1中新增一个交换机,就会自动同步在m2的节点上
-
Haproxy配置MQ集群负载均衡
-
Haproxy是一款提供高可用性、负载均衡以及基于TCP(第四层)和HTTP(第七层)应用的代理软件,支持虚拟主机,它是免费的、快速并且可靠的一种解决方案。TCP代理服务器。
-
RabbitMQ集群镜像模式中,Haproxy用于做TCP代理,提供节点负载均衡,(LB-LoadBalance)与故障发现。
-
Haproxy工作示意图
-
安装Haproxy
yum install haproxy
rpm -ql haproxy
haproxy
find / -name haproxy.cfg
-
修改haproxy.cfg
-
删除下面的内容
#---------------------------------------------------------------------
# main frontend which proxys to the backends
#---------------------------------------------------------------------
frontend main *:5000
acl url_static path_beg -i /static /images /javascript /stylesheets
acl url_static path_end -i .jpg .gif .png .css .js
use_backend static if url_static
default_backend app
#---------------------------------------------------------------------
# static backend for serving up images, stylesheets and such
#---------------------------------------------------------------------
backend static
balance roundrobin
server static 127.0.0.1:4331 check
#---------------------------------------------------------------------
# round robin balancing between the various backends
#---------------------------------------------------------------------
backend app
balance roundrobin
server app1 127.0.0.1:5001 check
server app2 127.0.0.1:5002 check
server app3 127.0.0.1:5003 check
server app4 127.0.0.1:5004 check
-
添加如下的配置项
#对MQ集群进行监听
listen rabbitmq_cluster
bind 0.0.0.0:5673 #通过5673对m1和m2进行映射
option tcplog #记录TCP连接状态和时间
mode tcp #四层协议代理,即对TCP进行转发
option clitcpka #开启TCP的Keep Alive(长连接模式)
timeout connect 1s #haproxy与mq建立连接的超时时间
timeout client 10s #客户端与haproxy最大空闲时间
timeout server 10s #服务器与haproxy最大空闲时间
balance roundrobin #采用轮询转发消息
#每5秒发送一次心跳包,如果连续两次有响应则代表状态良好
#如果连续3次没有响应,则视为服务故障,该节点将被剔除
server node1 192.168.18.177:5672 check inter 5s rise 2 fall 3
server node2 192.168.18.178:5672 check inter 5s rise 2 fall 3
#开启监控服务
listen http_front
bind 0.0.0.0:1080 #监听端口
stats refresh 30s #每30秒刷新一次
stats uri /haproxy?stats #统计页面uri
stats auth admin:admin #统计页面用户名和密码设置
-
启动haproxy
haproxy -f /etc/haproxy/haproxy.cfg
-
此时访问 http://192.168.18.177:1080/haproxy?stats 输入用户名和密码(admin:admin),就会出现下面的界面
-
此时查看haproxy开放的端口号
netstat -tulpn | grep haproxy
会出现下面的端口,其中5673是代理服务器对外提供的端口,1080是监控服务的端口
-
客户端访问MQ集群
-
由于guest用户只能在本机访问,通过代理服务器不能使用guest用户进行消息发送,所以需要在某一个节点上面新建一个用户,并且为该用户分配权限及虚拟机(集群下的其他节点会自动同步新增的用户数据);另外,在代理服务器的防火墙中需要开放代理端口号和监控服务的端口号,否则程序无法连接到代理服务器上面。
-
RabbitMQUtil的代码
package com.kangswx.rabbitmq.mirror;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtil {
private static ConnectionFactory connectionFactory;
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.18.177");
connectionFactory.setPort(5673);
connectionFactory.setUsername("swkang");
connectionFactory.setPassword("swkang");
connectionFactory.setVirtualHost("/");
public static Connection getConnection() {
Connection connection = null;
try {
connection = connectionFactory.newConnection();
} catch (Exception e) {
throw new RuntimeException(e);
return connection;
-
生产者代码
package com.kangswx.rabbitmq.mirror;
import com.kangswx.rabbitmq.utils.RabbitMQConsts;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Procuder {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConsts.QUEUE_HELLO, false, false, false, null);
String content = "Hello World bb!";
channel.basicPublish("", RabbitMQConsts.QUEUE_HELLO, null, content.getBytes());
channel.close();
connection.close();
System.out.println("数据发送成功");
-
消费者代码
package com.kangswx.rabbitmq.mirror;
import com.kangswx.rabbitmq.utils.RabbitMQConsts;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQConsts.QUEUE_HELLO, false, false, false, null);
channel.basicConsume(RabbitMQConsts.QUEUE_HELLO, false, new Receiver(channel));
class Receiver extends DefaultConsumer{
private Channel channel;
public Receiver(Channel channel) {
super(channel);
this.channel = channel;
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String messageBody = new String(body);
System.out.println("消费者接收到: " + messageBody);
channel.basicAck(envelope.getDeliveryTag(), false);
-
上面示例的代码见 Java客户端访问RabbitMQ代码示例
RabbitMQ集群包含四种架构模式主备模式 Warren镜像模式 Mirror远程模式 Shovel多活模式 Federation主备模式(一主一备):实现RabbitMQ的高可用集群,一般在并发和数据量不高的情况下,这种模型非常的简单且好用,主备模式也称为Warren模式。一台干活,一台闲着,只有当主服务器挂掉的时候,备份服务器才会被启用,因此会有严重的负...
RabbitMQ简介
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP,即Advanced message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(
单一模式:即单机情况不做集群,就单独运行一个rabbitmq而已。之前我们一直在用
普通模式:默认模式,以两个节点(A、B)为例来进行说明
当消息进入A节点的Queue后,consumer从B节点消费时,RabbitMQ会在A和B之间创建临时通道进行消息传输,把A中的消息实体取出并经过通过交给B发送给consumer
当A故障后,B就无法取到A节点中未消费的消息实体
如果做了消息持久化,.
rabbitmq采用的是erlang语音开发的,因此必须要有erlang的支持才可以进行运用,因此我们首先要去安装erlang环境,在Linux中可以通过下面的命令获取到erlang
wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/tag/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
但是恼火的是这样下载的时候很慢,可能是自己网络的问题,最后自己去手动下载,然后在上传到Linux上面进行安装,在进行下载erlang之前,我们还必须了解erl
RabbitMQ-Java-09-集群搭建
官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
》集群搭建说明
集群搭建是为了解决什么问题呢?
大量消息情况下性能问题
注意点有哪些呢?
每个node(机器)需要设置正确的host(hostname、hosts),保证通过host能互相连通。
所有加入集群的node地位相等,新加入node只需要连接任意一个node
1、安装Erlang
路径:https://github.com/rabbitmq/erlang-rpm/releases/download/v23.2.6/erlang-23.2.6-1.el7.x86_64.rpmhttps://github.com/rabbitmq/erlang-rpm/releases/download/v23.2.6/erlang-23.2.6-1.el7.x86_64.rpm
rpm -ivh erlang-23.2.6-1.el7.x86_64..
创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。
test-rabbitmq-producer:生产者工程
test-rabbitmq-consumer:消费者工程
添加依赖:
注:用rabbitMQ官方提供的java client测试,目的是对RabbitMQ的交互过程 有个清晰的认识
创建生产者
package ...
主要解决,异步处理,应用解耦,流量削峰等问题
从而实现高性能,高可用,可伸缩和最终一致性的架构
使用较多的消息队列产品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka等
1.1.1 异步处理
用户注册后,需要发送验证邮箱和手机验证码;
将注册信息写入数据库,发送验证邮件,发送手机,三个步骤全部完成后,返回给客户端
1.1.2 应用解耦
修改每台服务上的hosts文件(路径:/etc/hosts),设置成如下:
192.168.40.130 rabbitmq01
192.168.40.131 rabbitmq02
1.1 在线安装
yum install esl-erlang_17.3-1~centos~6_amd64.rpm
yum install esl-erlang-compat-R14B-1.el6.noarch.rpm
1.2 离线安装
1.下载好 erlang 安装包以后,上传至虚拟机。
2.依次执行命令:
1)rpm -ivh esl-erlang-17.3-1.x86_64.rpm --force --nodeps
2)rpm -ivh esl-erlang_17.3-1~centos~6_amd64.rpm --force --nodeps
为什么搭建rabbitmq集群?rabbitmq集群有那些模式?如何搭建Rabbitmq集群?rabbitmq镜像高可用策略有那些?
1、首先这款产品本身的优点众多,大家最看好的便是他的异步化提高系统抗峰值能力,然后便是系统及功能结构解耦,那么照此两点来说,他的在整个系统中的作用还是至关重要的,那么如此重要,当然要考虑他的高可用性,那么便有啦第一个问题的解答。
2、rabbitmq有3...
sudo rabbitmqctl add_user username password
sudo rabbitmqctl set_user_tags username administrator
sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
其中,username和password为自定义的用户名和密码。
6. 访问RabbitMQ管理界面
在浏览器中输入以下地址,即可访问RabbitMQ管理界面:
http://your_server_ip:15672/
其中,your_server_ip为服务器的IP地址。
以上就是CentOS 7安装RabbitMQ的步骤。
SpringBoot整合Swagger2实现自动生成API(含源码)
qq_23129387:
SpringBoot整合RabbitMQ(包含生产者和消费者)
旺仔OO糖: