session.transfer(flowFile, REL_FAILURE)
3.设置文件属性
flowFile = session.putAttribute(flowFile, 'attr', 'attr')
4.完整代码
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
flowFile = session.get()
class PyStreamCallback(StreamCallback):
def __init__(self):
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
outputStream.write(bytearray(text.encode('utf-8')))
if(flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
flowFile = session.putAttribute(flowFile, 'attr', 'atr')
session.transfer(flowFile, REL_SUCCESS)
RouteOnAttribute
根据表达式匹配属性进行文件路由
添加路由条件,当符合条件时匹配description路由,否则走其他路由
ExecuteScript本文使用python,其他语言请参考这里1.从FlowFile获取文件:flowFile = session.get()2.脚本结束后路由(不路由会报错)#路由到成功session.transfer(flowFile, REL_SUCCESS)#路由到失败session.transfer(flowFile, REL_FAILURE)3.设...
Apache
NiFi
是一个易于
使用
、功能强大而且可靠的数据拉取、数据处理和分发系统,用于自动化管理系统间的数据流。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。
NiFi
原来是NSA的一个项目,目前已经代码开源,是Apache基金会的顶级项目之一。
NiFi
是基于Java的,
使用
Maven支持包的构建管理。
NiFi
基于Web方式工作,后台在服务器上进行调度。用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。
NiFi
是一个数据处理和分发系统,其中很重要的一部分是处理器(Processors)。一个处理器组合系统间的数据路由、转换或调解。处理器可以访问给定FlowFile的属性及其内容流。处理器可以在给定的工作单元中对零个或多个FlowFile进行操作,并提交该工作或回滚。
本文介绍
NiFi
处理器之一 :**Rout
eO
nAt
tribute
**。
使用
At
tribute
Expression L...
NiFi
是一款功能强大的开源的ESB软件,适合多个系统间的数据流转以及大数据的处理。这里记录下Rout
eO
nAt
tribute
(路由属性)的
使用
方法。
首先,拖拽一个Processor组件,选择Rout
eO
nAt
tribute
。如图1所示:
右键选中Configure选项进行配置,如图2所示:
重点是属性的配置,如图3所示:
其中,Propo
Apache
NiFi
是基于流程编程概念的数据流系统。它支持强大且可扩展的数据路由,转换和系统中介逻辑的有向图。
NiFi
具有基于Web的用户界面,用于设计,控制,反馈和监控数据流。它在服务质量的几个方面具有高度可配置性,例如容错与保证交付,低延迟与高吞吐量以及基于优先级的排队。
NiFi
为所有接收,分叉,加入克隆,修改,发送和最终在达到其配置的最终状态时丢弃的数据提供细粒度数据来源。
有关...
网上关于
Nifi
自定义Processor的中文资料,要么是很古老的eclipse版本,要么太过于简单,学习
Nifi
的道路确实有点看不清楚,好在找到一篇从零构造一个JsonPROCESSOR的英文文章,单纯翻译外加心得记录,
使用
的
Nifi
版本是1.3.0,希望对大家有帮助。
源地址:http://www.
nifi
.rocks/developing-a-custom-apache-
nifi
-proces
[错误]java.lang.IllegalArgumentException: Index for header 'XXX' is 1 but CSVRecord only has 1 val
[错误]java.lang.IllegalArgumentException: Index for header 'XXX' is 1 but CSVRecord only has 1 val
bootstrap 搜索建议插件 suggest + 使用FastJSON进行JSON和String转换