package com.upupfeng.demo;
import com.upupfeng.source.TailHdfsFileSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* @author mawf
public class TailHdfsFileDemo {
public static void main(String[] args) throws Exception {
System.setProperty("user.name", "root");
Configuration configuration = new Configuration();
configuration.setString("fs.default-scheme", "hdfs://hadoop1:8020");
String path = "/user/mwf/a.log";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TailHdfsFileSource tailHdfsFileSource = new TailHdfsFileSource(configuration, path, 5);
env.addSource(tailHdfsFileSource)
.setParallelism(1)
.print();
env.execute();
背景需要读取HDFS上变化的日志文件,对每一行进行处理,就是类似于Linux中tail -f实现的功能。看了看好像Spark和Flink都没有类似的支持,于是就用Flink自定义了Source实现了这个功能。实现思路维持一个当前读取位置的偏移量,然后每隔几秒去看下文件的大小是否大于当前偏移量。如果最新文件大小大于当前偏移量就读取数据,并将当前偏移量设置为最新的文件大小;反之,不做任何操作。以下的代码,还没有把当前读取位置存储到状态中,如果重启会重头开始读。实现代码自定义Sourcepacka
本博客基于1.13.6版本Flink,实现批处理提交到yarn执行,并实现读取HDFS上文件实现批处理,有完整的Flink代码,已完成编译,开箱简单修改即可用,避免了大家编程、编译、提交yarn上的一些错误。
本工程提供
1、项目源码及详细注释,简单修改即可用在实际生产代码
2、成功编译截图
3、linux提交命令
4、提交到yarn上截图
5、自己编译过程中可能出现的问题
6、执行结果
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<versio...
目录1.引入依赖2.创建hdfs文件3.从hdfs读取数据
1.引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12&
文章目录01 引言02 Source2.1 基于集合的Source2.2 基于文件的Source2.3 基于Socket的Source2.4 自定义Source2.4.1 案例 - 随机生成数据2.4.2 案例 - MySQL03 Source
01 引言
在前面的博客,我们已经对Flink的原理有了一定的了解了,有兴趣的同学可以参阅下:
《Flink教程(01)- Flink知识图谱》
《Flink教程(02)- Flink入门》
《Flink教程(03)- Flink环境搭建》
《Flink教程(04
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import jav
Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源(Source)。
从java的集合中读取数据
一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。
//1.创建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.创建集合
List<WaterSe.
Flink是一个高性能流式处理引擎,可以读取各种各样的数据源,包括自定义的源。自定义源是使用Flink的一种方式,主要是为了读取一些非标准的数据源或者改善性能表现。
自定义source是一个接口,需要实现org.apache.flink.streaming.api.functions.source.SourceFunction接口。该接口只有两个方法,一个是run(),另一个是cancel()。在run()中实现数据读取的逻辑,cancel()用于取消读取。自定义source主要包括数据什么时候开始读取,如何读取数据及什么时候读取结束等。
实现自定义source需要在程序入口处调用StreamExecutionEnvironment对象中的addSource()方法,将自定义source添加到批处理中。示例如下:
```java
DataStreamSource<String> dataSource = env.addSource(new MySource());
其中,MySource是自定义的数据源。
在自定义source中,可以采用文件缓存方式来提升读取性能。通过FileChannel打开文件,使用ByteBuffer读取文件,然后将ByteBuffer通过Flink的DataStream传递给后续算子处理。这种方式可以大大提升文件读取的性能,减少文件IO的次数。示例如下:
```java
try {
FileInputStream inputStream = new FileInputStream(filePath);
FileChannel inChannel = inputStream.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 5);
while (inChannel.read(buffer) != -1) {
buffer.flip();
sourceContext.collect(buffer);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
自定义source的实现需要根据具体的数据源进行,但总体来说,实现自定义源并不复杂,只需要理解Flink数据处理的机制,并编写封装好的代码即可。