当kafka开启Kerberos认证后,如何使用Flink生产或消费数据呢?其实就是在生产消费者的代码中加入jaas.conf、keytab这些认证有关的配置,下面我们直接看代码:
版本信息:
flink1.9.0
kafka0.10.0
这里提示一下,如果版本依赖的不一致会报错,一定要对应版本:
java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
1.其实连接Kerberos集群很简单,需要下面三个文件:
1).KerberosServer的配置文件krb5.conf,让程序知道我应该哪个kdc去登录认证;
[libdefaults] udp_preference_limit = 1 renew_lifetime = 3650d forwardable = true default_realm = CHINAUNICOM ticket_lifetime = 3650d dns_lookup_realm = false dns_lookup_kdc = false default_ccache_name = /tmp/krb5cc_%{uid} #default_tgs_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5 #default_tkt_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5[domain_realm] .CHINAUNICOM = CHINAUNICOM[logging] default = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log kdc = FILE:/var/log/krb5kdc.log[realms] CHINAUNICOM = { admin_server = master98.hadoop.ljs kdc = master98.hadoop.ljs }
2).认证肯定需要指定认证方式这里需要一个jaas.conf文件,一般集群的conf目录下都有;
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="D:\\kafkaSSL\\kafka.service.keytab" storeKey=true useTicketCache=false principal="kafka/salver32.hadoop.unicom@CHINAUNICOM" serviceName=kafka;};
3).就是用户的登录认证票据和认证文件,票据和keytab文件这里就不在贴了;
2.为防止你依赖报错,这里贴下pom.xml依赖,可能有些冗余,自己删除即可:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> <scope>compile</scope></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-fs</artifactId> <version>${flink.version}</version></dependency><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version></dependency><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version></dependency><dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>${httpclient.version}</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.9.0</version> <scope>compile</scope></dependency>
4.Flink接收socket端消息,发送到kafka:
5.Flink将socket接收的数据发送Kafka,代码实例:
package com.hadoop.ljs.flink.streaming;
import com.hadoop.ljs.flink.utils.CustomKeyedSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-02-29 09:31
* @version: v1.0
* @description: com.hadoop.ljs.flink.streaming
*/
public class FlinkKafkaKerberosProducer {
public static final String topic="topic1";
public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";
public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";
public static final String bootstrapServers="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";
public static final String hostname="localhost";
public static final int port=9000;
public static void main(String[] args) throws Exception {
//在windows中设置JAAS,也可以通过-D方式传入
System.setProperty("java.security.krb5.conf", krb5Conf);
System.setProperty("java.security.auth.login.config", kafkaJaasConf);
/*获取flink流式计算执行环境*/
final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
/*从Socket端接收数据*/
DataStream<String> dataSource = senv.socketTextStream(hostname, port, "\n");
/*下面可以根据自己的需求进行自动的转换*/
/*接收的数据,中间可经过复杂的处理,最后发送到kafka端*/
dataSource.addSink(new FlinkKafkaProducer010<String>(topic, new CustomKeyedSerializationSchema(), getProducerProperties()));
/*启动*/
senv.execute("FlinkKafkaProducer");
}
public
static Properties getProducerProperties(){
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("acks", "1");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
return props;
}
}
6.Flink连接kafka消费消息,代码实例:
package com.hadoop.ljs.flink.streaming;
import com.hadoop.ljs.flink.utils.KafkaCommonRecordSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-02-29 09:31
* @version: v1.0
* @description: com.hadoop.ljs.flink.streaming
*/
public class FlinkKafkaKerberosConsumer {
public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";
public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";
public static final String topic="topic1";
public static final String consumerGroup="test_topic1";
public static final String bootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";
public static void main(String[] args) throws Exception {
//在windows中设置JAAS,也可以通过-D方式传入
System.setProperty("java.security.krb5.conf", krb5Conf);
System.setProperty("java.security.auth.login.config", kafkaJaasConf);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
FlinkKafkaConsumer010<String> consumer010 = new FlinkKafkaConsumer010<String>(topic,new SimpleStringSchema(), getComsumerProperties());
consumer010.setStartFromEarliest();
//source从kafka
DataStream<String> dataStream = env.addSource(consumer010);
dataStream.print();
try {
env.execute();
} catch (Exception ex) {
ex.printStackTrace();
}
}
private static Properties getComsumerProperties() {
Properties props = new Properties();
props.put("bootstrap.servers",bootstrapServer);
props.put("group.id",consumerGroup);
props.put("auto.offset.reset", "earliest");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
return props;
}
}
废话不多说,直接上1、定义一个Person类:public class Person {
String name;
int age;
public Person(String name, int age) {
super();
this.name = name;
this.age = age;
@Override
public String toString() {