Flink跨集群访问开启Kerberos认证的Kafka

Flink提供了三个模块来对集群进行安全验证,分别是HadoopModule、JaasModule、ZooKeeperModule。安全认证相关参数对应的类SecurityOptions。

HadoopModule 用来对使用UserGroupInformation进行身份验证的框架(kudu、hbase同步框架、hdfs等)进行认证配置。
JaasModule 用来对使用JaasConfig进行身份验证的框架(kafka、zk、hbase异步框架等)进行认证配置。
ZooKeeperModule 负责安装整个进程的ZooKeeper安全配置。

Flink组件在启动时,会先加载认证相关模块,在构建的安全上下文中,启动集群各个组件。不过Flink整个集群只能使用一份证书进行相关验证,也就是说,如果Flink任务从开启Kerberos认证的Kafka中读取数据,并写入Kudu,则使用的principal和keytab,具有同时访问hdfs、kafka、kudu的权限。如果使用不同的证书,则需要在Flink任务中单独进行Kerberos相关配置。

以JM启动为例,查看加载安全认证相关组件,记录各个模块调用链。

  • 加载安全认证上下文,从上下文中启动集群各个组件。
  • ClusterEntrypoint#startCluster
    SecurityContext securityContext = installSecurityContext(configuration);
    securityContext.runSecured((Callable<Void>) () -> {
        runCluster(configuration);
        return null;
    SecurityContext installSecurityContext(Configuration configuration)) {
        SecurityUtils.install(new SecurityConfiguration(configuration));
        return SecurityUtils.getInstalledContext();
    
  • SecurityConfiguration默认提供了两个security.context.factory.classes用来构建SecurityContext:
  • org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory:根据构建的UserGroupInformation在doAs方法中启动集群。
  • org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory:默认在不需要进行安全认证的上下文中启动。
  • 提供三个security.module.factory.classes用来准备安全认证使用的配置:

  • org.apache.flink.runtime.security.modules.HadoopModuleFactory
  • org.apache.flink.runtime.security.modules.JaasModuleFactory
  • org.apache.flink.runtime.security.modules.ZookeeperModuleFactory
  • 主要还是installModules配置认证使用的配置

    SecurityUtils#install
    public static void install(SecurityConfiguration config) throws Exception {
       // Install the security modules first before installing the security context
       installModules(config);
       installContext(config);
    #installModules使用SPI动态创建moduleFactory,分别调用其install方法
    static void installModules(SecurityConfiguration config) {
        List<SecurityModule> modules = new ArrayList<>();
        for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
            SecurityModuleFactory moduleFactory = null;
            SecurityModule module = moduleFactory.createModule(config);
                if (module != null) {
                    module.install();
                    modules.add(module);
            installedModules = modules;
    
  • HadoopModule#install。hdfs进行kerberos认证需要UserGroupInformation作为loginUser,该模块用来构建全局的loginUser,如果其他组件能够使用该loginUser进行认证,则不需要单独配置证书。
  • public void install() throws SecurityInstallException {
        ## 传递hadoop相关参数
            UserGroupInformation.setConfiguration(hadoopConfiguration);
        UserGroupInformation loginUser;
        try {
            ## 开启kerberos认证并传递Keytab和Principal
                if (UserGroupInformation.isSecurityEnabled() &&
                    !StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) {
                    String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
                    UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
                    ## 当前登陆用户
                        loginUser = UserGroupInformation.getLoginUser();
                    //  token cache
                    // supplement with any available tokens
                    String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
                    if (fileLocation != null) {
                        // Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
                        // used in the context of reading the stored tokens from UGI.
                        // Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
                        // loginUser.addCredentials(cred);
                        try {
                            Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
                                                                                            File.class, org.apache.hadoop.conf.Configuration.class);
                            Credentials cred =
                                (Credentials) readTokenStorageFileMethod.invoke(
                                null,
                                new File(fileLocation),
                                hadoopConfiguration);
                            // if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
                            // the UGI would prefer the delegation token instead, which eventually expires
                            // and does not fallback to using Kerberos tickets
                            Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
                            Credentials credentials = new Credentials();
                            final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
                            Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
                            //If UGI use keytab for login, do not load HDFS delegation token.
                            for (Token<? extends TokenIdentifier> token : usrTok) {
                                if (!token.getKind().equals(hdfsDelegationTokenKind)) {
                                    final Text id = new Text(token.getIdentifier());
                                    credentials.addToken(id, token);
                            Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
                                                                                               Credentials.class);
                            addCredentialsMethod.invoke(loginUser, credentials);
                        } catch (NoSuchMethodException e) {
                            LOG.warn("Could not find method implementations in the shaded jar.", e);
                        } catch (InvocationTargetException e) {
                            throw e.getTargetException();
                } else {
                    // login with current user credentials (e.g. ticket cache, OS login)
                    // note that the stored tokens are read automatically
                    try {
                        //Use reflection API to get the login user object
                        //UserGroupInformation.loginUserFromSubject(null);
                        Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
                        loginUserFromSubjectMethod.invoke(null, (Subject) null);
                    } catch (NoSuchMethodException e) {
                        LOG.warn("Could not find method implementations in the shaded jar.", e);
                    } catch (InvocationTargetException e) {
                        throw e.getTargetException();
                    loginUser = UserGroupInformation.getLoginUser();
            boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
                loginUser, securityConfig.useTicketCache());
            LOG.info("Hadoop user set to {}, credentials check status: {}", loginUser, isCredentialsConfigured);
        } catch (Throwable ex) {
            throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
    
  • JaasModule#install。准备jaas文件使用的各个属性,先传递给ConfigFile,当各个框架使用jaas文件进行验证时,从javax.security.auth.login.Configuration中提取。
  • Kafka kerberos认证使用的jaas文件:
  •  KafkaClient {
            com.sun.security.auth.module.Krb5LoginModule required
            keyTab="/Users/xx/kafka.keytab"
            principal="kafka/cdh002@TEST.COM"
            useKeyTab=true
            useTicketCache=true;
    
  • jaas文件使用的参数,写入javax.security.auth.login.Configuration。各个框架使用时会从改配置中取。
  • public void install() {
        priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
        ## 创建一个空的jaas文件,和环境变量java.security.auth.login.config绑定
            if (priorConfigFile == null) {
                File configFile = generateDefaultConfigFile(workingDir);
                System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
                LOG.info("Jaas file will be created as {}.", configFile);
        // read the JAAS configuration file,    创建ConfigFile
        priorConfig = javax.security.auth.login.Configuration.getConfiguration();
        // construct a dynamic JAAS configuration
        currentConfig = new DynamicConfiguration(priorConfig);
        // wire up the configured JAAS login contexts to use the krb5 entries
        AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
        if (krb5Entries != null) {
            for (String app : securityConfig.getLoginContextNames()) {
                currentConfig.addAppConfigurationEntry(app, krb5Entries);   // kafkaClient
        ## 写入javax.security.auth.login.Configuration
            javax.security.auth.login.Configuration.setConfiguration(currentConfig);
    
  • ZooKeeperModule#install
  • public void install() throws SecurityInstallException {
        priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
        System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));
        priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
        if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName())) {
            System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
        priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
        if (!"Client".equals(securityConfig.getZooKeeperLoginContextName())) {
            System.setProperty(ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
    
  • 加载完各个Module后,构建installContext。只会有一个installedContext。
  • static void installContext(SecurityConfiguration config) throws Exception { // install the security context factory for (String contextFactoryClass : config.getSecurityContextFactories()) { try { // spi加载 SecurityContextFactory contextFactory = SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass); // 有hadoop环境就走Hadoop 认证 if (contextFactory.isCompatibleWith(config)) { try { installedContext = contextFactory.createContext(config); // install the first context that's compatible and ignore the remaining. 只加载一个 break; } catch (SecurityContextInitializeException e) { LOG.error("Cannot instantiate security context with: " + contextFactoryClass, e); } else { LOG.warn("Unable to install incompatible security context factory {}", contextFactoryClass); } catch (NoMatchSecurityFactoryException ne) { LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass); if (installedContext == null) { LOG.error("Unable to install a valid security context factory!"); throw new Exception("Unable to install a valid security context factory!");

    Flink kafka connector安全认证

  • Flink读取开启Kerberos认证的kafka时,需要进行如下配置。并未传递java.security.auth.login.config以及sasl.jaas.config配置。
  • kafka中添加的参数:
            1. security.protocol='SASL_PLAINTEXT'  //使用SASL认证协议
            2. sasl.mechanism = 'GSSAPI'           // 使用kerberos认证
            3. sasl.kerberos.service.name = 'kafka' // 服务名称
    flinkconf.yaml中参数:
            1. security.kerberos.login.use-ticket-cache:true
            2. security.kerberos.login.keytab: xxxx
            3. security.kerberos.login.principal:xxxxx
            4. security.kerberos.login.contexts: KafkaClient,Client
    
  • 如果配置sasl.jaas.config,则格式为:
  • String config = "com.sun.security.auth.module.Krb5LoginModule  required\n" +
        "\tprincipal=\"kafka/xxxxx@EXAMPLE.COM\"\n" +
        "\tkeyTab=\"/Users/xxxx/kafka.keytab\"\n" +
        "\tuseKeyTab=true\n" +
        "\tuseTicketCache=true;";
    
  • Flink kafka connector 使用的jaas配置流程。
  • FlinkKafkaConsumerBase#open
        this.partitionDiscoverer.open();
        KafkaPartitionDiscoverer#initializeConnections
    KafkaConsumer#KafkaConsumer
        ClientUtils.createChannelBuilder(config);
        ChannelBuilders#clientChannelBuilder
        ChannelBuilders#create
        SaslChannelBuilder(0.10版本)#configure
        JaasUtils#jaasConfig
    
    public static Configuration jaasConfig(LoginType loginType, Map<String, ?> configs) {
        Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
        # 绑定了sasl.jaas.config参数,则从val中获取
        if (jaasConfigArgs != null) {
            if (loginType == LoginType.SERVER)
                throw new IllegalArgumentException("JAAS config property not supported for server");
            else {
                JaasConfig jaasConfig = new JaasConfig(loginType, jaasConfigArgs.value());
                AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(LoginType.CLIENT.contextName());
                int numModules = clientModules == null ? 0 : clientModules.length;
                if (numModules != 1)
                    throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be one module");
                return jaasConfig;
        } else
            return defaultJaasConfig(loginType);
    private static Configuration defaultJaasConfig(LoginType loginType) {
        # 从Flink jaasModule加载时,已经绑定java.security.auth.login.config
        String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
        if (jaasConfigFile == null) {
            LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" +
                      SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration.");
        #  拿到javax.security.auth.login.Configuration中的配置,jaas中使用的认证实体
        Configuration jaasConfig = Configuration.getConfiguration();
        # KafkaClient
        String loginContextName = loginType.contextName();
        AppConfigurationEntry[] configEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
        if (configEntries == null) {
            String errorMessage;
            errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
                JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
            throw new IllegalArgumentException(errorMessage);
        return jaasConfig;
    

    跨集群访问开启Kerberos认证的Kafka

    目的,Flink任务运行在开启Kerberos认证的A集群,同时读取A集群以及B集群的Kafka信息。

  • 上传新集群的Krb5.conf文件,追加到原有的Krb5文件中。
  • 单独配置sasl.jaas.config参数。
  • 特别注意:

  • 新上传的Krb5.conf文件中,domain_realm一定要做realms的映射,否则会使用default_realm。
  • [realms]
     PLS.COM = {
      kdc = plscdh00:88
      admin_server = plscdh00
    将plscdh0-3 映射到PLS.COM
    [domain_realm]
         .pls.com = PLS.COM
         pls.com = PLS.COM
         plscdh01  = PLS.COM  
         plscdh02 = PLS.COM
         plscdh03 = PLS.COM
         plscdh00 = PLS.COM
    
  • 将新上传的Krb5.conf文件中的domain_realm以及realms追加到现有的Krb5文件中。
  • 通过使用sasl.jaas.config来传递Jaas文件内容,如果使用AppConfigurationEntry类传递的话,kafka默认LoginContextName为KafkaClient,多个kafka集群下取出的AppConfigurationEntry会混乱。
  • 相关代码:

  • krb5文件构建代码:
  • public class KrbConfManager {
        Logger LOG = LoggerFactory.getLogger(KrbConfManager.class);
        private static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
        private KrbConfManager(){}
        public void appendKrbConf(String krbConf) {
            this.appendKrbConf(krbConf, System.getProperty("user.dir"));
         *  将新增的kbr5.conf文件,使用Config类方法进行解析后,追加到现有的kbr5.conf文件中
         * @param krbConf
        public void appendKrbConf(String krbConf, String directory) {
            String krbConfPath = DtFileUtils.getFileAbsolutePath(krbConf);
            LOG.info("krb conf abs path is {}", krbConfPath);
            Preconditions.checkArgument(DtFileUtils.fileExistCheck(krbConfPath),"krb file does not exist");
            try {
                Constructor<Config> constructor = Config.class.getDeclaredConstructor();
                constructor.setAccessible(true);
                Config configParser = constructor.newInstance();
                Method loadConfigFile = configParser.getClass().getDeclaredMethod("loadConfigFile", String.class);
                loadConfigFile.setAccessible(true);
                List<String> configFileList = (List<String>) loadConfigFile.invoke(configParser, krbConfPath);
                Method parseStanzaTable = configParser.getClass().getDeclaredMethod("parseStanzaTable", List.class);
                parseStanzaTable.setAccessible(true);
                Hashtable<String, Object> appendConfig = (Hashtable<String, Object>) parseStanzaTable.invoke(configParser, configFileList);
                Hashtable<String, Object> appendRealms = (Hashtable<String, Object>) appendConfig.get("realms");
                Hashtable<String, Object> appendDomainRealm = (Hashtable<String, Object>) appendConfig.get("domain_realm");
                Config instance = Config.getInstance();
                Field stanzaTable = instance.getClass().getDeclaredField("stanzaTable");
                stanzaTable.setAccessible(true);
                Hashtable<String, Object> currentTable = (Hashtable<String, Object>) stanzaTable.get(instance);
                Hashtable<String, Object> realms = (Hashtable<String, Object>) currentTable.computeIfAbsent("realms", key -> new Hashtable());
                realms.putAll(appendRealms);
                Hashtable<String, Object> domainRealm = (Hashtable<String, Object>) currentTable.computeIfAbsent("domain_realm", key -> new Hashtable());
                domainRealm.putAll(appendDomainRealm);
                StringBuffer stringBuffer = new StringBuffer();
                String newKerbConfigStr = buildKrbConfigStr(currentTable, stringBuffer);
                LOG.info("====buildKerbConf======\n{}", newKerbConfigStr);
                String krb5FilePath = DtFileUtils.createTempFile("krb-", ".conf", newKerbConfigStr, directory);
                System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5FilePath);
                Config.refresh();
            } catch (Exception e) {
                throw new RuntimeException("build krb conf error", e);
        private String buildKrbConfigStr(Hashtable<String, Object> currentTable, StringBuffer stringBuilder) {
            Set<String> keySet = currentTable.keySet();
            for (String key : keySet) {
                stringBuilder = stringBuilder.append("[").append(key).append("]").append("\n");
                if (!StringUtils.equalsIgnoreCase(key, "realms")) {
                    toStringInternal("", currentTable.get(key), stringBuilder);
                } else {
                    dealRealms(currentTable.get(key), stringBuilder);
            return stringBuilder.toString();
        private void dealRealms(Object realms, StringBuffer stringBuilder) {
            if (realms instanceof Hashtable) {
                Hashtable realmsTable = (Hashtable) realms;
                Iterator tabIterator = realmsTable.keySet().iterator();
                while (tabIterator.hasNext()) {
                    Object entity = tabIterator.next();
                    stringBuilder = stringBuilder.append(entity).append(" = ").append("{\n");
                    toStringInternal("", realmsTable.get(entity), stringBuilder);
                    stringBuilder.append("}\n");
        private static void toStringInternal(String prefix, Object obj, StringBuffer sb) {
            if (obj instanceof String) {
                // A string value, just print it
                sb.append(obj).append('\n');
            } else if (obj instanceof Hashtable) {
                // A table, start a new sub-section...
                Hashtable<?, ?> tab = (Hashtable<?, ?>) obj;
                for (Object o : tab.keySet()) {
                    // ...indent, print "key = ", and
                    sb.append(prefix).append("    ").append(o).append(" = ");
                    // ...go recursively into value
                    toStringInternal(prefix + "    ", tab.get(o), sb);
                sb.append(prefix).append("\n");
            } else if (obj instanceof Vector) {
                // A vector of strings, print them inside [ and ]
                Vector<?> v = (Vector<?>) obj;
                boolean first = true;
                for (Object o : v.toArray()) {
                    if (!first) {
                        sb.append(",");
                    sb.append(o);
                    first = false;
                sb.append("\n");
        private enum Singleton {
            INSTANCE;
            private final KrbConfManager instance;
            Singleton() {
                instance = new KrbConfManager();
            private KrbConfManager getInstance() {
                return instance;
        public static KrbConfManager getInstance() {
            return Singleton.INSTANCE.getInstance();
    
  • sasl.jaas.config 文件构建以及填充
  • public interface SecurityManager {
        Logger LOG = LoggerFactory.getLogger(SecurityManager.class);
        String SASL_JAAS_CONFIG = "sasl.jaas.config";
        default void kerberosSecurity(KafkaSourceTableInfo kafkaSourceTableInfo, Properties props) {
            if (StringUtils.equalsIgnoreCase(kafkaSourceTableInfo.getKerberosAuthEnable(), Boolean.TRUE.toString())) {
                Optional.ofNullable(kafkaSourceTableInfo.getKrbConfName())
                        .ifPresent(KrbConfManager.getInstance()::appendKrbConf);
                String jaasContent = JaasConfigUtil.JaasConfig.builder()
                        .setLoginModule("com.sun.security.auth.module.Krb5LoginModule")
                        .setLoginModuleFlag("required")
                        .setPrincipal(DtStringUtil.addQuoteForStr(kafkaSourceTableInfo.getPrincipal()))
                        .setKeyTab(DtStringUtil.addQuoteForStr(DtFileUtils.getFileAbsolutePath(kafkaSourceTableInfo.getKeyTab())))
                        .setUseKeyTab(kafkaSourceTableInfo.getUseKeyTab())
                        .setUseTicketCache(kafkaSourceTableInfo.getUseTicketCache())
                        .build()
                        .generateJaasConfigStr();
                LOG.info(" kafka jaas Content: \n{}", jaasContent);
                props.put(SASL_JAAS_CONFIG, jaasContent);
    
  • 构建入口,要在OPEN中调用
  • KafkaConsumer010#open
    @Override