Integrate with External Analytics Service

The BlackBerry IoT Platform provides a Search API to query for data using the Elasticsearch DSL. While this is powerful, there may be cases where the data being collected needs to be fed into an external analytics tool outside of the BlackBerry IoT Platform for a customized solutions.

The platform has a feature called Firehose that allows incoming data to securely stream into external systems. These systems could be analytics tools, regional data centers, etc.


We will continue to build on top of the Monitor a fleet of devices use case. As location data is being collected from a fleet of vehicles, the data can be streamed to multiple firehose consumers.


1. Configure firehose subscriptions

Data residing on the BlackBerry IoT platform vehicles (devices), which run the QNX CAR platform, are secured and not accessible for external systems, unless Firehose Subscriptions are configured. Using a firehose explicitly whitelists the data that is streamed to authorized systems.

  1. Create a Firehose to receive location.status updates from QNX Car.

  2. Add subscriptions to the Firehose. You use subscriptions to control what type of events arrive through the firehose. For this scenario, location.status data from all devices running the QNX CAR platform is all that's required.
    More details on the Subscription types are available at Working with Firehoses.

2. Grant capabilities to a firehose consumer

At this point, the firehose is set up to stream event data from all devices (vehicles) running the application called, QNX CAR Application with the data_id of location.status. Since capabilities have not been granted yet, no other device is able to consume data from the firehose that was created..


For the firehose to be useful, it needs to allow a device to attach to it. Devices are allowed to attach to the firehose after then are granted the firehose.attach capability. The act of attaching to a firehose involves a client steaming data from an HTTP GET connection. An example client is seen later.

Grant the firehose.attach capability to user.


3. Get an access token for the firehose consumer

Follow the instructions in Understanding Authentication to obtain an access token. The Firehose consumer uses the access token to attach to the defined firehose and receive event data.

Note: The access token is used in the following steps.


4. Get the firehose identifier

A firehose consumer listens and consumes event data using the Firehose API. As part of the using the API, the firehose identifier (firehose_id) must be known.


Checkpoint

At this point, the following information is known:

5. Develop the firehose consumer

The firehose consumer (implemented in NodeJS) is used in this use case for simplicity. You can develop a firehose consumer in any language. The code below is set up to listen to the created firehose, and writes data to Elastic Search. Since the data is available in Elasticsearch, Kibana4 can be used to visualize the data. Here is a high level summary of what occurs:

Ensure that the following attributes in the code below is setup correctly.

var FIREHOSE_URL        = 'https://bbryiot.com/api/1/firehoses/{firehose_id}/data';
var ACCESS_TOKEN        = '{ACCESS_TOKEN}';
'use strict';
var request       = require('request');

var FIREHOSE_URL        = 'https://bbryiot.com/api/1/firehoses/{firehose_id}/data';
var ACCESS_TOKEN        = '{ACCESS_TOKEN}';

//#############################################################
//Setup Stream Writer to handle data from the Firehose
//#############################################################

var NopStream = function () {
  this.chunk = '';
  this.readable = true;
  this.writable = true;
};
// Inherit from base stream class.
require('util').inherits(NopStream, require('stream'));

NopStream.prototype.write = function (chunk) {
  var str = chunk.toString();
  var notif;

  if (str === '\r\n') {
    notif = null;
    this.emit('data', notif);
  } else {
    try{
      var temp = this.chunk + str;
      notif = JSON.parse(temp);
      this.chunk = '';
      this.emit('data', notif);
    } catch(e){
      this.chunk += str;
      this.emit('data', null);
    }
  }
  return true;
};

NopStream.prototype.end = function () {
  var args = Array.prototype.slice.call(arguments, 0);
  this.emit.apply(this, ['end'].concat(args))
};


//#############################################################
//Setup stream connection and wait for data.
//#############################################################

var options = {
  url: FIREHOSE_URL,
  method: 'GET',
  headers: {
    'Authorization': 'Bearer ' + ACCESS_TOKEN
  }
}
var stream = new NopStream();
request(options).pipe(stream);

stream.on('data', function(data){
  if (data) {
    console.log(data);
  }
});

stream.on('end', function(data){
  //Do something when the firehose connection is closed.
});

You can download the code above from here. To run the code, extract the zip file, replace the ACCESS_TOKEN, and FIREHOSE_URL in the index.js file, and then run these commands:

npm install
node index.js


6. Working with location data from a device

If the map application from the QNX CAR platform is running and navigating, data starts to appear though the firehose and is visible in the console where you ran your NodeJS code.

The structure of the data looks like this:

{
  "token": "your firehose token",
  "data": {
    "orgId": "91baf36b-4ce8-11e5-9e2f-6d55ca06319a",
    "appId": "c81721da-4cec-11e5-836c-af54b9e6a100",
    "deviceId": "9dbac0ba-4ced-11e5-ae1b-373b279dfab6",
    "eventType": "data",
    "eventSubType": "create",
    "eventMeta": {
      "device_id": "9dbac0ba-4ced-11e5-ae1b-373b279dfab6",
      "standard": "",
      "category": {},
      "id": "location.status",
      "app_id": "c81721da-4cec-11e5-836c-af54b9e6a100",
      "created_on": 1440702544532,
      "recorded_on": 1434598918222,
      "values": {
        "destination": {
          "country": "Canada",
          "province": "Ontario",
          "city": "Ottawa",
          "street": "Madhu Cres",
          "number": "215",
          "longitude": -75.69891600000001,
          "latitude": 45.358923
        },
        "eta": 1434600290,
        "maneuver": {
          "street": "Bankfield Rd",
          "turn_type": "tr-r"
        },
        "maneuver_distance": 389,
        "maneuvers": [
          {
            "street": "Bankfield Rd",
            "command": "tr-r",
            "distance": 389
          },
          {
            "street": "Prince of Wales Dr",
            "command": "tr-l",
            "distance": 2073
          },
          {
            "street": "Prince of Wales Dr",
            "command": "tr-r",
            "distance": 7183
          }
        ],
        "navigating": true,
        "speed_limit": 27,
        "total_distance_remaining": 18290,
        "total_time_remaining": 23
      }
    }
  }
}

Make a note of the token attribute in the payload from the firehose because it's used in the next step to acknowledge event data.

7. Acknowledge event data from the firehose

To avoid receiving duplicate event data ( data that was already handled or processed) if the consumer disconnects and reconnects to the firehose, the event data must to be acknowledged. Event data is acknowledged using the ACK method from the Firehose API. More information about acknowledging the data is available at Acknowledge Event Data.

Note: Only clients that have attached to a firehose can acknowledge the event data that they receive.

Since the client that opened the connection has to acknowledge the received event data, the consumer has to be modified to do this. The NodeJS code to acknowledge the event data from firehose should look like this:

function ackEventData(firehoseId, token){
  var body = {
    token: token
  }
  var options = {
    url: 'https://bbryiot.com/api/1/firehoses/' + firehoseId + '/token',
    headers: {
      Authorization: 'Bearer ' + ACCESS_TOKEN,
      'Content-Type': 'application/json'
    },
    method: 'PUT',
    json: true,
    body: body
  };
  request(options, function(err, response){
    console.log('ACK ', firehoseId, ' with token ', token, err, response.status);
  });
}

8. Load balance data with multiple firehose consumers

With data intensive solutions, there may be situations where one firehose consumer is unable to process all the data. The data processing overhead can be distributed to multiple instances of the consumer mentioned previously by running it multiple times.

If there were thousands of cars producing location.status updates or there are multiple data points that are produced from one car; the event data would is distributed between the multiple consumers.

Note: The platform currently allows eight concurrent connections to a single firehose. For event data to be distributed evenly among the consumers, there must be a sufficient amount event data flowing to the firehose.


Next steps