Flink 广播变量在实时处理程序中扮演着很重要的角色,适当的使用广播变量会大大提升程序处理效率。
本文从简单的 demo 场景出发,引入生产中实际的需求并提出思路与部分示例代码,应对一般需求应该没有什么问题,话不多说,赶紧来看看这篇干货满满的广播程序使用实战吧。
1 啥是广播
Flink 支持广播变量,允许在每台机器上保留一个只读的缓存变量,数据存在内存中,在不同的 task 所在的节点上的都能获取到,可以减少大量的 shuffle 操作。
换句话说,广播变量可以理解为一个公共的共享变量,可以把一个 dataset 的数据集广播出去,然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。
如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集,比较浪费内存 (也就是一个节点中可能会存在多份 dataset 数据)
2 用法总结
//1 初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1,2,3)
//2 广播数据 api
withBroadcastSet(toBroadcast,"broadcastSetName")
//3 获取数据
Collection<integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
广播变量由于要常驻内存,程序结束时才会失效,所以数据量不宜过大
广播变量广播在初始化后不支持修改 (修改场景也有办法)
3 基础案例演示
基础案例广播变量使用
这种场景下广播变量就是加载参数表,参数表不会变化,记住第二部分常用总结公式即可。
*
@author
大数据江湖
*
@version
1.0
* @date 2021/5/17.
public
class
BaseBroadCast {
* broadcast广播变量
* 需求:
* flink会从数据源中获取到用户的姓名
* 最终需要把用户的姓名和年龄信息打印出来
* 分析:
* 所以就需要在中间的map处理的时候获取用户的年龄信息
* 建议吧用户的关系数据集使用广播变量进行处理
public
static
void
main(String[] args)
throws
Exception {
//
获取运行环境
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//
1:准备需要广播的数据
ArrayList<Tuple2<String, Integer>> broadData =
new
ArrayList<>
();
broadData.add(
new
Tuple2<>("zs", 18
));
broadData.add(
new
Tuple2<>("ls", 20
));
broadData.add(
new
Tuple2<>("ww", 17
));
DataSet
<Tuple2<String, Integer>> tupleData =
env.fromCollection(broadData);
//
1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
DataSet
<HashMap<String, Integer>> toBroadcast = tupleData.map(
new
MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>
() {
@Override
public
HashMap<String, Integer> map(Tuple2<String, Integer>
value)
throws
Exception {
HashMap
<String, Integer> res =
new
HashMap<>
();
res.put(value.f0, value.f1);
return
res;
//
源数据
DataSource<String> data = env.fromElements("zs", "ls", "ww"
);
//
注意:在这里需要使用到RichMapFunction获取广播变量
DataSet<String> result = data.map(
new
RichMapFunction<String, String>
() {
List
<HashMap<String, Integer>> broadCastMap =
new
ArrayList<HashMap<String, Integer>>
();
HashMap
<String, Integer> allMap =
new
HashMap<String, Integer>
();
* 这个方法只会执行一次
* 可以在这里实现一些初始化的功能
* 所以,就可以在open方法中获取广播变量数据
@Override
public
void
open(Configuration parameters)
throws
Exception {
super
.open(parameters);
//
3:获取广播数据
this
.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName"
);
for
(HashMap map : broadCastMap) {
allMap.putAll(map);
@Override
public
String map(String value)
throws
Exception {
Integer age
=
allMap.get(value);
return
value + "," +
age;
}).withBroadcastSet(toBroadcast,
"broadCastMapName");
//
2:执行广播数据的操作
result.print();
4
生产案例演示
实际生产中有时候是需要更新广播变量的,但不是实时更新的,一般会设置一个更新周期,几分钟,几小时的都很常见,根据业务而定。
由于广播变量需要更新,解决办法一般是需要将广播变量做成另一个 source,进行流与流之间的 connect 操作,定时刷新广播的source,从而达到广播变量修改的目的。
4.1.1 使用 redis 中的数据作为广播变量的思路:
消费 kafka 中的数据,使用 redis 中的数据作为广播数据,进行数据清洗后 写到 kafka中。
示例代码分为三个部分:kafka 生产者,redis 广播数据源,执行入口类
构建 kafka 生成者,模拟数据 (以下代码的消费消息来源均是此处生产)
public
static
void
main(String[] args)
throws
Exception{
Properties prop
=
new
Properties();
//
指定kafka broker地址
prop.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"
);
//
指定key value的序列化方式
prop.put("key.serializer", StringSerializer.
class
.getName());
prop.put(
"value.serializer", StringSerializer.
class
.getName());
//
指定topic名称
String topic = "data_flink_bigdata_test"
;
//
创建producer链接
KafkaProducer<String, String> producer =
new
KafkaProducer<String,String>
(prop);
//
{"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}
while
(
true
){
String message
= "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\",\"data\":[{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"},{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"}]}"
;
System.out.println(message);
//
同步的方式,往Kafka里面生产数据
producer.send(
new
ProducerRecord<String, String>
(topic,message));
Thread.sleep(
2000
);
//
关闭链接
//
producer.close();
public
static
String getCurrentTime(){
SimpleDateFormat sdf
=
new
SimpleDateFormat("YYYY-MM-dd HH:mm:ss"
);
return
sdf.format(
new
Date());
public
static
String getCountryCode(){
String[] types
= {"US","TW","HK","PK","KW","SA","IN"
};
Random random
=
new
Random();
int
i =
random.nextInt(types.length);
return
types[i];
public
static
String getRandomType(){
String[] types
= {"s1","s2","s3","s4","s5"
};
Random random
=
new
Random();
int
i =
random.nextInt(types.length);
return
types[i];
public
static
double
getRandomScore(){
double
[] types = {0.3,0.2,0.1,0.5,0.8
};
Random random
=
new
Random();
int
i =
random.nextInt(types.length);
return
types[i];
public
static
String getRandomLevel(){
String[] types
= {"A","A+","B","C","D"
};
Random random
=
new
Random();
int
i =
random.nextInt(types.length);
return
types[i];
public
class
BigDataRedisSource
implements
SourceFunction<HashMap<String,String>>
{
private
Logger logger= LoggerFactory.getLogger(BigDataRedisSource.
class
);
private
Jedis jedis;
private
boolean
isRunning=
true
;
@Override
public
void
run(SourceContext<HashMap<String, String>> cxt)
throws
Exception {
this
.jedis =
new
Jedis("localhost",6379
);
HashMap
<String, String> map =
new
HashMap<>
();
while
(isRunning){
try
{
map.clear();
Map
<String, String> areas = jedis.hgetAll("areas"
);
* AREA_CT TT,AA
* map:
* TT,AREA_CT
* AA,AREA_CT
for
(Map.Entry<String,String>
entry: areas.entrySet()){
String area
=
entry.getKey();
String value
=
entry.getValue();
String[] fields
= value.split(","
);
for
(String country:fields){
map.put(country,area);
if
(map.size() > 0
){
cxt.collect(map);
Thread.sleep(
60000
);
}
catch
(JedisConnectionException e){
logger.error(
"redis连接异常"
,e.getCause());
this
.jedis =
new
Jedis("localhost",6379
);
}
catch
(Exception e){
logger.error(
"数据源异常"
,e.getCause());
@Override
public
void
cancel() {
isRunning
=
false
;
if
(jedis !=
null
){
jedis.close();
public
class
广播方式1分两个流进行connnect操作 {
public
static
void
main(String[] args)
throws
Exception {
//
1 获取执行环境
StreamExecutionEnvironment env
=
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(
3);
//
并行度取决于 kafka 中的分区数 保持与kafka 一致
//
2 设置 checkpoint
//
开启checkpoint 一分钟一次
env.enableCheckpointing(60000
);
//
设置checkpoint 仅一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//
两次checkpoint的时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000
);
//
最多只支持1个checkpoint同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1
);
//
checkpoint超时的时间
env.getCheckpointConfig().setCheckpointTimeout(60000
);
//
任务失败后也保留 checkPonit数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
//
尝试重启的次数
Time.of(10, TimeUnit.SECONDS)
//
间隔
//
设置 checkpoint 路径
//
env.setStateBackend(new FsStateBackend("hdfs:
//
192.168.123.103:9000/flink/checkpoint"));
//
3 设置 kafka Flink 消费
//
创建 Kafka 消费信息
String topic
="data_flink_bigdata_test"
;
Properties consumerProperties
=
new
Properties();
consumerProperties.put(
"bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"
);
consumerProperties.put(
"group.id","data_test_new_1"
);
consumerProperties.put(
"enable.auto.commit", "false"
);
consumerProperties.put(
"auto.offset.reset","earliest"
);
//
4 获取 kafka 与 redis 数据源
FlinkKafkaConsumer consumer =
new
FlinkKafkaConsumer<String>(topic,
new
SimpleStringSchema(), consumerProperties);
DataStreamSource
<String> kafkaSourceData =
env.addSource(consumer);
//
直接使用广播的方式 后续作为两个数据流来操作
DataStream<HashMap<String, String>> redisSourceData = env.addSource(
new
NxRedisSource()).broadcast();
//
5 两个数据源进行 ETL 处理 使用 connect 连接处理
SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(redisSourceData).flatMap(
new
MyETLProcessFunction());
//
6 新创建一个 kafka 生产者 进行发送
String outputTopic="allDataClean"
;
//
输出给下游 kafka
Properties producerProperties =
new
Properties();
producerProperties.put(
"bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"
);
FlinkKafkaProducer
<String> producer =
new
FlinkKafkaProducer<>
(outputTopic,
new
KeyedSerializationSchemaWrapper<String>(
new
SimpleStringSchema()),
producerProperties);
etlData.addSink(producer);
//
7 提交任务执行
env.execute("DataClean"
);
* in 1 kafka source :
* {"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}
* in 2 redis source
* US,AREA_US
* TW,AREA_CT
* HK,AREA_CT
* out 合并后的source
private
static
class
MyETLProcessFunction
implements
CoFlatMapFunction<String,HashMap<String,String>,String>
{
//
用来存储 redis 中的数据
HashMap<String,String> allMap =
new
HashMap<String,String>
();
@Override
public
void
flatMap1(String line, Collector<String> collector)
throws
Exception {
//
将 kafka 数据 按 redis 数据进行替换
//
s -> kafka 数据
//
allMap -> redis 数据
JSONObject jsonObject
=
JSONObject.parseObject(line);
String dt
= jsonObject.getString("dt"
);
String countryCode
= jsonObject.getString("countryCode"
);
//
可以根据countryCode获取大区的名字
String area =
allMap.get(countryCode);
JSONArray data
= jsonObject.getJSONArray("data"
);
for
(
int
i = 0; i < data.size(); i++
) {
JSONObject dataObject
=
data.getJSONObject(i);
System.out.println(
"大区:"+
area);
dataObject.put(
"dt"
, dt);
dataObject.put(
"area"
, area);
//
下游获取到数据的时候,也就是一个json格式的数据
collector.collect(dataObject.toJSONString());
@Override
public
void
flatMap2(HashMap<String, String> stringStringHashMap, Collector<String> collector)
throws
Exception {
//
将 redis 中 数据进行赋值
allMap =
stringStringHashMap;
public
class
广播方式2使用MapState对方式1改造 {
public
static
void
main(String[] args)
throws
Exception {
//
1 获取执行环境
StreamExecutionEnvironment env
=
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(
3);
//
并行度取决于 kafka 中的分区数 保持与kafka 一致
//
2 设置 checkpoint
//
开启checkpoint 一分钟一次
env.enableCheckpointing(60000
);
//
设置checkpoint 仅一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//
两次checkpoint的时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000
);
//
最多只支持1个checkpoint同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1
);
//
checkpoint超时的时间
env.getCheckpointConfig().setCheckpointTimeout(60000
);
//
任务失败后也保留 checkPonit数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
//
尝试重启的次数
Time.of(10, TimeUnit.SECONDS)
//
间隔
//
设置 checkpoint 路径
//
env.setStateBackend(new FsStateBackend("hdfs:
//
192.168.123.103:9000/flink/checkpoint"));
//
3 设置 kafka Flink 消费
//
创建 Kafka 消费信息
String topic = "data_flink_bigdata_test"
;
Properties consumerProperties
=
new
Properties();
consumerProperties.put(
"bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"
);
consumerProperties.put(
"group.id", "data_flink_fpy_test_consumer"
);
consumerProperties.put(
"enable.auto.commit", "false"
);
consumerProperties.put(
"auto.offset.reset", "earliest"
);
//
4 获取 kafka 与 redis 数据源
FlinkKafkaConsumer consumer =
new
FlinkKafkaConsumer<String>(topic,
new
SimpleStringSchema(), consumerProperties);
DataStreamSource
<String> kafkaSourceData =
env.addSource(consumer);
//
获取 redis 数据源并且进行广播 线上的广播也是 source + 广播方法
MapStateDescriptor<String, String> descriptor =
new
MapStateDescriptor<String, String>
(
"RedisBdStream"
,
String.
class
,
String.
class
//
5 两个数据源进行 ETL 处理 使用 connect 连接处理 TODO process 替换 FlatMap
//
TODO 使用 MapState 来进行广播
BroadcastStream<HashMap<String, String>> redisSourceData = env.addSource(
new
NxRedisSource()).broadcast(descriptor);
SingleOutputStreamOperator
<String> etlData = kafkaSourceData.connect(redisSourceData).process(
new
MyETLProcessFunction());
//
6 新创建一个 kafka 生产者 进行发送
String outputTopic = "allDataClean"
;
//
输出给下游 kafka
Properties producerProperties =
new
Properties();
producerProperties.put(
"bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"
);
FlinkKafkaProducer
<String> producer =
new
FlinkKafkaProducer<>
(outputTopic,
new
KeyedSerializationSchemaWrapper<String>(
new
SimpleStringSchema()),
producerProperties);
etlData.addSink(producer);
etlData.print();
//
7 提交任务执行
env.execute("DataClean"
);
* in 1 kafka source
* in 2 redis source
* out 合并后的source
private
static
class
MyETLProcessFunction
extends
BroadcastProcessFunction<String, HashMap<String, String>, String>
{
//
TODO 注意此处 descriptor 的名称需要与 广播时 (99行代码) 名称一致
MapStateDescriptor<String, String> descriptor =
new
MapStateDescriptor<String, String>
(
"RedisBdStream"
,
String.
class
,
String.
class
//
逻辑的处理方法 kafka 的数据
@Override
public
void
processElement(String line, ReadOnlyContext readOnlyContext, Collector<String> collector)
throws
Exception {
//
将 kafka 数据 按 redis 数据进行替换
//
s -> kafka 数据
//
allMap -> redis 数据
System.out.println("into processElement "
);
JSONObject jsonObject
=
JSONObject.parseObject(line);
String dt
= jsonObject.getString("dt"
);
String countryCode
= jsonObject.getString("countryCode"
);
//
可以根据countryCode获取大区的名字
//
String area = allDataMap.get(countryCode);
//
TODO 从MapState中获取对应的Code
String area =
readOnlyContext.getBroadcastState(descriptor).get(countryCode);
JSONArray data
= jsonObject.getJSONArray("data"
);
for
(
int
i = 0; i < data.size(); i++
) {
JSONObject dataObject
=
data.getJSONObject(i);
System.out.println(
"大区:" +
area);
dataObject.put(
"dt"
, dt);
dataObject.put(
"area"
, area);
//
下游获取到数据的时候,也就是一个json格式的数据
collector.collect(dataObject.toJSONString());
//
广播流的处理方法
@Override
public
void
processBroadcastElement(HashMap<String, String> stringStringHashMap, Context context, Collector<String> collector)
throws
Exception {
//
将接收到的控制数据放到 broadcast state 中
//
key , flink
//
将 RedisMap中的值放入 MapState 中
for
(Map.Entry<String, String>
entry : stringStringHashMap.entrySet()) {
//
TODO 使用 MapState 存储 redis 数据
context.getBroadcastState(descriptor).put(entry.getKey(), entry.getValue());
System.out.println(entry);
4.2 关系型数据库广播变量案例思路:
在 flink 流式处理中常常需要加载数据库中的数据作为条件进行数据处理,有些表作为系统表,实时查询效率很低,这时候就需要将这些数据作为广播数据,而同时这些数据可能也需要定期的更新。
数据库表的广播变量思路同redis等缓存广播数据的思路类似,也是使用 两个source 进行 connect 处理 , 在数据库表的 source 中定时刷新数据就可以了。
不同点在于这里把数据库查询的操作转成另一个工具类,在初始化时使用了静态代码块,在广播时使用了流的 connect 操作。
示例代码分为三个部分:数据库表广播源,数据库操作类,执行入口类
数据库表广播源
* DB source 源头 进行广播
public
class
BigDataDBBroadSource
extends
RichSourceFunction<Map<String,Object>>
{
private
final
Logger logger = LoggerFactory.getLogger(BigDataDBBroadSource.
class
);
private
volatile
boolean
isRunning =
true
;
public
BigDataDBBroadSource() {
@Override
public
void
open(Configuration parameters)
throws
Exception {
super
.open(parameters);
@Override
public
void
run(SourceContext<Map<String,Object>> sourceContext)
throws
Exception {
while
(isRunning) {
//
TODO 使用的是一个 DB 源头的 source 60 s 刷新一次 进行往下游发送
TimeUnit.SECONDS.sleep(60
);
Map
<String,Object> map =
new
HashMap<String,Object>
();
//
规则匹配关键词
final
DbBroadCastListInitUtil.Build ruleListInitUtil =
new
DbBroadCastListInitUtil.Build();
ruleListInitUtil.reloadRule();
map.put(
"dbsource"
, ruleListInitUtil);
if
(map.size() > 0
) {
sourceContext.collect(map);
@Override
public
void
cancel() {
this
.isRunning =
false
;
@Override
public
void
close()
throws
Exception {
super
.close();
public
class
DbBroadCastListInitUtil
implements
Serializable {
private
static
final
Logger LOG = LoggerFactory.getLogger(DbBroadCastListInitUtil.
class
);
//
数据库规则信息
public
static
Map<String, String> areasMap =
new
HashMap<String, String>
();
static
{
LOG.info(
"初始化 db 模块"
);
Connection dbConn
=
null
;
try
{
if
(dbConn ==
null
||
dbConn.isClosed()) {
LOG.info(
"init dbConn start...."
);
LOG.info(
"init dbConn end...."
);
HashMap
<String, String> map =
Maps.newHashMap();
map.put(
"US","AREA_US"
);
map.put(
"TW","AREA_CT"
);
map.put(
"HK","AREA_CT"
);
areasMap
=
map;
}
catch
(Exception e) {
LOG.error(
"init database [status:error]"
, e);
throw
new
RuntimeException(" static article rule list db select error! , "+
e.getMessage()) ;
}
finally
{
if
(dbConn !=
null
) {
try
{
dbConn.close();
}
catch
(SQLException e) {
LOG.error(
"dbConn conn close error!"
,e);
public
static
Map<String, String> newAreasMap =
new
HashMap<String, String>
();
public
void
reloadRule()
throws
Exception {
LOG.info(
"重新初始化 DB reloadRule 模块"
);
Connection dbConn
=
null
;
try
{
if
(dbConn ==
null
||
dbConn.isClosed()) {
LOG.info(
"init dbConn start...."
);
LOG.info(
"init dbConn end...."
);
HashMap
<String, String> map =
Maps.newHashMap();
map.put(
"US","AREA_US"
);
map.put(
"TW","AREA_CT"
);
map.put(
"HK","AREA_CT"
);
map.put(
"AM","AREA_CT"
);
newAreasMap
=
map;
}
catch
(Exception e) {
LOG.error(
"init database [status:error]"
, e);
throw
e;
}
finally
{
if
(dbConn !=
null
) {
try
{
dbConn.close();
}
catch
(SQLException e) {
LOG.error(
"dbConn conn close error!"
,e);
public
static
Map<String, String>
getNewAreasMap() {
return
newAreasMap;
public
static
Build build()
throws
Exception {
final
DbBroadCastListInitUtil.Build build =
new
DbBroadCastListInitUtil.Build();
build.reloadRule();
return
build;
public
class
广播方式3使用DB对方式广播 {
public
static
void
main(String[] args)
throws
Exception {
//
1 获取执行环境
StreamExecutionEnvironment env
=
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(
3);
//
并行度取决于 kafka 中的分区数 保持与kafka 一致
//
2 设置 checkpoint
//
开启checkpoint 一分钟一次
env.enableCheckpointing(60000
);
//
设置checkpoint 仅一次语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//
两次checkpoint的时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000
);
//
最多只支持1个checkpoint同时执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1
);
//
checkpoint超时的时间
env.getCheckpointConfig().setCheckpointTimeout(60000
);
//
任务失败后也保留 checkPonit数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
//
尝试重启的次数
Time.of(10, TimeUnit.SECONDS)
//
间隔
//
设置 checkpoint 路径
//
env.setStateBackend(new FsStateBackend("hdfs:
//
192.168.123.103:9000/flink/checkpoint"));
//
3 设置 kafka Flink 消费
//
创建 Kafka 消费信息
String topic
= "data_flink_bigdata_test"
;
Properties consumerProperties
=
new
Properties();
consumerProperties.put(
"bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"
);
consumerProperties.put(
"group.id", "data_flink_bigdata_test_consumer"
);
consumerProperties.put(
"enable.auto.commit", "false"
);
consumerProperties.put(
"auto.offset.reset", "earliest"
);
//
4 获取 kafka 与 redis 数据源
FlinkKafkaConsumer consumer
=
new
FlinkKafkaConsumer<String>(topic,
new
SimpleStringSchema(), consumerProperties);
DataStreamSource
<String> kafkaSourceData =
env.addSource(consumer);
//
获取 redis 数据源并且进行广播 线上的广播也是 source + 广播方法
MapStateDescriptor
<String, String> descriptor =
new
MapStateDescriptor<String, String>
(
"RedisBdStream"
,
String.
class
,
String.
class
//
使用 数据库源 来进行广播
BroadcastStream<Map<String, Object>> broadcast = env.addSource(
new
BigDataDBBroadSource()).broadcast(descriptor);
//
5 两个数据源进行 ETL 处理 使用 connect 连接处理 数据库表信息进行广播
SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(broadcast).process(
new
MyETLProcessFunction());
//
6 新创建一个 kafka 生产者 进行发送
String outputTopic = "allDataClean"
;
//
输出给下游 kafka
/*
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(outputTopic,
new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),
producerProperties);
etlData.addSink(producer);
*/
etlData.print();
//
7 提交任务执行
env.execute("DataClean"
);
* in 1 kafka source
* in 2 redis source
* out 合并后的source
* TODO 程序启动后发生的事:
* 1 运行 open 方法 ,触发静态方法给 areasMap 赋值
* 2 运行 processElement 方法前, areasMap 肯定是值的,正常进行处理
* 3 当到 BigDataDBBroadSource 轮训的时间后 ,刷新数据库表数据到 areasMap ,此时 areasMap 会加入新值,完成广播变量的更新
* 4 广播变量更新后 继续进行 processElement 数据处理
private
static
class
MyETLProcessFunction
extends
BroadcastProcessFunction<String, Map<String, Object>, String>
{
public
Map<String, String> areasMap =
new
HashMap<String, String>
();
@Override
public
void
open(Configuration parameters)
throws
Exception {
super
.open(parameters);
//
触发静态方法去赋值
areasMap =
DbBroadCastListInitUtil.areasMap;
//
逻辑的处理方法 kafka 的数据
@Override
public
void
processElement(String line, ReadOnlyContext readOnlyContext, Collector<String> collector)
throws
Exception {
//
将 kafka 数据 按 redis 数据进行替换
//
s -> kafka 数据
//
allMap -> redis 数据
System.out.println("into processElement "
);
JSONObject jsonObject
=
JSONObject.parseObject(line);
String dt
= jsonObject.getString("dt"
);
String countryCode
= jsonObject.getString("countryCode"
);
//
可以根据countryCode获取大区的名字
//
String area = allDataMap.get(countryCode);
//
从MapState中获取对应的Code
String area =
areasMap.get(countryCode);
JSONArray data
= jsonObject.getJSONArray("data"
);
for
(
int
i = 0; i < data.size(); i++
) {
JSONObject dataObject
=
data.getJSONObject(i);
System.out.println(
"大区:" +
area);
dataObject.put(
"dt"
, dt);
dataObject.put(
"area"
, area);
//
下游获取到数据的时候,也就是一个json格式的数据
collector.collect(dataObject.toJSONString());
@Override
public
void
processBroadcastElement(Map<String, Object> value, Context ctx, Collector<String> out)
throws
Exception {
//
广播算子定时刷新后 将数据发送到下游
if
(value !=
null
&& value.size() > 0
) {
Object obj
= value.getOrDefault("dbsource",
null
);
if
(obj !=
null
) {
DbBroadCastListInitUtil.Build biulder
=
(DbBroadCastListInitUtil.Build) obj;
//
更新了 数据库数据
areasMap =
biulder.getNewAreasMap();
System.out.println(
"数据库刷新算子运行完成!"
);