因为这里阻塞导致了RocketMQ的20个线程都被阻塞住了,发送的消息过来之后,没有线程能够去执行。
【解决思路】
-
排查 Broker 是否异常,通过查看偏移量来确认是否出现积压
-
RocketMQ 中每一客户端会单独创建一个线程 PullMessageService 会循环从 Broker 拉取一批消息,
然后提交到消费端的线程池中进行消费,线程池中的线程消费完一条消息后会上服务端上报当前消费端的消费进度,
而且在提交消费进度时是提交当前处理队列中消息消费偏移量最小的消息作为消费组的进度,
即如果消息偏移量为 100 的消息,如果由于某种原因迟迟没有消费成功,那该消费组的进度则无法向前推进。
【解决步骤】
-
打印出栈信息,jstack pid > j.log
-
先确定是否线程的状态在正常进行
-
重点搜索ConsumeMessageThread_开头的日志,来确定是否哪里造成了阻塞
"ConsumeMessageThread_1 #1 prio=5 os_prio=0 tid=0x00007fe51000c000 nid=0x8 waiting on condition [0x00007fe519590000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c3a00070> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396)
at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649)
at cn.DequeUtil.producer(DequeUtil.java:49)
at cn.bo.impl.TaskBOImpl.lambda$compensateTaskDeque$0(TaskBOImpl.java:42)
at cn.bo.impl.TaskBOImpl$$Lambda$1110/1194575856.accept(Unknown Source)
2. 设置线程数没用,正确设置消费组线程数
-
在源码中,他们两个线程数都设置为20,将这两个值设置为相同。
认为在消费端消息很多的情况下,将最大线程数提高会创建更多的线程来提高消息的处理速度,
-
参数设置过大,导致配置检查失败
【问题分析】
我们先来看一下RocketMQ是如何进行监听消息的
它主要启动了一个线程池,不间断的拉取消息,由于线程池内部持有的队列为一个无界队列,
导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));
问题二: 在创建Consumer监听消息的时候,会进行配置的校验,。
那区间只能是 [1,1000] 如果超出这个值则会报错
// consumeThreadMin
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
throw new MQClientException(
"consumeThreadMin Out of range [1, 1000]"
FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
// consumeThreadMax
if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
throw new MQClientException(
"consumeThreadMax Out of range [1, 1000]"
FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
【解决方案】
- 在设置最大和最小线程数量的时候并不会因为最大线程数提高而提高消息的处理速率,所以在设置参数的时候需要注意设置的范围。
在 RocketMQ 中,每一个消费组都会启动一个线程池用来实现消费端在消费组的隔离,
RocketMQ 也提供了 consumeThreadMin、consumeThreadMax 两个参数来设置线程池中的线程个数
// 消费者最小线程数
consumer.setConsumeThreadMin(20);
// 消费者最大线程数
consumer.setConsumeThreadMax(20);
-
同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。
-
可以通过加机器,或者在已有机器启动多个进程的方式。
3. 批量拉取数据解决默认32的限制
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setPullBatchSize(100);
consumer.setConsumeMessageBatchMaxSize(200);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
* Subscribe one more topic to consume.
* 设置监听主题以及过滤条件
consumer.subscribe("TopicTest999", "*");
* Register callback to execute on arrival of messages fetched from brokers.
* 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//System.out.println("待消费条数:" msgs.size());
LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName());
/*try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.info("success");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
* Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
通过设置PullBatchSize、ConsumeMessageBatchMaxSize
consumer.setPullBatchSize(100);
consumer.setConsumeMessageBatchMaxSize(200);
来修改批量拉取消息的值,发现默认情况下一次消息会拉取 32 条消息,但业务监听器收到的消息默认一条
【问题分析】
因为RocketMQ采取了保护机制,需要修改Broker配置的参数才能够允许一次拉取的最大条数调整
- pullBatchSize:消息客户端一次向 Broker 发送拉取消息每批返回最大的消息条数,默认为 32。
- consumeMessageBatchMaxSize:提交到消息消费监听器中的消息条数,默认为 1。
【解决方案】
通过修改Broker配置的参数来解决,通常建议修只修改命中内存相关的
参数的含义:
int maxTransferCountOnMessageInMemory
如果此次消息拉取能全部命中,内存允许一次消息拉取的最大条数,默认值为 32 条。
int maxTransferBytesOnMessageInMemory
如果此次消息拉取能全部命中,内存允许一次消息拉取的最大消息大小,默认为 256K。
如果使用场景是大数据领域,建议的配置如下:
maxTransferCountOnMessageInMemory=5000
maxTransferBytesOnMessageInMemory = 5000 * 1024
如果是业务类场景,建议配置如下:
maxTransferCountOnMessageInMemory=2000
maxTransferBytesOnMessageInMemory = 2000 * 1024
通过修改完配置,我们再次启动就可以看到能够拉取到代销费的数量超过默认的32条。
4. 对当前版本的业务进行修改,业务希望从最新的消息开始消费
对当前版本的业务进行修改,业务希望从最新的消息开始消费
【解决方案】
- 重置点位,sh ./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g CID_CONSUMER_TEST -t TopicTest -s now
- 设置ConsumeFromWhere,从最新的点位开始读取
ConsumeFromWhere 这个参数的含义是,初次启动从何处开始消费。更准确的表述是,如果查询不到消息消费进度时,从什么地方开始消费。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
5. 基于多机房队列负载算法,实现优先消费本机房中的消息
从消费者的角度来看,如果采取平均分配,特别是采取 AllocateMessageQueueAveragelyByCircle 方案,
会出现消费者跨消费这种情况,如果能实现本机房的消费者优先消费本机房中的消息,可有效避免消息跨机房消费。
RocketMQ 设计者已经为我们了提供了解决方案——AllocateMachineRoomNearby。
【解决方案】
AllocateMachineRoomNearby 核心属性
1. AllocateMessageQueueStrategy allocateMessageQueueStrategy
内部分配算法,可以看成机房就近分配算法,其实是一个代理,内部还是需要持有一种分配算法,例如平均分配算法。
2. MachineRoomResolver machineRoomResolver
多机房解析器,即从 brokerName、客户端 clientId 中识别出所在的机房。
-
修改 broker.conf 配置文件,机房信息加上broker名称,这样做是为了识别出哪个 Broker 属于哪个机房
brokerName = MachineRoom1-broker-a
- 修改消费者的clientIp
consumer.setClientIP("MachineRoom1-" RemotingUtil.getLocalAddress());
AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() {
// Broker部署
@Override
public String brokerDeployIn(MessageQueue messageQueue) {
System.out.println(messageQueue.getBrokerName().split("-")[0]);
return messageQueue.getBrokerName().split("-")[0];
// 消费端部署
@Override
public String consumerDeployIn(String clientID) {
System.out.println(clientID.split("-")[0]);
return clientID.split("-")[0];
consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver));
文章目录如何顺序消息?如何定时消息?如何解决重复消费?如何HA?
如何顺序消息?
全局有序,只能配置一个MQ.局部有序的话,设置顺序消息监听就可以了.若是想要根据如订单生命周期有序消费,让他们路由到同一个MQ就行了.自定义MessageQueueSelector,配置MQ路由规则.
如何定时消息?
MQ不支持定时,但支持很多种延时定时,MQ为每个延时单独线程处理,处理步骤是定时消息属性存储了消息的原生ID和Topic,然后定时消息恢复成原生消息,然后丢到CommitLog存储.
如何解决重复消费?
RocketMQ FAQ1)消费端处理消息发生异常没有捕获或是因为其他原因,没有返回消费状态解决方案:消费端捕获异常,如果需要重试,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,如果不需要重试,返回ConsumeConcurrentlyStatus.RECONSUME_SUCCESS可以在消费端增加重试次数判断,例如重试三次就返回成功
2)不同的消费者consumer1,、consumer2配置了相同的单个消费组consumerGroup,订阅了多个topic和
实现方式:重写MessageQueueSelector中send方法。将消息放入同一个队列,来保证顺序消费。
如何放入同一队列:例如,同一订单的N个消息需要保证顺序,则可以根据订单号,Hash取模确定发送到同一个队列。
二、如何避免重复消费
保证消费幂等,即消费多次对各方无影响
三、如何实现分布式事务
方案一:两阶段提交和事务状态回查
RocketMQ常见问题及实现分布式事务时注意事项RocketMQ常见的一些疑问或问题1、如何防止消息丢失的问题2、如何防止消息的重复消费问题(幂等性问题)3、消息如何保证消费顺序的问题4、消息积压、阻塞怎么解决?5、如何保证消息不会丢失RocketMQ实现分布式事务时注意事项1、MQ半消息回查,若得不到该消息是提交还是回滚,会一直隔一段时间就查询一次吗?1、二阶段异常,需要回滚怎么处理?2、消费者消费失败后,会重试吗,多次重试后还是失败会怎么样?
RocketMQ常见的一些疑问或问题
1、如何防止消息丢失
RocketMQ本身不保证消息重复消费,如果业务有要求不能重复消费,需要在自身的业务处理,常见的操作有两种;
接口幂等,消费端业务消息保持幂等性,例如redis的setNx()命令,当然要注意设置key的超时时间,以及key的唯一性。
redis的Incr命令,确定消息的唯一值,在set之前先判断值是否存在,同时也是需要注意超时时间。
2.Rocket...
RocketMQ 常见面试问题包括以下几个方面:
1. 消息重试和死信队列:当一条消息初次消费失败后,RocketMQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则将消息发送到该消费者对应的特殊队列中,即死信队列。[1][2]
2. 消息幂等:消息幂等是指对同一条消息的多次消费操作所产生的结果是一致的。在 RocketMQ 中,可以通过设计合适的业务逻辑和消息处理机制来实现消息的幂等性。
3. Broker 主从架构和多副本策略:RocketMQ 的 Broker 采用主从架构,Master 收到消息后会同步给 Slave,这样一条消息就不止一份了,即使 Master 宕机,仍然可以从 Slave 中获取消息,保证了消息队列的可靠性和高可用性。此外,RocketMQ 还支持多副本策略,可以配置多个 Slave 副本来提高消息的可靠性和容错性。[3]
4. Broker 注册到 NameServer:Broker 会将自己的信息注册到 NameServer 上,NameServer 是 RocketMQ 的路由中心,负责管理 Broker 的地址信息,消费者可以通过 NameServer 获取到 Broker 的地址,从而进行消息的发送和消费。[3]
以上是一些常见的 RocketMQ 面试问题,希望对你有所帮助。
CSDN-Ada助手:
Dubbo实现负载均衡原理
CSDN-Ada助手:
SpringCloud整合Duboo+Nacos注册中心使用
CSDN-Ada助手:
ArrayList和LinkedList使用小技巧
CSDN-Ada助手:
RocketMQ的常见问题解析
CSDN-Ada助手: