添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
12:51:28.426 [pool-1-thread-218] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=DemoProducer
	at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
	at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
	at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
	at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:451)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:304)
	at com.study.kafka.ProducerNew.<init>(ProducerNew.java:36)
	at com.study.kafka.ProducerNew.run(ProducerNew.java:76)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

上面是本人在使用spring kafka中所遇到的问题,针对此问题做一个记录,根据error log,发现 在调用AppInfoParser.registerAppInfo方法时出现的异常, 根据error log 定位到 Repository.addMBean()截取一部分代码

上图中可以得知,一个clientId对应一个,不可重复。

最后找到ProduceNew,发现是因为自己配置了client.id导致的;如果不配置的话,看KafkaProducer类源码可知道会为每一个线程生成一个clientid,"consumer" +  自增id,原子性递增。

String clientId = config.getString("client.id");
if (clientId.length() <= 0) {
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
}
基于此可知道配置了并发度大于1,同时配置了kafka的 client.id属性则会出现上述问题,而当你配置为1的时候不会出现上述log 解决方式:不配置client.id这一项,kakfa中会默认为多个线程生成id。
原文链接:https://blog.csdn.net/qq_38286618/article/details/103443896

如果非得自己定义client.id,可以在使用的时候,自己修改配置(可不依赖配置文件):

	@Bean("kafakConsumerProp")
	@ConfigurationProperties(prefix = "daemon.consumer.kafka.properties")
	public Properties getKafkaConsumeProperties() {
		return new Properties();
	@Autowired
	@Qualifier("kafakConsumerProp")
	private Properties properties;
	@PostConstruct
	private void init() {
		rateLimiter = RateLimiter.create(ratelimiterProperties.getUpdateReportNotify());
		propertiesCopy = (Properties) properties.clone();
		propertiesCopy.put("key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class);
		propertiesCopy.put("value.deserializer", org.apache.kafka.common.serialization.ByteArrayDeserializer.class);
		String bootstrapServers = StringUtils.collectionToDelimitedString(kafkaProperties.getBootstrapServers(), ",");
		propertiesCopy.put("bootstrap.servers", bootstrapServers);
		propertiesCopy.put("client.id", properties.get("client.id")+"-update-report-notify");
		propertiesCopy.put("group.id", properties.get("group.id")+"-update-report-notify");
		startPoll();
                    12:51:28.426 [pool-1-thread-218] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbeanjavax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=DemoProducer	at com.sun.jmx.mbeanserver.Repository.addMBe.
				
本文,Verisign实验室大规模数据分析基础设施的技术主管Michael通过示例对Kafka整合到SparkStreaming进行了详细讲解,更分享了该领域的现状和一些注意点。作者MichaelG.Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管。本文,Michael详细的演示了如何将Kafka整合到SparkStreaming中。期间, Michael还提到了将Kafka整合到SparkStreaming中的一些现状,非常值得阅读,虽然有一些信息在Spark1.2版本中已发生了一些变化,比如HA策略:
在搭建springcloud项目时,多个模块的项目同事启动,但是又两个模块的项目启动报错:javax.management.InstanceAlreadyExistsException; 百度了很多解决办法,说什么domain不可以一样,要在配置文件里添加 spring.jmx.domain的配置,试了也是不可以; 由于是在启动的时候报错,于是查看两个启动报错的启动类,发现启动类的入口方法SpringApplication.run方法多次执行了,导致报错。 解决方法: springboot启动类中只
出现这个异常,一般来说是在并发访问Kafka的时候出现的,解决思路: (1)kafka的client.id重复了,看看代码里的配置是不是配重复了(也有人说可以代码取消此项配置) (2)看看代码并发情况出现成员变量,比如Properties,导致在并发情况下配置混乱造成的 我遇到是第二种导致的,不说了,Someone else's code.
[kafka_spout:7-MultipleThreadSpoutExecutors] WARN o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info
12:51:28.426 [pool-1-thread-218] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=DemoProducer at com.sun.jmx.mbeanserver.Repository.addMBea
错误日志: 2019-10-11 17:50:48.744 WARN []-[o.a.k.clients.consumer.ConsumerConfig :173] The configuration num.replica.fetchers = 1 was supplied but isn't a known config. 2019-10-11 17:50:48.747 INFO []-[o.a.kafka.common.utils.AppInfoParser :82] Kafka version :
已编译 Kafka-Manager-1.3.3.22 linux下直接解压解压kafka-manager-1.3.3.22.zip到/opt/module目录 [root@hadoop102 module]$ unzip kafka-manager-1.3.3.22.zip 4)进入到/opt/module/kafka-manager-1.3.3.22/conf目录,在application.conf文件中修改kafka-manager.zkhosts [root@hadoop102 conf]$ vim application.conf 修改zookeeper地址为: kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181" 5)启动KafkaManager [root@hadoop102 kafka-manager-1.3.3.22]$ nohup bin/kafka-manager -Dhttp.port=7456 >/opt/module/kafka-manager-1.3.3.22/start.log 2>&1 & 6)在浏览器中打开 备注:指定端口号看启动过程中 "-Dhttp.port=7456" 端口可以自己设置 http://hadoop102:7456
kafka-connect-zeebe 这个适用于器可以做两件事: 当工作流实例达到特定活动时,将消息发送到Kafka主题。 请注意,一条message更确切地说是一个卡夫卡record ,通常也称为event 。 这是Kafka Connect演讲中的消息来源。 消耗来自Kafka主题的消息,并将它们与工作流程相关联。 这是一个Kafka Connect接收器。 它可以与或独立的Zeebe经纪人合作。 有关实现的一些背景,请参。 示例和演练 以下视频引导您完成连接的示例: 安装和快速入门 您将在此处找到有关如何构建连接器以及如何运行Kafka和Zeebe的信息,以快速入门: 该插件带有两个连接器,即源连接器和接收器连接器。 源连接器激活Zeebe作业,将其发布为Kafka录,并在将它们提交给Kafka后完成。 水槽连接器 在工作流模型中,您可以按名称等待某些事件(通
InstanceAlreadyExistsException错误 在ELFK集群结合Kafka集群的配置过程中,通过filebet收集日志传输到kafka,然后Kafka传输日志到logstash,最后到elasticsearch,并结合zabbix告警。 但在kafka传输日志到logstash的配置完成后,启动logstash时报错: [WARN ][org.apache.kafka.comm...
CORRUPT_MESSAGE 这个错误一般是压缩策略为cleanup.policy=compact的情况下,key不能为空 o.a.k.c.p.i.Sender 595 [WARN] [Producer clientId=producer-1] Got error produce response with correlation id 131 on topic-partition SHI_TOPIC1-0, retrying (2147483521 attempts left). Error: COR
这个错误是Kafka出现的一个异常情况,意味着程序在运行过程中被强制中断了。具体来说,是由于程序在等待某个操作完成时,被外部因素(比如操作系统)强制中断了,导致程序无法正常完成该操作并抛出该异常。这个异常通常会在Kafka客户端的使用中出现,比如在数据消费者的消费过程中。 解决该错误需要找到导致程序被中断的具体原因。可能的原因包括操作系统资源耗尽、网络连接不稳定等等。一些具体的解决方案可能包括增加操作系统资源、优化程序代码、尝试连接其他网络等等。总之,这个异常提示需要我们注意程序在运行过程中可能会出现的问题,及时处理并改进程序的运行方式。