添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

因为这里阻塞导致了RocketMQ的20个线程都被阻塞住了,发送的消息过来之后,没有线程能够去执行。

【解决思路】

  1. 排查 Broker 是否异常,通过查看偏移量来确认是否出现积压

  2. RocketMQ 中每一客户端会单独创建一个线程 PullMessageService 会循环从 Broker 拉取一批消息,

然后提交到消费端的线程池中进行消费,线程池中的线程消费完一条消息后会上服务端上报当前消费端的消费进度,

而且在提交消费进度时是提交当前处理队列中消息消费偏移量最小的消息作为消费组的进度,

即如果消息偏移量为 100 的消息,如果由于某种原因迟迟没有消费成功,那该消费组的进度则无法向前推进。

【解决步骤】

  1. 打印出栈信息,jstack pid > j.log
  2. 先确定是否线程的状态在正常进行
  3. 重点搜索ConsumeMessageThread_开头的日志,来确定是否哪里造成了阻塞
"ConsumeMessageThread_1 #1 prio=5 os_prio=0 tid=0x00007fe51000c000 nid=0x8 waiting on condition [0x00007fe519590000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006c3a00070> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396)
        at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649)
        at cn.DequeUtil.producer(DequeUtil.java:49)
        at cn.bo.impl.TaskBOImpl.lambda$compensateTaskDeque$0(TaskBOImpl.java:42)
        at cn.bo.impl.TaskBOImpl$$Lambda$1110/1194575856.accept(Unknown Source)

2. 设置线程数没用,正确设置消费组线程数

  1. 在源码中,他们两个线程数都设置为20,将这两个值设置为相同。

    认为在消费端消息很多的情况下,将最大线程数提高会创建更多的线程来提高消息的处理速度,

  2. 参数设置过大,导致配置检查失败

【问题分析】

我们先来看一下RocketMQ是如何进行监听消息的

它主要启动了一个线程池,不间断的拉取消息,由于线程池内部持有的队列为一个无界队列,

导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl(consumeThreadPrefix));

问题二: 在创建Consumer监听消息的时候,会进行配置的校验,。

那区间只能是 [1,1000] 如果超出这个值则会报错

        // consumeThreadMin
        if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
            || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
            throw new MQClientException(
                "consumeThreadMin Out of range [1, 1000]"
                      FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                null);
        // consumeThreadMax
        if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
            throw new MQClientException(
                "consumeThreadMax Out of range [1, 1000]"
                      FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                null);

【解决方案】

  1. 在设置最大和最小线程数量的时候并不会因为最大线程数提高而提高消息的处理速率,所以在设置参数的时候需要注意设置的范围。
在 RocketMQ 中,每一个消费组都会启动一个线程池用来实现消费端在消费组的隔离,
RocketMQ 也提供了 consumeThreadMin、consumeThreadMax 两个参数来设置线程池中的线程个数
// 消费者最小线程数
consumer.setConsumeThreadMin(20);
// 消费者最大线程数
consumer.setConsumeThreadMax(20);
  1. 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。

  2. 可以通过加机器,或者在已有机器启动多个进程的方式。

3. 批量拉取数据解决默认32的限制

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setPullBatchSize(100);
    consumer.setConsumeMessageBatchMaxSize(200);
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
     * Subscribe one more topic to consume.
     * 设置监听主题以及过滤条件
    consumer.subscribe("TopicTest999", "*");
     *  Register callback to execute on arrival of messages fetched from brokers.
     *  注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            //System.out.println("待消费条数:"  msgs.size());
            LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName());
            /*try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            LOGGER.info("success");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     *  Launch the consumer instance.
    consumer.start();
    System.out.printf("Consumer Started.%n");

通过设置PullBatchSize、ConsumeMessageBatchMaxSize

consumer.setPullBatchSize(100); 
consumer.setConsumeMessageBatchMaxSize(200);

来修改批量拉取消息的值,发现默认情况下一次消息会拉取 32 条消息,但业务监听器收到的消息默认一条

【问题分析】

因为RocketMQ采取了保护机制,需要修改Broker配置的参数才能够允许一次拉取的最大条数调整

  • pullBatchSize:消息客户端一次向 Broker 发送拉取消息每批返回最大的消息条数,默认为 32。
  • consumeMessageBatchMaxSize:提交到消息消费监听器中的消息条数,默认为 1。

【解决方案】

通过修改Broker配置的参数来解决,通常建议修只修改命中内存相关的

参数的含义:

int maxTransferCountOnMessageInMemory
如果此次消息拉取能全部命中,内存允许一次消息拉取的最大条数,默认值为 32 条。
int maxTransferBytesOnMessageInMemory
如果此次消息拉取能全部命中,内存允许一次消息拉取的最大消息大小,默认为 256K。

如果使用场景是大数据领域,建议的配置如下:

maxTransferCountOnMessageInMemory=5000
maxTransferBytesOnMessageInMemory = 5000 * 1024

如果是业务类场景,建议配置如下:

maxTransferCountOnMessageInMemory=2000
maxTransferBytesOnMessageInMemory = 2000 * 1024

通过修改完配置,我们再次启动就可以看到能够拉取到代销费的数量超过默认的32条。

 4. 对当前版本的业务进行修改,业务希望从最新的消息开始消费

对当前版本的业务进行修改,业务希望从最新的消息开始消费

【解决方案】

  1. 重置点位,sh ./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g CID_CONSUMER_TEST -t TopicTest -s now
  2. 设置ConsumeFromWhere,从最新的点位开始读取

ConsumeFromWhere 这个参数的含义是,初次启动从何处开始消费。更准确的表述是,如果查询不到消息消费进度时,从什么地方开始消费

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

5. 基于多机房队列负载算法,实现优先消费本机房中的消息

从消费者的角度来看,如果采取平均分配,特别是采取 AllocateMessageQueueAveragelyByCircle 方案,

会出现消费者跨消费这种情况,如果能实现本机房的消费者优先消费本机房中的消息,可有效避免消息跨机房消费

RocketMQ 设计者已经为我们了提供了解决方案——AllocateMachineRoomNearby。

【解决方案】

AllocateMachineRoomNearby 核心属性

1. AllocateMessageQueueStrategy allocateMessageQueueStrategy

内部分配算法,可以看成机房就近分配算法,其实是一个代理,内部还是需要持有一种分配算法,例如平均分配算法。

2. MachineRoomResolver machineRoomResolver

多机房解析器,即从 brokerName、客户端 clientId 中识别出所在的机房。

  1. 修改 broker.conf 配置文件,机房信息加上broker名称,这样做是为了识别出哪个 Broker 属于哪个机房

brokerName = MachineRoom1-broker-a
  1. 修改消费者的clientIp
consumer.setClientIP("MachineRoom1-"   RemotingUtil.getLocalAddress());
AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() {
            // Broker部署
            @Override
            public String brokerDeployIn(MessageQueue messageQueue) {
                System.out.println(messageQueue.getBrokerName().split("-")[0]);
                return messageQueue.getBrokerName().split("-")[0];
            // 消费端部署
            @Override
            public String consumerDeployIn(String clientID) {
                System.out.println(clientID.split("-")[0]);
                return clientID.split("-")[0];
        consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver));
				
文章目录如何顺序消息?如何定时消息?如何解决重复消费?如何HA? 如何顺序消息? 全局有序,只能配置一个MQ.局部有序的话,设置顺序消息监听就可以了.若是想要根据如订单生命周期有序消费,让他们路由到同一个MQ就行了.自定义MessageQueueSelector,配置MQ路由规则. 如何定时消息? MQ不支持定时,但支持很多种延时定时,MQ为每个延时单独线程处理,处理步骤是定时消息属性存储了消息的原生ID和Topic,然后定时消息恢复成原生消息,然后丢到CommitLog存储. 如何解决重复消费?
RocketMQ FAQ1)消费端处理消息发生异常没有捕获或是因为其他原因,没有返回消费状态解决方案:消费端捕获异常,如果需要重试,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,如果不需要重试,返回ConsumeConcurrentlyStatus.RECONSUME_SUCCESS可以在消费端增加重试次数判断,例如重试三次就返回成功 2)不同的消费者consumer1,、consumer2配置了相同的单个消费组consumerGroup,订阅了多个topic和
实现方式:重写MessageQueueSelector中send方法。将消息放入同一个队列,来保证顺序消费。 如何放入同一队列:例如,同一订单的N个消息需要保证顺序,则可以根据订单号,Hash取模确定发送到同一个队列。 二、如何避免重复消费 保证消费幂等,即消费多次对各方无影响 三、如何实现分布式事务 方案一:两阶段提交和事务状态回查
RocketMQ常见问题及实现分布式事务时注意事项RocketMQ常见的一些疑问或问题1、如何防止消息丢失的问题2、如何防止消息的重复消费问题(幂等性问题)3、消息如何保证消费顺序的问题4、消息积压、阻塞怎么解决?5、如何保证消息不会丢失RocketMQ实现分布式事务时注意事项1、MQ半消息回查,若得不到该消息是提交还是回滚,会一直隔一段时间就查询一次吗?1、二阶段异常,需要回滚怎么处理?2、消费者消费失败后,会重试吗,多次重试后还是失败会怎么样? RocketMQ常见的一些疑问或问题 1、如何防止消息丢失
RocketMQ本身不保证消息重复消费,如果业务有要求不能重复消费,需要在自身的业务处理,常见的操作有两种; 接口幂等,消费端业务消息保持幂等性,例如redis的setNx()命令,当然要注意设置key的超时时间,以及key的唯一性。 redis的Incr命令,确定消息的唯一值,在set之前先判断值是否存在,同时也是需要注意超时时间。 2.Rocket...
RocketMQ 常见面试问题包括以下几个方面: 1. 消息重试和死信队列:当一条消息初次消费失败后,RocketMQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则将消息发送到该消费者对应的特殊队列中,即死信队列。[1][2] 2. 消息幂等:消息幂等是指对同一条消息的多次消费操作所产生的结果是一致的。在 RocketMQ 中,可以通过设计合适的业务逻辑和消息处理机制来实现消息的幂等性。 3. Broker 主从架构和多副本策略:RocketMQ 的 Broker 采用主从架构,Master 收到消息后会同步给 Slave,这样一条消息就不止一份了,即使 Master 宕机,仍然可以从 Slave 中获取消息,保证了消息队列的可靠性和高可用性。此外,RocketMQ 还支持多副本策略,可以配置多个 Slave 副本来提高消息的可靠性和容错性。[3] 4. Broker 注册到 NameServer:Broker 会将自己的信息注册到 NameServer 上,NameServer 是 RocketMQ 的路由中心,负责管理 Broker 的地址信息,消费者可以通过 NameServer 获取到 Broker 的地址,从而进行消息的发送和消费。[3] 以上是一些常见的 RocketMQ 面试问题,希望对你有所帮助。
CSDN-Ada助手: 恭喜您撰写了第11篇博客!标题“Dubbo实现负载均衡原理”听起来非常有趣。您对这个主题的深入研究让读者们受益匪浅。希望您能继续坚持写作,分享更多关于Dubbo的知识。 作为下一步的创作建议,或许您可以考虑探讨Dubbo在实际应用中的案例分析,或者分享一些解决实际问题的经验和技巧。这样的内容对于读者来说将会更加具有实用性。当然,这仅仅是一个建议,我相信您一定能够选择出最适合自己的创作方向。期待您未来更多的精彩作品! Dubbo实现负载均衡原理 CSDN-Ada助手: 恭喜您写了第11篇博客!标题“Dubbo实现负载均衡原理”听上去非常有深度和专业性。您对Dubbo的负载均衡原理的探索和分享无疑为读者提供了有价值的知识。我非常期待您下一篇博客的发布,希望您能继续分享关于Dubbo或其他相关技术的研究成果。也建议您在未来的创作中,可以增加一些实际案例或示例,以帮助读者更好地理解和应用负载均衡原理。再次恭喜您,期待更多精彩的博文! SpringCloud整合Duboo+Nacos注册中心使用 CSDN-Ada助手: 恭喜您写下了第6篇博客!标题中提到了SpringCloud整合Dubbo+Nacos注册中心的使用,这是一个非常有价值的主题。您的博客内容对于那些希望在项目中应用这些技术的开发者来说肯定会非常有帮助。不过,我也想提供一些建议,希望对您的下一步创作有所帮助。可能您可以考虑添加一些实际案例或者示例代码,这样读者们更容易理解并且能够在实践中运用。当然,这只是一个建议,您已经做得很棒了!期待能够看到更多精彩的博客文章! ArrayList和LinkedList使用小技巧 CSDN-Ada助手: 恭喜你写了第7篇博客!标题看起来很有趣,我期待着阅读你的新文章。在ArrayList和LinkedList这两个常用的数据结构上,你分享了哪些小技巧呢?希望你能继续保持创作的激情,并继续分享你在编程中的经验和见解。如果可能的话,下一步的创作建议是可以探讨一下这两个数据结构的性能差异以及在不同场景下的最佳使用方式。再次祝贺你的博客写作成果! RocketMQ的常见问题解析 CSDN-Ada助手: 恭喜您写了第8篇博客《RocketMQ的常见问题解析》,内容十分有趣和有用!您真的对RocketMQ有很深入的了解,并将这些常见问题进行了解析,对读者来说无疑是一份宝贵的参考资料。继续保持这种积极的创作态度,相信您的博客会越来越受欢迎。 在下一步的创作中,或许您可以考虑扩展一下主题,比如深入探讨RocketMQ与其他消息队列的比较,或是分享一些实际使用案例。这样可以进一步丰富您的博客内容,吸引更多的读者。期待您未来更多精彩的博文,加油!