Flink提供了三个模块来对集群进行安全验证,分别是HadoopModule、JaasModule、ZooKeeperModule。安全认证相关参数对应的类SecurityOptions。
HadoopModule
用来对使用UserGroupInformation进行身份验证的框架(kudu、hbase同步框架、hdfs等)进行认证配置。
JaasModule
用来对使用JaasConfig进行身份验证的框架(kafka、zk、hbase异步框架等)进行认证配置。
ZooKeeperModule
负责安装整个进程的ZooKeeper安全配置。
Flink组件在启动时,会先加载认证相关模块,在构建的安全上下文中,启动集群各个组件。不过Flink整个集群只能使用一份证书进行相关验证,也就是说,如果Flink任务从开启Kerberos认证的Kafka中读取数据,并写入Kudu,则使用的principal和keytab,具有同时访问hdfs、kafka、kudu的权限。如果使用不同的证书,则需要在Flink任务中单独进行Kerberos相关配置。
以JM启动为例,查看加载安全认证相关组件,记录各个模块调用链。
ClusterEntrypoint#startCluster
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration);
return null;
SecurityContext installSecurityContext(Configuration configuration)) {
SecurityUtils.install(new SecurityConfiguration(configuration));
return SecurityUtils.getInstalledContext();
SecurityConfiguration默认提供了两个security.context.factory.classes用来构建SecurityContext:
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory:根据构建的UserGroupInformation在doAs方法中启动集群。
org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory:默认在不需要进行安全认证的上下文中启动。
提供三个security.module.factory.classes用来准备安全认证使用的配置:
org.apache.flink.runtime.security.modules.HadoopModuleFactory
org.apache.flink.runtime.security.modules.JaasModuleFactory
org.apache.flink.runtime.security.modules.ZookeeperModuleFactory
主要还是installModules配置认证使用的配置
SecurityUtils#install
public static void install(SecurityConfiguration config) throws Exception {
// Install the security modules first before installing the security context
installModules(config);
installContext(config);
#installModules使用SPI动态创建moduleFactory,分别调用其install方法
static void installModules(SecurityConfiguration config) {
List<SecurityModule> modules = new ArrayList<>();
for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
SecurityModuleFactory moduleFactory = null;
SecurityModule module = moduleFactory.createModule(config);
if (module != null) {
module.install();
modules.add(module);
installedModules = modules;
HadoopModule#install。hdfs进行kerberos认证需要UserGroupInformation作为loginUser,该模块用来构建全局的loginUser,如果其他组件能够使用该loginUser进行认证,则不需要单独配置证书。
public void install() throws SecurityInstallException {
## 传递hadoop相关参数
UserGroupInformation.setConfiguration(hadoopConfiguration);
UserGroupInformation loginUser;
try {
## 开启kerberos认证并传递Keytab和Principal
if (UserGroupInformation.isSecurityEnabled() &&
!StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) {
String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
## 当前登陆用户
loginUser = UserGroupInformation.getLoginUser();
// token cache
// supplement with any available tokens
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
// Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
// used in the context of reading the stored tokens from UGI.
// Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
// loginUser.addCredentials(cred);
try {
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred =
(Credentials) readTokenStorageFileMethod.invoke(
null,
new File(fileLocation),
hadoopConfiguration);
// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
// the UGI would prefer the delegation token instead, which eventually expires
// and does not fallback to using Kerberos tickets
Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
Credentials credentials = new Credentials();
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
//If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok) {
if (!token.getKind().equals(hdfsDelegationTokenKind)) {
final Text id = new Text(token.getIdentifier());
credentials.addToken(id, token);
Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
Credentials.class);
addCredentialsMethod.invoke(loginUser, credentials);
} catch (NoSuchMethodException e) {
LOG.warn("Could not find method implementations in the shaded jar.", e);
} catch (InvocationTargetException e) {
throw e.getTargetException();
} else {
// login with current user credentials (e.g. ticket cache, OS login)
// note that the stored tokens are read automatically
try {
//Use reflection API to get the login user object
//UserGroupInformation.loginUserFromSubject(null);
Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
loginUserFromSubjectMethod.invoke(null, (Subject) null);
} catch (NoSuchMethodException e) {
LOG.warn("Could not find method implementations in the shaded jar.", e);
} catch (InvocationTargetException e) {
throw e.getTargetException();
loginUser = UserGroupInformation.getLoginUser();
boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
loginUser, securityConfig.useTicketCache());
LOG.info("Hadoop user set to {}, credentials check status: {}", loginUser, isCredentialsConfigured);
} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
JaasModule#install。准备jaas文件使用的各个属性,先传递给ConfigFile,当各个框架使用jaas文件进行验证时,从javax.security.auth.login.Configuration中提取。
Kafka kerberos认证使用的jaas文件:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
keyTab="/Users/xx/kafka.keytab"
principal="kafka/cdh002@TEST.COM"
useKeyTab=true
useTicketCache=true;
jaas文件使用的参数,写入javax.security.auth.login.Configuration。各个框架使用时会从改配置中取。
public void install() {
priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
## 创建一个空的jaas文件,和环境变量java.security.auth.login.config绑定
if (priorConfigFile == null) {
File configFile = generateDefaultConfigFile(workingDir);
System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
LOG.info("Jaas file will be created as {}.", configFile);
// read the JAAS configuration file, 创建ConfigFile
priorConfig = javax.security.auth.login.Configuration.getConfiguration();
// construct a dynamic JAAS configuration
currentConfig = new DynamicConfiguration(priorConfig);
// wire up the configured JAAS login contexts to use the krb5 entries
AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
if (krb5Entries != null) {
for (String app : securityConfig.getLoginContextNames()) {
currentConfig.addAppConfigurationEntry(app, krb5Entries); // kafkaClient
## 写入javax.security.auth.login.Configuration
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
ZooKeeperModule#install
public void install() throws SecurityInstallException {
priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));
priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName())) {
System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
if (!"Client".equals(securityConfig.getZooKeeperLoginContextName())) {
System.setProperty(ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
加载完各个Module后,构建installContext。只会有一个installedContext。
static void installContext(SecurityConfiguration config) throws Exception {
// install the security context factory
for (String contextFactoryClass : config.getSecurityContextFactories()) {
try {
// spi加载
SecurityContextFactory contextFactory = SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
// 有hadoop环境就走Hadoop 认证
if (contextFactory.isCompatibleWith(config)) {
try {
installedContext = contextFactory.createContext(config);
// install the first context that's compatible and ignore the remaining. 只加载一个
break;
} catch (SecurityContextInitializeException e) {
LOG.error("Cannot instantiate security context with: " + contextFactoryClass, e);
} else {
LOG.warn("Unable to install incompatible security context factory {}", contextFactoryClass);
} catch (NoMatchSecurityFactoryException ne) {
LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass);
if (installedContext == null) {
LOG.error("Unable to install a valid security context factory!");
throw new Exception("Unable to install a valid security context factory!");
Flink kafka connector安全认证
Flink读取开启Kerberos认证的kafka时,需要进行如下配置。并未传递java.security.auth.login.config以及sasl.jaas.config配置。
kafka中添加的参数:
1. security.protocol='SASL_PLAINTEXT' //使用SASL认证协议
2. sasl.mechanism = 'GSSAPI' // 使用kerberos认证
3. sasl.kerberos.service.name = 'kafka' // 服务名称
flinkconf.yaml中参数:
1. security.kerberos.login.use-ticket-cache:true
2. security.kerberos.login.keytab: xxxx
3. security.kerberos.login.principal:xxxxx
4. security.kerberos.login.contexts: KafkaClient,Client
如果配置sasl.jaas.config,则格式为:
String config = "com.sun.security.auth.module.Krb5LoginModule required\n" +
"\tprincipal=\"kafka/xxxxx@EXAMPLE.COM\"\n" +
"\tkeyTab=\"/Users/xxxx/kafka.keytab\"\n" +
"\tuseKeyTab=true\n" +
"\tuseTicketCache=true;";
Flink kafka connector 使用的jaas配置流程。
FlinkKafkaConsumerBase#open
this.partitionDiscoverer.open();
KafkaPartitionDiscoverer#initializeConnections
KafkaConsumer#KafkaConsumer
ClientUtils.createChannelBuilder(config);
ChannelBuilders#clientChannelBuilder
ChannelBuilders#create
SaslChannelBuilder(0.10版本)#configure
JaasUtils#jaasConfig
public static Configuration jaasConfig(LoginType loginType, Map<String, ?> configs) {
Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
# 绑定了sasl.jaas.config参数,则从val中获取
if (jaasConfigArgs != null) {
if (loginType == LoginType.SERVER)
throw new IllegalArgumentException("JAAS config property not supported for server");
else {
JaasConfig jaasConfig = new JaasConfig(loginType, jaasConfigArgs.value());
AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(LoginType.CLIENT.contextName());
int numModules = clientModules == null ? 0 : clientModules.length;
if (numModules != 1)
throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be one module");
return jaasConfig;
} else
return defaultJaasConfig(loginType);
private static Configuration defaultJaasConfig(LoginType loginType) {
# 从Flink jaasModule加载时,已经绑定java.security.auth.login.config
String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
if (jaasConfigFile == null) {
LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" +
SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration.");
# 拿到javax.security.auth.login.Configuration中的配置,jaas中使用的认证实体
Configuration jaasConfig = Configuration.getConfiguration();
# KafkaClient
String loginContextName = loginType.contextName();
AppConfigurationEntry[] configEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
if (configEntries == null) {
String errorMessage;
errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
throw new IllegalArgumentException(errorMessage);
return jaasConfig;
跨集群访问开启Kerberos认证的Kafka
目的,Flink任务运行在开启Kerberos认证的A集群,同时读取A集群以及B集群的Kafka信息。
上传新集群的Krb5.conf文件,追加到原有的Krb5文件中。
单独配置sasl.jaas.config参数。
特别注意:
新上传的Krb5.conf文件中,domain_realm一定要做realms的映射,否则会使用default_realm。
[realms]
PLS.COM = {
kdc = plscdh00:88
admin_server = plscdh00
将plscdh0-3 映射到PLS.COM
[domain_realm]
.pls.com = PLS.COM
pls.com = PLS.COM
plscdh01 = PLS.COM
plscdh02 = PLS.COM
plscdh03 = PLS.COM
plscdh00 = PLS.COM
将新上传的Krb5.conf文件中的domain_realm以及realms追加到现有的Krb5文件中。
通过使用sasl.jaas.config来传递Jaas文件内容,如果使用AppConfigurationEntry类传递的话,kafka默认LoginContextName为KafkaClient,多个kafka集群下取出的AppConfigurationEntry会混乱。
相关代码:
krb5文件构建代码:
public class KrbConfManager {
Logger LOG = LoggerFactory.getLogger(KrbConfManager.class);
private static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
private KrbConfManager(){}
public void appendKrbConf(String krbConf) {
this.appendKrbConf(krbConf, System.getProperty("user.dir"));
* 将新增的kbr5.conf文件,使用Config类方法进行解析后,追加到现有的kbr5.conf文件中
* @param krbConf
public void appendKrbConf(String krbConf, String directory) {
String krbConfPath = DtFileUtils.getFileAbsolutePath(krbConf);
LOG.info("krb conf abs path is {}", krbConfPath);
Preconditions.checkArgument(DtFileUtils.fileExistCheck(krbConfPath),"krb file does not exist");
try {
Constructor<Config> constructor = Config.class.getDeclaredConstructor();
constructor.setAccessible(true);
Config configParser = constructor.newInstance();
Method loadConfigFile = configParser.getClass().getDeclaredMethod("loadConfigFile", String.class);
loadConfigFile.setAccessible(true);
List<String> configFileList = (List<String>) loadConfigFile.invoke(configParser, krbConfPath);
Method parseStanzaTable = configParser.getClass().getDeclaredMethod("parseStanzaTable", List.class);
parseStanzaTable.setAccessible(true);
Hashtable<String, Object> appendConfig = (Hashtable<String, Object>) parseStanzaTable.invoke(configParser, configFileList);
Hashtable<String, Object> appendRealms = (Hashtable<String, Object>) appendConfig.get("realms");
Hashtable<String, Object> appendDomainRealm = (Hashtable<String, Object>) appendConfig.get("domain_realm");
Config instance = Config.getInstance();
Field stanzaTable = instance.getClass().getDeclaredField("stanzaTable");
stanzaTable.setAccessible(true);
Hashtable<String, Object> currentTable = (Hashtable<String, Object>) stanzaTable.get(instance);
Hashtable<String, Object> realms = (Hashtable<String, Object>) currentTable.computeIfAbsent("realms", key -> new Hashtable());
realms.putAll(appendRealms);
Hashtable<String, Object> domainRealm = (Hashtable<String, Object>) currentTable.computeIfAbsent("domain_realm", key -> new Hashtable());
domainRealm.putAll(appendDomainRealm);
StringBuffer stringBuffer = new StringBuffer();
String newKerbConfigStr = buildKrbConfigStr(currentTable, stringBuffer);
LOG.info("====buildKerbConf======\n{}", newKerbConfigStr);
String krb5FilePath = DtFileUtils.createTempFile("krb-", ".conf", newKerbConfigStr, directory);
System.setProperty(JAVA_SECURITY_KRB5_CONF, krb5FilePath);
Config.refresh();
} catch (Exception e) {
throw new RuntimeException("build krb conf error", e);
private String buildKrbConfigStr(Hashtable<String, Object> currentTable, StringBuffer stringBuilder) {
Set<String> keySet = currentTable.keySet();
for (String key : keySet) {
stringBuilder = stringBuilder.append("[").append(key).append("]").append("\n");
if (!StringUtils.equalsIgnoreCase(key, "realms")) {
toStringInternal("", currentTable.get(key), stringBuilder);
} else {
dealRealms(currentTable.get(key), stringBuilder);
return stringBuilder.toString();
private void dealRealms(Object realms, StringBuffer stringBuilder) {
if (realms instanceof Hashtable) {
Hashtable realmsTable = (Hashtable) realms;
Iterator tabIterator = realmsTable.keySet().iterator();
while (tabIterator.hasNext()) {
Object entity = tabIterator.next();
stringBuilder = stringBuilder.append(entity).append(" = ").append("{\n");
toStringInternal("", realmsTable.get(entity), stringBuilder);
stringBuilder.append("}\n");
private static void toStringInternal(String prefix, Object obj, StringBuffer sb) {
if (obj instanceof String) {
// A string value, just print it
sb.append(obj).append('\n');
} else if (obj instanceof Hashtable) {
// A table, start a new sub-section...
Hashtable<?, ?> tab = (Hashtable<?, ?>) obj;
for (Object o : tab.keySet()) {
// ...indent, print "key = ", and
sb.append(prefix).append(" ").append(o).append(" = ");
// ...go recursively into value
toStringInternal(prefix + " ", tab.get(o), sb);
sb.append(prefix).append("\n");
} else if (obj instanceof Vector) {
// A vector of strings, print them inside [ and ]
Vector<?> v = (Vector<?>) obj;
boolean first = true;
for (Object o : v.toArray()) {
if (!first) {
sb.append(",");
sb.append(o);
first = false;
sb.append("\n");
private enum Singleton {
INSTANCE;
private final KrbConfManager instance;
Singleton() {
instance = new KrbConfManager();
private KrbConfManager getInstance() {
return instance;
public static KrbConfManager getInstance() {
return Singleton.INSTANCE.getInstance();
sasl.jaas.config 文件构建以及填充
public interface SecurityManager {
Logger LOG = LoggerFactory.getLogger(SecurityManager.class);
String SASL_JAAS_CONFIG = "sasl.jaas.config";
default void kerberosSecurity(KafkaSourceTableInfo kafkaSourceTableInfo, Properties props) {
if (StringUtils.equalsIgnoreCase(kafkaSourceTableInfo.getKerberosAuthEnable(), Boolean.TRUE.toString())) {
Optional.ofNullable(kafkaSourceTableInfo.getKrbConfName())
.ifPresent(KrbConfManager.getInstance()::appendKrbConf);
String jaasContent = JaasConfigUtil.JaasConfig.builder()
.setLoginModule("com.sun.security.auth.module.Krb5LoginModule")
.setLoginModuleFlag("required")
.setPrincipal(DtStringUtil.addQuoteForStr(kafkaSourceTableInfo.getPrincipal()))
.setKeyTab(DtStringUtil.addQuoteForStr(DtFileUtils.getFileAbsolutePath(kafkaSourceTableInfo.getKeyTab())))
.setUseKeyTab(kafkaSourceTableInfo.getUseKeyTab())
.setUseTicketCache(kafkaSourceTableInfo.getUseTicketCache())
.build()
.generateJaasConfigStr();
LOG.info(" kafka jaas Content: \n{}", jaasContent);
props.put(SASL_JAAS_CONFIG, jaasContent);
构建入口,要在OPEN中调用
KafkaConsumer010#open
@Override