添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
直爽的棒棒糖  ·  C++ ...·  1 年前    · 

第八章 Flink - Sink数据目标

Flink针对DataStream提供了大量的已经实现的数据目标(Sink),包括文件、Kafka、Redis、HDFS、Elasticsearch等等。

flink官网: flink.apache.org/

在官网左侧,document下选择版本,目前是1.13。如果想看之前版本的文档,直接修改url中的版本号即可查看。

1) 基于HDFS的Sink( StreamingFileSink )

首先配置支持Hadoop FileSystem的连接器依赖。

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-filesystem_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>

这个连接器提供了一个 Sink 来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。

Streaming File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此 每个桶中的数据被划分为多个有限大小的文件 。如何分桶是可以配置的, 默认使用基于时间的分桶策略 ,这种策略 每个小时创建一个新的桶 ,桶中包含的文件将记录所有该小时内从流中接收到的数据。

桶目录中的实际输出数据会被划分为多个部分文件(part file) ,每一个接收桶数据的 Sink Subtask , 至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建, 滚动策略是可以配置的。默认的策略是 根据文件大小和超时时间来滚动文件 。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。

重要: 使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。

文件格式

StreamingFileSink 支持行编码格式和批量编码格式( 如 Apache Parquet ) 。

  • Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder)
  • Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)

创建行或批量编码的 Sink 时,我们需要指定存储桶的基本路径和数据的编码逻辑。

行编码格式 (Row-encoded sink)

行编码格式需要指定一个 Encoder 。Encoder 负责为每个处于 In-progress 状态文件的 OutputStream 序列化数据。

除了桶分配器之外, RowFormatBuilder 还允许用户指定:

  • Custom RollingPolicy :自定义滚动策略以覆盖默认的 DefaultRollingPolicy
  • bucketCheckInterval (默认为1分钟):毫秒间隔,用于基于时间的滚动策略。

字符串元素写入示例:

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
val input: DataStream[String] = ...
val sink: StreamingFileSink[String] = StreamingFileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
            .withMaxPartSize(1024 * 1024 * 1024)
            .build())
    .build()
input.addSink(sink)

这个例子创建了一个简单的 Sink ,将记录分配给默认的一小时时间桶。它还指定了一个滚动策略,该策略在以下三种情况下滚动处于 In-progress 状态的部分文件(part file):

  • 它至少包含 15 分钟的数据
  • 最近 5 分钟没有收到新的记录
  • 文件大小达到 1GB (写入最后一条记录后)

批量编码格式 (Bulk-encoded sink)

批量编码 Sink 的创建与行编码 Sink 相似,不过在这里我们不是指定编码器 Encoder 而是指定 BulkWriter. Factory BulkWriter 定义了如何添加、刷新元素,以及如何批量编码。

Flink 有四个内置的 BulkWriter Factory :

  • ParquetWriterFactory
  • AvroWriterFactory
  • SequenceFileWriterFactory
  • CompressWriterFactory
  • OrcBulkWriterFactory
重要: 批量编码模式仅支持 OnCheckpointRollingPolicy 策略, 在每次 checkpoint 的时候切割文件。

Parquet 格式

Flink 包含为不同 Avro 类型,创建 ParquetWriterFactory 的便捷方法,更多信息请参考 ParquetAvroWriters

要编写其他 Parquet 兼容的数据格式,用户需要创建 ParquetWriterFactory 并实现 ParquetBuilder 接口。

在应用中使用 Parquet 批量编码器,你需要添加以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet_2.11</artifactId>
    <version>1.13.2</version>
</dependency>

这个例子使用 StreamingFileSink 将 Avro 数据写入 Parquet 格式:

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.avro.Schema
val schema: Schema = ...
val input: DataStream[GenericRecord] = ...
val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
    .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
    .build()
input.addSink(sink)

除了上述schema的方式,还可以通过反射的方式创建parquet格式。例如:

DataStream<GenericRecord> stream = ...;
final StreamingFileSink<GenericRecord> sink = StreamingFileSink
	.forBulkFormat(outputBasePath,ParquetAvroWriters.forReflectRecord(MyBean.class))
	.build();
input.addSink(sink);

类似的,将 Protobuf 数据写入到 Parquet 格式可以通过:

import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters
// ProtoRecord is a generated protobuf Message class.
val input: DataStream[ProtoRecord] = ...
val sink: StreamingFileSink[ProtoRecord] = StreamingFileSink
    .forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
    .build()
input.addSink(sink)

Bucket Assignment:(分桶策略)

桶分配逻辑定义了如何将数据结构化为基本输出目录中的子目录

行格式和批量格式都使用 DateTimeBucketAssigner 作为默认的分配器。 默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: yyyy-MM-dd--HH 。日期格式(即桶的大小)和时区都可以手动配置。

我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner

Flink 有两个内置的 BucketAssigners :

  • DateTimeBucketAssigner :默认基于时间的分配器
  • BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)

除此之外,还可以实现BucketAssigner接口,自定义分桶策略。

滚动策略

滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。

Flink 有两个内置的滚动策略:

  • DefaultRollingPolicy : 当超过文件大小(默认为 128 MB),或超过了滚动周期(默认为 60 秒),或未写入数据处于不活跃状态超时(默认为 60 秒)的时候,滚动文件;
  • OnCheckpointRollingPolicy : 当 checkpoint 的时候,滚动文件。

部分文件(part file) 生命周期

为了在下游系统中使用 StreamingFileSink 的输出,我们需要了解输出文件的命名规则和生命周期。

部分文件(part file)可以处于以下三种状态之一:

  1. In-progress :当前文件正在写入中
  2. Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
  3. Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态

处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。

重要: 部分文件的索引在每个 subtask 内部是严格递增的(按文件创建顺序)。但是索引并不总是连续的。当 Job 重启后,所有部分文件的索引从 `max part index + 1` 开始, 这里的 `max part index` 是所有 subtask 中索引的最大值。

对于每个活动的桶,Writer 在任何时候都只有一个处于 In-progress 状态的部分文件(part file),但是可能有几个 Penging 和 Finished 状态的部分文件(part file)。

部分文件(part file)例子

为了更好地理解这些文件的生命周期,让我们来看一个包含 2 个 Sink Subtask 的简单例子:

当部分文件 part-1-0 被滚动(假设它变得太大了)时,它将成为 Pending 状态,但是它还没有被重命名。然后 Sink 会创建一个新的部分文件: part-1-1

part-1-0 现在处于 Pending 状态等待完成,在下一次成功的 Checkpoint 后,它会变成 Finished 状态:

根据分桶策略创建新的桶,但是这并不会影响当前处于 In-progress 状态的文件:

因为分桶策略基于每条记录进行评估,所以旧桶仍然可以接受新的记录。

部分文件的配置项

可以通过命名,将Finished的文件与in-progress的文件区分开。默认:

  • In-progress / Pending:part-<subtaskIndex>-<partFileIndex>.inprogress.uid
  • Finished:part-<subtaskIndex>-<partFileIndex>

Flink 允许用户通过 OutputFileConfig 指定部分文件名的前缀和后缀。 举例来说,前缀设置为 “prefix” 以及后缀设置为 “.ext” 之后,Sink 创建的文件名如下所示:

用户可以通过如下方式设置 OutputFileConfig :

val config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build()
val sink = StreamingFileSink
 .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
 .withBucketAssigner(new KeyBucketAssigner())
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .withOutputFileConfig(config)
 .build()

2) 基于Kafka的Sink

由于前面已经讲过Flink的Kafka连接器,所以还是一样需要配置Kafka连接器的依赖配置,接下我们还是把WordCout的结果写入Kafka:

package com.jht.flink.sink
import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
object KafkaSink1 {
  //从netcat中读取数据,统计单词的数量,写入Kafka
  def main(args: Array[String]): Unit = {
    //初始化Flink流计算环境
    val streamEvn: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //导入隐式转换
    import org.apache.flink.streaming.api.scala._
    //设置默认的分区
    //读取流数据
    val ds: DataStream[String] = streamEvn.socketTextStream("hadoop101",9999)
    //转换计算
    val result: DataStream[(String, Int)] = ds.flatMap(_.split(" ")).filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)//分组:必须指定根据哪个字段分组,参数代表当前要分组的字段的下标。
      .sum(1) //1代表下标
    //写数据Kafka
    //创建Kafka的连接参数
    val props = new Properties()
    props.setProperty("bootstrap.servers","hadoop101:9092,hadoop102:9092,hadoop103:9092")
    result.addSink(new FlinkKafkaProducer[(String, Int)]("t_topic",new MySerializer2,props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
    streamEvn.execute()
  class MySerializer2 extends KafkaSerializationSchema[(String,Int)] {
    override def serialize(t: (String, Int), aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
      new ProducerRecord("t_topic",t._1.getBytes(),t._2.toString.getBytes())

3) 自定义的Sink

当然你可以自己定义Sink,有两种实现方式:1、实现SinkFunction接口。2、实现RichSinkFunction类。后者增加了生命周期的管理功能。比如需要在Sink初始化的时候创建连接对象,则最好使用第二种。案例需求:把StationLog对象写入Mysql数据库中。

package com.jht.flink.sink
import java.sql.DriverManager
import java.sql.{Connection, PreparedStatement}
import com.jht.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object DiySink {
  //自定义一个Sink写入Mysql
  class MyCustomSink extends RichSinkFunction[StationLog]{
    var conn:Connection = _
    var pst :PreparedStatement =_
    //生命周期管理,在Sink初始化的时候调用
    override def open(parameters: Configuration): Unit = {
      conn=DriverManager.getConnection("jdbc:mysql://192.21.19.200:3306/test","kk","kk")
      pst=conn.prepareStatement("insert into t_station_log (sid,call_out,call_in,call_type,call_time,duration) values (?,?,?,?,?,?)")
    //把StationLog 写入到表t_station_log,每写入一条执行一次
    override def invoke(value: StationLog, context: SinkFunction.Context[_]): Unit = {
      pst.setString(1,value.sid)
      pst.setString(2,value.callOut)
      pst.setString(3,value.callIn)
      pst.setString(4,value.callType)
      pst.setLong(5,value.callTime)
      pst.setLong(6,value.duration)
      pst.executeUpdate()
    override def close(): Unit = {
      pst.close()
      conn.close()
  def main(args: Array[String]): Unit = {
    //初始化Flink的Streaming(流计算)上下文执行环境
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.setParallelism(1)
    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.streaming.api.scala._
    val data: DataStream[StationLog] = streamEnv.addSource(new MyCustomerSource)