添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

当kafka开启Kerberos认证后,如何使用Flink生产或消费数据呢?其实就是在生产消费者的代码中加入jaas.conf、keytab这些认证有关的配置,下面我们直接看代码:


版本信息:

flink1.9.0

kafka0.10.0


这里提示一下,如果版本依赖的不一致会报错,一定要对应版本:

java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread

1.其实连接Kerberos集群很简单,需要下面三个文件:

1).KerberosServer的配置文件krb5.conf,让程序知道我应该哪个kdc去登录认证;

[libdefaults]  udp_preference_limit = 1   renew_lifetime = 3650d  forwardable = true  default_realm = CHINAUNICOM  ticket_lifetime = 3650d  dns_lookup_realm = false  dns_lookup_kdc = false  default_ccache_name = /tmp/krb5cc_%{uid}  #default_tgs_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5  #default_tkt_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5[domain_realm]  .CHINAUNICOM = CHINAUNICOM[logging]  default = FILE:/var/log/krb5kdc.log  admin_server = FILE:/var/log/kadmind.log  kdc = FILE:/var/log/krb5kdc.log[realms]  CHINAUNICOM = {    admin_server = master98.hadoop.ljs    kdc = master98.hadoop.ljs  }

2).认证肯定需要指定认证方式这里需要一个jaas.conf文件,一般集群的conf目录下都有;

KafkaClient {    com.sun.security.auth.module.Krb5LoginModule required    useKeyTab=true    keyTab="D:\\kafkaSSL\\kafka.service.keytab"    storeKey=true    useTicketCache=false    principal="kafka/salver32.hadoop.unicom@CHINAUNICOM"    serviceName=kafka;};

3).就是用户的登录认证票据和认证文件,票据和keytab文件这里就不在贴了;


2.为防止你依赖报错,这里贴下pom.xml依赖,可能有些冗余,自己删除即可:

<dependency>  <groupId>org.apache.kafka</groupId>  <artifactId>kafka-clients</artifactId>  <version>${kafka.version}</version>  <scope>compile</scope></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-hadoop-fs</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.hadoop</groupId>  <artifactId>hadoop-common</artifactId>  <version>${hadoop.version}</version></dependency><dependency>  <groupId>org.apache.hadoop</groupId>  <artifactId>hadoop-hdfs</artifactId>  <version>${hadoop.version}</version></dependency><dependency>  <groupId>org.apache.httpcomponents</groupId>  <artifactId>httpclient</artifactId>  <version>${httpclient.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka-0.10_2.11</artifactId>  <version>1.9.0</version>  <scope>compile</scope></dependency>

4.Flink接收socket端消息,发送到kafka:

5.Flink将socket接收的数据发送Kafka,代码实例:

package com.hadoop.ljs.flink.streaming;
import com.hadoop.ljs.flink.utils.CustomKeyedSerializationSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-29 09:31 * @version: v1.0 * @description: com.hadoop.ljs.flink.streaming */public class FlinkKafkaKerberosProducer {    public static final String topic="topic1";    public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";    public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";    public static final String bootstrapServers="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";    public static final String hostname="localhost";    public static final int port=9000;    public static void main(String[] args) throws Exception {        //在windows中设置JAAS,也可以通过-D方式传入        System.setProperty("java.security.krb5.conf", krb5Conf);        System.setProperty("java.security.auth.login.config", kafkaJaasConf);        /*获取flink流式计算执行环境*/        final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        /*从Socket端接收数据*/        DataStream<String> dataSource = senv.socketTextStream(hostname, port, "\n");        /*下面可以根据自己的需求进行自动的转换*/        /*接收的数据,中间可经过复杂的处理,最后发送到kafka端*/        dataSource.addSink(new FlinkKafkaProducer010<String>(topic, new CustomKeyedSerializationSchema(), getProducerProperties()));        /*启动*/        senv.execute("FlinkKafkaProducer");    }    public static Properties getProducerProperties(){        Properties props = new Properties();        props.put("bootstrap.servers", bootstrapServers);        props.put("acks", "1");        props.put("retries", 3);        props.put("batch.size", 16384);        props.put("linger.ms", 1);        props.put("buffer.memory", 33554432);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");        props.put("security.protocol", "SASL_PLAINTEXT");        props.put("sasl.kerberos.service.name", "kafka");        props.put("sasl.mechanism", "GSSAPI");        return props;    }}

6.Flink连接kafka消费消息,代码实例:

package com.hadoop.ljs.flink.streaming;import com.hadoop.ljs.flink.utils.KafkaCommonRecordSchema;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;import java.util.Map;import java.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-29 09:31 * @version: v1.0 * @description: com.hadoop.ljs.flink.streaming */public class FlinkKafkaKerberosConsumer {    public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";    public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";    public static final String topic="topic1";    public static final String consumerGroup="test_topic1";    public static final String bootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";    public static void main(String[] args) throws Exception {        //在windows中设置JAAS,也可以通过-D方式传入        System.setProperty("java.security.krb5.conf", krb5Conf);        System.setProperty("java.security.auth.login.config", kafkaJaasConf);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        FlinkKafkaConsumer010<String> consumer010 = new FlinkKafkaConsumer010<String>(topic,new SimpleStringSchema(), getComsumerProperties());        consumer010.setStartFromEarliest();          //source从kafka        DataStream<String> dataStream = env.addSource(consumer010);        dataStream.print();        try {            env.execute();        } catch (Exception ex) {            ex.printStackTrace();        }    }    private static Properties getComsumerProperties() {        Properties props = new Properties();        props.put("bootstrap.servers",bootstrapServer);        props.put("group.id",consumerGroup);        props.put("auto.offset.reset", "earliest");        props.put("security.protocol", "SASL_PLAINTEXT");        props.put("sasl.kerberos.service.name", "kafka");        props.put("sasl.mechanism", "GSSAPI");        return props;    }}


java 两个不同对象的集合 合并到一个对象 java合并两个arraylist

废话不多说,直接上1、定义一个Person类:public class Person { String name; int age; public Person(String name, int age) { super(); this.name = name; this.age = age; @Override public String toString() {