Apache NiFi duomenų srautų valdymas

Kas tas Apache NiFi ir kodėl jis turėtų rūpėti

Kai pradedi dirbti su duomenų integracija, greitai supranti, kad perkelti duomenis iš taško A į tašką B nėra taip paprasta, kaip atrodo. Turiu omenyje ne tik techninę pusę – reikia galvoti apie duomenų transformacijas, saugumą, klaidų tvarkymą, stebėseną. Ir čia į pagalbą ateina Apache NiFi.

Apache NiFi – tai atvirojo kodo platforma, sukurta automatizuoti duomenų srautus tarp sistemų. Projektas gimė NSA (taip, toje pačioje Nacionalinėje saugumo agentūroje) ir 2014 metais buvo perduotas Apache Software Foundation. Tai iškart pasako ką nors apie saugumo prioritetus šioje sistemoje.

Kas daro NiFi unikalų? Pirmiausia – vizualus srautų projektavimas. Užuot rašęs šimtus kodo eilučių, tiesiog tempi ir meti komponentus ant drobės, sujungi juos ir voilà – duomenų srautas veikia. Bet nesijausk apgautas – po šia paprasta sąsaja slypi labai galinga sistema.

Architektūra ir pagrindinės sąvokos

Prieš pradedant kurti srautus, verta suprasti, kaip NiFi veikia po gaubtu. Sistema sukurta ant kelių pagrindinių koncepcijų, kurias būtina įsisavinti.

FlowFile – tai pagrindinis duomenų vienetas NiFi pasaulyje. Galvok apie jį kaip apie voką su turiniu ir atributais. Turinys gali būti bet kas – JSON, XML, dvejetainiai duomenys, tekstas. Atributai – tai metadata, kuri keliauja kartu su duomenimis ir gali būti naudojama maršrutizavimui ar transformacijoms.

Processor – tai darbo arklys. Procesoriai atlieka konkrečius veiksmus su FlowFile: skaito failus, siunčia HTTP užklausas, transformuoja duomenis, rašo į bazes. NiFi turi daugiau nei 300 įvairių procesorių out-of-the-box. Ir jei to nepakanka, gali parašyti savo.

Connection – tai eilės tarp procesorių. Jos ne tik jungia komponentus, bet ir veikia kaip buferiai. Jei vienas procesorius dirba lėčiau nei kitas, connection sulaiko duomenis, kol jie gali būti apdoroti. Tai labai svarbu stabilumui.

Process Group – tai būdas organizuoti sudėtingus srautus į loginius blokus. Galima kurti hierarchiją, pakartotinai naudoti grupes, net importuoti/eksportuoti jas kaip šablonus.

Kaip sukurti pirmąjį duomenų srautą

Gerai, teorijos užtenka. Pažiūrėkime, kaip praktiškai sukurti paprastą, bet naudingą srautą. Tarkime, norime skaityti CSV failus iš katalogo, filtruoti įrašus pagal tam tikrą kriterijų ir įkelti rezultatus į PostgreSQL bazę.

Pirmas žingsnis – GetFile procesorius. Jį nutempk ant drobės ir sukonfigūruok Input Directory parametrą. Čia nurodai, iš kurio katalogo skaityti failus. Svarbu: GetFile ištrina failus po skaitymo (jei nenurodysi kitaip), todėl testuodamas naudok kopijas.

Toliau – ConvertRecord procesorius. Čia prasideda įdomybės. NiFi turi galingą record-based architektūrą, kuri leidžia efektyviai dirbti su struktūruotais duomenimis. Tau reikės sukonfigūruoti du dalykus: CSVReader (kaip skaityti duomenis) ir CSVWriter arba JSONWriter (kaip juos išvesti).

Filtravimui naudok QueryRecord procesorių. Jis leidžia rašyti SQL-tipo užklausas prieš duomenis. Pavyzdžiui: SELECT * FROM FLOWFILE WHERE age > 18. Taip, tiesiog SQL prieš CSV failus. Labai patogu.

Galiausiai – PutDatabaseRecord. Čia nurodai JDBC connection pool (kurį reikia sukonfigūruoti atskirai kaip Controller Service) ir lentelės pavadinimą. NiFi automatiškai sugeneruos INSERT statements pagal įeinančius duomenis.

Tarp kiekvieno procesoriaus nepamirštk sukurti connections. Kiekvienas procesorius turi relationships – success, failure, retry ir pan. Tu nusprendžia, kur nukreipti kiekvieną relationship. Paprastai success eina į kitą procesorių, o failure – į LogAttribute, kad matytum, kas nutiko.

Expression Language – NiFi supergebėjimas

Viena galingiausių NiFi funkcijų yra Expression Language (EL). Tai leidžia dinamiškai manipuliuoti FlowFile atributais ir turiniu be custom kodo rašymo.

Sintaksė paprasta: ${attribute_name}. Bet galimybės – milžiniškos. Pavyzdžiui:

${filename:toUpper()} – paverčia failo vardą didžiosiomis raidėmis.

${fileSize:gt(1000)} – patikrina, ar failas didesnis nei 1000 baitų.

${now():format('yyyy-MM-dd')} – grąžina dabartinę datą norimo formato.

Galima net kurti sudėtingas logines išraiškas: ${filename:startsWith('data_') :and(${fileSize:gt(0)})}

Expression Language naudojama visur – UpdateAttribute procesoriuje atributų kūrimui, RouteOnAttribute procesoriuje maršrutizavimui, net procesorių konfigūracijos parametruose. Pavyzdžiui, galima nustatyti output directory kaip /data/${now():format('yyyy/MM/dd')} ir automatiškai kurti hierarchinę katalogų struktūrą pagal datą.

Patarimas: naudok EL testą, kurį NiFi siūlo konfigūracijos languose. Jis leidžia iš karto pamatyti, ką tavo išraiška grąžins, nepaleidžiant viso srauto.

Controller Services ir pakartotinis naudojimas

Kai pradedi kurti daugiau srautų, pastebėsi, kad tam tikri dalykai kartojasi – duomenų bazės prisijungimai, SSL kontekstai, schema registry konfigūracijos. Čia praverčia Controller Services.

Controller Service – tai bendras resursas, kurį gali naudoti keli procesoriai ar net kelios process groups. Pavyzdžiui, sukuri vieną DBCPConnectionPool service su PostgreSQL prisijungimo parametrais, ir visi procesoriai, kuriems reikia tos bazės, naudoja tą patį service.

Tai ne tik patogu, bet ir efektyvu. Connection pooling reiškia, kad prisijungimai prie bazės yra pakartotinai naudojami, o ne kuriami kiekvienam užklausai. Tai drastiškai sumažina apkrovą ir pagerina našumą.

Kiti dažnai naudojami Controller Services:

AvroSchemaRegistry – saugo Avro schemas, kurias naudoja record procesoriai.

StandardSSLContextService – konfigūruoja SSL/TLS parametrus HTTPS ar kitos saugios komunikacijos.

DistributedMapCacheServer/Client – leidžia dalintis būsena tarp skirtingų NiFi mazgų klasteryje.

Controller Services konfigūruojami root lygyje arba process group lygyje. Jei nori, kad service būtų prieinamas visur, konfigūruok root lygyje. Jei tik tam tikroje grupėje – konfigūruok grupės lygyje.

Klasterizacija ir high availability

Vienas NiFi egzempliorius gali apdoroti nemažai duomenų, bet rimtoms production apkrovoms reikia klasterio. NiFi klasterizacija yra įdomi tuo, kad ji yra zero-master architektūra (nuo 1.0 versijos, kai buvo įvestas embedded ZooKeeper).

Kiekvienas klasterio mazgas gali atlikti bet kokį darbą. Nėra centrinio koordinatoriaus, kuris taptų single point of failure. ZooKeeper naudojamas tik koordinacijai – išrinkti primary node tam tikriems procesoriams, kurie turi veikti tik vienoje vietoje (pvz., GetFile, kad neduotų dublikatų).

Kai kuri procesoriai klasteryje veikia All Nodes režimu – kiekvienas mazgas apdoroja savo duomenų dalį nepriklausomai. Kiti – Primary Node režimu, kur tik vienas mazgas (primary) atlieka darbą, kiti yra standby.

Klasterio konfigūravimas nėra sudėtingas. Pagrindinis failas – nifi.properties. Svarbiausi parametrai:

nifi.cluster.is.node=true – įjungia klasterio režimą.

nifi.cluster.node.address – šio mazgo adresas.

nifi.zookeeper.connect.string – ZooKeeper serverių sąrašas.

Visi klasterio mazgai turi turėti identišką konfigūraciją (išskyrus node-specific parametrus kaip adresą). Srautų dizainas dalijamas automatiškai – pakeitimai viename mazge replikuojami į visus kitus per kelias sekundes.

Saugumas ir prieigos kontrolė

Kadangi NiFi dažnai dirba su jautriais duomenimis, saugumas yra kritinis. Sistema siūlo kelių lygių apsaugą.

Pirmiausia – autentifikacija. NiFi palaiko įvairius metodus: sertifikatais (X.509), LDAP, Kerberos, OpenID Connect. Production aplinkoje beveik visada naudojamas HTTPS su klientų sertifikatais arba SSO integracija.

Konfigūruoti HTTPS nėra trivialus dalykas. Laimei, NiFi ateina su tls-toolkit utility, kuris automatiškai sugeneruoja reikalingus sertifikatus ir keystores. Komanda atrodo taip:

./bin/tls-toolkit.sh standalone -n 'localhost' -C 'CN=admin, OU=NiFi'

Antra – autorizacija. NiFi naudoja policy-based prieigos kontrolę. Galima tiksliai nurodyti, kas gali daryti ką – peržiūrėti srautus, modifikuoti juos, valdyti controller services, matyti provenance duomenis.

Policies konfigūruojamos per UI, bet gali būti importuojamos ir iš XML failų. Tipiškai sukuriamos rolės (pvz., „Developers”, „Operators”, „Viewers”) ir jiems priskiriamos teisės.

Trečia – duomenų šifravimas. NiFi gali šifruoti duomenis poilsio metu (data at rest) naudodamas encryption procesorius kaip EncryptContent. Palaikomi įvairūs algoritmai – AES, RSA, PGP.

Nepamirštk ir provenance – NiFi seka kiekvieno FlowFile kelią per sistemą. Kas sukūrė, kas modifikavo, kas išsiuntė. Tai neįkainojama audito ir debugging tikslams, bet taip pat reiškia, kad reikia apsaugoti šiuos duomenis.

Stebėsena ir troubleshooting

Kai srautai veikia production’e, reikia žinoti, kas vyksta. NiFi siūlo keletą įrankių stebėsenai.

Bulletin Board – tai pranešimų lenta, kur rodomos klaidos ir įspėjimai. Kiekvienas procesorius, kuris susiduria su problema, sukuria bulletin. Tai pirmas dalykas, į kurį žiūri, kai kažkas neveikia.

Status History – grafikai, rodantys procesorių throughput, backpressure, task duration laikui bėgant. Galima zoomintu į konkretų laiko tarpą ir pamatyti, kada prasidėjo problemos.

Data Provenance – čia gali sekti konkretų FlowFile per visą jo gyvavimo ciklą. Matysi, kokie atributai buvo, kaip jie keitėsi, kas transformavo duomenis. Galima net peržiūrėti FlowFile turinį bet kuriame taške (jei jis dar saugomas).

Connection Queues – right-click ant connection ir pasirink „List queue”. Pamatysi, kokie FlowFiles laukia eilėje. Galima juos peržiūrėti, ištrinti, net perkrauti į kitą connection.

Integravimas su išorinėmis stebėsenos sistemomis taip pat svarbus. NiFi teikia:

Reporting Tasks – komponentai, kurie periodiškai siunčia metrikas į išorines sistemas. Yra built-in reporting tasks Prometheus, Ambari, SiteToSite protokolui.

REST API – visa NiFi funkcionalumas prieinamas per REST API. Galima kurti custom monitoring scripts, kurie renka metrikas ir siunčia į Grafana, Datadog ar kur nori.

Praktinis patarimas: sukonfigūruok MonitorActivity procesorių kritiniuose srautų taškuose. Jis generuoja FlowFile, jei tam tikrą laiką nieko nevyko. Tai puikus būdas aptikti, kad srautas „užstrigo”.

Kai viskas sueina į vieną vietą

Apache NiFi nėra paprastas ETL įrankis – tai pilnavertė duomenų orkestracija platforma. Taip, pradžioje gali atrodyti pernelyg sudėtinga, ypač jei atėjai iš tradicinių scripting sprendimų. Bet kai įsigilinsi, suprasi, kodėl tiek organizacijų pasirinko būtent NiFi.

Vizualus srautų dizainas ne tik palengvina kūrimą, bet ir dokumentaciją – pažiūrėjęs į srautą, iš karto supranti, kas vyksta. Expression Language leidžia daryti sudėtingus dalykus be kodo rašymo. Record-based procesoriai efektyviai apdoroja milijonus įrašų. Klasterizacija užtikrina scalability ir atsparumą.

Ar NiFi tinka visiems? Turbūt ne. Jei tavo duomenų srautai labai paprasti, galbūt užteks Python scripto. Bet kai pradedi integruoti daugybę sistemų, kai reikia garantuoti duomenų pristatymą, kai svarbu auditas ir saugumas – NiFi tampa natūraliu pasirinkimu.

Pradėk nuo paprastų srautų. Eksperimentuok su procesoriais. Skaityk dokumentaciją (ji tikrai gera). Prisijunk prie community – Apache NiFi turi aktyvią bendruomenę, kuri mielai padeda. Ir svarbiausia – nebijok klysti. NiFi turi puikią backpressure ir klaidų tvarkymą, tad tavo eksperimentai nesugriovs production sistemų.

Duomenų integracija niekada nebuvo paprasta, bet su tinkamais įrankiais ji tampa valdoma. Apache NiFi yra vienas iš tų įrankių, kurie tikrai verti dėmesio.

Daugiau

Parcel bundler: zero konfigūracijos build tool