Flume 消费或生产消息到 Kafka 遇到的相关问题及解决
下载 flume 源码包
地址: http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.12.1-src.tar.gz
在使用的 Flume 消费或生产添加了SASL_SSL认证的 Kafka 时,遇到以下问题。其中 Flume 本身自带 Kafka 依赖是0.9.0版本, Kafka 集群是0.10.0版本。
遇到的相关问题
由于 Flume 本身自带 Kafka 依赖是0.9.0版本, Kafka 集群是0.10.0版本,版本的不一致。
19/10/18 13:05:18 ERROR source.BasicSourceSemantics: Unexpected error performing start: name = r1 org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524) at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:514) at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83) at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:577) ... 13 more Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794) at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) at javax.security.auth.login.LoginContext.login(LoginContext.java:587) at org.apache.kafka.common.security.kerberos.Login.login(Login.java:299) at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104) at org.apache.kafka.common.security.kerberos.LoginManager.<init>(LoginManager.java:44) at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55) ... 16 more 19/10/18 13:05:18 ERROR source.PollableSourceRunner: Unhandled exception, logging and sleeping for 5000ms org.apache.flume.FlumeException: Source had error configuring or starting at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:53) at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524) at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:514) at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83) at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:577) ... 13 more Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794) at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) at javax.security.auth.login.LoginContext.login(LoginContext.java:587) at org.apache.kafka.common.security.kerberos.Login.login(Login.java:299) at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104) at org.apache.kafka.common.security.kerberos.LoginManager.<init>(LoginManager.java:44) at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85) at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55) ... 16 more
解决办法:
将 Flume 中的 lib 目录下的
kafka_2.10-0.9.0-kafka-2.0.2.jar
和
kafka-clients-0.9.0-kafka-2.0.2.jar
的 jar 包替换为0.10.0的 jar 包。
替换为 0.10.0 版本后可以实现 Kafka Sink (向 Kafka 生产数据),但是使用 Kafka Source (消费 Kafka 数据)会出现问题二。
cp kafka_2.11-0.10.0-kafka-2.1.0.jar /opt/cloudera/parcels/CDH/jars/
cp kafka-clients-0.10.0-kafka-2.1.0.jar /opt/cloudera/parcels/CDH/jars/
cd /opt/cloudera/parcels/CDH/lib/flume-ng/lib
ln -s ../../../jars/kafka_2.11-0.10.0-kafka-2.1.0.jar kafka_2.11-0.10.0-kafka-2.1.0.jar
ln -s ../../../jars/kafka-clients-0.10.0-kafka-2.1.0.jar kafka-clients-0.10.0-kafka-2.1.0.jar
rm -rf /opt/cloudera/parcels/CDH/lib/flume-ng/lib/kafka_2.10-0.9.0-kafka-2.0.2.jar
rm -rf /opt/cloudera/parcels/CDH/lib/flume-ng/lib/kafka-clients-0.9.0-kafka-2.0.2.jar
替换使用 Kafka(启用了 SASL_SSL) 时需要的两个 jar 包后,使用 Kafka Source 会出现 NoSuchMethodError
问题。
19/10/18 10:49:51 INFO authenticator.AbstractLogin: Successfully logged in.
19/10/18 10:49:51 INFO utils.AppInfoParser: Kafka version : 0.10.2.2
19/10/18 10:49:51 INFO utils.AppInfoParser: Kafka commitId : cd80bc412b9b9701
19/10/18 10:49:51 ERROR lifecycle.LifecycleSupervisor: Unable to start PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
at org.apache.flume.source.kafka.KafkaSource$TopicListSubscriber.subscribe(KafkaSource.java:152)
at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:517)
at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/10/18 10:49:51 ERROR lifecycle.LifecycleSupervisor: Unsuccessful attempt to shutdown component: {} due to missing dependencies. Please shutdown the agentor disable this component, or the agent will bein an undefined state.
java.lang.NullPointerException
at org.apache.flume.source.PollableSourceRunner$PollingRunner.access$200(PollableSourceRunner.java:119)
at org.apache.flume.source.PollableSourceRunner.stop(PollableSourceRunner.java:90)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:257)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/10/18 10:49:54 INFO lifecycle.LifecycleSupervisor: Component PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } is in error state, and Flume will notattempt to change its state
查看对比 kafka-clients-0.9.0-kafka-2.0.2.jar 和 kafka-clients-0.10.0-kafka-2.1.0.jar 中的 org.apache.kafka.clients.consumer.Consumer 接口发现,subscribe 方法参数类型发生变化。
0.9.0 版本:
public interface Consumer<K, V> extends Closeable {
Set<TopicPartition> assignment();
Set<String> subscription();
void subscribe(List<String> var1);
void subscribe(List<String> var1, ConsumerRebalanceListener var2);
void assign(List<TopicPartition> var1);
void subscribe(Pattern var1, ConsumerRebalanceListener var2);
void unsubscribe();
0.10.0 版本:
public interface Consumer<K, V> extends Closeable {
public Set<TopicPartition> assignment();
public Set<String> subscription();
public void subscribe(Collection<String> topics);
public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
public void assign(Collection<TopicPartition> partitions);
public void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
public void unsubscribe();
解决办法:
Flume 中依赖的第三方 jar (Kafka的jar) 发生改变了, 需要重新编译。项目中所依赖的 API 如果发生更改,即使在源代码中不需要进行任何更改,也应重新编译。如果 API 未发生更改,则无需重新编译。所以要解决上述异常,需要将 flume-kafka-source 中依赖的 Kafka 版本改为 0.10.0 后重新编译打包,然后替换 Flume 的 lib 中的 flume-kafka-source.jar 包。
参考: https://stackoverflow.com/questions/536971/do-i-have-to-recompile-my-application-when-i-upgrade-a-third-party-jar
解压源码包进入 flume-kafka-source 项目中并修改 pom.xml 中的 kafka.version。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<!--<version>${kafka.version}</version>-->
<version>0.10.0-kafka-2.1.0</version>
</dependency>
使用 maven 打成 jar 包。
[root@cdh01 flume-app]# wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.12.1-src.tar.gz
[root@cdh01 flume-app]# tar -zxvf flume-ng-1.6.0-cdh5.12.1-src.tar.gz
[root@cdh01 flume-app]# cd /data/flume-app/flume-ng-1.6.0-cdh5.12.1/flume-ng-sources/flume-kafka-source
[root@cdh01 flume-kafka-source]# vim pom.xml
[root@cdh01 flume-kafka-source]# rm -rf src/test/java/* # 删除测试文件,打包时会报错
[root@cdh01 flume-kafka-source]# mvn clean package
替换 /opt/cloudera/parcels/CDH/jars/flume-kafka-source-1.6.0-cdh5.12.1.jar
包
[root@cdh01 ~]# rm -rf /opt/cloudera/parcels/CDH/jars/flume-kafka-source-1.6.0-cdh5.12.1.jar
[root@cdh01 ~]# cd /data/flume-app/flume-ng-1.6.0-cdh5.12.1/flume-ng-sources/flume-kafka-source/target
[root@cdh01 target]# cp flume-kafka-source-1.6.0-cdh5.12.1.jar /opt/cloudera/parcels/CDH/jars
Flume 配置文件及命令
运行命令:
flume-ng agent --conf-file test-kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=debug,CONSOLE -Djava.security.auth.login.config=/data/flume-app/kafka_client_jaas.conf -Xmx1024m
kafka_client_jaas.conf 配置文件:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="username"
password="123456";
test-kafka-flume-hdfs.conf 配置文件:
## flume 配置
## 组件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.1.101:9093,192.168.1.102:9093,192.168.1.103:9093
a1.sources.r1.kafka.topics = test_topic
a1.sources.r1.kafka.consumer.group.id = test_group
a1.sources.r1.kafka.consumer.auto.offset.reset = earliest
a1.sources.r1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.r1.kafka.consumer.sasl.mechanism = PLAIN
a1.sources.r1.kafka.consumer.ssl.truststore.location = /data/flume-app/kafka.client.truststore.jks
a1.sources.r1.kafka.consumer.ssl.truststore.password = truststore
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/flume-app/channel/checkpointDir/test/checkpoint
a1.channels.c1.dataDirs = /data/flume-app/channel/checkpointDir/test/data
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
##sink1
a1.sinks.k1.type = logger
#a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.k1.kafka.producer.max.request.size = 100000
#a1.sinks.k1.kafka.producer.compression.type = gzip
#a1.sinks.k1.brokerList = 192.168.1.101:9093,192.168.1.102:9093,192.168.1.103:9093
#a1.sinks.k1.topic = sink_test
#a1.sinks.k1.kafka.producer.security.protocol = SASL_SSL
#a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
#a1.sinks.k1.kafka.producer.ssl.truststore.location = /data/flume-app/kafka.client.truststore.jks
#a1.sinks.k1.kafka.producer.ssl.truststore.password = truststore
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1