Integrate SQS and Lambda: serverless architecture for asynchronous workloads

The Gold Standard for modern cloud native applications is a serverless architecture. AWS Lambda allows you to implement scalable and fault tolerant applications without the need of a single virtual machine.

A serverless infrastructure based on AWS Lambda has two main advantages:

  1. You don’t need to manage a fleet of virtual machines anymore.
  2. Deploying new versions of your code can be fully controlled by API calls.

This article shows you how to process asynchronous tasks serverless. Possible use cases are: sending out massive amounts of emails, transcoding video files after upload, or analyzing user behavior. A SQS queue will be used to decouple your microservice from other parts of your system. You’ll learn how to implement the microservice with AWS Lambda.

What is SQS?

A best practice when building scalable and highly available systems on AWS is to decouple your microservice by using one of the following options:

  • ELB (Load Balancer) for synchronous decoupling: web application answering HTTPS requests from browsers
  • SQS queue for asynchronous decoupling: sending out massive amounts of emails, transcoding video files after upload, or analyzing user behavior.

Amazon Simple Queue Service (SQS) is a managed message queuing service. SQS offers fault tolerant and scalable distributed message queues: simple to use but very powerful.

The following figure shows a typical architecture based on SQS. A producer sends tasks (also called messages) to a SQS queue. A fleet of consumers reads tasks from the SQS queue and does the actual computing.

The number of consumers can be scaled based on the number of tasks in the queue very easily. As the tasks are stored durable and are managed within SQS it is also very easy to build a fault tolerant system recovering from failed consumers without loosing any tasks automatically.

SQS offers a REST API so there are various options to integrate into producers and consumers: mobile or client-side web applications, applications running on EC2 instances, and AWS Lambda.

What is Lambda?

Amazon announced AWS Lambda in November 2014. Since then serverless computing has become one of the hottest topics within the public cloud market.

You are able to execute small computing tasks with the help of AWS Lambda. Currently your function needs to finish its work within 300 seconds.

AWS Lambda offers:

  • Scalable and highly available computing capacity.
  • Automated deployments allowing you to setup a continuous delivery pipeline.
  • A fully managed and maintenance free service.

Currently Lambda supports Node.js, Java, and Python. You just have to upload your source code to S3. Afterwards you are able to execute your functions by calling the REST API or use one of the integrations: Kinesis, S3, DynamoDB, CloudTrail, and API Gateway.

Ath the moment there is no out-of-the-box integration for SQS. But you’ll learn how to integrate SQS with Lambda easily next.

Integrate Lambda and SQS

The following figure shows all components needed to read and process messages from a SQS queue serverless:

  1. The SQS queue receives and stores tasks from other parts of the system.
  2. The CloudWatch Event Rule triggers the Lambda Consumer based on a schedule (e.g. every minute).
  3. The Lambda Consumer reads as many messages as possible from the SQS and executes a Lambda Worker for each message.
  4. The Lambda Worker performs the actual computing tasks and deletes the task from the SQS queue after the task was completed successfully.

Let’s dive a little bit deeper into implement and setup this infrastructure with the help of CloudFormation, my favourite Infrastructure as Code tool.

Setup SQS queue

First you need to setup a SQS queue. The following CloudFormation snippet contains:

  • A SQS queue for your tasks.
  • A SQS queue acting as dead letter queue.

What is a dead letter queue? A dead letter queue is used to collect failed tasks. In this example a task is moved to the dead letter queue if the Lambda Consumer or Worker has failed to successfully complete the task within 60 seconds for 10 times. This mechanism allows your system to get rid of broken tasks without blocking the whole task queue.

"TaskQueue": {
    "Type": "AWS::SQS::Queue",
    "Properties": {
        "QueueName": "task-queue",
        "MessageRetentionPeriod": "1209600",
        "VisibilityTimeout": "60",
        "RedrivePolicy": {
            "deadLetterTargetArn": {"Fn::GetAtt": ["DeadLetterQueue", "Arn"]},
            "maxReceiveCount": "10"
        }
    }
},
"DeadLetterQueue": {
    "Type": "AWS::SQS::Queue",
    "Properties": {
        "QueueName": "dead-letter-queue",
        "MessageRetentionPeriod": "1209600"
    }
}

Let’s proceed with the Lambda Consumer.

Setup and implement Lambda Consumer

The following snippet contains all resources needed for the Lambda Consumer:

  • An Events Rule called ScheduleRuleForConsumerLambda will trigger the Lambda Consumer every minute.
  • A Lambda Permission named PermissionToInvokeConsumerLambda allows the Event Rule to invoke the Lambda Consumer.
  • An IAM Role called ConsumerLambdaRole contains an IAM policy allowing the Lambda Consumer function to read messages from SQS and invoke the Lambda Worker.
  • A Lambda function named ConsumerLambda creates and configures the Lambda Consumer.
"ScheduleRuleForConsumerLambda": {
    "Type": "AWS::Events::Rule",
    "Properties": {
        "Description": "Schedule Rule for Consumer Lambda",
        "ScheduleExpression": "rate(1 minute)",
        "State": "ENABLED",
        "Targets": [{
            "Arn": {"Fn::GetAtt": ["ConsumerLambda", "Arn"]},
            "Id": "DailyScheduleTarget"
        }]
    }
},
"PermissionToInvokeConsumerLambda": {
    "Type": "AWS::Lambda::Permission",
    "Properties": {
        "FunctionName": {"Ref": "ConsumerLambda"},
        "Action": "lambda:InvokeFunction",
        "Principal": "events.amazonaws.com",
        "SourceArn": {"Fn::GetAtt": ["ScheduleRuleForConsumerLambda", "Arn"]}
    }
},
"ConsumerLambdaRole": {
    "Type": "AWS::IAM::Role",
    "Properties": {
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "lambda.amazonaws.com"},
                "Action": ["sts:AssumeRole"]
            }]
        },
        "Path": "/",
        "Policies": [{
            "PolicyName": "logs",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "logs:CreateLogGroup",
                        "logs:CreateLogStream",
                        "logs:PutLogEvents"
                    ],
                    "Resource": "arn:aws:logs:*:*:*"
                }]
            }
        },
        {
            "PolicyName": "sqs",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "sqs:ReceiveMessage"
                    ],
                    "Resource": {"Fn::GetAtt": ["TaskQueue", "Arn"]}
                }
            ]}
        },
        {
            "PolicyName": "lambda",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "lambda:InvokeFunction"
                    ],
                    "Resource": {"Fn::GetAtt": ["WorkerLambda", "Arn"]}
                }
            ]}
        }]
    }
},
"ConsumerLambda": {
    "Type": "AWS::Lambda::Function",
    "Properties": {
        "Code": {
            "S3Bucket" : {"Ref": "LambdaCodeBucket"},
            "S3Key" : {"Ref": "ConsumerLambdaCodeKey"}
        },
        "Handler": "index.handler",
        "MemorySize": 128,
        "Role": {"Fn::GetAtt": ["ConsumerLambdaRole", "Arn"]},
        "Runtime": "nodejs4.3",
        "Timeout": 60
    }
}

The previous snippet can be used to setup the resources needed with the help of CloudFormation. What’s still missing is the source code for the Lambda Consumer executed on AWS Lambda. The Lambda function needs to:

  • Read as many tasks as possible from the task queue.
  • Invoke a Lambda Worker for each task.

The following snippet contains an example for the Lambda Consumer written in JavaScript:

var AWS = require("aws-sdk");
var async = require("async");

var QUEUE_URL = "SQS_QUEUE_URK";
var WORKER_LAMBDA_FUNCTION_NAME = "LAMBDA_FUNCTION_NAME";
var REGION = "AWS_REGION";

var sqs = new AWS.SQS({region: REGION});
var lambda = new AWS.Lambda({region: REGION});

function receiveMessages(callback) {
    var params = {
        QueueUrl: QUEUE_URL,
        MaxNumberOfMessages: 10
    };
    sqs.receiveMessage(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);
            callback(err);
        } else {
            callback(null, data.Messages);
        }
    });
}

function invokeWorkerLambda(task, callback) {
    var params = {
        FunctionName: WORKER_LAMBDA_FUNCTION_NAME,
        InvocationType: 'Event',
        Payload: JSON.stringify(task)
    };
    lambda.invoke(params, function(err, data) {
        if (err) {
            console.error(err, err.stack);
            callback(err);
        } else {
            callback(null, data)
        }
    });
}

function handleSQSMessages(context, callback) {
    receiveMessages(function(err, messages) {
        if (messages && messages.length > 0) {
            var invocations = [];
            messages.forEach(function(message) {
                invocations.push(function(callback) {
                    invokeWorkerLambda(message, callback)
                });
            });
            async.parallel(invocations, function(err) {
                if (err) {
                    console.error(err, err.stack);
                    callback(err);
                } else {
                    if (context.getRemainingTimeInMillis() > 20000) {
                        handleSQSMessages(context, callback);    
                    } else {
                        callback(null, "PAUSE");
                    }                    
                }
            });
        } else {
            callback(null, "DONE");
        }
    });
}

exports.handler = function(event, context, callback) {
    handleSQSMessages(context, callback);
};

Only one more part missing: the Worker Lambda.

Setup Worker Lambda

The following snippet contains all resources needed to setup the Worker Lambda with the help of CloudFormation:

  • An IAM Role called WorkerLambdaRole contains an IAM policy allowing the Lambda Worker function to delete messages from SQS.
  • A Lambda function named WorkerLambda creates and configures the Lambda Worker.
"WorkerLambdaRole": {
    "Type": "AWS::IAM::Role",
    "Properties": {
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {"Service": "lambda.amazonaws.com"},
                "Action": ["sts:AssumeRole"]
            }]
        },
        "Path": "/",
        "Policies": [{
            "PolicyName": "logs",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "logs:CreateLogGroup",
                        "logs:CreateLogStream",
                        "logs:PutLogEvents"
                    ],
                    "Resource": "arn:aws:logs:*:*:*"
                }]
            }
        },
        {
            "PolicyName": "sqs",
            "PolicyDocument": {
                "Statement": [{
                    "Effect": "Allow",
                    "Action": [
                        "sqs:DeleteMessage"
                    ],
                    "Resource": {"Fn::GetAtt": ["TaskQueue", "Arn"]}
                }
            ]}
        }]
    }
},
"WorkerLambda": {
    "Type": "AWS::Lambda::Function",
    "Properties": {
        "Code": {
            "S3Bucket" : {"Ref": "LambdaCodeBucket"},
            "S3Key" : {"Ref": "WorkerLambdaCodeKey"}
        },
        "Handler": "index.handler",
        "MemorySize": 128,
        "Role": {"Fn::GetAtt": ["WorkerLambdaRole", "Arn"]},
        "Runtime": "nodejs4.3",
        "Timeout": 60
    }
}

Of course, you need to implement your own source code for the Lambda Worker. You can do whatever can be done within 300 seconds, the current execution timeout for Lambda functions.

Summary

Asynchronous decoupling can be achieved with the help of SQS. It is possible to integrate the managed queuing system with Lambda. This allows you to build a serverless microservice consuming tasks from a queue, e.g.: sending out massive amounts of emails, transcoding video files after upload, or analyzing user behavior.

Read on


Share

           

RSS

  RSS

Newsletter


Andreas Wittig

Andreas Wittig

I’m the author of Amazon Web Services in Action. I work as a software engineer, and independent consultant focused on AWS and DevOps. Hire me!

Is anything missing in my article? I'm looking forward to your feedback! @andreaswittig or andreas@widdix.de.


Published on
Tagged with AWS, Lambda, SQS, Serverless,