12:51:28.426 [pool-1-thread-218] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=DemoProducer
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:451)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:304)
at com.study.kafka.ProducerNew.<init>(ProducerNew.java:36)
at com.study.kafka.ProducerNew.run(ProducerNew.java:76)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
上面是本人在使用spring kafka中所遇到的问题,针对此问题做一个记录,根据error log,发现
在调用AppInfoParser.registerAppInfo方法时出现的异常,
根据error log
定位到 Repository.addMBean()截取一部分代码
上图中可以得知,一个clientId对应一个,不可重复。
最后找到ProduceNew,发现是因为自己配置了client.id导致的;如果不配置的话,看KafkaProducer类源码可知道会为每一个线程生成一个clientid,"consumer" + 自增id,原子性递增。
String clientId = config.getString("client.id");
if (clientId.length() <= 0) {
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
}
基于此可知道配置了并发度大于1,同时配置了kafka的 client.id属性则会出现上述问题,而当你配置为1的时候不会出现上述log 解决方式:不配置client.id这一项,kakfa中会默认为多个线程生成id。
原文链接:https://blog.csdn.net/qq_38286618/article/details/103443896
如果非得自己定义client.id,可以在使用的时候,自己修改配置(可不依赖配置文件):
@Bean("kafakConsumerProp")
@ConfigurationProperties(prefix = "daemon.consumer.kafka.properties")
public Properties getKafkaConsumeProperties() {
return new Properties();
@Autowired
@Qualifier("kafakConsumerProp")
private Properties properties;
@PostConstruct
private void init() {
rateLimiter = RateLimiter.create(ratelimiterProperties.getUpdateReportNotify());
propertiesCopy = (Properties) properties.clone();
propertiesCopy.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
propertiesCopy.put("value.deserializer", org.apache.kafka.common.serialization.ByteArrayDeserializer.class);
String bootstrapServers = StringUtils.collectionToDelimitedString(kafkaProperties.getBootstrapServers(), ",");
propertiesCopy.put("bootstrap.servers", bootstrapServers);
propertiesCopy.put("client.id", properties.get("client.id")+"-update-report-notify");
propertiesCopy.put("group.id", properties.get("group.id")+"-update-report-notify");
startPoll();
12:51:28.426 [pool-1-thread-218] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbeanjavax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=DemoProducer at com.sun.jmx.mbeanserver.Repository.addMBe.
本文,Verisign实验室大规模数据分析基础设施的技术主管Michael通过示例对Kafka整合到SparkStreaming进行了详细讲解,更分享了该领域的现状和一些注意点。作者MichaelG.Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管。本文,Michael详细的演示了如何将Kafka整合到SparkStreaming中。期间, Michael还提到了将Kafka整合到SparkStreaming中的一些现状,非常值得阅读,虽然有一些信息在Spark1.2版本中已发生了一些变化,比如HA策略:
在搭建springcloud项目时,多个模块的项目同事启动,但是又两个模块的项目启动报错:javax.management.InstanceAlreadyExistsException;
百度了很多解决办法,说什么domain不可以一样,要在配置文件里添加 spring.jmx.domain的配置,试了也是不可以;
由于是在启动的时候报错,于是查看两个启动报错的启动类,发现启动类的入口方法SpringApplication.run方法多次执行了,导致报错。
解决方法:
springboot启动类中只
出现这个异常,一般来说是在
并发访问
Kafka的时候出现的,解决思路:
(1)
kafka的cli
ent.id重复了,看看代码里的
配置是不是配重复了(也有人说可以代码取消此项
配置)
(2)看看代码
并发情况出现成员变量,比如Properties,导致在
并发情况下
配置混乱造成的
我遇到是第二种导致的,不说了,Someone else's code.
[
kafka_spout:7-MultipleThreadSpoutExecutors] WARN o.a.
kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.
management.
InstanceAlready
ExistsException:
kafka.consumer:type=app-info
12:51:28.426 [pool-1-thread-218] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=DemoProducer
at com.sun.jmx.mbeanserver.Repository.addMBea
错误日志:
2019-10-11 17:50:48.744 WARN []-[o.a.k.clients.consumer.ConsumerConfig :173] The configuration num.replica.fetchers = 1 was supplied but isn't a known config.
2019-10-11 17:50:48.747 INFO []-[o.a.kafka.common.utils.AppInfoParser :82] Kafka version :
已编译
Kafka-
Manager-1.3.3.22
linux下直接解压解压
kafka-
manager-1.3.3.22.zip到/opt/module目录
[root@hadoop102 module]$ unzip
kafka-
manager-1.3.3.22.zip
4)进入到/opt/module/
kafka-
manager-1.3.3.22/conf目录,在application.conf文件中修改
kafka-
manager.zkhosts
[root@hadoop102 conf]$ vim application.conf
修改zookeeper地址为:
kafka-
manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
5)启动
KafkaManager
[root@hadoop102
kafka-
manager-1.3.3.22]$ nohup bin/
kafka-
manager -Dhttp.port=7456 >/opt/module/
kafka-
manager-1.3.3.22/start.log 2>&1 &
6)在浏览器中打开
备注:指定端口号看启动过程中 "-Dhttp.port=7456" 端口可以自己设置
http://hadoop102:7456
kafka-connect-zeebe
这个适用于器可以做两件事:
当工作流实例达到特定活动时,将消息发送到Kafka主题。 请注意,一条message更确切地说是一个卡夫卡record ,通常也称为event 。 这是Kafka Connect演讲中的消息来源。
消耗来自Kafka主题的消息,并将它们与工作流程相关联。 这是一个Kafka Connect接收器。
它可以与或独立的Zeebe经纪人合作。
有关实现的一些背景,请参。
示例和演练
以下视频引导您完成连接的示例:
安装和快速入门
您将在此处找到有关如何构建连接器以及如何运行Kafka和Zeebe的信息,以快速入门:
该插件带有两个连接器,即源连接器和接收器连接器。
源连接器激活Zeebe作业,将其发布为Kafka记录,并在将它们提交给Kafka后完成。
水槽连接器
在工作流模型中,您可以按名称等待某些事件(通
InstanceAlreadyExistsException错误
在ELFK集群结合Kafka集群的配置过程中,通过filebet收集日志传输到kafka,然后Kafka传输日志到logstash,最后到elasticsearch,并结合zabbix告警。
但在kafka传输日志到logstash的配置完成后,启动logstash时报错:
[WARN ][org.apache.kafka.comm...
CORRUPT_MESSAGE
这个错误一般是压缩策略为cleanup.policy=compact的情况下,key不能为空
o.a.k.c.p.i.Sender 595 [WARN] [Producer clientId=producer-1] Got error produce response with correlation id
131 on topic-partition SHI_TOPIC1-0, retrying (2147483521 attempts left). Error: COR
这个错误是Kafka出现的一个异常情况,意味着程序在运行过程中被强制中断了。具体来说,是由于程序在等待某个操作完成时,被外部因素(比如操作系统)强制中断了,导致程序无法正常完成该操作并抛出该异常。这个异常通常会在Kafka客户端的使用中出现,比如在数据消费者的消费过程中。
解决该错误需要找到导致程序被中断的具体原因。可能的原因包括操作系统资源耗尽、网络连接不稳定等等。一些具体的解决方案可能包括增加操作系统资源、优化程序代码、尝试连接其他网络等等。总之,这个异常提示需要我们注意程序在运行过程中可能会出现的问题,及时处理并改进程序的运行方式。