添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

最近有需求需要实时监控HDFS指定目录,然后对新增文件的每一行进行处理。

采用了Flink DataStream API的 readFile 方法来实现。

文件都是 .gz 结尾的,也会存在脏文件进来,所以需要对文件名进行过滤,于是就用 FilePathFilter

在不过滤之前,可以正常监听目录;但是加了过滤后就不能监听了。

纳闷了好一会,后来点进去仔细看了看源码,才发现是使用 FilePathFilter 的姿势有问题。

这里的 FilePathFilter 居然是用来过滤目录的。。。实现代码如下:

代码摘自 org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

	private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, Path path) throws IOException {
		final FileStatus[] statuses;
		try {
            // 列出目录下的所有文件状态
			statuses = fileSystem.listStatus(path);
		} catch (IOException e) {
		if (statuses == null) {
		} else {
			Map<Path, FileStatus> files = new HashMap<>();
			// 处理新文件
			for (FileStatus status : statuses) {
                // 如果不是目录
				if (!status.isDir()) {
					Path filePath = status.getPath();
					long modificationTime = status.getModificationTime();
                    // 看他的修改时间是否应该被忽略
					if (!shouldIgnore(filePath, modificationTime)) {
						files.put(filePath, status);
                // 如果是目录。先判断是否开启了递归扫描;
                // 再判断目录是否应该被接收。其中就是去用`FilePathFilter`来过滤的
				} else if (format.getNestedFileEnumeration() && format.acceptFile(status)){
					files.putAll(listEligibleFiles(fileSystem, status.getPath()));
			return files;
	// 上面用到的acceptFile方法
	public boolean acceptFile(FileStatus fileStatus) {
		final String name = fileStatus.getPath().getName();
		return !name.startsWith("_")
			&& !name.startsWith(".")
			&& !filesFilter.filterPath(fileStatus.getPath());

源码中FilePathFilter类的filterPath中也确实写了是过滤目录的:

// org.apache.flink.api.common.io.FilePathFilter
// 如果在处理目录时,需要忽略给定的filePath,则返回true
public abstract boolean filterPath(Path filePath);

所以是不支持过滤文件名的,只支持过滤目录名。

我最开始是过滤了.gz结尾的,由于没有一个目录是以.gz结尾的,导致一个文件都扫描不到。。。

  • Flink实时监控目录下的文件,官方提供的Source不支持过滤文件名,需要我们自己实现
  • Flink监控文件这块比Spark鸡肋很多,Spark Streaming这块实现还不错
String fileNameTag=".csv"; TextInputFormat format = new TextInputFormat(new Path(filePath)); Configuration configuration = new Configuration(); configuration.setBoolean("recursive.file.enumeration", true); // 设置递归获取文件 读写文件1 读取文件-readFile2 写入到文件-StreamingFileSink    2.1 在了解-StreamingFileSink之前你需要了解的知识点        2.1.1 结论    2.2 行编码        2.2.1 行编码自定义-BucketAssigner    2.3 批量编码        2.3.1 批量编码自定义-BucketAssigner 1 读取文件-readFile Q:什么是文件数据源? A:Apache Flink提供了一个可重置的数据源连接器,支持将 Flink读取文件目录: 因为目录下的文件可能会不断新增,在新增过程文件处于传输阶段 会出现比如01.data文件正在上传,在hdfs显示的是01.data._COPYING_文件,只有真正上传完成后才能读取,而不设置过滤器的话就会报错,会提示._COPYING_文件存在,所以像这样的临时文件需要我们过滤掉, 目前默认过滤器已经满足了我们的需求:方案如下 * 2.流处理: 监听并读取hdfs文件夹目录下的所有文件 * @throws Exception 看到这些需求,我们可能想到spark都支持的不错,Flink支持的怎么样呢?本篇文章详细介绍一下Flink如何实现,递归,正则表达式等方式读取hdfs的目录。 Flink递归读取hdfs上多路径文件 比如,读取如下dat... import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import java.text.ParseException; import java.text.SimpleDa 1.setStartFromEarliest不起作用 在IDEA调试,消费Kafka的数据,然后发现setStartFromEarliest不起作用,Consumer显示默认的offset还是latest。通过Con... Flink基于模型,内置了很多强大功能的算子,可以帮助我们快速开发应用程序。作为Flink开发老手,大多算子的写法和场景想来已是了然于胸,但是使用过程常常会有一些小小的问题:工欲善其事,必先利其器!快速高效的使用合适的算子开发程序,往往可以达到事半功倍的效果。想着这个道理,特此整理一份常见的Flink算子!!也作为自己的工作笔记。欢迎大家收藏~Flink 让用户灵活且高效编写Flink流式程序。... Flink DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文运行,可以独立运行,也可以嵌入到其它程序。任务执行可以运行在本地 JVM ,也可以运行在多台机器的集群上。 DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序的数据 在flink针对读取csv文件的输出可以有3种格式,都是通过引用inputFormat来控制的,分别为 PojoCsvInputFormat输出类型为pojo, RowCsvInputFormat输出类型为Row, TupleCsvInputFormat输出类型为Tuple。本例子就用RowCsvInputFormat。 可以进入到RowCsvInputFormat 看看其构造函数都有哪些 public RowCsvInputFormat( Path filePath, TypeInforma import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.