添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Kafka使用KafkaTemplate发送消息,需要先实例化bean.配置如下

<!-- 定义producer的参数 -->
	<bean id="producerProperties" class="java.util.HashMap">
		<constructor-arg>
				<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
				<entry key="group.id" value="0" />
				<entry key="retries" value="2" />
				<entry key="batch.size" value="16384" />
				<entry key="linger.ms" value="1" />
				<entry key="buffer.memory" value="33554432" />
				<entry key="max.request.size" value="10000000"></entry>
				<entry key="send.buffer.bytes" value="10000000"></entry>
				<entry key="key.serializer"
					value="org.apache.kafka.common.serialization.StringSerializer" />
				<entry key="value.serializer"
					value="org.apache.kafka.common.serialization.StringSerializer" />
		</constructor-arg>
	</bean>
	<!-- 创建kafkatemplate需要使用的producerfactory bean -->
	<bean id="producerFactory"
		class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
		<constructor-arg>
			<ref bean="producerProperties" />
		</constructor-arg>
	</bean>
	<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
	<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg ref="producerFactory" />
		<constructor-arg name="autoFlush" value="true" />
		<property name="defaultTopic" value="mhb-test" />
	</bean>

使用时直接注入就可以使用了.

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,key, JSON.toJSONString(obj));
future.get();

这个是通过Spring包装后的用法.Spring增加了ProducerFactory创建Producer对象,并且给Producer增加了事务功能,把参数包装成ProducerRecord对象,调用Kafka-client包中Producer类的send方法.

protected ListenableFuture<SendResult<K, V>> doSend(ProducerRecord<K, V> producerRecord) {
        if (this.transactional) {
            Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
//增加事务功能,使用factory创建producer
        Producer<K, V> producer = this.getTheProducer();
        this.logger.trace(() -> {
            return "Sending: " + producerRecord;
        SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
//发送Kafka,包装返回结果  
producer.send(producerRecord, this.buildCallback(producerRecord, producer, future));
        if (this.autoFlush) {
            this.flush();
        this.logger.trace(() -> {
            return "Sent: " + producerRecord;
        return future;

接下来都是kafka-client包内的内容了.KafkaProducer实现了Producer接口,在发送前还调用了拦截器ProducerInterceptor,这个拦截器能拦截甚至更改record数据.官方介绍如下.

A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster.

拦截器处理完后就是真正发送到Kafka了.调用了org.apache.kafka.clients.producer.KafkaProducer#doSend方法.源码如下:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
//0.检查和一些参数的初始化,计算等待时间,因为Kafka是批量发送.
            this.throwIfProducerClosed();
            KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
            } catch (KafkaException var19) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", var19);
                throw var19;
            long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {//1.序列化key
                serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException var18) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var18);
            byte[] serializedValue;
            try {//2.序列化value
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException var17) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var17);
//3,计算根据key,计算所在分区
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
//组装TopicPartition对象
            tp = new TopicPartition(record.topic(), partition);
            this.setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
            this.ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp();
            this.log.trace("Sending record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
//组装Callback 对象
            Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
            if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                this.transactionManager.maybeAddPartitionToTransaction(tp);
//3.添加到RecordAccumulator中等待发送
            RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
//4.返回结果
            return result.future;
        } catch (ApiException var20) {
            this.log.debug("Exception occurred during message send:", var20);
            if (callback != null) {
                callback.onCompletion((RecordMetadata)null, var20);
            this.errors.record();
            this.interceptors.onSendError(record, tp, var20);
            return new KafkaProducer.FutureFailure(var20);
        } catch (InterruptedException var21) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var21);
            throw new InterruptException(var21);
        } catch (BufferExhaustedException var22) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            this.interceptors.onSendError(record, tp, var22);
            throw var22;
        } catch (KafkaException var23) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var23);
            throw var23;
        } catch (Exception var24) {
            this.interceptors.onSendError(record, tp, var24);
            throw var24;

 这里是发送Kafka消息的核心逻辑了,这段代码非常重要,尤其是计算分区逻辑和Kafka批量发送逻辑.

分区默认是采用hash算法计算key,转32位后与总分区取余.

发送消息是批量发送,先把数据在client中存下来,等队列满了或者等待时间到了就发送给Kafka服务器.重点关注org.apache.kafka.clients.producer.internals.RecordAccumulator#append方法,代码如下,具体逻辑将在下一篇中补充.

public RecordAccumulator.RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException {
        this.appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) {
            headers = Record.EMPTY_HEADERS;
        RecordAccumulator.RecordAppendResult var16;
        try {
//1.检查是否有包含该主题分区的批处理对象的双端队列,如果没有则新建
            Deque<ProducerBatch> dq = this.getOrCreateDeque(tp);
            synchronized(dq) {
                if (this.closed) {
                    throw new KafkaException("Producer closed while send in progress");
//尝试向批处理对象追加消息,并返回追加结果,如果队列里没有批处理对象,则返回空
                RecordAccumulator.RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    RecordAccumulator.RecordAppendResult var14 = appendResult;
                    return var14;
            byte maxUsableMagic = this.apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, this.compression, key, value, headers));
            this.log.trace("Allocating a new {} byte message buffer for topic {} partition {}", new Object[]{size, tp.topic(), tp.partition()});
            buffer = this.free.allocate(size, maxTimeToBlock);
            synchronized(dq) {
                if (this.closed) {
                    throw new KafkaException("Producer closed while send in progress");
                RecordAccumulator.RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult == null) {
//2. 将消息写入内存中,封装成一个内存消息对象
                    MemoryRecordsBuilder recordsBuilder = this.recordsBuilder(buffer, maxUsableMagic);
//根据内存消息对象新建一个批处理对象
                    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, this.time.milliseconds());
//批量处理
                    FutureRecordMetadata future = (FutureRecordMetadata)Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, this.time.milliseconds()));
//将批处理对象添加到双端队列中
                    dq.addLast(batch);
                    this.incomplete.add(batch);
                    buffer = null;
                    RecordAccumulator.RecordAppendResult var19 = new RecordAccumulator.RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
                    return var19;
                var16 = appendResult;
        } finally {
            if (buffer != null) {
                this.free.deallocate(buffer);
            this.appendsInProgress.decrementAndGet();
        return var16;
                    Kafka使用KafkaTemplate发送消息,需要先实例化bean.配置如下&lt;!-- 定义producer的参数 --&gt;	&lt;bean id="producerProperties" class="java.util.HashMap"&gt;		&lt;constructor-arg&gt;			&lt;map&gt;				&lt;entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" /&gt;
				
Kafka Scala模板 这是使用scala与kafka一起工作的模板。 使用自动生成的avro代码,一切都被docker化,所有消息均以avro格式发送。 该模板针对使用Linux / Ubuntu / Mac和Intellij的人群。 该程序做什么 该程序在生产者中产生小的“ Hello there”消息,并且kafka-connect自动将它们传输到postgres。 流模块添加了!!! 在每封邮件的末尾。 消费者显示生产者和流模块已发出的消息。 build.gradle文件(在根文件夹中)包含所有必需的命令。 如果转到项目的根文件夹并键入gradle -t,则所有默认任务都将运行(即prepare,startContainers,producer)。 您可以在build.gradle顶部轻松更改默认任务。 现在,您可以键入:docker ps,以查看启动了哪些容器
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 2.application.yml中引入kafka相关配置 spring: kafka.. 1,本文主要关注 KafkaTemplate的重点方法,并非全部方法; 2,KafkaTemplate 底层依赖于 DefaultKafkaProducerFactory , 关于 DefaultKafkaProducerFactory 的介绍,refer2 spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客【1】 类描述类描述:单例共享 Producer 实例的 Prod..
1、发后即忘(fire-and-forget) 只管往kafka发送消息而并不关心消息是否正确到达。正常情况没什么问题,不过有些时候(比如不可重试异常)会造成消息的丢失。这种发送方式性能最高,可靠性最差。 kafkaTemplate.send(topic, msg) 2、同步发送(sync) 其实kafkaTemplate.send方法并不是返回void,而是ListenableFuture<SendResult<K, V>>,该类继承了jdk concurrent包的Fu
1.为什么使用kafka?以实际业务中用户提现为例,比如一笔交易发生,这笔交易是否是欺诈交易,这笔交易是否已经达到限额了,相关用户是否在黑名单中,交易结果联动短信、Email等渠道通知用户……等等,也许一笔简单的交易,就要与反欺诈系统、黑名单系统、限额或授信管控系统、短信邮件系统进行通讯 (1)解耦,避免子系统之间过度依赖 (2)异步处理,使主程序关注自己的业务点(即正常交易逻辑的处理),将其...
主要讲两个重要点:1、分区选择 2、ack机制 springboot中现在已经嵌入了kafka,可以直接用KafkaTemplate进行操作,和RestTemplate类似 maven中配置: <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifac...
在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式。 confirm 确认模式 return 退回模式 rabbitmq整个消息投递的路径为: producer—>rabbitmq broker—> exchange—>queue—>consumer 消息从producer 到exchange则会返回一个confirmCallback 。 ·消息从exchange–&g 其中topic,message是范指。 异步发送消息时,只要消息积累达到batch.size值或者积累消息的时间超过linger.ms(二者满足其一),producer就会把该批量的消息发送到topic中。 注:batch.size默认是16384,linger.ms默认是0 同步发送: 代码使用: kafkaTemplate.send(topic,message
好的,我可以帮助您解答有关KafkaTemplate使用案例的问题。KafkaTemplate是Spring Kafka提供的一个生产者API,它可以让您轻松地在Spring应用程序中使用Apache Kafka消息代理。 在使用KafkaTemplate之前,您需要进行一些配置。例如,您需要配置KafkaTemplate的生产者工厂,以及Kafka的bin目录的路径等。一旦配置完成,您就可以使用KafkaTemplate发送消息。以下是一个简单的KafkaTemplate使用案例: ```java @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send("myTopic", message); 在这个例子中,我们使用@Autowired注解来注入KafkaTemplate实例。在sendMessage方法中,我们使用send()方法将消息发送到名为“myTopic”的Kafka主题中。 当然,这只是一个简单的使用案例,KafkaTemplate还有其他更高级的特性,例如事务以及实现自定义序列化和反序列化逻辑等。 希望这个回答能够帮助您解决问题。如果您有其他问题,可以随时问我。
Spring启动异常之ConflictingBeanDefinitionException: Annotation-specified bean name ‘XXXXXService‘ 14899 MySQL主从复制报错Authentication plugin ‘caching_sha2_password‘ reported error: Authentication @体验人生的大都: 我提示的就是这个 错误,但是没有办法解决 , FlowableException: Could not update Flowable database schema: unknown version from database: 'XXX' HezhezhiyuLe: 有用 我的升级到了 6.7.2.0 通过查看jar 包版本 把所有叫这名字的都改为6.5.0.1 然后就可以启动了 注意他说的schema.version 是包含这个关键字的 修改10处两个表[code=html] https://blog.csdn.net/qq_46258463/article/details/126017142 [/code] FlowableException: Could not update Flowable database schema: unknown version from database: 'XXX' HezhezhiyuLe: https://blog.csdn.net/qq_46258463/article/details/126017142 直接看你依赖的jar 文章有方式 FlowableException: Could not update Flowable database schema: unknown version from database: 'XXX' 飞翔的咩咩: 这个是你引入的flowable包的源码,你下载的哪个版本就是哪个,你也可以用解压,然后反编译去查看 FlowableException: Could not update Flowable database schema: unknown version from database: 'XXX' m0_59514902: 大神ProcessEngine.java版本怎么查看啊表情包 The server selected protocol version TLS10 is not accepted by client preferences [TLS13, TLS12] Mybatis3源码全解析