Introducing the real-time data store: Kinesis Streams

Kinesis is all about real-time data:

  • Kinesis Streams are a temporary store for real-time data. Append new events or read all events in order.
  • Kinesis Analytics helps you to analyze data in real-time.
  • Kinesis Firehose ingests real-time data into data stores like S3, Elasticsearch or Redshift for batch analytics.

This blog post will introduce Kinesis Streams as the foundation of real-time data processing on AWS.

What is a stream?

Imagine a timeline where you can add events at the current time, and you will have a good mental model of a Kinesis stream. The following animation will demonstrate the idea.

Kinesis Stream Animation

Arriving data is symbolized as yellow circles and called a Record. The current time is generated by the blue Kinesis Stream System. You can only insert records at the current time. You can not add records in the past or the future. This implies that records are ordered by time, inside a stream.

If you want to read records from a stream you have three options:

  • read from the beginning of the stream to the present
  • read from a certain record (identified by a sequence number) to the present
  • read from a certain time to the present
    Depending on your use case you are interested in historical records or not. A Kinesis Stream supports both cases. But there is one limitation: Kinesis Streams can keep records only 1-7 days. After that, records are deleted. Kinesis is not a long term storage.

How does it scale?

A single Kinesis Stream is composed of Shards. A single shard supports:

  • 5 read operations per second (you read in batches)
  • maximum total data read rate of 2 MB per second
  • insertion of up to 1,000 records per second
  • maximum total data write rate of 1 MB per second

Records are assigned to shards based on a Partition Key. Records with the same partition key are always assigned to the same shard. The following animation will demonstrate the idea using two shards and representing partition keys with colors.

Kinesis Shard Animation

It’s important to know that there is no total order of records in the stream. The order is only guaranteed for records of the same shard. So if you want to track actions by many users, it makes sense to select the user id as the partition key. Otherwise, you will not be able to retrieve the records (actions) of a user in order.

How do you insert records?

Kinesis Streams make it very simple to insert records. The application that inserts records is also called the Producer.
When you insert records, you don’t care about shards. You just provide the name of the stream, a partition key, and the payload. A simple Node.js example follows:

var kinesis = new AWS.Kinesis();
kinesis.putRecord({
  Data: '{"action": "click", "productId": "product-123"}',
  PartitionKey: 'user-123',
  StreamName: 'cloudonaut-stream'
}, function(err, data) {
  if (err) {
    console.log(err, err.stack); // an error occurred
  } else {
    console.log(data); // successful response
  }
});

putRecord Output:

{
  ShardId: 'shardId-000000000001',
  SequenceNumber: '49566461377660667441689347227228529918554355624453341202'
}

How do you read records?

The application that reads records is also called the Consumer.
Reading records is more tricky, because you can only read from a shard, not from a stream.
You need to create a Shard Iterator to read from a shard. The shard iterator contains the information of the next record to read. So when you read records, you will get the records and a new shard iterator that you can use to read the next records from the shard. A simple Node.js example follows:

var kinesis = new AWS.Kinesis();
kinesis.describeStream({
  StreamName: 'cloudonaut-stream'
}, function(err, streamData) {
  if (err) {
    console.log(err, err.stack); // an error occurred
  } else {
    console.log(streamData); // successful response
    streamData.StreamDescription.Shards.forEach(shard => {
      kinesis.getShardIterator({
        ShardId: shard.ShardId,
        ShardIteratorType: 'TRIM_HORIZON',
        StreamName: 'cloudonaut-stream'
      }, function(err, shardIteratordata) {
        if (err) {
          console.log(err, err.stack); // an error occurred
        } else {
          console.log(shardIteratordata); // successful response
          kinesis.getRecords({
            ShardIterator: shardIteratordata.ShardIterator
          }, function(err, recordsData) {
            if (err) {
              console.log(err, err.stack); // an error occurred
            } else {
              console.log(recordsData); // successful response
            }
          });
        }
      });
    });
  }
});

describeStream Output:

{
  StreamDescription: {
    StreamName: 'cloudonaut-stream',
    StreamARN: 'arn:aws:kinesis:eu-west-1:123456123456:stream/cloudonaut-stream',
    StreamStatus: 'ACTIVE',
    Shards: [ [Object], [Object] ],
    HasMoreShards: false,
    RetentionPeriodHours: 24,
    EnhancedMonitoring: [ [Object] ] 
  }
}

getShardIterator Outputs:

{
  ShardIterator: 'AAAAAAAAAAFczeAFwB21etXem1x93AV2dp2aLsJ+8W9Kntx72U9TewMHgG1zQSQbykcdepxW5I'
}

{
  ShardIterator: 'AAAAAAAAAAEyoN6Nw/vClRIOJ/dzg/7cE9PawEzP/KxbQHm3q41cvDXOpCzni0MP2lknXF'
}

getRecords Outputs:

{
  Records: [{
    SequenceNumber: '49566461377660667441689347227114890891510548871071662098',
    ApproximateArrivalTimestamp: 2016-10-07T16:50:58.055Z,
    Data: '{"action": "click", "productId": "product-123"}',
    PartitionKey: 'user-123'
  }, {
    SequenceNumber: '49566461377660667441689347227116099817330163775124275218',
    ApproximateArrivalTimestamp: 2016-10-07T16:51:00.726Z,
    Data: '{"action": "click", "productId": "product-234"}',
    PartitionKey: 'user-123'
  }, {
    SequenceNumber: '49566461377660667441689347227117308743149778541737934866',
    ApproximateArrivalTimestamp: 2016-10-07T16:51:03.203Z,
    Data: '{"action": "click", "productId": "product-456"}',
    PartitionKey: 'user-123'
  }],
  NextShardIterator: 'AAAAAAAAAAH+L+N0v7t9gU2myNC7TmpJh15RXfHqSifCMnA+74HFvAtwlpInF5756vQi8NHTMZHfHzP7W2XWRWOiJ',
  MillisBehindLatest: 0
}

{
  Records: [{
    SequenceNumber: '49566461377660667441689347227168083627573593860428791826',
    ApproximateArrivalTimestamp: 2016-10-07T16:51:15.855Z,
    Data: '{"action": "click", "productId": "product-123"}',
    PartitionKey: 'user-234'
  }],
  NextShardIterator: 'AAAAAAAAAAHcxQHyANnjIVul8/W8AYN9LJSkfm0mCEE8P2zm+tExzLAZgrzuQxOwyOhCfosSKZ/3wKXP2CmJPzea5',
  MillisBehindLatest: 0
}

That sounds pretty complicated, right? And that’s why most people use either the Kinesis Client Library or a Lambda function to read from Kinesis Streams. If possible, I recommend to use a Lambda function and connect it the Kinesis stream.

Summary

Kinesis Streams store real-time data for up to 7 days. A stream is composed of shards. Only inside a shard, there is an order of the records. When you insert records you can provide a partition key to group records together. When you read records, you read from a shard, not the whole stream. To keep track of the next record to read shard iterators are used.
Kinesis Streams is a managed service that is fault tolerant. It scales based on the number of shards you provision. You can change the number of shards at any time.
The pricing model is based on shard hours and insert record operations.

Read on


Share

           

RSS

  RSS

Newsletter


Michael Wittig

Michael Wittig

I’m the author of Amazon Web Services in Action. I work as a software engineer, and independent consultant focused on AWS and DevOps. Hire me!

Is anything missing in my article? I'm looking forward to your feedback! @hellomichibye or michael@widdix.de.


Published on