Connecting Kinesis Analytics with AWS IoT

In a previous post, Getting started with AWS IoT and Tessel, you learned how to send temperature data from your Tessel microcontroller to an AWS IoT topic. You also learned how to set up an alarm that fires whenever the temperature goes above a certain point (in the example, 10 degree Celsius). But fixed thresholds don’t work for with dynamic datasets. If you want to receive an alert when the temperature becomes unusual, things get more complicated.

Defining the unusual

An unusual temperature needs to be defined first. Statistics can help us with the problem.

  • The arithmetic mean (in colloquial language, an average) is a measure of central tendency in data (avg).
  • The standard deviation is a measure that is used to quantify the amount of variation in data (sd).

Look at the following visualization of this idea.

Standard Deviation

The idea is that most of the values are very close to the average (0 in this case).
To be more precise, 68.2% of the values are within the [avg-sd, avg+sg] range, also called 1-sigma.
To be more precise, 95.4% of the values are within the [avg-2sd, avg+2sg] range, also called 2-sigma.
So 4.6% of the values are not within the 2-sigma range. We will define them as unusual values.

Excursion: My first job was in high frequency / algorithmic trading. I did a lot of real-time analytics on top of financial markets data like fx, stock, futures, and options prices. So let me explain the idea of averages in more detail.
The average applies to normally distributed data sets like you see in the above figure. Many phenomena in nature are bell shaped/normally distributed, e.g. size of humans.
If you apply the average to numbers like personal incomes of the people of a country, things get funny. Assume we have 100 people in our country. 99 people earn $100 while the leader makes $10,000. The average is $199. The standard deviation is $990. So an income of $10,000 is ~ a 10-sigma event. If you wait for a 7-sigma event once a day you will see one every 1.07 billion years (a quarter of Earth’s history). A 10-sigma event is impossible. Still, it’s in the data. The issue is that the data is not normally distributed.
The crux of all that is that we are usually interested in outliers while most statistics focus on the opposite. Some people even remove outliers from their data to make it normally distributed.
In our temperature example we assume a normal distribution but keep in mind that this assumption is most likely not true. You will not end up with 4.6% unusual temperatures!

Let’s do some math:

temperatures = [15, 10, 11, 12, 14, 13, 14, 15, 9, 12]
avg = average(temperatures)
sd = standard_deviation(temperatures)

We define that a temperature is unusual, if it is not in the 2-sigma range of past values.

temperature < (avg - 2*sd)
or
temperature > (avg + 2*sd)

This works pretty well for static data sets. But in our case, new temperature data arrives every minute.

Sliding windows

What we like to do is compare the current temperature against the average and standard deviation of temperature over that past one hour. This is called a sliding window because old values fall out. You add new values on the front while old values fall out on the end after 1 hour.

Let’s do some math:

temperatures = [...]
temperatures_in_window = sliding_window(1h, temperatures)
avg = average(temperatures_in_window)
sd = standard_deviation(temperatures_in_window)

So we can define that a temperature is unusual if it is not in the 2-sigma range of values that are not older than 1 hour.

temperature < (avg - 2*sd)
or
temperature > (avg + 2*sd)

Implementation

To implement this, we are going to pipe all temperatures from the IoT topic temperature into a Kinesis Stream. A Kinesis Analytics application will connect to that stream and run the analytics logic to calculate averages and standard deviations over sliding windows. Finally, the Kinesis Analytics application writes the unusual temperatures back into a Kinesis Stream. The following figure demonstrates the flow of data.

Kinesis Analytics flow

Let’s get started! Log in to the AWS Management Console.

You need to setup the IoT example as described in Getting started with AWS IoT and Tessel.

Creating the Kinesis Streams

First, we need to create two Kinesis Streams. One for the raw temperature data (input) and one for the unusual sensor data (result).

  1. Open the AWS Kinesis Management Console
  2. Click Go to Streams button
  3. Click Create Stream button
  4. Set the Stream Name to temperature
  5. Set the Number of Shards to 1
  6. Click Create button

We need a second stream for the results.

  1. Click Create Stream button
  2. Set the Stream Name to unusual-temperature
  3. Set the Number of Shards to 1
  4. Click Create button

Creating the IoT Rule

Now it’s time to pipe the data from AWS IoT into the temperature Kinesis stream.

  1. Open the AWS IoT Management Console
  2. Click Create a resource button
  3. Click Create a rule
  4. Set Name to forward_temperature
  5. Set Description to Forwards temperature to Kinesis Stream where it is analyzed
  6. Defining the message source
  7. Set SQL version to 2016-03-23-beta
  8. Set Attribute to temperature
  9. Set Topic filter to temperature
  10. Defining the action
  11. Set Choose an action to Send message to a real-time data stream (Amazon Kinesis)
  12. Set Stream name to temperature
  13. Set Partition key to ${topic()}
  14. Click Create a new role
  15. Set Role name to forward-temperature
  16. Click Create button
  17. Click Add Action button
  18. Click Create button

Creating the Kinesis Analytics Application

You have temperature data coming in so we can start to analyze the data. A Kinesis Analytics application consists of three parts:

  • Source: The Kinesis Stream temperature
  • Real-time analytics: SQL to analyze the data
  • Destination: The Kinesis Stream unusual-temperature

But first, you need to create the application.

Kinesis Analytics is not covered by the free tier. This experiment will cost you a few dollars!

  1. Open the AWS Kinesis Management Console
  2. Click Go to Analytics button
  3. Click Create new application button
  4. Set Application name to unusual-temperature
  5. Set Description to Analyze temperature for unusual values
  6. Click Save and continue button

Creating the Source of the Application

  1. Click Connect to a source button
  2. Select the temperature stream
  3. Set Permission to access the stream to Create/update unusual-temperature IAM role

The schema is discovered now. This only works if new data arrives and should look like this:

Kinesis Analytics discovered schema

  1. Click Save and continue button

Creating the Real-time analytics logic of the Application

  1. Click Go to SQL editor button
  2. Click Yes, start application button
  3. Wait 30-90 seconds until your application is started…
  4. Insert the following SQL:
CREATE OR REPLACE STREAM "temperature" ("temperature" DOUBLE, "temperature_avg" DOUBLE, "temperature_sd" DOUBLE);
CREATE OR REPLACE PUMP "temperature_pump" AS INSERT INTO "temperature" SELECT STREAM "temperature", AVG("temperature") OVER SLIDING_WINDOW AS "temperature_avg", STDDEV_POP("temperature") OVER SLIDING_WINDOW AS "temperature_sd" FROM "SOURCE_SQL_STREAM_001"
WINDOW SLIDING_WINDOW AS (PARTITION BY PARTITION_KEY RANGE INTERVAL '1' HOUR PRECEDING);

-- select only the unusual events for the destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("temperature" DOUBLE, "temperature_avg" DOUBLE, "temperature_sd" DOUBLE);
CREATE OR REPLACE PUMP "output_tpump" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM "temperature" WHERE "temperature" < ("temperature_avg"-2*"temperature_sd") OR "temperature" > ("temperature_avg"+2*"temperature_sd");
  1. Click Save and run SQL button
  2. Saving and running SQL will take some time…
  3. Inspect what’s going on by selecting the In-application stream temperature. You should see new values coming in every minute together with the average and the SD of the last hour.
  4. Click Exit (done editing)

Creating the Destination of the Application

  1. Click Connect to a destination button
  2. Select the unusual-temperature stream
  3. Set Output format to JSON
  4. Set Permission to access the stream to Create/update unusual-temperature IAM role
  5. Click Save button

Consuming the unusual sensor data from the Kinesis stream

Unfortunately we can not connect a Kinesis stream with a SNS topic directly to send out the alert. But you can create a Lambda function that connects to the unusual-temperature stream and forwards the values to SNS. This is out of the scope of this post. You can also have a look at the In-application stream DESTINATION_SQL_STREAM.

Summary

  • We used AWS IoT to ingest sensor data into our system over MQTT. AWS IoT runs rules to subscribe to topics and trigger action like writing to a Kinesis Stream.
  • You can analyze real-time data with Kinesis Analytics. A Kinesis Stream is used as the source and destination of the analytics.
  • Kinesis Analytics make use SQL to query the real-time data stream.
  • Sliding windows can be used to calculate aggregation over the last hour of data

Clean up

Make sure to clean up the example after you are finished. Otherwise additional charges will apply.

  • Delete the Kinesis Analytics Application
  • Delete the two Kinesis streams

Read on


Share

           

RSS

  RSS

Newsletter


Michael Wittig

Michael Wittig

I’m the author of Amazon Web Services in Action. I work as a software engineer, and independent consultant focused on AWS and DevOps. Hire me!

Is anything missing in my article? I'm looking forward to your feedback! @hellomichibye or michael@widdix.de.


Published on