Introducing the real-time data store: Kinesis Streams

Michael Wittig – 07 Oct 2016

Kinesis is all about real-time data:

  • Kinesis Streams are a temporary store for real-time data. Append new events or read all events in order.
  • Kinesis Analytics helps you to analyze data in real-time.
  • Kinesis Firehose ingests real-time data into data stores like S3, Elasticsearch or Redshift for batch analytics.

This blog post will introduce Kinesis Streams as the foundation of real-time data processing on AWS.

What is a stream?

Imagine a timeline where you can add events at the current time, and you will have a good mental model of a Kinesis stream. The following animation will demonstrate the idea.

Kinesis Stream Animation

Arriving data is symbolized as yellow circles and called a Record. The current time is generated by the blue Kinesis Stream System. You can only insert records at the current time. You can not add records in the past or the future. This implies that records are ordered by time, inside a stream.

If you want to read records from a stream you have three options:

  • read from the beginning of the stream to the present
  • read from a certain record (identified by a sequence number) to the present
  • read from a certain time to the present
    Depending on your use case you are interested in historical records or not. A Kinesis Stream supports both cases. But there is one limitation: Kinesis Streams can keep records only 1-7 days. After that, records are deleted. Kinesis is not a long term storage.

How does it scale?

A single Kinesis Stream is composed of Shards. A single shard supports:

  • 5 read operations per second (you read in batches)
  • maximum total data read rate of 2 MB per second
  • insertion of up to 1,000 records per second
  • maximum total data write rate of 1 MB per second

Records are assigned to shards based on a Partition Key. Records with the same partition key are always assigned to the same shard. The following animation will demonstrate the idea using two shards and representing partition keys with colors.

Kinesis Shard Animation


Looking for a new challenge?

  • tecRacer

    Cloud Consultant

    tecRacer • Premier AWS Consulting Partner • Germany, Austria, Spain, and Switzerland
    AWS only Infrastructure as Code EC2 Containers Serverless
  • tecRacer

    Cloud Migration Specialist

    tecRacer • Premier AWS Consulting Partner • Germany, Austria, Spain, and Switzerland
    Lift&Shift Transformation EC2 RDS VPC

It’s important to know that there is no total order of records in the stream. The order is only guaranteed for records of the same shard. So if you want to track actions by many users, it makes sense to select the user id as the partition key. Otherwise, you will not be able to retrieve the records (actions) of a user in order.

How do you insert records?

Kinesis Streams make it very simple to insert records. The application that inserts records is also called the Producer.
When you insert records, you don’t care about shards. You just provide the name of the stream, a partition key, and the payload. A simple Node.js example follows:

var kinesis = new AWS.Kinesis();
kinesis.putRecord({
Data: '{"action": "click", "productId": "product-123"}',
PartitionKey: 'user-123',
StreamName: 'cloudonaut-stream'
}, function(err, data) {
if (err) {
console.log(err, err.stack); // an error occurred
} else {
console.log(data); // successful response
}
});

putRecord Output:

{
ShardId: 'shardId-000000000001',
SequenceNumber: '49566461377660667441689347227228529918554355624453341202'
}

How do you read records?

The application that reads records is also called the Consumer.
Reading records is more tricky, because you can only read from a shard, not from a stream.
You need to create a Shard Iterator to read from a shard. The shard iterator contains the information of the next record to read. So when you read records, you will get the records and a new shard iterator that you can use to read the next records from the shard. A simple Node.js example follows:

var kinesis = new AWS.Kinesis();
kinesis.describeStream({
StreamName: 'cloudonaut-stream'
}, function(err, streamData) {
if (err) {
console.log(err, err.stack); // an error occurred
} else {
console.log(streamData); // successful response
streamData.StreamDescription.Shards.forEach(shard => {
kinesis.getShardIterator({
ShardId: shard.ShardId,
ShardIteratorType: 'TRIM_HORIZON',
StreamName: 'cloudonaut-stream'
}, function(err, shardIteratordata) {
if (err) {
console.log(err, err.stack); // an error occurred
} else {
console.log(shardIteratordata); // successful response
kinesis.getRecords({
ShardIterator: shardIteratordata.ShardIterator
}, function(err, recordsData) {
if (err) {
console.log(err, err.stack); // an error occurred
} else {
console.log(recordsData); // successful response
}
});
}
});
});
}
});

describeStream Output:

{
StreamDescription: {
StreamName: 'cloudonaut-stream',
StreamARN: 'arn:aws:kinesis:eu-west-1:123456123456:stream/cloudonaut-stream',
StreamStatus: 'ACTIVE',
Shards: [ [Object], [Object] ],
HasMoreShards: false,
RetentionPeriodHours: 24,
EnhancedMonitoring: [ [Object] ]
}
}

getShardIterator Outputs:

{
ShardIterator: 'AAAAAAAAAAFczeAFwB21etXem1x93AV2dp2aLsJ+8W9Kntx72U9TewMHgG1zQSQbykcdepxW5I'
}

{
ShardIterator: 'AAAAAAAAAAEyoN6Nw/vClRIOJ/dzg/7cE9PawEzP/KxbQHm3q41cvDXOpCzni0MP2lknXF'
}

getRecords Outputs:

{
Records: [{
SequenceNumber: '49566461377660667441689347227114890891510548871071662098',
ApproximateArrivalTimestamp: 2016-10-07T16:50:58.055Z,
Data: '{"action": "click", "productId": "product-123"}',
PartitionKey: 'user-123'
}, {
SequenceNumber: '49566461377660667441689347227116099817330163775124275218',
ApproximateArrivalTimestamp: 2016-10-07T16:51:00.726Z,
Data: '{"action": "click", "productId": "product-234"}',
PartitionKey: 'user-123'
}, {
SequenceNumber: '49566461377660667441689347227117308743149778541737934866',
ApproximateArrivalTimestamp: 2016-10-07T16:51:03.203Z,
Data: '{"action": "click", "productId": "product-456"}',
PartitionKey: 'user-123'
}],
NextShardIterator: 'AAAAAAAAAAH+L+N0v7t9gU2myNC7TmpJh15RXfHqSifCMnA+74HFvAtwlpInF5756vQi8NHTMZHfHzP7W2XWRWOiJ',
MillisBehindLatest: 0
}

{
Records: [{
SequenceNumber: '49566461377660667441689347227168083627573593860428791826',
ApproximateArrivalTimestamp: 2016-10-07T16:51:15.855Z,
Data: '{"action": "click", "productId": "product-123"}',
PartitionKey: 'user-234'
}],
NextShardIterator: 'AAAAAAAAAAHcxQHyANnjIVul8/W8AYN9LJSkfm0mCEE8P2zm+tExzLAZgrzuQxOwyOhCfosSKZ/3wKXP2CmJPzea5',
MillisBehindLatest: 0
}

That sounds pretty complicated, right? And that’s why most people use either the Kinesis Client Library or a Lambda function to read from Kinesis Streams. If possible, I recommend to use a Lambda function and connect it the Kinesis stream.

Summary

Kinesis Streams store real-time data for up to 7 days. A stream is composed of shards. Only inside a shard, there is an order of the records. When you insert records you can provide a partition key to group records together. When you read records, you read from a shard, not the whole stream. To keep track of the next record to read shard iterators are used.
Kinesis Streams is a managed service that is fault tolerant. It scales based on the number of shards you provision. You can change the number of shards at any time.
The pricing model is based on shard hours and insert record operations.

Become a cloudonaut supporter

Michael Wittig

Michael Wittig ( Email, Twitter, or LinkedIn )

We launched the cloudonaut blog in 2015. Since then, we have published 345 articles, 45 podcast episodes, and 37 videos. It's all free and means a lot of work in our spare time. We enjoy sharing our AWS knowledge with you.

Please support us

Have you learned something new by reading, listening, or watching our content? With your help, we can spend enough time to keep publishing great content in the future. Learn more

$
Amount must be a multriply of 5. E.g, 5, 10, 15.

Thanks to Alan Leech, Alex DeBrie, ANTHONY RAITI, Jaap-Jan Frans, Jason Yorty, Jeff Finley, Jens Gehring, jhoadley, Johannes Grumböck, John Culkin, Jonas Mellquist, Juraj Martinka, Kamil Oboril, Ken Snyder, Ross Mohan, Ross Mohan, sam onaga, Shawn Tolidano, Thorsten Hoeger, Todd Valentine, and all anonymous supporters for your help! We also want to thank all supporters who purchased a cloudonaut t-shirt.