在技术领域,不断学习和探索是保持竞争力的关键。最近,我重新开始了对RabbitMQ的研究,这个过程让我又收获了许多新的知识和技能,我觉得有必要记录下来,以便将来回顾和分享。
RabbitMQ是一款开源的消息队列中间件,它提供了高效、可靠、灵活的通信机制,广泛应用于分布式系统中。然而,有时候在使用RabbitMQ时会遇到连接断开的问题,这可能会导致消息传递中断和应用程序的不可用性。
当使用RabbitMQ时,可能会遇到以下几种情况导致连接断开的问题:
1.网络问题:网络中断、防火墙设置等问题可能导致RabbitMQ连接断开。
2.长时间空闲:如果连接在一段时间内没有进行任何通信,RabbitMQ可能会自动关闭连接。
3.RabbitMQ服务器问题:RabbitMQ服务器可能会因为负载过高或其他原因主动关闭连接。
虽然RabbitMQ.Client库,有心跳机制,有断线重连机制,但是在网络断开的时候,并不能重连。
下面这段代码经过本人验证有效,可以解决上面的问题。
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;
using RabbitMQDemo.Shared;
using System.Collections.Concurrent;
using RabbitMQ.Client.Exceptions;
namespace RabbitMQConsumerDemo
public class RabbitMQRpcClientHandler
/// <summary>
/// 定义一个静态变量来保存类的实列
/// </summary>
private static RabbitMQRpcClientHandler? _uniqueInstance;
/// <summary>
/// 定义一个标识确保线程同步
/// </summary>
private static readonly object _locker = new();
/// <summary>
/// Main entry point to the RabbitMQ .NET AMQP client API. Constructs RabbitMQ.Client.IConnection instances.
/// </summary>
private static IConnectionFactory? _factory;
/// <summary>
/// Main interface to an AMQP connection.
/// </summary>
private IConnection? _connection;
/// <summary>
/// 发送通道(发送通道及接收通道分开,避免互相影响,导致整个服务不可用)
/// </summary>
private IModel? _sendChannel;
/// <summary>
/// 接收通道(发送通道及接收通道分开,避免互相影响,导致整个服务不可用)
/// </summary>
private IModel? _listenChannel;
/// <summary>
/// 监听消费者
/// </summary>
private EventingBasicConsumer? _listenConsumer;
/// <summary>
/// 响应消费者
/// </summary>
private EventingBasicConsumer? _replyConsumer;
/// <summary>
/// RabbitMQ 主机域名
/// </summary>
private readonly string _defaultRabbitMQHostName = "127.0.0.1";
/// <summary>
/// RabbitMQ 服务器端口, 默认 5672, 网页监控页面是 15672
/// </summary>
private readonly int _defaultRabbitMQPort = 5672;
/// <summary>
/// RabbitMQ 用户名, 默认 guest
/// </summary>
private readonly string _defaultRabbitMQUserName = "guest";
/// <summary>
/// RabbitMQ 密码, 默认 guest
/// </summary>
private readonly string _defaultRabbitMQPassword = "guest!";
/// <summary>
/// 虚拟主机
/// </summary>
private readonly string _defaultRabbitMQVirtualHost = "/";
/// <summary>
/// 交换机
/// </summary>
private readonly string _exchangeName = "";
/// <summary>
/// 数据监控队列
/// </summary>
private readonly string _listenQueueName = "queue.listen.test";
/// <summary>
/// 指令响应队列
/// </summary>
private string _replyQueueName = string.Empty;
/// <summary>
/// 注册-路由键
/// </summary>
private readonly string _routingKeyRegister = "queue.register";
/// <summary>
/// 心跳-路由键
/// </summary>
private readonly string _routingKeyHeart = "queue.heart";
/// <summary>
/// 取消信号
/// </summary>
private readonly CancellationTokenSource _cts = new();
/// <summary>
/// 回调函数映射器
/// </summary>
private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> _callbackMapper = new();
private bool _connectionState;
private bool _sendChannelState;
private bool _listenChannelState;
/// <summary>
/// 连接状态
/// </summary>
public bool ConnectionState
get { return _connectionState; }
if (_connectionState == value)
return;
_connectionState = value;
if (_connectionState)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 连接已打开");
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 连接已关闭");
/// <summary>
/// 发送通道状态
/// </summary>
public bool SendChannelState
get { return _sendChannelState; }
if (_sendChannelState == value)
return;
_sendChannelState = value;
if (_sendChannelState)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 发送通道已打开");
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 发送通道已关闭");
/// <summary>
/// 接收通道状态
/// </summary>
public bool ListenChannelState
get { return _listenChannelState; }
if (_listenChannelState == value)
return;
_listenChannelState = value;
if (_listenChannelState)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 接收通道已打开");
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 接收通道已关闭");
/// <summary>
/// 定义私有构造函数,使外界不能创建该类实例
/// </summary>
private RabbitMQRpcClientHandler()
// 创建连接工厂
_factory = new ConnectionFactory()
HostName = _defaultRabbitMQHostName,//MQ服务器地址
Port = _defaultRabbitMQPort,//MQ服务端口号
UserName = _defaultRabbitMQUserName,//账号
Password = _defaultRabbitMQPassword,//密码
VirtualHost = _defaultRabbitMQVirtualHost,
RequestedHeartbeat = TimeSpan.FromSeconds(2),
AutomaticRecoveryEnabled = true,//自动重连
TopologyRecoveryEnabled = true,//拓扑重连
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
/// <summary>
/// 定义公有方法提供一个全局访问点,同时你也可以定义公有属性来提供全局访问点
/// </summary>
/// <returns></returns>
public static RabbitMQRpcClientHandler GetInstance()
/* *********************
* 当第一个线程运行到这里时,此时会对_locker对象“加锁”
* 当第二个线程运行该方法时,首先检测到_locker对象为“加锁”状态,该线程就会挂起等待第一个线程解锁
* lock语句运行完之后(即线程运行完之后)会对该对象“解锁”
* 双重锁定只需要一句判断就可以了
* *********************/
if (_uniqueInstance == null)
lock (_locker)
// 如果类的实例不存在则创建,否则直接返回
_uniqueInstance ??= new RabbitMQRpcClientHandler();
return _uniqueInstance;
/// <summary>
/// 异步调用
/// </summary>
/// <param name="message"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="Exception"></exception>
public Task<string> CallAsync(string message, EnumMsgType msgType = EnumMsgType.Register, CancellationToken cancellationToken = default)
if (_connection?.IsOpen != true)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 连接为空或已经关闭(生产者)");
TaskCompletionSource<string> taskCompletionSource = new();
taskCompletionSource.TrySetResult(Newtonsoft.Json.JsonConvert.SerializeObject(new ReplyDataEntity()
Successed = false,
Message = "连接为空或已经关闭"
}, Newtonsoft.Json.Formatting.None));
return taskCompletionSource.Task;
if (_sendChannel?.IsOpen != true)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 发送通道为空或已经关闭(生产者)");
TaskCompletionSource<string> taskCompletionSource = new();
taskCompletionSource.TrySetResult(Newtonsoft.Json.JsonConvert.SerializeObject(new ReplyDataEntity()
Successed = false,
Message = "发送通道为空或已经关闭"
}, Newtonsoft.Json.Formatting.None));
return taskCompletionSource.Task;
// 设置消息ID、类型、非持久性等
IBasicProperties props = _sendChannel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = _replyQueueName;
props.ContentType = "application/json";
props.DeliveryMode = 1;//非持久性
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
_callbackMapper.TryAdd(correlationId, tcs);
switch (msgType)
case EnumMsgType.Register:
/* *********************
* 作用:向默认交换机的指定队列中发送注册消息
* 说明:生产者
* 参数:
* 1、exchange:交换机名称。如果不指定将使用RabbitMQ的默认交换机(设置为"")
* 2、routingKey:路由键。交换机根据路由键来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、basicProperties:消息的属性。
* 4、body:发送消息的内容
* *********************/
_sendChannel.BasicPublish(exchange: string.Empty, routingKey: _routingKeyRegister, basicProperties: props, body: messageBytes);
break;
default:
/* *********************
* 作用:向默认交换机的指定队列中发送心跳消息
* 说明:生产者
* 参数:
* 1、exchange:交换机名称。如果不指定将使用RabbitMQ的默认交换机(设置为"")
* 2、routingKey:路由键。交换机根据路由键来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、basicProperties:消息的属性。
* 4、body:发送消息的内容
* *********************/
_sendChannel.BasicPublish(exchange: string.Empty, routingKey: _routingKeyHeart, basicProperties: props, body: messageBytes);
break;
// 通知任务已经取消,处理取消后的回调操作
cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out _));
// 请求超时检测
if (tcs.Task.Wait(TimeSpan.FromSeconds(10)) == false)
_callbackMapper.TryRemove(correlationId, out _);
tcs.TrySetResult(Newtonsoft.Json.JsonConvert.SerializeObject(new ReplyDataEntity()
Successed = false,
Message = $"{(msgType == EnumMsgType.Register ? "注册" : "心跳")}请求超时"
}, Newtonsoft.Json.Formatting.None));
return tcs.Task;
/// <summary>
/// 开始
/// </summary>
public void Start()
new Thread(new ThreadStart(Checking))
IsBackground = true
}.Start();
Reconnect();
/// <summary>
/// 停止
/// </summary>
public void Stop()
Cleanup();
/// <summary>
/// 取消状态监测
/// </summary>
public void CancelChecking()
_cts.Cancel();
/// <summary>
/// 状态监测
/// </summary>
private void Checking()
while (_cts.IsCancellationRequested == false)
Thread.Sleep(1000);
ConnectionState = _connection?.IsOpen == true;
if (_connection?.IsOpen != true)
SendChannelState = false;
ListenChannelState = false;
continue;
SendChannelState = _sendChannel?.IsOpen == true;
ListenChannelState = _listenChannel?.IsOpen == true;
/// <summary>
/// 连接
/// </summary>
private void Connect()
if (_factory == null)
return;
// 创建连接
_connection = _factory.CreateConnection();
_connection.ConnectionShutdown += Connection_ConnectionShutdown;
// 创建发送通道
_sendChannel = _connection.CreateModel();
// 创建接收通道
_listenChannel = _connection.CreateModel();
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 尝试连接至RabbitMQ服务器[{_defaultRabbitMQHostName}]");
// 监控消息
ListenMessageConsume();
// 响应消息
ReplyMessageConsume();
/// <summary>
/// 重连
/// </summary>
private void Reconnect()
Cleanup();
// state is initially false
var mres = new ManualResetEventSlim(false);
// loop until state is true, checking every 3s
while (!mres.Wait(3 * 1000))
// 连接RabbitMQ服务器
Connect();
// state set to true - breaks out of loop
mres.Set();
catch (Exception ex)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 尝试连接RabbitMQ服务器出现错误【{ex.Message}】");
/// <summary>
/// 清理
/// </summary>
private void Cleanup()
if (_replyConsumer != null)
if (_replyConsumer.IsRunning)
_replyConsumer.OnCancel();
catch (Exception ex)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的响应消费者,不再执行任何操作,但遇到错误【{ex.Message}】");
_replyConsumer.Received -= ReplyConsumer_Received;
_replyConsumer.Registered -= ReplyConsumer_Registered;
_replyConsumer.Shutdown -= ReplyConsumer_Shutdown;
_replyConsumer.Unregistered -= ReplyConsumer_Unregistered;
_replyConsumer = null;
if (_listenConsumer != null)
if (_listenConsumer.IsRunning)
_listenConsumer.OnCancel();
catch (Exception ex)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的监听消费者,不再执行任何操作,但遇到错误【{ex.Message}】");
_listenConsumer.Received -= ListenConsumer_Received;
_listenConsumer.Registered -= ListenConsumer_Registered;
_listenConsumer.Shutdown -= ListenConsumer_Shutdown;
_listenConsumer.Unregistered -= ListenConsumer_Unregistered;
_listenConsumer = null;
if (_sendChannel != null)
if (_sendChannel.IsOpen)
_sendChannel.Close();
catch (Exception ex)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的发送通道,但遇到错误【{ex.Message}】");
_sendChannel.Dispose();
_sendChannel = null;
if (_listenChannel != null)
if (_listenChannel.IsOpen)
_listenChannel.Close();
catch (Exception ex)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的接收通道,但遇到错误【{ex.Message}】");
_listenChannel.Dispose();
_listenChannel = null;
if (_connection != null)
_connection.ConnectionShutdown -= Connection_ConnectionShutdown;
if (_connection.IsOpen)
_connection.Close();
catch (Exception ex)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: RabbitMQ重新连接,正在尝试关闭之前的连接,但遇到错误【{ex.Message}】");
_connection.Dispose();
_connection = null;
/// <summary>
/// 响应消息消费
/// </summary>
private void ReplyMessageConsume()
if (_connection?.IsOpen != true)
throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 连接为空或已经关闭(响应消费者)");
if (_sendChannel?.IsOpen != true)
throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 发送通道为空或已经关闭(响应消费者)");
// 声明一个服务器命名的队列
_replyQueueName = _sendChannel.QueueDeclare().QueueName;
/* *********************
* 作用:定义消息分发机制
* 说明:Qos可以设置消费者一次接收消息的最大条数,能够解决消息拥堵时造成的消费者内存爆满问题。
* Qos也比较适用于耗时任务队列,当任务队列中的任务很多时,使用Qos后我们可以随时添加新的消费者来提高任务的处理效率。
* prefetchCount=1,RabbitMQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的。
* 参数:
* 1、prefetchSize:是可接收消息的大小。但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况
* 2、prefetchCount:处理消息最大的数量。即在下一次发送应答消息前,客户端可以收到的消息最大数量
* 3、global:则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的
* *********************/
_sendChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
/* *********************
* 作用:创建基于该队列的消费者
* 优点:
* 1、基于长连接
* 2、消费方式为发布订阅模式
* 3、节省资源且实时性好
* *********************/
_replyConsumer = new EventingBasicConsumer(_sendChannel);
_replyConsumer.Received += ReplyConsumer_Received;
_replyConsumer.Registered += ReplyConsumer_Registered;
_replyConsumer.Shutdown += ReplyConsumer_Shutdown;
_replyConsumer.Unregistered += ReplyConsumer_Unregistered;
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 消费者准备完毕(响应消费者)");
/* *********************
* 作用:监听队列(绑定消费者)
* 说明:消费者
* 参数:
* 1、queue:队列名称
* 2、autoAck:自动回复,当消费者接收到消息后要告诉RabbitMQ消息已接收,如果将此参数设置为true表示会自动回复RabbitMQ,如果设置为false要通过编程实现回复
* 3、consumer:消费方法,当消费者接收到消息要执行的方法
* *********************/
_sendChannel.BasicConsume(queue: _replyQueueName, autoAck: false, consumer: _replyConsumer);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 开始监控RabbitMQ服务器,队列{_replyQueueName}(响应消费者)");
catch (AggregateException aex)
// 错误信息去重
var errorList = (from error in aex.InnerExceptions select error.Message).Distinct().ToList();
// 打印所有错误信息
foreach (var error in errorList)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 总异常之【{error}】(响应消费者)");
catch (Exception ex)
throw ex;
/// <summary>
/// 监听消息消费
/// </summary>
private void ListenMessageConsume()
if (_connection?.IsOpen != true)
throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 连接为空或已经关闭(监听消费者)");
if (_listenChannel?.IsOpen != true)
throw new Exception($"{DateTime.Now:HH:mm:ss.fff}: 接收通道为空或已经关闭(监听消费者)");
/* *********************
* 作用:声明(创建)队列--RabbitMQ持久化机制(队列持久化)
* 说明:生产者、消费者都有
* 参数:
* 1、queue:队列名称。
* 2、durable:是否持久化。true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息
* 3、exclusive:是否独占队列。队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete:是否自动删除。队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments:其他参数。可以设置一个队列的扩展参数,比如:可设置存活时间
* *********************/
_listenChannel.QueueDeclare(queue: _listenQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
/* *********************
* 作用:定义消息分发机制
* 说明:Qos可以设置消费者一次接收消息的最大条数,能够解决消息拥堵时造成的消费者内存爆满问题。
* Qos也比较适用于耗时任务队列,当任务队列中的任务很多时,使用Qos后我们可以随时添加新的消费者来提高任务的处理效率。
* prefetchCount=1,RabbitMQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的。
* 参数:
* 1、prefetchSize:是可接收消息的大小。但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况
* 2、prefetchCount:处理消息最大的数量。即在下一次发送应答消息前,客户端可以收到的消息最大数量
* 3、global:则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的
* *********************/
_listenChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
/* *********************
* 作用:创建基于该队列的消费者
* 优点:
* 1、基于长连接
* 2、消费方式为发布订阅模式
* 3、节省资源且实时性好
* *********************/
_listenConsumer = new EventingBasicConsumer(_listenChannel);
_listenConsumer.Received += ListenConsumer_Received;
_listenConsumer.Registered += ListenConsumer_Registered;
_listenConsumer.Shutdown += ListenConsumer_Shutdown;
_listenConsumer.Unregistered += ListenConsumer_Unregistered;
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 消费者准备完毕(监听消费者)");
/* *********************
* 作用:监听队列(绑定消费者)
* 说明:消费者
* 参数:
* 1、queue:队列名称
* 2、autoAck:自动回复,当消费者接收到消息后要告诉RabbitMQ消息已接收,如果将此参数设置为true表示会自动回复RabbitMQ,如果设置为false要通过编程实现回复
* 3、consumer:消费方法,当消费者接收到消息要执行的方法
* *********************/
_listenChannel.BasicConsume(queue: _listenQueueName, autoAck: false, consumer: _listenConsumer);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 开始监控RabbitMQ服务器,队列{_listenQueueName}(监听消费者)");
catch (AggregateException aex)
// 错误信息去重
var errorList = (from error in aex.InnerExceptions select error.Message).Distinct().ToList();
// 打印所有错误信息
foreach (var error in errorList)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 总异常之【{error}】(监听消费者)");
catch (Exception ex)
throw ex;
/// <summary>
/// 绑定事件:消息监控(响应消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ReplyConsumer_Received(object? sender, BasicDeliverEventArgs e)
if (_sendChannel == null)
return;
/* *********************
* 作用:手动签收消息
* 说明:消费者
* 参数:
* 1、deliveryTag:消息投递标签
* 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息
* *********************/
_sendChannel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
if (!_callbackMapper.TryRemove(e.BasicProperties.CorrelationId, out var tcs))
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 响应消费者收到新消息【{Encoding.UTF8.GetString(e.Body.ToArray())}】");
return;
var body = e.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
/// <summary>
/// 绑定事件:订阅成功(响应消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ReplyConsumer_Registered(object? sender, ConsumerEventArgs e)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经注册上队列(响应消费者)");
/// <summary>
/// 绑定事件:通道被关闭(响应消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ReplyConsumer_Shutdown(object? sender, ShutdownEventArgs e)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 队列已经关闭(响应消费者)");
/// <summary>
/// 绑定事件:取消订阅(响应消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ReplyConsumer_Unregistered(object? sender, ConsumerEventArgs e)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经取消注册队列(响应消费者)");
/// <summary>
/// 绑定事件:消息监控(监听消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ListenConsumer_Received(object? sender, BasicDeliverEventArgs e)
if (_listenChannel == null)
return;
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var props = e.BasicProperties;
if (props != null && string.IsNullOrWhiteSpace(props.CorrelationId) == false && string.IsNullOrWhiteSpace(props.ReplyTo) == false)
var replyProps = _listenChannel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
replyProps.Persistent = true;
var responseBytes = Encoding.UTF8.GetBytes(message);
/* *********************
* 作用:向指定的队列中发送消息--RabbitMQ持久化机制(消息持久化)
* 说明:生产者
* 参数:
* 1、exchange:交换机名称。如果不指定将使用RabbitMQ的默认交换机(设置为"")
* 2、routingKey:路由键。交换机根据路由键来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、basicProperties:消息的属性。
* 4、body:发送消息的内容
* *********************/
_listenChannel.BasicPublish(exchange: _exchangeName, routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 签收响应【{message}】(监听消费者)");
// TODO: 正常处理回复BasicAck,未正常处理回复BasicReject
/* *********************
* 作用:手动签收消息
* 说明:消费者
* 参数:
* 1、deliveryTag:消息投递标签
* 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息
* *********************/
_listenChannel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
// 未正常处理的消息,重新放回队列
//_listenChannel.BasicReject(e.DeliveryTag, true);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 手动签收【{message}】(监听消费者)");
catch (OperationInterruptedException oiex)
/* *********************
* 作用:手动拒绝签收,返回消息到Broke
* 说明:消费者,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
* 参数:
* 1、deliveryTag:当前消息的投递标签
* 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息
* 3、requeue:是否重回队列。设置为true,重回队列,设置为false,不重回
* *********************/
_listenChannel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 手动拒绝签收【{oiex.Message}】,返回消息到Broke(监听消费者)");
catch (Exception ex)
/* *********************
* 作用:手动拒绝签收,返回消息到Broke
* 说明:消费者,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
* 参数:
* 1、deliveryTag:当前消息的投递标签
* 2、multiple:是否批量签收。设置为true,一次性签收所有,设置为false,只签收当前消息
* 3、requeue:是否重回队列。设置为true,重回队列,设置为false,不重回
* *********************/
_listenChannel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 手动拒绝签收【{ex.Message}】,返回消息到Broke(监听消费者)");
/// <summary>
/// 绑定事件:订阅成功(监听消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ListenConsumer_Registered(object? sender, ConsumerEventArgs e)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经注册上队列(监听消费者)");
/// <summary>
/// 绑定事件:通道被关闭(监听消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ListenConsumer_Shutdown(object? sender, ShutdownEventArgs e)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 队列已经关闭(监听消费者)");
/// <summary>
/// 绑定事件:取消订阅(监听消费者)
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ListenConsumer_Unregistered(object? sender, ConsumerEventArgs e)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经取消注册队列(监听消费者)");
/// <summary>
/// 绑定事件:断开连接时,调用方法自动重连
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void Connection_ConnectionShutdown(object? sender, ShutdownEventArgs e)
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 已经断开连接,正在尝试重新连接至RabbitMQ服务器");
Reconnect();
方法调用如下:
using RabbitMQConsumerDemo;
RabbitMQRpcClientHandler.GetInstance().Start();
while (true)
var readLine = Console.ReadLine();
if (string.IsNullOrWhiteSpace(readLine))
else if (readLine.Equals("exit", StringComparison.OrdinalIgnoreCase))
break;
else if (readLine.StartsWith("register=", StringComparison.OrdinalIgnoreCase))
var response = await RabbitMQRpcClientHandler.GetInstance().CallAsync(readLine, RabbitMQDemo.Shared.EnumMsgType.Register);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 注册指令响应【{response}】");
else if (readLine.StartsWith("heart=", StringComparison.OrdinalIgnoreCase))
var response = await RabbitMQRpcClientHandler.GetInstance().CallAsync(readLine, RabbitMQDemo.Shared.EnumMsgType.Heart);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}: 心跳指令响应【{response}】");
RabbitMQRpcClientHandler.GetInstance().Stop();
Thread.Sleep(2 * 1000);
// 退出程序
int count = 8;
while (true)
if (count == 0)
Console.WriteLine($"Exit in {count} seconds");
Thread.Sleep(1000);
break;
else if (count == 5)
RabbitMQRpcClientHandler.GetInstance().CancelChecking();
//RabbitMQClientHandler.GetInstance().CancelChecking();
Console.WriteLine($"Exit in {count} seconds");
Thread.Sleep(1000);
count--;
微服务解决方案| 通过购物车,订购和结帐队列支持电子商务应用程序。
ASP.NET Core 5.0,RabbitMQ,SQL Server,MongoDb,Redis,Docker,Docker Compose文件(用于应用程序和容器+图像的自动重建),Swagger,AutoMapper,Mediator等
建筑风格:
使用“干净”(N层)体系结构,SOLID原则,域驱动设计(DDD),CQRS(命令查询责任隔离)等。
由于作者极大地改变了这门课程...
我在这里复制他的自述文件的一部分,该文件指示如何下载和安装所有docker容器。 否则,他计划在接下来的2周(2021.03.30)中删除该课程的“旧”部分。
运行项目
您将需要以下工具:
请遵循以下步骤来设置您的开发环境:(在运行之前,启动Docker Desktop)
克隆存储库
在包含docke
安装
RabbitMQ后,有可能出现服务无法启动或者服务启动几秒钟之后
自动停止的问题,一般情况下是因为
RabbitMQ 和ErLang的版本号不一致导致的,
请参考我的另一篇文章windows 系统下安装
RabbitMQ 步骤
Windows下安装RabbitMQ后,按正常RabbitMQ会自动注册服务并自动启动,但是如果有的道友不注意中英文目录就会出现服务启动后几秒钟自动停止,而且反反复复。
出现这种情况一般都是由我们的用户名是中文,而导致默认的DB和log访问出现问。所以我建议以后大家在使用windows操作系统的时候尽量用英文来命名文件或目录,这样会极大的减小以后安装软件出现莫名其妙的问题的bug。
接下来我们先...
c#工业自动化通信开发库,工业自动软件必备的基本程序。包括串口通信,TCP客户端,tcp服务器端,高并发物联网接收服务器端,udp通信,can总线通信,profinet,modbus tcp/rtu/dtu等,各大品牌plc通信,opc ua,opc da,http通信,mysql常规库,ef6+mysql,ef6+sqlite,firebird数据库,ini配置文件操作,excel表格操作(包括模版化导出),rabbitmq消息队列管理库,Rabiitmq消息队列操作库,常用的数据转换(高低字节排序,转换,取位设位,校验和等),功能可拆分。不懂的,不明白的,包教会。做项目的做服务的,包能用。拥有技术公共群共同探讨技术难题,使用技巧和问题,详细的使用说明文档。
update 2021.7.26----优化消息队列出队缓存,读取更快捷更稳定。
update 2021.7.26----新增加rabbitmq消息队列管理类,可以嵌入到用户界面中管理rabbitmq服务器
update 2021.8.3---新增应用于物联网及mes系统中远程读取modbus rtu协议。
update 2021.8.10---新增加ef6+mssql数据库功能,实现自动创建表,以及存储过程。完善及提高了efmssql,efsqlite,efmysql等安全。
update 2021.08.19---根据行业设备通信现状需要,增加通过dcom组件与opc服务器通信功能,去除原来复杂的操作过程,使用接口式更方便稳定
update-2021-08-30-----新增加dtu服务器,pc做为服务器,远程4G Dtu模块作为客户端主动连接服务器,服务器根据模块注册信息下发modbus rtu指令至远端,终端回复数据服务器自动解析。主要应用于物联网平台.mes系统
Rabbitmq 故障转移例程
为您处理 RabbitMQ 自动重新连接和发布重试例程的轻量级库。 该库旨在使开发人员在使用 RabbitMQ 时免于头痛。
rabbitroutine解决了您的 RabbitMQ 重新连接问题:
分别处理连接错误和通道错误。
考虑到连接后需要在 RabbitMQ 中实体。
通知和连接。
支持和 ,可以用包装。
支持用于发布的通道池。
提供通道统计信息。
停下来做包装,做功能!
go get github.com/furdarius/rabbitroutine
通过“go dep”添加为依赖项
$ dep ensure -add github.com/furdarius/rabbitroutine
您需要实现Consumer并将其注册到StartConsumer或StartMultipleConsumers 。 建立连
在微服务架构下,少不了使用MQ和Redis,为了方便调试,制作了一个简单的RabbitMQ的调试工具,不用总是进入15672去看,主要是觉得麻烦。
工具具备创建Exchange,Queue,发布和订阅,自动应答处理等常用的消息处理操作。
工具使用.net开发,需要安装net461,其他的也没啥,有什么需要的,可以提出要求,方便开发嘛。
Hzdtf.FoundationFramework
基础框架系统,支持.NET和.NET Core平台,语言:C#,DB支持MySql和SqlServer,主要功能有抽象持久化、服务层,将业务基本的增删改查抽离复用;提供代码生成器从DB生成实体、持久化、服务以及MVC控制器,每层依赖接口,并需要在客户端将对应实现层用Autofac程序集依赖注入,用AOP提供日志跟踪、事务、模型验证等。对Autofac、Redis、RabbitMQ封装扩展;DB访问提供自动主从访问,Redis客户端分区。特别适合管理系统。
本框架必须运行在.NET Standard 2.0、.NET Framework 4.7.2和.NET Core 3.1.5以上。下载源码用Visual Studio 2019打开。
工程以Standard或Std结尾是标准库,以Framework或Frm结尾为Framework库,以C
微服务解决方案| 通过购物车,订购和结帐队列支持电子商务应用程序。
ASP
.NET Core 5.0,
RabbitMQ,SQL Server,MongoDb,Redis,Docker,Docker Compose文件(用于应用程序和容器+图像的
自动重建),Swagger,AutoMapper,Mediator等
建筑风格:
使用“干净”(N层)体系结构,SOLID原则,域驱动设计(DDD),CQRS(命令查询责任隔离)等。
没有持久化的测试用例,会出行阻塞的状态: 改为持久化后的测试用例: 使用auto自动模式 注意:创建的simple.queue需要Add Dead letter exchange,其他都跟之前创建的操作差不多 监听
(3)取消订单
根据引用\[1\]和引用\[2\]的内容,RabbitMQ的重试机制可以通过配置文件来设置。在配置文件中,可以设置最大重试次数(max-attempts)、重试间隔时间(initial-interval)、重试最大时间间隔(max-interval)和乘子(multiplier)。当消息在消费过程中抛出异常导致多次重试都失败时,根据配置的最大重试次数,RabbitMQ会自动进行重试。如果消息是自动确认的,重试失败后消息会被自动确认丢失;如果消息是手动确认的,重试失败后消息会一直处于unacked状态,导致消息积压。根据引用\[3\]的内容,可以在配置文件中设置是否开启消费者重试(enabled),以及其他相关参数。所以,RabbitMQ的自动重试次数可以通过配置文件中的max-attempts参数来设置。
#### 引用[.reference_title]
- *1* *2* [RabbitMQ--重试机制](https://blog.csdn.net/feiying0canglang/article/details/127368401)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v91^insert_down28v1,239^v3^insert_chatgpt"}} ] [.reference_item]
- *3* [rabbitmq重试机制](https://blog.csdn.net/yulingli42/article/details/126386134)[target="_blank" data-report-click={"spm":"1018.2226.3001.9630","extra":{"utm_source":"vip_chatgpt_common_search_pc_result","utm_medium":"distribute.pc_search_result.none-task-cask-2~all~insert_cask~default-1-null.142^v91^insert_down28v1,239^v3^insert_chatgpt"}} ] [.reference_item]
[ .reference_list ]