Serverless WebSocket API: API Gateway, Kinesis, Lambda

Andreas Wittig – 13 Feb 2019

Nowadays, it is a common approach to use a RESTful API following the synchronous request/response model. But what about asynchronous communication? Or communication that is synchronous but could be modeled asynchronous as well? A WebSocket API based on API Gateway, Kinesis, and Lambda is the perfect tool for that job.

Real-time and event-driven API

AWS announced WebSocket support for the API Gateway in December 2018. Recently CloudFormation added support for the new resources as well. Therefore, it is about time to discover how to build an event-driven API based on the following building blocks:

  • An API Gateway providing the WebSocket API
  • A Kinesis Stream to buffer and distribute events
  • Several Lambda Functions implementing event-driven microservices

The following figure illustrated the architecture.

Serverless WebSocket API: API Gateway, Kinesis, Lambda

What are the unique features of this architecture?

  • Low latency: The API Gateway forwards incoming events to the Kinesis Stream. No Lambda Function needed, which decreases costs and latency.
  • High Decoupling: Each microservice subscribes to the Kinesis Stream to process events from the client. A microservice is able to send events to a client by posting a message to the WebSocket via the API Gateway.
  • Fault Tolerant: The Kinesis Stream acts as a buffer for incoming events. In case of a failure within one of the micro-services events are not lost.
  • Asynchronous But Real-Time: Events are processed asynchronously but with minimal latency. The bidirectional model offered by the WebSocket allows sending response events to the client easily.
  • Ordered: The order of events is guaranteed from the client to the microservice.

In the following, you will learn how to set up the described architecture with the help of CloudFormation.

First of all, create the API as well as the default route.

Api:
Type: 'AWS::ApiGatewayV2::Api'
Properties:
Name: !Ref 'AWS::StackName'
ProtocolType: WEBSOCKET
RouteSelectionExpression: '\$default'
DefaultRoute:
Type: 'AWS::ApiGatewayV2::Route'
Properties:
ApiId: !Ref Api
AuthorizationType: NONE
RouteKey: '$default'
Target: !Sub 'integrations/${KinesisIntegration}'

In theory, the RouteSelectionExpression in combination with an AWS::ApiGatewayV2::Route allows you to route incoming events to different targets. For example, you could split events into different Kinesis Streams. Doing so is not necessary, and therefore all events are routed to the DefaultRoute route in this example.

The default route sends all events to the KinesisIntegration. The integration calls the PutRecord action on the Kinesis Stream for each incoming event. The default template defined in RequestTemplates assembles the needed request. Note, the connectionId - which can be used to send an event back to the client - is added to each event.

KinesisIntegration:
Type: 'AWS::ApiGatewayV2::Integration'
Properties:
ApiId: !Ref Api
CredentialsArn: !GetAtt IntegrationRole.Arn
IntegrationMethod: 'POST'
IntegrationType: 'AWS'
IntegrationUri: !Sub 'arn:aws:apigateway:${AWS::Region}:kinesis:action/PutRecord'
RequestTemplates:
default: !Sub |
#set($payload = $input.json('$'))
#set($data = "{""payload"": $payload, ""connectionId"": ""$context.connectionId""}")
{
"Data": "$util.base64Encode($data)",
"PartitionKey": "$context.connectionId",
"StreamName": "${EventStream}"
}
TemplateSelectionExpression: default

What else is needed to set up the API Gateway? You need to define a stage and a deployment.

Stage:
Type: 'AWS::ApiGatewayV2::Stage'
Properties:
ApiId: !Ref Api
DeploymentId: !Ref Deployment
StageName: 'v1'
DefaultRouteSettings:
LoggingLevel: INFO
DataTraceEnabled: true
Deployment:
Type: 'AWS::ApiGatewayV2::Deployment'
DependsOn: DefaultRoute
Properties:
ApiId: !Ref Api

Next, create a Kinesis Stream to buffer events.

EventStream:
Type: 'AWS::Kinesis::Stream'
Properties:
ShardCount: 1

The WebSocket API, as well as the Kinesis Stream, are ready to receive events.

In order to process the events, a microservice needs to be attached to the Kinesis Stream. The following snippet shows how to create the necessary resources:

  • EventStreamConsumer registers an HTTP/2 based consumer at the Kinesis Stream.
  • EventSourceMapping the source mapping connecting the consumer with the Lambda Function.
  • StreamFunction the Lambda Function implementing a microservice processing events from the stream.
EventStreamConsumer:
Type: 'AWS::Kinesis::StreamConsumer'
Properties:
ConsumerName: lambda
StreamARN: !GetAtt EventStream.Arn
EventSourceMapping:
Type: 'AWS::Lambda::EventSourceMapping'
Properties:
BatchSize: 16
Enabled: true
EventSourceArn: !Ref EventStreamConsumer
FunctionName: !GetAtt 'StreamFunction.Arn'
StartingPosition: LATEST
StreamFunction:
Type: 'AWS::Lambda::Function'
Properties:
Handler: 'index.handler'
Runtime: 'nodejs8.10'
Layers:
- !Sub 'arn:aws:lambda:${AWS::Region}:853553028582:layer:monitoring-jump-start:1'
MemorySize: 128
Timeout: 30
Role: !GetAtt 'StreamRole.Arn'
Code: '// ...'

The following implementation of the StreamFunction sends back each incoming event to its sender.

StreamFunction:
Type: 'AWS::Lambda::Function'
Properties:
# ...
Code:
ZipFile: !Sub |
'use strict';
const AWS = require('aws-sdk');
const api = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: '${Api}.execute-api.${AWS::Region}.amazonaws.com/${Stage}'
});
exports.handler = async (event) => {
console.log(JSON.stringify(event));
for (let r in event.Records) {
const data = JSON.parse(new Buffer(event.Records[r].kinesis.data, 'base64').toString());
try {
await api.postToConnection({
ConnectionId: data.connectionId,
Data: JSON.stringify(data.payload)
}).promise();
} catch (e) {
if (e.statusCode === 410) {
// do nothing, client disconnected
console.log('client disconnected');
} else {
throw e;
}
}
}
return "OK";
};

I’ve skipped a few resources, mainly IAM roles. You can find the whole CloudFormation template at api-gateway-websocket.yaml. Use the following command to create a CloudFormation stack based on the template:

aws cloudformation deploy --template-file apigateway-websocket.yaml --stack-name apigateway-websocket --capabilities CAPABILITY_IAM

Get the WebSocket URI from the CloudFormation stack.

aws cloudformation describe-stacks --stack-name apigateway-websocket --query 'Stacks[0].Outputs[?OutputKey==`WebSocketURI`].OutputValue' --output text

Use wscat to send events to your WebSocket API. The API will return the event immediately.

wscat -c wss://f124m4srw3.execute-api.eu-west-1.amazonaws.com/v1
> {"msg":"Hello World!"}
< {"msg":"Hello World!"}

That’s it. Building a WebSocket API with an API Gateway, a Kinesis Stream and Lambda Functions is a great approach when developing a real-time and event-driven API.


Do you want to learn more about cutting-edge architectures on AWS? Join Michael and me for our session Cutting-Edge Architectures Based on AppSync, Lambda, and Fargate on February 26th at the AWS Summit in Berlin. Also, if you stumble upon Michael or me during the summit, please say “Hi!”.

Andreas Wittig

Andreas Wittig

I’m the author of Amazon Web Services in Action. I work as a software engineer, and independent consultant focused on AWS and DevOps.

You can contact me via Email, Twitter, and LinkedIn.

Briefcase icon
Hire me
Cover of Rapid Docker on AWS

New book: Rapid Docker on AWS

A rapid way to get your web application up and running on AWS. Made for web developers and DevOps engineers who want to dockerize their web applications and run their containers on Amazon Web Services. Prior knowledge of Docker and AWS is not required.

Buy icon
Buy now
Marbot Logo

Incident Management for Slack

Team up to solve incidents with our chatbot marbot. Never miss a critical alert. Escalate alerts from your AWS infrastructure among your team members. Strong integrations with all parts of your AWS infrastructure: CloudWatch, Elastic Beanstalk, RDS, EC2, ...

Slack icon
Try for free