假设消费者对messageQueue的加锁已经成功,那么就进入到了第二个步骤,创建pullRequest进行消息拉取,消息拉取部分的代码实现在PullMessageService中,消息拉取完后,需要提交到ConsumeMessageService中进行消费,顺序消费的实现为ConsumeMessageOrderlyService
,提交消息进行消费的方法为ConsumeMessageOrderlyService#submitConsumeRequest,具体实现如下
可以看到,构建了一个ConsumeRequest对象,并提交给了ThreadPoolExecutor来并行消费,看下顺序消费的ConsumeRequest的run方法实现
里面先从messageQueueLock中获取了messageQueue对应的一个锁对象,看下messageQueueLock的实现
其中维护了一个ConcurrentMap<MessageQueue, Object> mqLockTable,使得一个messageQueue对应一个锁对象object
获取到锁对象后,使用synchronized尝试申请线程级独占锁
- 如果加锁成功,同一时刻只有一个线程进行消息消费
- 如果加锁失败,会延迟100ms重新尝试向broker端申请锁定messageQueue,锁定成功后重新提交消费请求
至此,第三个关键点的解决思路也清晰了,基本上就两个步骤
- 创建消息拉取任务时,消息客户端向broker端申请锁定MessageQueue,使得一个MessageQueue同一个时刻只能被一个消费客户端消费
- 消息消费时,多线程针对同一个消息队列的消费先尝试使用synchronized申请独占锁,加锁成功才能进行消费,使得一个MessageQueue同一个时刻只能被一个消费客户端中一个线程消费
在使用顺序消息时,一定要注意其异常情况的出现,对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 版会自动不断地进行消息重试(每次间隔时间为 1 秒),重试最大值是Integer.MAX_VALUE
.这时,应用会出现消息消费被阻塞的情况。因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生
重要的事再强调一次:在使用顺序消息时,一定要注意其异常情况的出现!
RocketMQ-顺序消息Demo及实现原理分析
文章目录场景分析全局顺序消费局部顺序消费DemoProducer DemoNormal Consumer DemoOrder Consumer Demo源码分析锁定MessageQueue客户端实现broker端实现锁定ProcessQueue场景分析顺序消费是指消息的产生顺序和消费顺序相同假设有个下单场景,每个阶段需要发邮件通知用户订单状态变化。用户付款完成时系统给用户发送订单已付款邮件,...
目录说明生产端消费端总结
RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。
核心思想是:发送消息时,可以通过实现MessageQueueSelector接口,选择消息发送到哪个队列,从而保证某类数据的顺序性。同时,可以在send方法中指定入参,方便MessageQueueSelector接口内部根据入参选择指定的队列。
@Test
public void
1. 前言
顺序消息是RocketMQ的特性之一,它可以让Consumer消费消息的顺序严格按照消息的发送顺序来进行。例如:一条订单产生的三条消息:订单创建、订单付款、订单完成。消费时要按照这个顺序依次消费才有意义,但是不同的订单之间这些消息是可以并行消费的。
顺序消息可以分为全局有序和分区有序,绝大部分场景下,分区有序就已经能够满足需求了,因此本文会重点分析。
全局有序:某个Topic下所有的消息都是有序的,所有消息按照严格的先进先出的顺序进行生产和消费,要求Topic下只能有一个分区队列,且Cons
kafka篇二11、讲一讲 kafka 的 ack 的三种机制12、消费者如何不自动提交偏移量,由应用提交?13、消费者故障,出现活锁问题如何解决?14、如何控制消费的位置15、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?16、kafka 的高可用机制是什么?17、kafka 如何减少数据丢失18、kafka 如何不消费重复数据?比如扣款,我们不能重复的扣。
11、讲一讲 kafka 的 ack 的三种机制
request.required.acks 有三个值 0 1 -1(all)
在项目中使用MQ消息队列时,某些业务场景可能需要保证消息的顺序消费执行。
比如在订单流程场景中:创建订单、支付订单、订单完成这三个订单状态需要保证顺序执行,不能先支付订单,再创建订单。也不能订单完成,再去支付订单。
在项目中如果使用默认的普通消息,那么就算生产者按照顺序发布消息后,消费端也有可能会不会按照顺序进行消费消息。
RocketMQ
RocketMQ在其官方文档中指出,除了支持无序消息以外,还支持顺序消息。
无序消息:普通消息,事务消息,延迟消息和定时消息
顺序消息:全局顺序消息,
一、rocketmq顺序消费的原理
1、消息的有序性是指消息的消费顺序能够与消息的发送顺序一致。但是有时候我们从业务需要上面并不需要保证所有消息严格按照消费顺序完全一致。例如,一个订单的下单、付款、出库等操作是不同替换顺序。但是有A订单和B订单,并不需要保证A订单与B订单的顺序。
2、RocketMQ采用了局部顺序一致性的机制,一组消息发送到同一个队列中来保证发送顺序的有序性,然后再由消费者进行。消费的时候通过一个队列只会被一个线程取到 ,第二个线程无法访问这个队列 来保证队列有序性。rocketmq可以
// 有序消费MessageListenerOrderly
consumer.registerMessageListener((MessageListenerOrderly) (msgs, content) -> {
// msgs中只收集同一个topic,同一个tag,并且key相同的message
try {
for (MessageExt msg : msgs) {
String body = ne
Java 操作RocketMQ简单Demo,顺序与事务案例,不容错过!
前言:MQ的概述与认识这里就暂时不赘述,以及RocketMQ,想了解这方面的知识点,可以推荐伙伴们看 “ 墨家巨子@俏如来 ”大佬写的文章
注:- - 启动Broker时,需要指定可以支持自动创建主题(start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true),如果出现闪退的情况,可以把 MQ文件夹\bin’下,Bean目录下的runbroker.cmd 中JVM占用
1,单机模式的配置:进入目录\rocketmq-all-4.7.0-bin-release\conf\2m-noslave,修改broker-a.properties目录如下:
# 集群名称
brokerClusterName=DefaultCluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示Master,>0 表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=127.0.