今天要帶大家做另外一個簡單的場境應用,我們繼續沿用昨天所處理的 parquet File 來做今天的小實作,大致上今天要實作的內容如下:
讀取 local 端的 parquet file,並且依據 Stage 欄位的值,只選擇值 Armor、Champion 和 Mega 的資料送到 AWS SQS。
這邊的事前準備就是記得在 AWS 建立一個要來用的 SQS,並且複製
SQS 的 URL
,實際的畫面如下:
How to Build?
這次的範例的 data pipeline 大致會長的如下圖:
GetFile
這個 Processor 是可以讓我們讀取 Local 端的某一個 Folder 下的檔案,來看一下如何設定:
這邊以我的範例來說,我是將前一天實作完的檔案暫存到
/tmp/data
這個路徑下,所以只要在
Input Directory
這個 Property 設定好,他就會將底下的檔案讀上來做使用。
SplitRecord
因為前一個 Processor 只是將檔案讀取變成一個 FlowFiles 而已,尚未將裡面的資料取出來,所以我們可以透過該 Processor 做到這件事情:
原先的檔案為 Parquet 格式,所以我們以
ParquetReader
的方式來做讀取,並且以
一筆 Record
為單位,接著再以
JsonRecordSetWriter
來轉換成 Json 格式來給下游 Processor 做處理,因為後續我們需要透過 Json 格式來做欄位的判斷。
因此我們可以看到經過這個 Processor 的 Content,都會轉換成 Json 格式,且一筆為單位,內容如下:
EvaluateJsonPath
這個 Processor 是可以讓我們去解析將原先 Content 的某一個欄位轉換成 Attribute,所以來看一下設定:
Destination
: 代表要轉換的目的地,這邊我們先選擇城
flowfile-attribute
Stage
: 是我新增的一個 Property,後面的
$.Stage
代表他會去解析進來的 FlowFiles 中的 Stage 這個欄位,並且帶到名為 Stage 的 Attribute。
所以經過該 Processor 的 Flowfiles,我們會發現都會多帶一個名為 Stage 的 Attribute:
有了 Stage 這個 Attribute 之後,原則上該 Attribute 的值會跟 Content 內的 Stage 這個欄位值相同,接著就可以做過濾的動作,所以就會用到 RouteOnAttribute 這個 Processor。
RouteOnAttribute
該 Processor 是讓我們可依據 FlowFiles 的狀況動代增加下游的 Connection 的 Route,我們先看一下原先的 Processor 設定只會有
unmatched
的 Route:
但是我們可以在 Property 增加更多的條件,如下設定:
這邊我們加入了
7 個 Property
,分別對應的是:
0, Baby
1, In-Training
2, Rookie
3, Champion
4, Ultimate
5, Mega
6, Ultra
7, Armor
一但設定完成之後,我們會發現 Route 會多出這些剛剛設定的 Property Name:
接著我們在連接下游 Processor 的時候,就可以選定符合哪些條件的 Route 可以連接到下游的 Processor。
以接下來的 PutSQS 的範例為例,我們只需要Armor、Champion 和 Mega 的資料送到 AWS SQS 即可其他的都不要,所以既勾選對應的 Route 即可。
接著就可以看到 RouteOnAttribute 和 PutSQS 之間就只會有這三個的 Connection:
然後其他不會用到的我們先傳送到 Wait Processor。
PutSQS
一切準備就緒之後,接下來就可以設定 PutSQS 這個 Processor,還記得一開始要你們先事前建立好 SQS,這邊就會用到了:
Queue URL: 這裡就填上你剛剛建立好 SQS URL
AWS Credentials Provider service: 對接好 AWS Controller Service
Region: 設定好 AWS Region
上述的設定完成,就可以將符合條件的資料送到 AWS SQS 了。
Wait 這個 Processor 其實就是一個暫停
的 Processor。通常會是什麼時候會用他呢?
通常用於開發的階段,因為 Date Pipeline 是由多個 Processor 所建構而成的,所以會需要一個一個 Processor 做設定與測試,所以會在需要做測試的 Processor 的下游接一個 Wait Processor。
另一個用法就是會用在不需要的資料,以這次的範例來說,我們就可以把不符合條件的 FlowFiles 先送到 Wait,主要是用來確定資料確實有依照我們的限制來做判斷,來進一步地決定下游的流向。
上述就是我帶給各位的第二個範例,這些看似簡單的 Processor,其實都是很常用的,所以希望透過這兩天的小實作那大家可以對於 NiFi 在做 Data Pipeline 的設定與流程可以有更多的體悟與理解。
那明天會介紹一個國外企業是如何使用 Apache NiFi 的小案例分享,以及他的架構是如何做的,對於未來要導入該 Tool 的企業或許有一定的幫助。
Reference
Apache NiFi Document