Data Streaming With Apache NiFi and Twitter

Salam! Xoş gördük sizləri blogumda, bu yazımda sizlərə Apache NIFI istifadə etməklə stream dataları götürüb fərqli sistemlərə nece yükləmək olar onu izah edəcəyəm. Daha sonra Grafana istifadə edərək gələn datanı real time vizual olaraq görəcəyik.

İlk öncə Apache NIFI haqqında qısaca məlumat verim. Deməli NiFi, ayrı-ayrı sistemlər arasında məlumatların hərəkətini avtomatlaşdırmaq üçün inteqrasiya olunmuş ETL platformadır. NiFi istənilən mənbə və istənilən hədəf arasında məlumatların hərəkətini idarə etməyi asanlaşdıran real vaxt rejimində nəzarəti təmin edən ETL alətidir. Daha ətraflı burdan oxuya biləriniz.

Bu postumuzda NiFi istifadə edərək bugün nə kimi bir dataflow dizayn edəcəyik ondan danışım və beləliklə NiFi nədir daha da anlaşılır olacaq sizlərə. Deməli biz istəyirik ki, Twitter-dən real time atılan tweet-ləri istədiyimiz formatta götürək və fərqli hədəflərə yazaq. Bunun üçün biz ilk başda Twitter-dən datanı bizə verilən JSON formatta götürəcəyik daha sonra onu transformasiya edib fərqli hədəflərə yazacağıq və bu hədəflərimiz də NoSQL, RDBMS, HDFS, EMAIL, SLACK olmaq üzrə 5 fərqli tipdən ibarət olacaq. Ən sonda isə real time gələn datanı Grafana ilə vizual şəkildə göstərməyi hədəfləyirik.

İlk öncə sizlərə APACHE NIFI necə yüklənilir və necə konfigurasiya olunur onu izah edim ki bu aləti işlək hala gətirək.

1 – İlk öncə burdan NiFi yükləyirik.

Mən MAC istifadəçisi olduğum üçün özümə uygun formatı seçib yükləyirəm daha sonra unzip edib install mərhələsinə başlaya bilərik.

Bütün executable fayllar unzip etdiyimiz folderdə olan bin folderinde yerləşir . Linux üçün *.sh windows üçün isə .bat faylları bizim üçün lazım olacaq.

Application start etmədən öncə bütün konfirqurasiyaları conf folderi altında olan fayllarda edə bilərsiniz bizim üçün əsas nifi.properties faylında düzgün hostport qeyd etmək kifayətdir və mən bu parameterləri bu cür dəyişib faylı save edirəm.

nifi.web.https.host=127.0.0.1

nifi.web.https.port=8443

Daha sonra bin folderine keçib NiFi start edə bilərik

cd bin

./nifi.sh start ve yaxud ./nifi.sh run

Bu əmrləri çalışdırdıqdan sonra brauzerden https://127.0.0.1:8443/nifi/ adresinə daxil olmağa çalışın. NiFi start olmağı bir müddət çəkə bilər.

Ən sonda ./nifi.sh status əmri ilə NIFI application haqqında məlumat əldə edə bilərsiniz.

Əgər hər hansı problemlə üzləşsəniz və ya yuxarıdakı link açılmazsa o zaman logs folderində yerləşən nifi-app.log faylını açıb ən son hissələrdə problemi təyin etməyə çalışın,ən çok rast gəlinən problemlər ya fayl permissionlarla olacaq ya da java_home düzgün qeyd etmədiyinizə görə olacaq.

Əgər heç bir problem yoxdursa və application start olunsa sizi yuxarıdakı linkdə login pəncərəsi qarşılayacaq və bu halda username password hissəsini bu əmr ilə NIFI loglardan əldə edə bilərsiniz.  

nifi-app*log | grep Generated

Bu əmr ilə siz default username password əldə edə bilərsiniz. Əgər yeni username və şifrə qeyd etmək istəsəniz aşağıdakı əmri istifadə edə bilərsiniz.

./bin/nifi.sh set-single-user-credentials USERNAME PASSWORD

Nifi Up olduqdan sonra keçək NiFi-dan artıq istifadə etməyə. Əgər bütün step-lər düzgün şəkildə etmisizinsə https://127.0.0.1:8443/nifi/ sizə NiFİ login səhifəsini açacaq daha sonra uyğun istifadəçi adı və şifrə yazıb NiFi-a daxil oluruq.

Nifi-a daxil olduqda bizi belə bir pəncərə qarşılayır:

Nifi DataFlow Workbench

Yuxarıda qırmızı ilə işarələnən hissə bizim üçün NiFi komponentləri saxlayır bunlara prosessorlar, prosessor grupları, funnel və s aiddir. Aşağısında isə bizim dataflow-da hansı komponentlər işləkdir hansılar dayanıb və ya error varsa bu haqda statistik məlumatlar yer alır.

Bugün biz Nifi isifadə edərək istəyirik ki, Twitter-dən real time atılan tweet-ləri istədiyimiz formatta götürək və fərqli hədəflərə yazaq. Bunun üçün biz ilk başda Twitter-dən datanı bizə verilən default JSON formatta götürəcəyik daha sonra onu istəyimizə uyğun transformasiya edib fərqli hədəflərə yazacağıq və bu hədəflərimiz də NoSQL, RDBMS, HDFS, EMAIL, SLACK olmaq üzrə 5 fərqli tipdən ibarət olacaq.

İlk öncə qeyd edim ki, NiFi-da bütün componentlər Processor adlanır və 300-ə yaxın fərqli processorlar mövcuddur. İlkin olaraq biz GetTwitter processorunu drag/drop edib edit edirik. Burda bizə Twitter API lazım olacaq və bunu əldə etmək üçün developer.twitter.com-a daxil olub müvafiq suallara cavab verdikdən sonra istəyinizə bir neçə gün ərzində cavab veriləcək əgər əlavə məlumatlar lazım olarsa mail vasitəsilə sizinlə əlaqə saxlayacaqlar.

Daha sonra twitter-dən sizə verilən consumer key, consumer secret, access token və access token secret-i müvafiq xanalara əlavə edirik və hansı tweet-ləri track etmək istəyirsinizsə həmin şərtləri aşağıda olan seçimlərə əlavə edərək GetTwitter processorunu hazır hala gətiririk. Processor-un SETTINGS, SCHEDULING,PROPERTIES,COMMENTS tablarında yer alan konfiqurasiyaları kəşf etməyi sizin öhdənizə buraxıram 🙂

GetTwitter Processor

Indi gələk twitter-dən gələcək olan default JSON datanı transform etmək hissəsinə, bunun üçün bizə JoltTransformJSON processoru lazım olacaq. Bu zaman ilk öncə sample JSON data və bizim istədiyimiz strukturu qeyd etməklə processor-a gələn datanı necə transform etməli olduğunu bildiririk.

JoltTransformJSON prosessorunu edit edib Advanced hissəsinə keçirik.

JoltTransformJSON Processor

1ci stepdə hazır ettiyimiz GetTwitter processorunu run ettikten sonra Twitterden gələn hər hansı JSON datanı input olaraq qeyd edib transform edirik.

JOLT Transformation in NIFI

JoltTransformation hissədə biz input JSON-u hansı formada transform edəcəyiksə o formatı hazırlayırıq və aşağıda test üçün input data göndərib output-un bizim formata uyğun olub olmadığını yoxlayırıq. 

Düzgün formatta transform etdikden sonra JoltTransformJSON processorunu save edib keçirik növbəti addıma.

Növbəti addımda biz transform etdiyimiz JSON datanın içərisində olan parameterlərin hər birini və ya sizin istəyinizə uyğun olar hansılar lazımdırsa onları müəyyən dəyişkənlərə(variable) mənimsədirik ki, növbəti addımlarda bu datadan hər tərəfdə istifadə edə bilək. Bunun üçün EvaluateJSONPath processorundan istifadə edəcəyik.

EvaluateJSONPath Processor

Bu stepi də hazırladıqdan sonra demək olar ki biz artıq Dataflow-muzun ExtractTransformation hissələrini tam hazır hala gətirmişik. Ümumi olaraq desək yuxarıdakı steplərdə biz Twitterdən datanı extract etdik daha sonra onu öz istəyimizə uygun transform edib gələn parameterləri xüsusi dəyişkənlərə mənimsətdik. Əgər yuxarıdakı stepləri düzgün şəkildə etmisinizsə nəticə olaraq belə bir dataflow əmələ gəlmiş olacaq.

Extract & Transformation by Nifi

Növbəti step əlimizdə olan data və dəyişkənləri müxtəlif hədəflərə yazmaqdan ibarətdir. Mən bu hədəflər üçün NoSQL – MongoDB, RDBMS – POSTGRESQL, EMAIL, SLACK, HDFS – HADOOP təyin etmişəm və hər birini necə sazlayıb NIFI-a qoşmaq lazımdır ətraflı izah edəcəyəm.

Müxtəlif hədəflərə keçməzdən öncə datanı fərqli hədəflərə yönəltmək üçün RouteOnAttribute prosessorundan istifadə edəcəyik. Mən bu prosessorda 4 fərqli attribute yaradıb hər birinə xusüsi şərt verirəm ki tweetləri qruplaşdıra bilsin və istədiyimiz qrupu istədiyimiz hədəfə yönəldə bilsin.

RouteOnAttribute Processor

Məsələn tweetdataday2021 attribute üçün bücür bir şərt verirəm: ${text:isEmpty():not():and(${text:contains(‘dataday’)})}

Burdakı “text” bir əvvəlki stepdə yaratmış olduğumuz “text” dəyişkənidir və biz bir növü deyirik ki JSON-dan “text” dəyişkəninə mənimsədilmiş məlumat NULL deyilsə və tərkibində “dataday” sözü varsa bu tweetləri tweetdataday2021 grupuna at . Digər gruplar üçün də eyni məntiqdir. 

Tweet Gruplarımız da hazır olduğuna görə başlayaq bu qruplardakı dataları fərqi hədəflərə yazmağa. İlk öncə başlayaq document based NoSQL baza olan Mongo DB ilə. MongoDB on-premise install etməmisinizsə cloud versiyasından istifadə edə bilərsiniz nəticədə connection string eyni olacaq sadəcə ip-lər və database adları fərqlənəcək. Mən öz MongoDB bazamı cloud üzərində config etmişəm. Eynisini sizdə bu linkdən daxil olaraq edə bilərsiniz bu hissənin detallarını sizin öhdənizə buraxıram etməli olduğunuz sadəcə qeydiyyatdan keçib yeni 1 cluster qurmaq və yeni baza yaratmaq olacaq daha sonra sizə verilən connection string-i götürüb gəlirsiniz NİFi -a və PutMongoDb processorunu müvafiq xanaları dolduraraq sazlayırıq.

PutMongoDB processor

PutMongoDB processoru hazır olduqdan sonra RouteOnAttribute-dan drag/drop edib hansı tweet qrupundakı datalar MongDB-e yazılacaqsa seçim edib Apply edirik. Mən dataopsPashaBank tweetlərini MongoDB-ə yazmağa qərar vermişəm aşağıdakı sazlamada da bunu görə bilərsiniz.

Nifi Route to MongoDB

Aşağıdakı screen DATAOPS TO MONGO DB queue-a aiddir.

Example Nifi queue config

Artıq MongoDB hissəsi hazırdır. 

Növbəti addım gələn tweetlərin bir hissəsini EMAIL vasitəsilə məsul şəxsə göndərməkdir. Bunun üçün PutEmail prosessorundan istifadə edəcəyik və sazlaması aşağıdaki kimidir.

PutEmail Processor
PutEmail Processor Davamı

Tweet TEXT:

TWEET: ${text}

BY: ${screen_name}

TWEET TIME: ${created_at}

Best Regards,

APACHE NIFI

Gmail -dən Nifi ilə mail göndərə bilməyiniz üçün gmail-in less secure app seçimini aktiv etməlisiniz. Burdan daha ətraflı oxuya bilərsiniz.

Email hissəsi də hazır olduğuna görə artıq keçək bilərik PostgreSQL hissəsini sazlamağa. Bunun üçün PutSQL prosessorundan istifadə edəcəyik və aşağıdaki kimi sazlama edib gələn bütün tweetləri burada saxlayacağıq. İlk öncə postgresql bazamıza qoşulmaq üçün JDBC Connection Pool set etməliyik bu həmçinin yuxarıda qeyd olunan MongoDB üçün də keçərlidir daha sonra isə SQL Statement hissəsində hansı cədvələ data yüklənəcəksə o cədvəl üçün insert statement yazırıq.

PutSQL Processor

İlk öncə JDBC Connection Pool hissəsinindən DBCPConnectionPool seçib sag tərəfdə yerləşən –> oxa klikləyirik və açılan NiFi controller servisləri siyahısında postgresql bazamızın sazlamasını edirik.

Controller Service Lists

PostgreSQL bazamızın connection string hissəsini yazırıq daha sonra postgresql üçün JDBC driver path göstəririk və ən son bazaya qoşulmaq üçün istifadəçi adı şifrə yazıb servisi yadda saxlayırıq.

Controller Service Configuration Example

Beləliklə PostgreSQL hissəsi də hazırdır. Əgər bütün addımları doğru etmisinizsə belə bir DataFlow dizaynınız olmuş olacaq.

Nifi DataFlow

Davam edək HADOOPSLACK hədəflərinə data göndərmək hissəsinə.

Hadoop -a datanı yazmadan öncə mən gələn JSON formatındaki datanı müəyyən sayda birləşdirib AVRO formata çevirəcəm və daha sonra HDFS-də saxlayacağam.

Bunun üçün MergeRecord processorundan istifadə edib ilk başda 10 mesajı merge edirəm və daha sonra AVRO formatda HDFS-ə göndərirəm.

MergeRecord Processor

Gələn data JSON formatında olduğu üçün record reader olaraq JsonTreeReader və AVRO-ya yazdığım üçün də Record writer AvroRecordSetWritter secirəm və bu servisləri yene —> klik edib servis siyahısında aktiv hala gətirirəm.

Apache Nifi Controller Services

Daha sonra PutHDFS processorundan istifadə edərək AVRO formatında olan faylları HDFS-ə yazıram. Bu processor üçün əsas 2 parameter sazlamaq lazımdır. Hadoop install etdiyiniz zaman sazladığınız core-site.xml, hdfs-site.xml faylların harda yerləşdiyini qeyd etməlisiniz və daha sonra AVRO fayların harda saxlanılacağını qeyd etməlisiniz.

PutHDFS Processor

Beləliklə HDFS hissəsi də hazırdır. Ən son isə dərhal atılan tweetləri mesaj formatında SLACK ilə ala bilmək üçün müəyyən tweetləri digitalheights slack kanalına göndərək. Bunun üçün bizə ilk öncə SLACK WEBHOOK URL lazımdır daha ətraflı burdan oxuya bilərsiniz.

PutSLACK processor vasitəsilə web hook url , kanalımızı və mesajımızı sazladıqdan sonra SLACK kanalımıza da mesajları göndərə bilərik.

PutSLACK Processor

TEXT:

New Tweet about digitalheights:

Text: ${text} by: ${screen_name} at ${created_at}

Beləliklə Nifi Tərəfdə bütün işlərimizi yekunlaşdırdıq və ən son DataFlow-muz bu halda görünür.

Gördüyü iş isə:

Twitter-dən real time atılan tweet-ləri istədiyimiz formatta götürərək fərqli hədəflərə yazır. Bu hədəflərimiz də NoSQL RDBMS HDFS EMAİL SLACK olmaq üzrə 5 fərqli tipdən ibarətdir.

Apache NIFI TWITTER data streaming DataFlow

Ən son gəlin bütün queue-larda olan mesajları eyni zamanda proses olunmuş bütün datanı və prosessorlarımızın iş yükünü göstərən sadə bir data vizuallaşdırması edərək daha rahat bir formatta bütün datanı Grafana-da göstərək.

Bunun üçün on-premise Grafana yükləyək və arxada automatic olaraq Nifi-dan log datalarını götürə bilməsi üçün Prometheus serveri start edək.

Grafana install və start haqqında daha ətraflı burdan oxuya bilərsiniz.

Prometheus install və start haqqında daha ətraflı burdan oxuya bilərsiniz.

Vəəəəə budur Grafana ilə hər gələn datanı real time vizual olaraq görə bilərik.

Grafana NIFI Statistics

Bu səfərlik bu qədər. Növbəti Nifi postlarda daha fərqli prosessorlardan danışmaq ümidi ilə hələlik 🙂