Integrate SQS and Lambda: serverless architecture for asynchronous workloads

Andreas WittigUpdated 19 May 2017

The Gold Standard for modern cloud-native applications is a serverless architecture. AWS Lambda allows you to implement scalable and fault tolerant applications without the need of a single virtual machine.

A serverless infrastructure based on AWS Lambda has two key benefits:

  1. You don’t need to manage a fleet of virtual machines anymore.
  2. Deploying new versions of your code can be entirely controlled by API calls.

Lambda function SQS consumer

This article shows you how to process asynchronous tasks serverless. Possible use cases are: sending out massive amounts of emails, transcoding video files after upload, or analyzing user behavior. An SQS queue will be used to decouple your microservice from other parts of your system. You’ll learn how to implement the microservice with AWS Lambda.

What is SQS?

A best practice when building scalable and highly available systems on AWS is to decouple your microservice by using one of the following options:

  • ELB (Load Balancer) for synchronous decoupling: web application answering HTTPS requests from browsers
  • SQS queue for asynchronous decoupling: sending out massive amounts of emails, transcoding video files after upload, or analyzing user behavior.

Amazon Simple Queue Service (SQS) is a managed message queuing service. SQS offers fault tolerant and scalable distributed message queues: simple to use but very powerful.

The following figure shows a typical architecture based on SQS. A producer sends tasks (also called messages) to an SQS queue. A fleet of consumers reads tasks from the SQS queue and does the actual computing.

SQS producer and consumer

The number of consumers can be scaled based on the number of tasks in the queue. As the tasks are stored durable and are managed within SQS, it is also very easy to build a fault tolerant system recovering from failed consumers without losing any tasks automatically.

SQS offers a REST API, so there are various options to integrate into producers and consumers: mobile or client-side web applications, applications running on EC2 instances, and AWS Lambda.

What is Lambda?

Amazon announced AWS Lambda in November 2014. Since then serverless computing has become one of the hottest topics within the public cloud market.

You can execute small computing tasks with the help of AWS Lambda. Currently, your function needs to finish its work within 300 seconds.

AWS Lambda offers:

  • Scalable and highly available computing capacity.
  • Automated deployments allowing you to create a continuous delivery pipeline.
  • A fully managed and maintenance-free service.

Lambda supports Node.js, Java, Python, and .NET Core. You just have to upload your source code to S3. Afterward, you can execute your functions by calling the REST API or use one of the integrations: Kinesis, S3, DynamoDB, CloudTrail, and API Gateway.

Ath the moment there is no out-of-the-box integration for SQS. But you’ll learn how to integrate SQS with Lambda easily next.

Integrate Lambda and SQS

The following figure shows all components needed to read and process messages from an SQS queue serverless:

  1. The SQS queue receives and stores tasks from other parts of the system.
  2. The CloudWatch Event Rule triggers the Lambda Consumer based on a schedule (e.g. every minute).
  3. The Lambda Consumer reads as many messages as possible from the SQS and executes a Lambda Worker for each message.
  4. The Lambda Worker performs the actual computing tasks and deletes the task from the SQS queue after the task was completed successfully.

Lambda function consumer

Let’s dive a little bit deeper into implement and set up this infrastructure with the help of CloudFormation, my favorite Infrastructure as Code tool.

You can find the example code at Github widdix/sqs-lambda-example as well.

Setup SQS queue

First, you need to setup an SQS queue. The following CloudFormation snippet contains:

  • A SQS queue for your tasks.
  • A SQS queue acting as dead letter queue.

What is a dead letter queue? A dead letter queue is used to collect failed tasks. In this example, a task is moved to the dead letter queue if the Lambda Consumer or Worker has failed to complete the task within 60 seconds for ten times successfully. This mechanism allows your system to get rid of broken tasks without blocking the whole task queue.

TaskQueue: 
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 60
RedrivePolicy:
deadLetterTargetArn: !Sub ${DeadLetterQueue.Arn}
maxReceiveCount: 10
DeadLetterQueue:
Type: AWS::SQS::Queue

Let’s proceed with the Lambda Consumer.

Setup and implement Lambda Consumer

The following snippet contains all resources needed for the Lambda Consumer:

  • An IAM Role called ConsumerLambdaRole includes an IAM policy allowing the Lambda Consumer function to read messages from SQS and invoke the Lambda Worker.
  • A Lambda function named ConsumerLambda and a scheduler rule which will execute the lambda function every minute.
ConsumerLambdaRole: 
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: '/'
Policies:
- PolicyName: logs
PolicyDocument:
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
- PolicyName: sqs
PolicyDocument:
Statement:
- Effect: Allow
Action:
- sqs:ReceiveMessage
Resource: !Sub ${TaskQueue.Arn}
- PolicyName: lambda
PolicyDocument:
Statement:
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource: !Sub ${WorkerLambda.Arn}
ConsumerLambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./consumer
Handler: index.handler
MemorySize: 128
Role: !Sub ${ConsumerLambdaRole.Arn}
Runtime: nodejs6.10
Timeout: 60
Environment:
Variables:
TASK_QUEUE_URL: !Ref TaskQueue
WORKER_LAMBDA_NAME: !Ref WorkerLambda
Events:
Timer:
Type: Schedule
Properties:
Schedule: rate(1 minute)

The previous snippet can be used to set up the resources needed with the help of CloudFormation. What’s still missing is the source code for the Lambda Consumer executed on AWS Lambda. The Lambda function needs to:

  • Read as many tasks as possible from the task queue.
  • Invoke a Lambda Worker for each task.

The following snippet contains an example for the Lambda Consumer written in JavaScript:

var sqs = new AWS.SQS({region: AWS_REGION});
var lambda = new AWS.Lambda({region: AWS_REGION});

function receiveMessages(callback) {
var params = {
QueueUrl: TASK_QUEUE_URL,
MaxNumberOfMessages: 10
};
sqs.receiveMessage(params, function(err, data) {
if (err) {
console.error(err, err.stack);
callback(err);
} else {
callback(null, data.Messages);
}
});
}

function invokeWorkerLambda(task, callback) {
var params = {
FunctionName: WORKER_LAMBDA_NAME,
InvocationType: 'Event',
Payload: JSON.stringify(task)
};
lambda.invoke(params, function(err, data) {
if (err) {
console.error(err, err.stack);
callback(err);
} else {
callback(null, data);
}
});
}

function handleSQSMessages(context, callback) {
receiveMessages(function(err, messages) {
if (messages && messages.length > 0) {
var invocations = [];
messages.forEach(function(message) {
invocations.push(function(callback) {
invokeWorkerLambda(message, callback);
});
});
async.parallel(invocations, function(err) {
if (err) {
console.error(err, err.stack);
callback(err);
} else {
if (context.getRemainingTimeInMillis() > 20000) {
handleSQSMessages(context, callback);
} else {
callback(null, 'PAUSE');
}
}
});
} else {
callback(null, 'DONE');
}
});
}

exports.handler = function(event, context, callback) {
handleSQSMessages(context, callback);
};

Only one more part is missing: the Worker Lambda.

Setup Worker Lambda

The following snippet contains all resources needed to set up the Worker Lambda with the help of CloudFormation:

  • An IAM Role called WorkerLambdaRole includes an IAM policy allowing the Lambda Worker function to delete messages from SQS.
  • A Lambda function named WorkerLambda creates and configures the Lambda Worker.
WorkerLambdaRole: 
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: '/'
Policies:
- PolicyName: logs
PolicyDocument:
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
- PolicyName: sqs
PolicyDocument:
Statement:
- Effect: Allow
Action:
- sqs:DeleteMessage
Resource: !Sub ${TaskQueue.Arn}
WorkerLambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./worker
Handler: index.handler
MemorySize: 128
Role: !Sub ${WorkerLambdaRole.Arn}
Runtime: nodejs6.10
Timeout: 60
Environment:
Variables:
TASK_QUEUE_URL: !Ref TaskQueue

Of course, you need to implement your source code for the Lambda Worker. You can do whatever can be done within 300 seconds, the current execution timeout for Lambda functions. The Lambda Worker needs to delete the message from the SQS queue after processing the message successfully.

The following snippet shows an example implementation for the Lambda Worker.

var AWS = require("aws-sdk");

var TASK_QUEUE_URL = process.env.TASK_QUEUE_URL;
var AWS_REGION = process.env.AWS_REGION;

var sqs = new AWS.SQS({region: AWS_REGION});
var s3 = new AWS.S3({region: AWS_REGION});

function deleteMessage(receiptHandle, cb) {
sqs.deleteMessage({
ReceiptHandle: receiptHandle,
QueueUrl: TASK_QUEUE_URL
}, cb);
}

function work(task, cb) {
console.log(task);
// TODO implement
cb();
}

exports.handler = function(event, context, callback) {
work(event.Body, function(err) {
if (err) {
callback(err);
} else {
deleteMessage(event.ReceiptHandle, callback);
}
});
};

Summary

SQS helps you to decouple your asynchronous workloads. Using Lambda with SQS is a challenge, as AWS does not provide an out-of-the-box integration so far. But as you have learned in this article, it is possible to combine SQS and Lambda with a little bit of glue yourself. Doing so allows you to build a serverless microservice consuming tasks from a queue, e.g.: sending out massive amounts of emails, transcoding video files after upload, or analyzing user behavior.


P.S. Looking for a way to monitor your Serverless application? Send CloudWatch alarms to Slack with marbot.

Andreas Wittig

Andreas Wittig

I’ve been building on AWS since 2012 together with my brother Michael. We are sharing our insights into all things AWS on cloudonaut and have written the book AWS in Action. Besides that, we’re currently working on bucketAV,HyperEnv for GitHub Actions, and marbot.

Here are the contact options for feedback and questions.