Designing asynchronous event systems with AWS IoT and Serverless Application Model (SAM)

Michael WittigUpdated 06 Apr 2017

An event system receives and processes events by following rules that are defined inside the system. All processing happens asynchronously. When an event is sent to the system, it is processed at some point in time, but you will not get an immediate response. Asynchronous processing has advantages if you want to build a scalable solution because it frees you from the burden of an immediate response. Instead you can queue them and process them as fast as you can. In this article, I will demonstrate how the AWS IoT service can be used to process events that have nothing to do with IoT. I will also use the new Serverless Application Model (SAM) to deploy the solution. Needless to mention that the solution will be serverless and highly cost effective for workloads up to 1 mio events per day.

How AWS IoT works

AWS IoT can do many things, but here I focus on messages, topics, rules, and actions.

A message is sent to a topic. In this case, a message is an event. AWS IoT can deal with JSON so I choose JSON as my data representation.

A topic has a name like event/buy and as you can see you can add a hierarchy by using up to 7 forward slashes (/).

A rule subscribes to a topic and triggers actions when a message is received. As simple rule can subscribe to the topic event/buy. But you can also use wildcards like event/+ or event/*. + is used for exactly one hierarchy while * matches to any number of hierarchies.

AWS IoT comes with many built-in actions. To mention just a few:

  • Write the message to DynamoDB.
  • Save the message as a file to S3.
  • Send the message to SNS.
  • Invoke a Lambda function to process the message.

So a message is sent to a topic. If a rule matches the topic’s name, it triggers the defined actions. That’s an event system, isn’t it?

Architecture

The event system I design in this article can handle events that are generated at exchanges like buy and sell events. It is important to store all events on durable storage for archival. For some reasons buy events need to be checked for fraud. If a fraud event is detected, external systems must be notified. The following figure shows the architecture of the system.

AWS IoT Architecture

The diagram was created with Cloudcraft - Visualize your cloud architecture like a pro.

Event Flow

  1. It all starts with a HTTP API (provided by API Gateway) that triggers a Lambda function for every HTTP POST call /event.
  2. The Lambda event-api does some input validation. Depending on the payload the event is published on a topic like event/buy, event/sell, …
  3. One rule subscribes to all event/+ topics with an action to write the message to DynamoDB.
  4. Another rule subscribes only to the event/buy topic and triggers the buy-event Lambda for every message.
  5. The buy-event Lambda decides if the event is fraud or not. If it is fraud, it publishes the event to the alert/fraud topic.
  6. A rule subscribes to the alert/fraud topic and triggers two actions: Save message to S3 and send event to SNS.

Implementation

I use the brand new Serverless Application Model (SAM) for this example. SAM builds upon CloudFormation, so most of the interesting pieces happen inside file that I will name template.yml.

You can find the full source code on GitHub.

The first file defines Node.js dependencies that are needed in the Lambda functions.

package.json

{
"name": "sam-iot-example",
"version": "1.0.0",
"author": "Michael Wittig",
"license": "MIT",
"dependencies": {
"uuid": "3.0.1"
},
"devDependencies": {
"aws-sdk": "2.6.9"
}
}

REST API

This is how you define a APi Gateway with SAM.

template.yml

---
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Resources:
EventApiLambda:
Type: 'AWS::Serverless::Function'
Properties:
Handler: 'event-api-handler.create'
Runtime: 'nodejs6.10'
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action: 'iot:Publish'
Resource: !Sub 'arn:aws:iot:${AWS::Region}:${AWS::AccountId}:topic/event/*'
- Effect: Allow
Action: 'iot:DescribeEndpoint'
Resource: '*'
Events:
Http:
Type: Api
Properties:
Path: /event
Method: post

And here comes the implementation that will run inside Lambda. The name of the file must match with the Handler from above.

event-api-handler.js

'use strict';

const uuid = require('uuid');
const cache = require('./cache.js');

function publish(iotdata, payload) {
return iotdata.publish({
topic: `event/${payload.type}`,
qos: 0,
payload: JSON.stringify(payload),
}).promise();
}

module.exports.create = (event, context, cb) => {
console.log(JSON.stringify(event));
try {
var payload = JSON.parse(event.body); // safely parse body JSON
} catch(err) {
cb(null, {statusCode: 400});
return;
}
if (payload.id === undefined || payload.id === null) {
payload.id = uuid.v4(); // assign id if no id was passed in
}
if (payload.type === undefined || payload.type === null) {
cb(null, {statusCode: 400});
} else {
cache.iotdata
.then((iotdata) => publish(iotdata, payload))
.then(() => cb(null, {statusCode: 204}))
.catch((err) => cb(err));
}
};

Event archival

Now it’s time to create a rule that inserts the events into a DynamoDB table.

template.yml

[...]
Resources:
[...]
EventTable:
Type: 'AWS::DynamoDB::Table'
Properties:
AttributeDefinitions:
- AttributeName: id
AttributeType: S
- AttributeName: timestamp
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
EventTableRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
Service: 'iot.amazonaws.com'
Action: 'sts:AssumeRole'
Policies:
- PolicyName: 'dynamodb'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: 'dynamodb:PutItem'
Resource: !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${EventTable}'
EventTableRule:
Type: 'AWS::IoT::TopicRule'
Properties:
TopicRulePayload:
Actions:
- DynamoDB:
HashKeyField: id
HashKeyValue: '${id}'
RangeKeyField: timestamp # unfortunately this field is required by CloudFormation
RangeKeyValue: '${timestamp()}' # unfortunately this field is required by CloudFormation
RoleArn: !GetAtt 'EventTableRole.Arn'
TableName: !Ref EventTable
RuleDisabled: false
Sql: "SELECT * FROM 'event/+'"

Buy Event Lambda

This is a rule that subscribes to the event/buy topic and triggers a Lambda function for every message.

template.yml

[...]
Resources:
[...]
BuyEventLambda:
Type: 'AWS::Serverless::Function'
Properties:
Handler: 'buy-event-handler.analyze'
Runtime: 'nodejs6.10'
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action: 'iot:Publish'
Resource: !Sub 'arn:aws:iot:${AWS::Region}:${AWS::AccountId}:topic/alert/fraud'
- Effect: Allow
Action: 'iot:DescribeEndpoint'
Resource: '*'
Events:
IoT:
Type: IoTRule
Properties:
Sql: "SELECT * FROM 'event/buy'"
AwsIotSqlVersion: '2016-03-23'

The is the implementation of the fraud detection that runs inside Lambda.

buy-event-handler.js

'use strict';

const cache = require('./cache.js');

function publish(iotdata, payload) {
return iotdata.publish({
topic: `alert/fraud`,
qos: 0,
payload: JSON.stringify(payload),
}).promise();
}

function isFraud(payload) {
return Promise.resolve(Math.random() < 0.5); // very simple implementation :)
}

module.exports.analyze = (payload, context, cb) => {
console.log(JSON.stringify(payload));
isFraud()
.then(fraud => {
if (fraud === true) {
return cache.iotdata
.then((iotdata) => publish(iotdata, payload));
} else {
return fraud;
}
})
.then(() => cb(null, {statusCode: 204}))
.catch((err) => cb(err));
};

Fraud archival

And here we define what happens whit messages that are published to the alert/fraud topic.

template.yml

[...]
Resources:
[...]
ArchiveFraudBucket:
Type: 'AWS::S3::Bucket'
Properties: {}
ArchiveFraudS3Role:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
Service: 'iot.amazonaws.com'
Action: 'sts:AssumeRole'
Policies:
- PolicyName: 's3'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: 's3:PutObject'
Resource: !Sub 'arn:aws:s3:::${ArchiveFraudBucket}/*'
ArchiveFraudTopic:
Type: 'AWS::SNS::Topic'
Properties: {}
ArchiveFraudTopicRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
Service: 'iot.amazonaws.com'
Action: 'sts:AssumeRole'
Policies:
- PolicyName: 's3'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: 'sns:Publish'
Resource: !Ref ArchiveFraudTopic
ArchiveFraudRule:
Type: 'AWS::IoT::TopicRule'
Properties:
TopicRulePayload:
Actions:
- S3:
BucketName: !Ref ArchiveFraudBucket
Key: '${id}.json'
RoleArn: !GetAtt 'ArchiveFraudS3Role.Arn'
- Sns:
MessageFormat: RAW
RoleArn: !GetAtt 'ArchiveFraudTopicRole.Arn'
TargetArn: !Ref ArchiveFraudTopic
AwsIotSqlVersion: '2016-03-23'
RuleDisabled: false
Sql: "SELECT * FROM 'alert/fraud'"

Shared caching library

And finally we have some shared code that is needed by both Lambdas. It creates a singleton of an AWS.IotData client with the needed variable endpointAddress.

cache.js

'use strict';

const AWS = require('aws-sdk');

const iotApiVersion = '2015-05-28';
const iot = new AWS.Iot({
apiVersion: iotApiVersion
});

module.exports.iotdata = new Promise((resolve, reject) => {
iot.describeEndpoint({}, (err, data) => {
if (err) {
reject(err);
} else {
resolve(new AWS.IotData({
apiVersion: iotApiVersion,
endpoint: data.endpointAddress
}));
}
});
});

You may be surprised how little code was needed. Most of the stuff functionality is provided by AWS IoT and we only need to configure it trough the CloudFormation template and the SAM extensions.

Deploy

Make sure to update the AWS CLI. Otherwise, you may not have support for SAM:

sudo pip install --upgrade awscli

Select a region that supports the IoT service

export AWS_DEFAULT_REGION=us-east-1

Then create a artifacts bucket:

aws s3 mb s3://$USER-artifacts

Clone the repository:

git clone git@github.com:michaelwittig/sam-iot-example.git
cd sam-iot-example/

Then install the Node.js dependencies:

npm install --production

Then deploy the template using SAM:

aws cloudformation package --template-file template.yml --output-template-file template.tmp.yml --s3-bucket "$USER-artifacts"
aws cloudformation deploy --template-file template.tmp.yml --stack-name sam-iot-example --capabilities CAPABILITY_IAM

Done. You now have a running event system.

Usage

  1. Go to https://console.aws.amazon.com/apigateway/
  2. Select sam-iot-example
  3. Select /event -> POST and click Test
  4. Fill the Request Body with: {"type":"buy","price":123.45} and submit a few times
  5. Go to https://console.aws.amazon.com/dynamodb/
  6. Select Tables
  7. Select the table that starts with sam-iot-example-EventTable-
  8. Click on Items, and you would see a few events
  9. Go to https://console.aws.amazon.com/s3/
  10. Select the bucket that starts with sam-iot-example-archivefraudbucket-
  11. You should see a few fraud events

Cleanup

Remove archived events from S3 Bucket by using the AWS Management Console. The name of the bucket starts with sam-iot-example-archivefraudbucket-

Then remove the stack:

aws cloudformation delete-stack --stack-name sam-iot-example

Then remove the artifacts bucket:

aws s3 rb s3://$USER-artifacts --force

Summary

  • The Serverless Application Model (SAM) make deploying CloudFormation templates very easy. I like it:)
  • Using topics to decouple your event system is very powerful. You can always add topic rules and actions if the business process changes or just someone else is interested in an event in the system. Keep the Lambda small. Better have more small Lambdas than few big ones.
  • The event system is very cost effective for workloads up to 1 mio messages per day. If you want to process more than that, I recommend that you first calculate the costs and compare them to an architecture using Kinesis.

Michael Wittig

Michael Wittig

I’ve been building on AWS since 2012 together with my brother Andreas. We are sharing our insights into all things AWS on cloudonaut and have written the book AWS in Action. Besides that, we’re currently working on bucketAV, HyperEnv for GitHub Actions, and marbot.

Here are the contact options for feedback and questions.