FADI - Ingest, store and analyse big data flows
This page provides documentation on how to use the FADI big data framework using a sample use case: monitoring CETIC offices building.
In this example, we will ingest temperature measurements from sensors and push them into the Apache Kafka message broker. We are then going to store the data from the broker into a database. Finally, we display the temperature values in a simple dashboard.
To install the FADI framework on your workstation or on a cloud, see the installation instructions.
The components needed for this use case are the following:
Those components are configured in the following sample config file.
The following instructions assume that you deployed FADI on your workstation inside minikube.
Unless specified otherwise, all services can be accessed using the username and password pair: admin
/ password1
, see the user management documentation for detailed information on how to configure user identification and authorization (LDAP, RBAC, …).
First, setup the datalake by creating a table in the postgresql database.
To achieve this you need to:
minikube service -n fadi fadi-adminer
Access to the adminer service and to the postgreSQL database using the following credentials:
In the adminer Browser, launch the Query tool by clicking “SQL command”.
Copy/Paste the table creation script in the Query Editor.
Execute the creation query by clicking on the Execute
command.
example_basic
is created in the Tables
field of adminer Browser.“An easy to use, powerful, and reliable system to process and distribute data.”
Apache Nifi provides data ingestion and processing mechanism (to e.g. connect a database, REST API, csv/json/avro files on a FTP, …): in this case we want to read the temperature sensors data from our HVAC system and store it in a database while using a broker to transmit the data between the various processing steps.
To start, head to the Nifi web interface, if you are using minikube, you can use the following command :
minikube service -n fadi fadi-nifi
For more information on how to use Apache Nifi, see the official Nifi user guide and this Awesome Nifi resources.
Temperature measurements from the last 5 days (see HVAC sample temperatures csv extract) are ingested:
measure_ts,temperature
2019-06-23 14:05:03.503,22.5
2019-06-23 14:05:33.504,22.5
2019-06-23 14:06:03.504,22.5
2019-06-23 14:06:33.504,22.5
2019-06-23 14:07:03.504,22.5
2019-06-23 14:07:33.503,22.5
2019-06-23 14:08:03.504,22.5
2019-06-23 14:08:33.504,22.5
(...)
Now we need to tell Nifi to read the csv file and push the measurements in broker, this can be done by creating the following components in Nifi:
To create processor, make a drag and drop from the following button :
We need to configure two processors:
Configure
>
Settings
tab > Automatically Terminate Relationships
: all except Response
Properties
tab > Remote url
: https://raw.githubusercontent.com/cetic/fadi/master/examples/kafka/sample_data.csv
Scheduling
tab > Run Schedule
: 120s (this will download the sample file every 120 seconds)Configure
> go to Properties
tab >
Kafka Brokers
: fadi-kafka:9092
Topic Name
: nifi-kafka
To create output port, make a drag and drop from the following button :
Create two output ports : success_port
and failure_port
Response
Connection:
InvokeHTTP
processor to PublishKafka
Response
Success
Connection:
PutDatabaseRecord
to Output Success Port
success
Failure
Connection:
PutDatabaseRecord
to Output Failure Port
failure
Here is the target result :
See also the Nifi “Kafka_to_PostgreSQL” template that corresponds to this example.
To reuse the provided template (instead of designing your own template), you can:
Upload template
in the Operate frame, select the template, and upload it.Start
.To create the processor, drag and drop from the following button :
We need to configure two processors:
ConsumeKafka
processor
Configure
> go to Properties
tab >
Kafka Brokers
: fadi-kafka:9092
Topic Name
: nifi-kafka
Group ID
: fadi
Offset Reset
: earliest
PutDatabaseRecord
processor:
Configure
> go to Settings
tab > uncheck all the box on the list Automatically Terminate Relationships
Configure
> go to Properties
tab >INSERT
public
example_basic
false
* right-click on the processor > Click on Configure
> go to Properties
tab > Record Reader > Create a new service
> CSV Reader
> click on Go To
(the litlle arrow on the right) > Click on Configure
(the gear on the right side) > Properties
> set Treat First Line as Header
to true
* right-click on the processor > Click on Configure
> go to Properties
tab > Database Connection Pooling Service > DBCPConnectionPool
> Click on Go To
(the litlle arrow on the right) > Click on Configure
(the gear on the right side) > Properties
> set up the following values:jdbc:postgresql://fadi-postgresql:5432/postgres?stringtype=unspecified
org.postgresql.Driver
/opt/nifi/psql/postgresql-42.2.6.jar
admin
password1
To create output port, make a drag and drop from the following button :
Create two output ports : success_port
and failure_port
Response
Connection:
InvokeHTTP
processor to PublishKafka
Response
Success
Connection:
PutDatabaseRecord
to Output Success Port
success
Failure
Connection:
PutDatabaseRecord
to Output Failure Port
failure
Here is the result you need to arrive to:
See also the Nifi “CSV_to_Kafka” template that corresponds to this example.
To reuse the provided template (instead of designing your own template), you can:
Upload template
in the Operate frame, select the template, and upload it.Configuration
View configuration
of DBCPConnectionPool
controller service.Properties
tab, complete the password
field with password1
CSVReader
and DBCPConnectionPool
controller services.Start
.For more information on how to use Apache Nifi, see the official Nifi user guide and this Awesome Nifi resources.
Finally, start the Nifi flow in the operate window.
This step is similar to the basic use case
This step is similar to the basic use case
This step is similar to the basic use case
In this use case, we have demonstrated a streaming configuration for FADI, where we use various services to ingest, store, analyse, explore and provide dashboards and alerts.