Serverless: Invalidating a DynamoDB Cache

A cache in front of DynamoDB is boosting performance and saving costs. Especially true for read-intensive and spiky workloads. Why? Please have a look at one of my recent articles: Performance boost and cost savings for DynamoDB.

Caching itself is easy:

  1. Incoming request
  2. Load data from cache
  3. Query database if needed data is not available in cache
  4. Insert data to the cache

But as Phil Karlton said wisely:

There are only two hard things in Computer Science: cache invalidation and naming things.

Probably true. Especially the statement about naming things. But setting up cache invalidation is not that hard when using the following building blocks: ElastiCache, DynamoDB, DynamoDB Streams and Lambda as shown in the following figure. You’ll learn how to implement a Lambda function doing cache invalidation in this article.

DynamoDB with ElastiCache

Combining DynamoDB Streams and Lambda

DynamoDB is publishing events for every change of an item to a DynamoDB Stream if needed. These events are perfectly suited for cache invalidation.

If you are into serverless, today is your lucky day!

  • DynamoDB Streams are integrated with AWS Lambda.
  • Lambda supports VPC which allows access to ElastiCache.

A typical event received by a Lambda function looks like this:

{
    "Records": [
        {
            "eventID": "fcd425364ee62afae661c7e69b5bbac0",
            "eventName": "INSERT",
            "eventVersion": "1.1",
            "eventSource": "aws:dynamodb",
            "awsRegion": "us-east-1",
            "dynamodb": {
                "ApproximateCreationDateTime": 1467991740,
                "Keys": {
                    "Id": {
                        "S": "3"
                    }
                },
                "NewImage": {
                    "Id": {
                        "S": "1"
                    },
                    "Name": {
                        "S": "John Doe"
                    }
                },
                "SequenceNumber": "800000000009116419690",
                "SizeBytes": 30,
                "StreamViewType": "NEW_IMAGE"
            },
            "eventSourceARN": "arn:aws:dynamodb:us-east-1:166876438428:table/serverless/stream/2016-07-08T15:00:10.731"
        }
    ]
}

The function receiving such an event needs to:

  1. Loop over all incoming events.
  2. Decide whether a cached item needs to be updated or deleted.
  3. Update or delete a cached item by sending a request to Redis (or Memcached).

Just a few lines of code are necessary to implement these steps with Node.js:

var redis = require("redis");
var async = require("async");

function handler(event, context, globalCallback) {
    var redisClient = redis.createClient({host: "<REDIS_HOST>"});
    async.each(event.Records, function(record, callback) { 
        var key = record.dynamodb.Keys.Id.S;
        if (record.eventName === "INSERT" || record.eventName === "MODIFY") {
            var value = JSON.stringify(record.dynamodb.NewImage);
            redisClient.set(key, value, function(err) {
                callback(err);
            });
        } else if (record.eventName === "REMOVE") {
            redisClient.del(key, function(err) {
               callback(err);
            });
        }
    }, function(err){
        redisClient.quit();
        if(err) {
            globalCallback(err);
        } else {
            globalCallback(null, "DONE");
        }
    });
};

exports.handler = handler;

Almost done, next step is to setup the whole infrastructure.

Wiring all the parts together

It’s time to create all the needed parts, as of the infrastructure:

  • DynamoDB table and stream
  • Security Group allowing access from Lambda to ElastiCache
  • ElastiCache cluster
  • mapping between DynamoDB stream and Lambda
  • IAM role for Lambda
  • Lambda function

The following snippet contains a CloudFormation template including all the needed parts.

{
    "AWSTemplateFormatVersion": "2010-09-09",
    "Description": "DynamoDB with ElastiCache",
    "Parameters": {
        "LambdaCodeBucket": {
            "Description": "The name of the S3 bucket containing the source code for the Lambda function.",
            "Type": "String"
        },
        "LambdaCodeKey": {
            "Description": "The key of the S3 object containing the source code for the Lambda function.",
            "Type": "String"
        },
        "VPC": {
            "Description": "The VPC for Lambda function and ElastiCache.",
            "Type": "AWS::EC2::VPC::Id"
        },
        "Subnets": {
            "Description": "The Subnets for Lambda function and ElastiCache.",
            "Type": "List<AWS::EC2::Subnet::Id>"
        }
    },
    "Resources": {
        "DynamoDB": {
            "Type": "AWS::DynamoDB::Table",
            "Properties": {
                "AttributeDefinitions": [{"AttributeName": "Id","AttributeType": "S"}],
                "KeySchema": [{"AttributeName": "Id", "KeyType": "HASH"}],
                "ProvisionedThroughput": {
                    "ReadCapacityUnits": "1",
                    "WriteCapacityUnits": "1"
                },
                "TableName": {"Ref": "AWS::StackName"},
                "StreamSpecification": {
                    "StreamViewType": "NEW_IMAGE"
                }
            }
        },
        "ElastiCacheSecurityGroup": {
            "Type": "AWS::EC2::SecurityGroup",
            "Properties": {
                "GroupDescription": "Elasticache Security Group",
                "VpcId": {"Ref": "VPC"},
                "SecurityGroupIngress": [{ 
                    "IpProtocol": "tcp", 
                    "FromPort": "6379", 
                    "ToPort": "6379",
                    "SourceSecurityGroupId": {"Ref": "LambdaSecurityGroup"}
                }]
            }
        },
        "ElastiCacheSubnetGroup" : {
            "Type" : "AWS::ElastiCache::SubnetGroup",
            "Properties" : {
                "Description" : "ElastiCache Subnet Group",
                "SubnetIds" : {"Ref": "Subnets"}
            }
        },
        "ElasticacheCluster": {
            "Type": "AWS::ElastiCache::CacheCluster",
            "Properties": {
                "AutoMinorVersionUpgrade": "true",
                "Engine": "redis",
                "CacheNodeType": "cache.t1.micro",
                "NumCacheNodes": "1",
                "VpcSecurityGroupIds": [{"Fn::GetAtt": ["ElastiCacheSecurityGroup", "GroupId"]}],
                "CacheSubnetGroupName": {"Ref": "ElastiCacheSubnetGroup"}
            }
        },
        "LambdaSecurityGroup": {
            "Type": "AWS::EC2::SecurityGroup",
            "Properties": {
                "GroupDescription": "Lambda Security Group",
                "VpcId": {"Ref": "VPC"}
            }
        },
        "EventSourceMapping": {
            "Type": "AWS::Lambda::EventSourceMapping",
            "Properties": {
                "EventSourceArn": {"Fn::GetAtt": ["DynamoDB", "StreamArn"]},
                "FunctionName": {"Fn::GetAtt": ["Lambda", "Arn"]},
                "StartingPosition": "TRIM_HORIZON"
            }
        },
        "LambdaRole": {
            "Type": "AWS::IAM::Role",
            "Properties": {
                "AssumeRolePolicyDocument": {
                    "Version": "2012-10-17",
                    "Statement": [{
                        "Effect": "Allow",
                        "Principal": {"Service": "lambda.amazonaws.com"},
                        "Action": ["sts:AssumeRole"]
                    }]
                },
                "Path": "/",
                "Policies": [{
                    "PolicyName": "logs",
                    "PolicyDocument": {
                        "Statement": [{
                            "Effect": "Allow",
                            "Action": [
                                "logs:CreateLogGroup",
                                "logs:CreateLogStream",
                                "logs:PutLogEvents"
                            ],
                            "Resource": "arn:aws:logs:*:*:*"
                        }]
                    }
                },
                {
                    "PolicyName": "ec2",
                    "PolicyDocument": {
                        "Statement": [{
                            "Effect": "Allow",
                            "Action": [
                                "ec2:CreateNetworkInterface",
                                "ec2:DescribeNetworkInterfaces",
                                "ec2:DeleteNetworkInterface"
                            ],
                            "Resource": "*"
                        }]
                    }
                },
                {
                    "PolicyName": "dynamodb",
                    "PolicyDocument": {
                        "Statement": [{
                            "Action": [
                                "dynamodb:PutItem",
                                "dynamodb:UpdateItem",
                                "dynamodb:DescribeStream",
                                "dynamodb:GetRecords",
                                "dynamodb:GetShardIterator",
                                "dynamodb:ListStreams"
                            ],
                            "Effect": "Allow",
                            "Resource": [
                                {"Fn::Join": ["", ["arn:aws:dynamodb:*:", {"Ref": "AWS::AccountId"}, ":table/", {"Ref": "DynamoDB"}]]},
                                {"Fn::Join": ["", ["arn:aws:dynamodb:*:", {"Ref": "AWS::AccountId"}, ":table/", {"Ref": "DynamoDB"}, "/stream/*"]]}
                            ]
                        }]
                    }
                }]
            }
        },
        "Lambda": {
            "Type": "AWS::Lambda::Function",
            "Properties": {
                "Code": {
                    "S3Bucket" : {"Ref": "LambdaCodeBucket"},
                    "S3Key" : {"Ref": "LambdaCodeKey"}
                },
                "Handler": "index.handler",
                "MemorySize": 128,
                "Role": {"Fn::GetAtt": ["LambdaRole", "Arn"]},
                "Runtime": "nodejs4.3",
                "Timeout": 60,
                "VpcConfig": {
                    "SecurityGroupIds": [{"Ref": "LambdaSecurityGroup"}],
                    "SubnetIds": {"Ref": "Subnets"} 
                }
            }
        }
    }
}

Use this template to create your own environment by following these steps:

  1. git clone https://github.com/widdix/lambda-dynamodb-elasticache.git
  2. cd lambda-dynamodb-elasticache
  3. Bundle files into ZIP file and upload to S3
  4. Launch Stack
  5. Click Next to proceed with the next step of the wizard.
  6. Specify the parameters for the stack.
  7. Click Next to proceed with the next step of the wizard.
  8. Click Next to skip the Options step of the wizard.
  9. Click Create to start the creation of the stack.
  10. Wait until the stack reaches the state CREATE_COMPLETE.
  11. Update index.js with the host of ElastiCache cluster created by CloudFormation.
  12. Bundle files into ZIP file and upload to S3.
  13. Update stack, keep template, update LambdaCodeKey.
  14. Wait until the stack reaches the state UPDATE_COMPLETE.

Everything ready for testing! Insert, update and delete items from the DynamoDB table named cacheinvalidation. Watch CloudWatch Logs of Lambda to gain insight into cache invalidation.

Read on


Share

           

RSS

  RSS

Newsletter


Andreas Wittig

Andreas Wittig

I’m the author of Amazon Web Services in Action. I work as a software engineer, and independent consultant focused on AWS and DevOps. Hire me!

Is anything missing in my article? I'm looking forward to your feedback! @andreaswittig or andreas@widdix.de.


Published on