fadi

Logo

FADI - Ingest, store and analyse big data flows

View the Project on GitHub cetic/fadi

Use Case - Streaming ingestion with Apache Kafka

This page provides documentation on how to use the FADI big data framework using a sample use case: monitoring CETIC offices building.

FADI sample use case - building monitoring

In this example, we will ingest temperature measurements from sensors and push them into the Apache Kafka message broker. We are then going to store the data from the broker into a database. Finally, we display the temperature values in a simple dashboard.

1. Install FADI

To install the FADI framework on your workstation or on a cloud, see the installation instructions.

The components needed for this use case are the following:

Those components are configured in the following sample config file.

The following instructions assume that you deployed FADI on your workstation inside minikube.

Unless specified otherwise, all services can be accessed using the username and password pair: admin / password1 , see the user management documentation for detailed information on how to configure user identification and authorization (LDAP, RBAC, …).

2. Prepare the database to store measurements

First, setup the datalake by creating a table in the postgresql database.

To achieve this you need to:

3 Prepare Nifi to inter-connect the different components.

“An easy to use, powerful, and reliable system to process and distribute data.”

Apache Nifi provides data ingestion and processing mechanism (to e.g. connect a database, REST API, csv/json/avro files on a FTP, …): in this case we want to read the temperature sensors data from our HVAC system and store it in a database while using a broker to transmit the data between the various processing steps.

To start, head to the Nifi web interface, if you are using minikube, you can use the following command :

minikube service -n fadi fadi-nifi

Nifi web interface

For more information on how to use Apache Nifi, see the official Nifi user guide and this Awesome Nifi resources.

3.1 Measurements ingestion

Temperature measurements from the last 5 days (see HVAC sample temperatures csv extract) are ingested:

measure_ts,temperature
2019-06-23 14:05:03.503,22.5
2019-06-23 14:05:33.504,22.5
2019-06-23 14:06:03.504,22.5
2019-06-23 14:06:33.504,22.5
2019-06-23 14:07:03.504,22.5
2019-06-23 14:07:33.503,22.5
2019-06-23 14:08:03.504,22.5
2019-06-23 14:08:33.504,22.5
(...)

Now we need to tell Nifi to read the csv file and push the measurements in broker, this can be done by creating the following components in Nifi:

Processors

To create processor, make a drag and drop from the following button :

Grafana

We need to configure two processors:

  1. InvokeHTTP processor:
    • right-click on the processor > Click on Configure >
      • On the Settings tab > Automatically Terminate Relationships : all except Response
      • On the Properties tab > Remote url: https://raw.githubusercontent.com/cetic/fadi/master/examples/kafka/sample_data.csv
      • On the Scheduling tab > Run Schedule: 120s (this will download the sample file every 120 seconds)
  2. PublishKafka processor:
    • right-click on the processor > Click on Configure > go to Properties tab >
      • Kafka Brokers: fadi-kafka:9092
      • Topic Name: nifi-kafka

Output Port

To create output port, make a drag and drop from the following button :

Nifi button

Create two output ports : success_port and failure_port

Connections

Here is the target result :

Nifi Ingest CSV and store in PostgreSQL

See also the Nifi “Kafka_to_PostgreSQL” template that corresponds to this example.

To reuse the provided template (instead of designing your own template), you can:

3.2 Measurements digestion

Processors

To create the processor, drag and drop from the following button :

Grafana

We need to configure two processors:

  1. ConsumeKafka processor
    • right-click on the processor > Click on Configure > go to Properties tab >
      • Kafka Brokers: fadi-kafka:9092
      • Topic Name: nifi-kafka
      • Group ID: fadi
      • Offset Reset: earliest
  2. PutDatabaseRecord processor:
    • right-click on the processor > Click on Configure > go to Settings tab > uncheck all the box on the list Automatically Terminate Relationships
    • right-click on the processor > Click on Configure > go to Properties tab >
    • Statement Type: INSERT
    • Schema Name > public
    • Table Name > example_basic
    • Translate Field Names > false * right-click on the processor > Click on Configure > go to Properties tab > Record Reader > Create a new service > CSV Reader > click on Go To (the litlle arrow on the right) > Click on Configure(the gear on the right side) > Properties > set Treat First Line as Header to true * right-click on the processor > Click on Configure > go to Properties tab > Database Connection Pooling Service > DBCPConnectionPool > Click on Go To (the litlle arrow on the right) > Click on Configure(the gear on the right side) > Properties > set up the following values:
    • Database Connection URL: jdbc:postgresql://fadi-postgresql:5432/postgres?stringtype=unspecified
    • Database Driver Class Name: org.postgresql.Driver
    • Database Driver Location(s): /opt/nifi/psql/postgresql-42.2.6.jar
    • Database User: admin
    • Password: password1
    • Enable service by clicking on the lightning icon.

Output Port

To create output port, make a drag and drop from the following button :

Grafana

Create two output ports : success_port and failure_port

Connections

Here is the result you need to arrive to:

Nifi Ingest CSV and store in PostgreSQL

See also the Nifi “CSV_to_Kafka” template that corresponds to this example.

To reuse the provided template (instead of designing your own template), you can:

For more information on how to use Apache Nifi, see the official Nifi user guide and this Awesome Nifi resources.

Finally, start the Nifi flow in the operate window.

4. Display dashboards and configure alerts

This step is similar to the basic use case

5. Explore

This step is similar to the basic use case

6. Process

This step is similar to the basic use case

7. Summary

In this use case, we have demonstrated a streaming configuration for FADI, where we use various services to ingest, store, analyse, explore and provide dashboards and alerts.