添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Flink 广播变量在实时处理程序中扮演着很重要的角色,适当的使用广播变量会大大提升程序处理效率。

本文从简单的 demo 场景出发,引入生产中实际的需求并提出思路与部分示例代码,应对一般需求应该没有什么问题,话不多说,赶紧来看看这篇干货满满的广播程序使用实战吧。

1 啥是广播

Flink 支持广播变量,允许在每台机器上保留一个只读的缓存变量,数据存在内存中,在不同的 task 所在的节点上的都能获取到,可以减少大量的 shuffle 操作。

换句话说,广播变量可以理解为一个公共的共享变量,可以把一个 dataset 的数据集广播出去,然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。

如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集,比较浪费内存 (也就是一个节点中可能会存在多份 dataset 数据)

2 用法总结

//1 初始化数据
DataSet<Integer>  toBroadcast = env.fromElements(1,2,3)
//2 广播数据 api
withBroadcastSet(toBroadcast,"broadcastSetName")
//3 获取数据
Collection<integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName"); 

广播变量由于要常驻内存,程序结束时才会失效,所以数据量不宜过大

广播变量广播在初始化后不支持修改 (修改场景也有办法)

3 基础案例演示

基础案例广播变量使用

这种场景下广播变量就是加载参数表,参数表不会变化,记住第二部分常用总结公式即可。

* @author 大数据江湖 * @version 1.0 * @date 2021/5/17. public class BaseBroadCast { * broadcast广播变量 * 需求: * flink会从数据源中获取到用户的姓名 * 最终需要把用户的姓名和年龄信息打印出来 * 分析: * 所以就需要在中间的map处理的时候获取用户的年龄信息 * 建议吧用户的关系数据集使用广播变量进行处理 public static void main(String[] args) throws Exception { // 获取运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 1:准备需要广播的数据 ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<> (); broadData.add( new Tuple2<>("zs", 18 )); broadData.add( new Tuple2<>("ls", 20 )); broadData.add( new Tuple2<>("ww", 17 )); DataSet <Tuple2<String, Integer>> tupleData = env.fromCollection(broadData); // 1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄 DataSet <HashMap<String, Integer>> toBroadcast = tupleData.map( new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>> () { @Override public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception { HashMap <String, Integer> res = new HashMap<> (); res.put(value.f0, value.f1); return res; // 源数据 DataSource<String> data = env.fromElements("zs", "ls", "ww" ); // 注意:在这里需要使用到RichMapFunction获取广播变量 DataSet<String> result = data.map( new RichMapFunction<String, String> () { List <HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>> (); HashMap <String, Integer> allMap = new HashMap<String, Integer> (); * 这个方法只会执行一次 * 可以在这里实现一些初始化的功能 * 所以,就可以在open方法中获取广播变量数据 @Override public void open(Configuration parameters) throws Exception { super .open(parameters); // 3:获取广播数据 this .broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName" ); for (HashMap map : broadCastMap) { allMap.putAll(map); @Override public String map(String value) throws Exception { Integer age = allMap.get(value); return value + "," + age; }).withBroadcastSet(toBroadcast, "broadCastMapName"); // 2:执行广播数据的操作 result.print();

4 生产案例演示

实际生产中有时候是需要更新广播变量的,但不是实时更新的,一般会设置一个更新周期,几分钟,几小时的都很常见,根据业务而定。

由于广播变量需要更新,解决办法一般是需要将广播变量做成另一个 source,进行流与流之间的 connect 操作,定时刷新广播的source,从而达到广播变量修改的目的。

4.1.1 使用 redis 中的数据作为广播变量的思路:

消费 kafka 中的数据,使用 redis 中的数据作为广播数据,进行数据清洗后 写到 kafka中。

示例代码分为三个部分:kafka 生产者,redis 广播数据源,执行入口类

构建 kafka 生成者,模拟数据 (以下代码的消费消息来源均是此处生产)

public static void main(String[] args) throws Exception{ Properties prop = new Properties(); // 指定kafka broker地址 prop.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092" ); // 指定key value的序列化方式 prop.put("key.serializer", StringSerializer. class .getName()); prop.put( "value.serializer", StringSerializer. class .getName()); // 指定topic名称 String topic = "data_flink_bigdata_test" ; // 创建producer链接 KafkaProducer<String, String> producer = new KafkaProducer<String,String> (prop); // {"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]} while ( true ){ String message = "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\",\"data\":[{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"},{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"}]}" ; System.out.println(message); // 同步的方式,往Kafka里面生产数据 producer.send(
new ProducerRecord<String, String> (topic,message)); Thread.sleep( 2000 ); // 关闭链接 // producer.close(); public static String getCurrentTime(){ SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss" ); return sdf.format( new Date()); public static String getCountryCode(){ String[] types = {"US","TW","HK","PK","KW","SA","IN" }; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; public static String getRandomType(){ String[] types = {"s1","s2","s3","s4","s5" }; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; public static double getRandomScore(){ double [] types = {0.3,0.2,0.1,0.5,0.8 }; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; public static String getRandomLevel(){ String[] types = {"A","A+","B","C","D" }; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; public class BigDataRedisSource implements SourceFunction<HashMap<String,String>> { private Logger logger= LoggerFactory.getLogger(BigDataRedisSource. class ); private Jedis jedis; private boolean isRunning= true ; @Override public void run(SourceContext<HashMap<String, String>> cxt) throws Exception { this .jedis = new Jedis("localhost",6379 ); HashMap <String, String> map = new HashMap<> (); while (isRunning){ try { map.clear(); Map <String, String> areas = jedis.hgetAll("areas" ); * AREA_CT TT,AA * map: * TT,AREA_CT * AA,AREA_CT for (Map.Entry<String,String> entry: areas.entrySet()){ String area = entry.getKey(); String value = entry.getValue(); String[] fields = value.split("," ); for (String country:fields){ map.put(country,area); if (map.size() > 0 ){ cxt.collect(map); Thread.sleep( 60000 ); } catch (JedisConnectionException e){ logger.error( "redis连接异常" ,e.getCause()); this .jedis = new Jedis("localhost",6379 ); } catch (Exception e){ logger.error( "数据源异常" ,e.getCause()); @Override public void cancel() { isRunning = false ; if (jedis != null ){ jedis.close(); public class 广播方式1分两个流进行connnect操作 { public static void main(String[] args) throws Exception { // 1 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism( 3); // 并行度取决于 kafka 中的分区数 保持与kafka 一致 // 2 设置 checkpoint // 开启checkpoint 一分钟一次 env.enableCheckpointing(60000 ); // 设置checkpoint 仅一次语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 两次checkpoint的时间间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000 ); // 最多只支持1个checkpoint同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1 ); // checkpoint超时的时间 env.getCheckpointConfig().setCheckpointTimeout(60000 ); // 任务失败后也保留 checkPonit数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 Time.of(10, TimeUnit.SECONDS) // 间隔 // 设置 checkpoint 路径 // env.setStateBackend(new FsStateBackend("hdfs: // 192.168.123.103:9000/flink/checkpoint")); // 3 设置 kafka Flink 消费 // 创建 Kafka 消费信息 String topic ="data_flink_bigdata_test" ; Properties consumerProperties = new Properties(); consumerProperties.put( "bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092" ); consumerProperties.put( "group.id","data_test_new_1" ); consumerProperties.put( "enable.auto.commit", "false" ); consumerProperties.put( "auto.offset.reset","earliest" ); // 4 获取 kafka 与 redis 数据源 FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), consumerProperties); DataStreamSource <String> kafkaSourceData = env.addSource(consumer); // 直接使用广播的方式 后续作为两个数据流来操作 DataStream<HashMap<String, String>> redisSourceData = env.addSource( new NxRedisSource()).broadcast(); // 5 两个数据源进行 ETL 处理 使用 connect 连接处理 SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(redisSourceData).flatMap( new MyETLProcessFunction()); // 6 新创建一个 kafka 生产者 进行发送 String outputTopic="allDataClean" ; // 输出给下游 kafka Properties producerProperties = new Properties(); producerProperties.put( "bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092" ); FlinkKafkaProducer <String> producer = new FlinkKafkaProducer<> (outputTopic, new KeyedSerializationSchemaWrapper<String>( new SimpleStringSchema()), producerProperties); etlData.addSink(producer); // 7 提交任务执行 env.execute("DataClean" ); * in 1 kafka source : * {"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]} * in 2 redis source * US,AREA_US * TW,AREA_CT * HK,AREA_CT * out 合并后的source private static class MyETLProcessFunction implements CoFlatMapFunction<String,HashMap<String,String>,String> { // 用来存储 redis 中的数据 HashMap<String,String> allMap = new HashMap<String,String> (); @Override public void flatMap1(String line, Collector<String> collector) throws Exception { // 将 kafka 数据 按 redis 数据进行替换 // s -> kafka 数据 // allMap -> redis 数据 JSONObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt" ); String countryCode = jsonObject.getString("countryCode" ); // 可以根据countryCode获取大区的名字 String area = allMap.get(countryCode); JSONArray data = jsonObject.getJSONArray("data" ); for ( int i = 0; i < data.size(); i++ ) { JSONObject dataObject = data.getJSONObject(i); System.out.println( "大区:"+ area); dataObject.put( "dt" , dt); dataObject.put( "area" , area); // 下游获取到数据的时候,也就是一个json格式的数据 collector.collect(dataObject.toJSONString()); @Override public void flatMap2(HashMap<String, String> stringStringHashMap, Collector<String> collector) throws Exception { // 将 redis 中 数据进行赋值 allMap = stringStringHashMap; public class 广播方式2使用MapState对方式1改造 { public static void main(String[] args) throws Exception { // 1 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism( 3); // 并行度取决于 kafka 中的分区数 保持与kafka 一致 // 2 设置 checkpoint // 开启checkpoint 一分钟一次 env.enableCheckpointing(60000 ); // 设置checkpoint 仅一次语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 两次checkpoint的时间间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000 ); // 最多只支持1个checkpoint同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1 ); // checkpoint超时的时间 env.getCheckpointConfig().setCheckpointTimeout(60000 ); // 任务失败后也保留 checkPonit数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 Time.of(10, TimeUnit.SECONDS) // 间隔 // 设置 checkpoint 路径 // env.setStateBackend(new FsStateBackend("hdfs: // 192.168.123.103:9000/flink/checkpoint")); // 3 设置 kafka Flink 消费 // 创建 Kafka 消费信息 String topic = "data_flink_bigdata_test" ; Properties consumerProperties = new Properties(); consumerProperties.put( "bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092" ); consumerProperties.put( "group.id", "data_flink_fpy_test_consumer" ); consumerProperties.put( "enable.auto.commit", "false" ); consumerProperties.put( "auto.offset.reset", "earliest" ); // 4 获取 kafka 与 redis 数据源 FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), consumerProperties); DataStreamSource <String> kafkaSourceData = env.addSource(consumer); // 获取 redis 数据源并且进行广播 线上的广播也是 source + 广播方法 MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<String, String> ( "RedisBdStream" , String. class , String. class // 5 两个数据源进行 ETL 处理 使用 connect 连接处理 TODO process 替换 FlatMap // TODO 使用 MapState 来进行广播 BroadcastStream<HashMap<String, String>> redisSourceData = env.addSource( new NxRedisSource()).broadcast(descriptor); SingleOutputStreamOperator <String> etlData = kafkaSourceData.connect(redisSourceData).process( new MyETLProcessFunction()); // 6 新创建一个 kafka 生产者 进行发送 String outputTopic = "allDataClean" ; // 输出给下游 kafka Properties producerProperties = new Properties(); producerProperties.put( "bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092" ); FlinkKafkaProducer <String> producer = new FlinkKafkaProducer<> (outputTopic, new KeyedSerializationSchemaWrapper<String>( new SimpleStringSchema()), producerProperties); etlData.addSink(producer); etlData.print(); // 7 提交任务执行 env.execute("DataClean" ); * in 1 kafka source * in 2 redis source * out 合并后的source private static class MyETLProcessFunction extends BroadcastProcessFunction<String, HashMap<String, String>, String> { // TODO 注意此处 descriptor 的名称需要与 广播时 (99行代码) 名称一致 MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<String, String> ( "RedisBdStream" , String. class , String. class // 逻辑的处理方法 kafka 的数据 @Override public void processElement(String line, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception { // 将 kafka 数据 按 redis 数据进行替换 // s -> kafka 数据 // allMap -> redis 数据 System.out.println("into processElement " ); JSONObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt" ); String countryCode = jsonObject.getString("countryCode" ); // 可以根据countryCode获取大区的名字 // String area = allDataMap.get(countryCode); // TODO 从MapState中获取对应的Code String area = readOnlyContext.getBroadcastState(descriptor).get(countryCode); JSONArray data = jsonObject.getJSONArray("data" ); for ( int i = 0; i < data.size(); i++ ) { JSONObject dataObject = data.getJSONObject(i); System.out.println( "大区:" + area); dataObject.put( "dt" , dt); dataObject.put( "area" , area); // 下游获取到数据的时候,也就是一个json格式的数据 collector.collect(dataObject.toJSONString()); // 广播流的处理方法 @Override public void processBroadcastElement(HashMap<String, String> stringStringHashMap, Context context, Collector<String> collector) throws Exception { // 将接收到的控制数据放到 broadcast state 中 // key , flink // 将 RedisMap中的值放入 MapState 中 for (Map.Entry<String, String> entry : stringStringHashMap.entrySet()) { // TODO 使用 MapState 存储 redis 数据 context.getBroadcastState(descriptor).put(entry.getKey(), entry.getValue()); System.out.println(entry);

4.2 关系型数据库广播变量案例思路:

在 flink 流式处理中常常需要加载数据库中的数据作为条件进行数据处理,有些表作为系统表,实时查询效率很低,这时候就需要将这些数据作为广播数据,而同时这些数据可能也需要定期的更新。

数据库表的广播变量思路同redis等缓存广播数据的思路类似,也是使用 两个source 进行 connect 处理 , 在数据库表的 source 中定时刷新数据就可以了。

不同点在于这里把数据库查询的操作转成另一个工具类,在初始化时使用了静态代码块,在广播时使用了流的 connect 操作。

示例代码分为三个部分:数据库表广播源,数据库操作类,执行入口类

数据库表广播源

* DB source 源头 进行广播 public class BigDataDBBroadSource extends RichSourceFunction<Map<String,Object>> { private final Logger logger = LoggerFactory.getLogger(BigDataDBBroadSource. class ); private volatile boolean isRunning = true ; public BigDataDBBroadSource() { @Override public void open(Configuration parameters) throws Exception { super .open(parameters); @Override public void run(SourceContext<Map<String,Object>> sourceContext) throws Exception { while (isRunning) { // TODO 使用的是一个 DB 源头的 source 60 s 刷新一次 进行往下游发送 TimeUnit.SECONDS.sleep(60 ); Map <String,Object> map = new HashMap<String,Object> (); // 规则匹配关键词 final DbBroadCastListInitUtil.Build ruleListInitUtil = new DbBroadCastListInitUtil.Build(); ruleListInitUtil.reloadRule(); map.put( "dbsource" , ruleListInitUtil); if (map.size() > 0 ) { sourceContext.collect(map); @Override public void cancel() { this .isRunning = false ; @Override public void close() throws Exception { super .close(); public class DbBroadCastListInitUtil implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(DbBroadCastListInitUtil. class ); // 数据库规则信息 public static Map<String, String> areasMap = new HashMap<String, String> (); static { LOG.info( "初始化 db 模块" ); Connection dbConn = null ; try { if (dbConn == null || dbConn.isClosed()) { LOG.info( "init dbConn start...." ); LOG.info( "init dbConn end...." ); HashMap <String, String> map = Maps.newHashMap(); map.put( "US","AREA_US" ); map.put( "TW","AREA_CT" ); map.put( "HK","AREA_CT" ); areasMap = map; } catch (Exception e) { LOG.error( "init database [status:error]" , e); throw new RuntimeException(" static article rule list db select error! , "+ e.getMessage()) ; } finally { if (dbConn != null ) { try { dbConn.close(); } catch (SQLException e) { LOG.error( "dbConn conn close error!" ,e); public static Map<String, String> newAreasMap = new HashMap<String, String> (); public void reloadRule() throws Exception { LOG.info( "重新初始化 DB reloadRule 模块" ); Connection dbConn = null ; try { if (dbConn == null || dbConn.isClosed()) { LOG.info( "init dbConn start...." ); LOG.info( "init dbConn end...." ); HashMap <String, String> map = Maps.newHashMap(); map.put( "US","AREA_US" ); map.put( "TW","AREA_CT" ); map.put( "HK","AREA_CT" ); map.put( "AM","AREA_CT" ); newAreasMap = map; } catch (Exception e) { LOG.error( "init database [status:error]" , e); throw e; } finally { if (dbConn != null ) { try { dbConn.close(); } catch (SQLException e) { LOG.error( "dbConn conn close error!" ,e); public static Map<String, String> getNewAreasMap() { return newAreasMap; public static Build build() throws Exception { final DbBroadCastListInitUtil.Build build = new DbBroadCastListInitUtil.Build(); build.reloadRule(); return build; public class 广播方式3使用DB对方式广播 { public static void main(String[] args) throws Exception { // 1 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism( 3); // 并行度取决于 kafka 中的分区数 保持与kafka 一致 // 2 设置 checkpoint // 开启checkpoint 一分钟一次 env.enableCheckpointing(60000 ); // 设置checkpoint 仅一次语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 两次checkpoint的时间间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000 ); // 最多只支持1个checkpoint同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1 ); // checkpoint超时的时间 env.getCheckpointConfig().setCheckpointTimeout(60000 ); // 任务失败后也保留 checkPonit数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 Time.of(10, TimeUnit.SECONDS) // 间隔 // 设置 checkpoint 路径 // env.setStateBackend(new FsStateBackend("hdfs: // 192.168.123.103:9000/flink/checkpoint")); // 3 设置 kafka Flink 消费 // 创建 Kafka 消费信息 String topic = "data_flink_bigdata_test" ; Properties consumerProperties = new Properties(); consumerProperties.put( "bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092" ); consumerProperties.put( "group.id", "data_flink_bigdata_test_consumer" ); consumerProperties.put( "enable.auto.commit", "false" ); consumerProperties.put( "auto.offset.reset", "earliest" ); // 4 获取 kafka 与 redis 数据源 FlinkKafkaConsumer consumer
= new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), consumerProperties); DataStreamSource <String> kafkaSourceData = env.addSource(consumer); // 获取 redis 数据源并且进行广播 线上的广播也是 source + 广播方法 MapStateDescriptor
<String, String> descriptor = new MapStateDescriptor<String, String> ( "RedisBdStream" , String. class , String. class // 使用 数据库源 来进行广播 BroadcastStream<Map<String, Object>> broadcast = env.addSource( new BigDataDBBroadSource()).broadcast(descriptor); // 5 两个数据源进行 ETL 处理 使用 connect 连接处理 数据库表信息进行广播 SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(broadcast).process( new MyETLProcessFunction()); // 6 新创建一个 kafka 生产者 进行发送 String outputTopic = "allDataClean" ; // 输出给下游 kafka /* Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(outputTopic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), producerProperties); etlData.addSink(producer); */ etlData.print(); // 7 提交任务执行 env.execute("DataClean" ); * in 1 kafka source * in 2 redis source * out 合并后的source * TODO 程序启动后发生的事: * 1 运行 open 方法 ,触发静态方法给 areasMap 赋值 * 2 运行 processElement 方法前, areasMap 肯定是值的,正常进行处理 * 3 当到 BigDataDBBroadSource 轮训的时间后 ,刷新数据库表数据到 areasMap ,此时 areasMap 会加入新值,完成广播变量的更新 * 4 广播变量更新后 继续进行 processElement 数据处理 private static class MyETLProcessFunction extends BroadcastProcessFunction<String, Map<String, Object>, String> { public Map<String, String> areasMap = new HashMap<String, String> (); @Override public void open(Configuration parameters) throws Exception { super .open(parameters); // 触发静态方法去赋值 areasMap = DbBroadCastListInitUtil.areasMap; // 逻辑的处理方法 kafka 的数据 @Override public void processElement(String line, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception { // 将 kafka 数据 按 redis 数据进行替换 // s -> kafka 数据 // allMap -> redis 数据 System.out.println("into processElement " ); JSONObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt" ); String countryCode = jsonObject.getString("countryCode" ); // 可以根据countryCode获取大区的名字 // String area = allDataMap.get(countryCode); // 从MapState中获取对应的Code String area = areasMap.get(countryCode); JSONArray data = jsonObject.getJSONArray("data" ); for ( int i = 0; i < data.size(); i++ ) { JSONObject dataObject = data.getJSONObject(i); System.out.println( "大区:" + area); dataObject.put( "dt" , dt); dataObject.put( "area" , area); // 下游获取到数据的时候,也就是一个json格式的数据 collector.collect(dataObject.toJSONString()); @Override public void processBroadcastElement(Map<String, Object> value, Context ctx, Collector<String> out) throws Exception { // 广播算子定时刷新后 将数据发送到下游 if (value != null && value.size() > 0 ) { Object obj = value.getOrDefault("dbsource", null ); if (obj != null ) { DbBroadCastListInitUtil.Build biulder = (DbBroadCastListInitUtil.Build) obj; // 更新了 数据库数据 areasMap = biulder.getNewAreasMap(); System.out.println( "数据库刷新算子运行完成!" );