An in-depth look at Amazon Kinesis and a comparison to Apache Kafka

AmazonKinesis_Overview_Comparison_ApacheKafka Head Image

In this article, we take an in-depth look at the Amazon Kinesis product family and report from experience on the advantages and disadvantages of Amazon Kinesis Data Streams compared to Apache Kafka. Who is winning the race?

Amazon Kinesis, or more precisely Amazon Kinesis Data Streams, is Amazon’s answer to Apache Kafka – the de facto standard in real-time data streaming. Amazon’s Apache Kafka alternative is used primarily in the USA, but in terms of user numbers, it is well behind Apache Kafka in second place. Why is this and is it still worth taking a look at Amazon Kinesis? This article answers these and many other questions. The focus is on Amazon Kinesis Data Streams and especially the comparison to Apache Kafka. However, the other services of Amazon Kinesis are also briefly presented, so that a first impression of the entire range of services is made possible as a starting point for further research.

Amazon Kinesis was introduced in late 2013 as an Amazon Web Services (AWS) service for real-time processing of data streams. Although the term Amazon Kinesis is often used to refer to Amazon Kinesis Data Streams, the streaming service itself (i.e., the “Apache Kafka Alternative”), Amazon Kinesis also includes other products: Kinesis Video Streams, Kinesis Data Firehose, and Kinesis Data Analytics. But more on that later. Amazon Kinesis is used, for example, by big players such as Netflix, Comcast, Adobe, Thomson Reuters, but also by many smaller companies. Just recently, Amazon Kinesis made headlines because of an outage in North Virginia (us-east-1 region). Although the outage affected only one of the 24 AWS regions, at noon on Nov. 25, 2020, several services from vendors such as Adobe, Roku, Flickr, Coinbase, Garmin, and Amazon’s own Health Dashboard were no longer accessible or functional. At first glance, this extraordinary event seems daunting – but if you put this outage in relation to, for example, downtimes of other Managed Apache Kafka providers, this event is considerably less impressive, not to mention the comparison to self-operated (Apache Kafka) clusters.

As part of AWS, an evaluation of Amazon Kinesis only makes sense if your own company is already using AWS or at least the use of AWS is on the roadmap. If the cloud is out of the question for whatever reason, it is not worth looking at Amazon Kinesis: unlike Apache Kafka, Amazon Kinesis is not available on-prem and thus cannot be operated separately from AWS. If, on the other hand, various AWS services are already in use or are planned for use, Amazon Kinesis should be at the top of the list when evaluating (real-time) event streaming technologies. But first things first: what are the differences between the individual services “Data Streams”, “Data Firehose”, etc., and how does Amazon Kinesis Data Streams actually work?

The four musketeers

Amazon Kinesis is the umbrella term for four services that all revolve around real-time streaming at their core and complement each other: Video Streams, Data Streams, Data Analytics, and Data Firehose.

Amazon Kinesis Video Streams, as the name suggests, is about delivering, processing, and analyzing video streams from various devices in real-time, including in combination with machine learning (e.g., real-time computer vision capabilities).

Amazon Kinesis Data Streams is the focus of this article and refers to a scalable and fully managed service for streaming data sets in real-time.

Amazon Kinesis Data Analytics provides the ability to build applications based on either SQL or Apache Flink. Apache Flink is a framework and distributed processing engine for stateful computation over batch and event data streams. Flink’s strengths are computations at in-memory speeds and high scalability. In addition, flowcharts of the individual processing steps can be visualized in Flink via the separate Plan Visualizer.

When it comes to typical data integration scenarios and thus the connection of data sinks, e.g. Amazon Redshift, Elasticsearch, or third-party providers, Amazon Kinesis Data Firehose comes into play. The fully managed service takes care of ingesting, transforming, and delivering streaming data to Amazon S3, Amazon Redshift, Amazon ElasticSearch, and other third-party providers like Splunk, Datadog, New Relic, etc.

Most companies will use a combination of the latter three services, i.e. Amazon Kinesis Data Streams, Analytics, and Firehose, with Amazon Kinesis Data Streams being the technical foundation. So how is Amazon Kinesis Data Streams built?

The technical structure

In Amazon Kinesis Data Streams (hereafter referred to as Amazon Kinesis), a data set (or “event”) consists of a sequence number, a partition key, and a data blob. The data blob contains the actual data to be stored as bytes (or internally Base 64 serialized), i.e. as in Apache Kafka without schema information, and must not be larger than 1 megabyte (MiB).

As is common with message queues, records are generated by applications (or AWS services), called producers, and in turn, consumed by other applications, called consumers. This process is asynchronous. A message queue (or message broker) holds the messages for a certain time for consumers so that they can be retrieved and processed by the producer at any time. The message broker thus acts as a buffer between producers and consumers.

Fig. 1: Messages are assigned to a shard within a stream in Kinesis Data Streams via the partition key.

Duplicates can occur when events are delivered or written. Accordingly, it is important to design the complete pipeline and logic idempotent.

Similar to Apache Kafka, Amazon Kinesis also stores generated events immutably as a sequence in temporal order (“append-only”). While such a sequence is called a topic in Apache Kafka, it is simply called a stream in Amazon Kinesis. A stream is divided into individual shards that allow horizontal scaling. Unlike partitions in Apache Kafka, shards are much more flexible: in conjunction with the Amazon Kinesis scaling utilities, for example, new shards can be added when the load is heavier and removed again when the load is lighter (“merging”, also called “resharding”). Since Amazon Kinesis costs are primarily based on the number of shards, it is much more cost-effective to respond to different load scenarios. With Apache Kafka, such dynamic adaptation is not possible: once created, you cannot get rid of partitions with Apache Kafka. That is, unlike Amazon Kinesis shards, reducing the number of partitions in a topic is not supported in Apache Kafka.

The assignment of events to a shard in Amazon Kinesis is done using the partition key. A partition key (or simply “key” of an event) is a 256-character Unicode string. Using an MD5 hash function, the key is finally automatically assigned to a matching shard. This behavior can be overridden so that an event can also be explicitly assigned to a shard.

As with partitions in Apache Kafka, the sequence is guaranteed only within a shard. In order to uniquely identify an event in a shard, one speaks of a sequence number, in contrast to the offset in Apache Kafka. However, due to the technical underpinnings of Amazon Kinesis, this sequence number is composed completely differently than the event offset in Apache Kafka: while in Apache Kafka the first event in a partition starts at 0 and this number is incremented by one for each subsequent event (i.e., 0, 1, …, n), the sequence number in Kinesis is significantly larger and does not start at 0. Between two successive records, the sequence number can be larger by a large number. To illustrate this, here are two sequence numbers only 10 seconds apart as an example:

15:03:33: 49616572585056148483474760884126200414157508034292613122
15:03:43: 49616572585056148483474760886251492005040026810620837890

While in Apache Kafka the offset can be used to skip specific records (offset + n), this is not possible in Amazon Kinesis. Although the sequence number is incremented continuously, the difference between two sequence numbers does not allow any statement about the actual events in between. For example, the two sequence numbers shown belong to two successive records – there are no other records between them.

Another difference to Apache Kafka is the lack of a separate area for own meta information for a record. In contrast to Apache Kafka, Amazon Kinesis does not know any “headers” that can be used to include schema versions or a producer ID, for example. Thus, it is not uncommon to add this information to the payload of a dataset in Amazon Kinesis at the top level. The actual data then moves down one level. A JSON object could then contain a list for “Header” under an attribute of the same name and the actual payload as a “Data” object.

How fast is Amazon Kinesis?

When considering real-time data streaming technologies, a crucial aspect is data throughput and speed. Amazon Kinesis makes it very easy to make a solid statement compared to Apache Kafka, where the actual (stable) performance depends heavily on the setup and the settings chosen:

  • Per shard, 1 MiB or 1000 records can be written per second (“Ingress”).
  • 2 MiB/second can be read per shard (“egress”), i.e. all consumers share the quota, or
  • Per shard, 2 MiB/second can be read per consumer if they are Enhanced Fan-Out (EFO) consumers.

With Apache Kafka, it naturally depends on your own infrastructure, settings or provider. While most Apache Kafka providers require renting VMs on which the individual Apache Kafka brokers run (which, of course, greatly reduces scalability), Confluent offers a comparable pricing model based on topics and partitions with the Confluent Cloud. The throughput per partition is limited to 5 MiB/second for Ingress and to 15 MiB/second for Egress with the Confluent Cloud. The throughput is shared among all consumers here. However, the total data throughput for each Egress and Ingress for the entire cluster must not exceed 100 MiB/second. In comparison, Amazon Kinesis limits the default quota to 200 shards in small and 500 shards per AWS account in large regions like US-East(Virginia), US-West(Oregon), and Europe (Ireland) – so the limit is 1000 MiB/second Egress and 500 MiB Ingress total. If this is not enough, it is possible to request an increase in the quota.

However, in addition to the data throughput limitation, there are other limitations that need to be considered: for example, various API calls in Amazon Kinesis are limited to a certain number per second. This also applies to the API call to GetRecords, which returns the actual records of a shard. I discuss the resulting idiosyncrasies further in the “Pitfalls from the field” section.

A comparison with a self-powered Apache Kafka cluster (e.g. in the form of VMs) would go beyond the scope of this article. In order to ensure stable operation with higher data throughput, some tuning of the settings is necessary. Not without reason, the Total Cost of Ownership (TCO) of an own Apache Kafka cluster is not only calculated according to the infrastructure costs but also according to person-days.

Enhanced Fan-Out (EFO) Consumers
By default, consumers in Kinesis Data Streams share the available bandwidth of 2 MiB/s per shard. This means that the sum of the data throughput of all “simple” consumers cannot exceed this value. If this data throughput is required for a dedicated consumer or group of consumers, Enhanced Fan-Out (EFO) Consumers are used.
Enhanced fan-out consumers have a dedicated 2 MiB/s available, independent of other consumers of the shard. In addition, such a consumer benefits from a lower latency of 70 milliseconds in contrast to 200 to (situation-dependent) 1000 milliseconds of “simple” consumers. The latency corresponds to the time required to pass a data set from producer to consumer.
Enhanced Fan-Out, compared to the “simple” consumers, does not use a pull, but a push methodology with backpressure. Kinesis remembers the bytes that have been sent to the client and have not yet been acknowledged by the client. If the client cannot handle the data rate, Kinesis stops the data flow until the next acknowledgement is made or the client is listed as inactive after 5 minutes. The client must then resubscribe to the shard. There is an additional cost for using Enhanced Fan-Out, which is listed in the pricing table.

Pricing model – a sample calculation

Amazon Kinesis offers a highly flexible pricing model that is based on the actual usage of the service. As with other AWS services, the calculation of a usage scenario is based on cent amounts. However, the price calculation for a scenario at Amazon Kinesis is quite simple: the two decisive dimensions for the price are the number of shards, which can be dynamically added or removed depending on the load, and the so-called PUT payload units, which measure the amount of data to be fed in.

One PUT Payload Unit corresponds to one record or, if the record is larger than 25KB, to one 25KB chunk that is added to a stream or shard. For example, if 50 records with a size of 50KB (i.e. two PUT Payload Units) per record and 50 records with a size of 3KB (the 3KB are each rounded up to one PUT Payload Unit) are added per second, this scenario corresponds to 150 PUT Payload Units per second. The price for PUT Payload Units is given for each 1,000,000 units. In our fictitious example with a static load behavior (in reality, the number of records generated would likely vary over time), 150 PUT Payload Units are consumed per second, or 401,760,000 PUT Payload Units per month. The price for 1 million PUT Payload Units is currently $0.0175 for the eu-central-1 (Frankfurt) region, so the cost of PUT Payload Units for the scenario is $7.03 (401,760,000 / 1,000,000 * $0.0175).

Added to this are the costs for the shards: With 50 data sets of 50 KB and 50 data sets of 3 KB (each per second), the data throughput is 2,650 KB / second or 2.65MIB / second – rounded up to 3MIB / second. Since 1MIB / second is available per shard for writing data, three shards are required, which currently cost $0.018 per shard hour. The costs for the shards thus add up to $40.176 (3 * $0.018 * 24 * 31) per month.

In summary, the scenario incurs a cost of ~$47.21 per month, assuming only events from the last 24 hours are relevant to consuming applications or data storage is external to Amazon Kinesis.

If the data is to be kept in a shard for longer than 24 hours, the “Extended data retention” option must be added per shard. Additional costs of $0.024 per shard are then due per hour and the data retention period is extended to 7 days. If this is not sufficient, the option “Long-term data storage” can be added. The data is then retained in the shards for a freely configurable period, but for a maximum of one year. The costs for this option are calculated according to the amount of data in GB that is to be retained per month. However, there are additional costs for retrieving data that can be stored for more than 7 days with this option.

The retention period can be flexibly adjusted: if an application still needs to rework some data from the past, the retention period can thus be increased for a short time and, after the consumer has caught up with enough data, reset again. An overview of all costs using the example of the Frankfurt region (eu-central-1) can be found in Table 1.

PriceUnitDescription
$0,0175 (per million)PUT Payload Unit
(per 1 million PUT Payload Units)
Data throughput measured by the number of data sets or 25KB portions if the data set is larger than 25KB.
Example:
50KB data set = 2 PUT Payload Units,
3KB data set = 1 PUT Payload Unit 
$0,018Shard HourEach shard provides 1MIB / second Ingress, or 2MIB / second Egress. Shards can be dynamically added or removed, or merged (“resharding”). Data is retained for 24 hours.
$0,024Optional: Extended data storage (up to 7 days), per shard hourData is retained for up to 7 days per shard. Data older than 7 days is successively deleted.
$0,0245Optional: Long-term data storage (longer than 7 days), per GB / monthData will not be deleted. The cost is calculated by GB per month. Maximum retention period is 1 year. Calculation for 1,000 GB per month of new data added: 1,000 x $0.0245 = $24.50 per month
$0,0252Optional: Retrieval of long-term data storage (longer than 7 days), per GBCost of retrieving data stored for more than 7 days per GB.
$0,018Optional: Enhanced Fan-Out (EFO), per shard hourCost for an EFO consumer, per shard.
$0,0156Optional: Enhanced Fan-Out (EFO) data retrieval per GBCost to retrieve data per EFO consumer, per GB.
Table 1: Cost composition when using Amazon Kinesis using the example of the eu-central-1 region (Frankfurt), as of April 2021.

A few steps to the finished use case

Amazon Kinesis already integrates with many other AWS services in just a few clicks. Whether it is connecting event sourcing to the existing highly scalable no-SQL database AWS DynamoDB in a few mouse clicks or loading data to Amazon Redshift, converting JSON data to Apache Parquet for cost-efficient storage or building sophisticated streaming applications with Kinesis Data Analytics based on Apache Flink (incl. Apache Beam support) without having to manage a server. Without much effort, complete use cases can be implemented in a few hours with Amazon Kinesis tools.

Fig. 2: Applications can be created using SQL or Apache Flink via Kinesis Data Analytics.

Apache Kafka, in direct comparison with the Schema Registry, KSQL, and Kafka Connect, also offers the possibility of implementing complete use cases with onboard means. In practice, however, this is much more complex and often requires the development of separate applications (e.g. with KStream), which must run & be managed in their own container.

Amazon Kinesis benefits from strong integration with more and more AWS services. The change data capture scenario mentioned above with DynamoDB, which can be done with just a few clicks, is just one of many examples.

Again, typically only what is actually used is paid for. Amazon Kinesis Data Analytics applications are scaled automatically based on memory consumption and data throughput. An example calculation can be found on the pricing page of Amazon Kinesis Data Analytics. The AWS Pricing Calculator can also be used to generate an estimate of the costs for the desired scenario.

Consuming data and making many decisions

So how do I read data from Amazon Kinesis Data Streams? In principle, this is not so easy to answer, since AWS provides a variety of options for this. In addition to the Kinesis Data Analytics and Kinesis Data Firehose services already presented, i.e. processing data via SQL or Flink, or the managed streaming data delivery service, there are considerably more options. I briefly present the most important of these below.

Fig. 3: Kinesis Data Firehose can even send records to an HTTP endpoint after processing.

Consuming with AWS Lambda

If you want to remain completely serverless, AWS Lambda is the obvious choice. When connecting to Kinesis Data Streams, the Event Source Mapping feature of the AWS Lambda service comes into play. This allows a Lambda function to be directly connected to a Kinesis Data Stream. The event source mapping offers a lot of configuration options. For example, the individual data sets of a shard can be combined into configurable (micro) batches in advance and thus trigger the Lambda function. The size of the batch and the time window can be defined so that processing via an instance of a Lambda function only takes place when:

  • the number of records is present, which was defined as batch size, or
  • the specified time window has expired, or
  • the payload limit of 6 MiB of the Lambda Service has been reached.

 By default, one instance of the lambda function is executed synchronously for each shard. If this is not sufficient for large data volumes, a parallelization factor can be specified, allowing up to 10 lambda functions to be executed per shard. The shard’s sort order is no longer completely preserved if the parallelization factor is greater than 1. However, the incoming data sets are distributed to the individual lambda functions using the same methodology already applied to the shards, namely the hash of the partition key. This way, an instance of a lambda function always receives all events with the same partition key. Listing 1 shows an example event.

Listing 1

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

Lambda functions are basically stateless. However, states can be passed from one execution to the next execution via time windows and thus, for example, sums or averages can be calculated. At the end of the time window, it is possible to calculate a result from the state, on which a configurable action can then be executed.

If an error occurs within the AWS Lambda function, the batch is redelivered in its entirety by default. This is regardless of whether part of the batch has already been successfully processed: Duplicates are the result.

Three major factors play a role in error handling:

  • the configurable maximum age of records,
  • the configurable number of retries,
  • and the retention time of the shard.

When one of these cases occurs, the records (i.e. the complete batch) are discarded and the subsequent records combined as a batch are delivered. During the period in which the faulty (micro) batch is repeatedly passed to the Lambda function, all subsequent batches of the shard, or of the “mini-shard” created by Lambda (with a parallelization factor greater than 1), are not processed further in order to preserve the sequence. Depending on the setting, a permanent failure can thus lead to a delay in the processing of the remaining batches of up to one week.

There is also another aspect to consider: if the subsequent data sets were generated at about the same time as those of the faulty batch, they will correspondingly also run at about the same time and thus not be processed. In the worst case, if the lambda function does not catch up in time, this can lead to a situation where successive individual records disappear from the stream before they have been processed. For this reason, it is extremely important to choose sensible values for the above settings and to configure a strategy for faulty records. Lambda offers the following possibilities to deal with such errors:

  • The faulty record can be reported back as return value of the Lambda function (see Listing 2).
  • An on-failure destination can be defined to which faulty records are delivered after reaching the maximum retries (e.g. SQS or SNS).

If the Lambda function, as described in the first point, returns the faulty data set or its sequence number, the batch is split at this point and only the part starting with the faulty data set is transferred again.

Listing 2

{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNo>"
        }
    ]
}

Consumption, the second: Kinesis Client Library

In addition to the ability to use AWS Lambda as a consumer for data streams, the Kinesis Client Library (KCL) is provided for building custom applications in the classic sense (for example, in a container environment). KCL is not just an abstraction layer over the AWS Kinesis API calls. For KCL to work, a worker (scheduler) is started within your own application. This takes care of automatic load balancing between multiple (distributed) consumer applications, provides mechanisms for error handling, and it flags records that have already been processed. The Kinesis Client Library uses AWS DynamoDB for storing leases. Workers and shards are linked via these reads. Metrics about the worker and consumers are also published to CloudWatch.

In combination with the Kinesis Producer Library (KPL), which can be used to write data sets, high-performance data pipelines become possible. For example, (micro)batches created by the Kinesis Producer Library are automatically disaggregated by the Kinesis Client Library. For more details, see the section “Writing files with the Kinesis Producer Library”.

To implement a consumer with KCL, a Kinesis scheduler must be created at application startup. When the scheduler is created, the stream name, a Kinesis client, a DynamoDB client and CloudWatch client, and a class that implements the ShardRecordProcessorFactory interface are passed as parameters. This class in turn returns an implementation of the ShardRecordProcessor interface that provides the actual processing logic. Thus, for example, it is not necessary for developers to worry about dynamic retrieval of current shards, coordination between distributed consumers (“who reads from which shard?”), and retry logic in case of failure. At least not to the full extent.

KCL thus allows the developer to focus on developing the essential application logic without having to worry about the complex mechanisms involved in distributed streaming applications. With version 2.x, KCL now also supports Enhanced Fan-Out Consumers (see box).

Consuming, the third: Kinesis Data Streams API

Of course, instead of the methodologies already listed, the Kinesis Data Streams API of the AWS SDK can also be used directly.

This is most similar to using Apache Kafka’s consumer or client API, although the distribution of shards to consumers is not handled by Kinesis. If you run the same consumer in parallel, for example to achieve faster processing, the distribution of the shards is left to the developer. If one does not want this burden, one should fall back on the Kinesis Consumer Library mentioned in the previous section.

To consume data via the Kinesis Data Streams API of the AWS SDK, a KinesisClient object is created via the associated builder, which is used in the further course to retrieve the streams, shards, and data. A KinesisAsyncClient is also provided for asynchronous calls. To keep the code examples a bit shorter, the KinesisClient is used in the following. In practice, I recommend the use of the asynchronous variant.

Unlike Apache Kafka, where various actions are only possible through either the KafkaProducer, KafkaConsumer or AdminClient object, all Kinesis Data Streams functions are available through the KinesisClient.

To consume data from a shard, a GetRecords request must be executed. The GetRecords request needs a ShardIterator as a reference, which provides the current position of the pointer in the shard for the call. The builder pattern is used consistently when creating the request objects (e.g. ShardIteratorRequest or GetRecordsRequest).

To create a ShardIterator, the name of the stream, the Shard Id, and the Shard Iterator type are necessary. There are five different types to choose from, which influence the starting position of the pointer in the shard:

  • AT_SEQUENCE_NUMBER – Read from a specified sequence number (inclusive).
  • AFTER_SEQUENCE_NUMBER – read from a specified sequence number (exclusive).
  • AT_TIMESTAMP – Read from the first record, starting from a timestamp.
  • TRIM_HORIZON – Read from the oldest record in the stream (“earliest” in Apache Kafka).
  • LATEST – Read from the latest record in the stream (“latest” in Apache Kafka).

Depending on the type, a sequence number or a timestamp must also be passed. The ShardIterator created in this way is passed to the GetRecordsRequest Builder and the maximum number of records to be retrieved is specified. The maximum is 10,000 records per call. Listing 3 shows the associated code.

Listing 3

KinesisClient kinesisClient = KinesisClient.builder()
				.region(Region.EU_CENTRAL_1)
				.credentialsProvider(() ->
					AwsBasicCredentials.create("myKey","mySecret"))
				.build();

GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
				.shardIteratorType(ShardIteratorType.LATEST)
				.shardId("shardId-000000000000")
				.streamName("myStream")
				.build();

String currentShardIterator = kinesisClient
.getShardIterator(shardIteratorRequest)
.shardIterator();

while(true) {

	GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
				.shardIterator(currentShardIterator)
				.build();

GetRecordsResponse records = kinesisClient
.getRecords(recordsRequest);
	
records.records().forEach(record ->
			System.out.println(record.data().asUtf8String()));
	
currentShardIterator = records.nextShardIterator();
	Thread.sleep(1_000);

}

A list of all shards with their associated ids can be dynamically retrieved for the respective stream using the +ListShards method of the KinesisClient.

In response to the call of the +getRecords method of the KinesisClient with the GetRecordsRequest object as an argument, one gets a GetRecordsResponse which, in addition to the records (or no records – see section “Pitfalls from the field”), returns a new ShardIterator to be used the next time +getRecords is called or the GetRecordsRequest object is created, containing the next position.

The position of the previous data record cannot be determined. This is an obstacle in that, as mentioned in the technical description of Kinesis Data Streams, sequence numbers do not provide any information about the preceding records and cannot be calculated by themselves. Apache Kafka, on the other hand, simply subtracts a desired number from the current offset to arrive at the particular record before the current one (e.g., offset – 1 for the previous record). If knowledge about the previous records is required (for example, to jump to the beginning of a batch), this information must either be carried in the records or stored externally.

The actual number of records returned by +getRecords depends on:

  • The size of the data sets in combination with the limit of 2 MiB/s per shard.
  • How many consumers consume records from the shard in parallel (limit of 2MIB/s for “simple consumers” per shard).
  • Whether the limit of 5 calls per second per shard has been exceeded.

Despite the limit of 2 MiB/s per shard, a GetRecordsRequest can return up to 10 MiB. After that, however, the next successful call is only possible after another 5 seconds, so the limit of 2 MiB/s still holds. This seemingly contradictory feature is due to the internal, technical design of Kinesis Data Streams.

Writing data with the Kinesis Producer Library

There are also several options for writing data. Once again, the integration of other AWS services should be mentioned, as well as Kinesis Data Analytics.

The Kinesis Producer Library (KPL) exists as a counterpart to the Kinesis Client Library (KCL) already presented. This also offers several advantages and allows the developer to concentrate on the essential application logic. The supplied functionality is:

  • Configurations of an error handling strategy.
  • Collect multiple write requests of records for multiple shards and combine them into one request.
  • Aggregate (micro-batch) data sets to increase payload size and data throughput. The batches are automatically disaggregated in interaction with the Kinesis Client Library.
  • A CloudWatch connection for performance metrics (analogous to KCL).

The Kinesis Producer Library is a java library that includes a  C++ daemon available for all major platforms. The binaries are part of the package available through Maven Central.

Aggregating records into a (micro)batch is a feature that can greatly increase data throughput when writing. The amount of data that can be written to a shard per second is limited to 1,000 records or 1 MiB. If the producer writes over 1,000 smaller records per second, this limit will cause the producer to be slowed down over the number, even though the 1 MiB limit is far from reached because of the small size of the records. Therefore, it makes sense to group smaller data sets together as a payload to ensure the most efficient workload when writing. The Kinesis Producer Library does this automatically.

Listing 4 shows the code for a simple producer using the Kinesis Producer Library.

Listing 4

KinesisProducer kinesis = new KinesisProducer();  
ByteBuffer data = ByteBuffer.wrap("myData". getBytes("UTF-8")); 
kinesis.addUserRecord("myStream", "myPartitionKey", data); 

Since KPL uses a buffer to send data sets, higher latencies may occur. For use cases where low latency is highly critical, direct use of the Kinesis Data Streams API of the AWS SDK is recommended, which is discussed in the next section.

Writing data with the Kinesis Data Streams API

The Kinesis Producer Library takes a lot of work out of frequently occurring use cases. In return, however, you also lose some freedom: many aspects can no longer be controlled independently. Therefore, the implementation of a Producer with the already known AWS SDK or the Kinesis Data Streams API is presented below.

Via the static method +KinesisClient.builder, a KinesisClient object is created, analogous to the creation of a consumer with the Kinesis Data Streams API. Also, instead of the synchronous KinesisClient shown in the example, a KinesisAsyncClient can again be used here for asynchronous operations. The name of the stream and a list of records to be published are passed to a PutRecordsRequest object. The builder pattern is used here again.

This list contains the individual records as PutRecordsRequestEntry objects. A PutRecordsRequestEntry consists of the data as bytes and the partition key.

If the list of PutRecordsRequestEntry objects was passed to the PutRecordsRequest object via the builder method of +records, the +putRecords method of the KinesisClient object is called with the PutRecordsRequest object as an argument.

Calling the +putRecords method of the KinesisClient returns an object of the PutRecordsResponse type. In case of error, this method throws an exception. All records that were successfully or unsuccessfully processed can be accessed through the +records method of the PutRecordsResponse object. The individual PutRecordsResultEntry objects contain the error code and the associated message.

If you want to transfer a single record to Kinesis Data Streams, the singular is used instead of the plural, i.e. PutRecordRequest, and passed to the +putRecord method of the KinesisClient object.

If multiple records are sent, as shown when using +putRecords of the KinesisClient in Listing 5, only a single request is sent containing all the records. This way, up to 500 records or a maximum of 5 MiB can be transferred with one call. On the other hand, when the method is called for a single record, a single request is executed at a time. It is unnecessary to mention that each request is associated with a certain overhead and can lead to enormous latencies in total. For this reason, combining several data records in one request is always preferable to sending individual data records – if the use case permits this.

Listing 5

KinesisClient kinesisClient = KinesisClient.builder()
				.region(Region.EU_CENTRAL_1)
				.credentialsProvider(() ->
					AwsBasicCredentials.create("myKey","mySecret"))
				.build();

List<PutRecordsRequestEntry> records =
	Stream.generate(() -> 
		PutRecordsRequestEntry.builder()
			.data(SdkBytes.fromUtf8String("myData"))
			.partitionKey("myPartitionKey")
			.build())
		.limit(100)
		.collect(Collectors.toList());

PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder()
				.records(records)
				.streamName("myStream")
.build();

PutRecordsResponse result = kinesisClient.putRecords(putRecordsRequest);

Error handling

Even if the name of the +putRecords method suggests otherwise: the write operation initiated by this is not atomic. Thus, in the event of an error, only part of the batch may have been written. A reason for this can be, for example, reaching the maximum data throughput per second per shard.

If an error occurs, an exception is thrown so that a new attempt can be started. It is recommended to proceed according to typical practices, such as exponential backoff with a random value (jitter), in order to prevent an even distribution. Especially when multiple processes are writing in parallel, a failure that spans multiple processes can cause load spikes due to the even distribution of retries. There is a detailed article on this on the AWS Architecture Blog.

Pitfalls from the practice – Put this section before Conclusion

While Amazon Kinesis Data Streams is serverless, especially when combined with the other Amazon Kinesis components or with other AWS services, and in-house development and operations can be kept to a minimum, there are one or two pitfalls when deployed in practice.

As already mentioned, shards can be reconfigured at any time (“resharding”) and thus the resources consumed can be adapted very flexibly to the required needs. In data integration scenarios, Amazon Kinesis Data Firehose manages scaling autonomously and accordingly adapts the necessary resources to the existing data throughput in an automated manner. For use cases that cannot be implemented via Amazon Kinesis Data Firehose, however, you have to work directly with the streams again, and that’s where things get complicated: auto-scaling for data streams themselves is not offered. To achieve this, Amazon suggests in a blog article the use of a combination of five components, namely Cloudwatch, Application Auto Scaling, API Gateway, a Lambda function and the Parameter Store.

Cloudwatch monitors the load (and other KPIs) and sends a notification to Application Auto Scaling when a certain threshold is exceeded. Application Auto Scaling in turn communicates with the Lambda function via REST through the API Gateway. The Lambda function then calls the UpdateShardCount API from Amazon Kinesis and increases or decreases the shards for the respective stream. The Store parameter is needed to keep the state (the number of shards). As a cloudformation template, this system of five components is relatively easy to put into operation.

However, there are still certain restrictions on changing the number of shards. For example, only a maximum of 10 changes are possible within 24 hours, the maximum number of shards you want to downscale must not exceed half of the current shards, and so on. The full list of restrictions is also listed in the article.

If replication of a stream to multiple AWS regions or another environment is required, this is also not possible without using a Lambda function. However, there is already a project on GitHub that provides all the necessary information.

A final point concerns the limitation of API calls, which was briefly touched on above: if you want to jump to a point in time in the past and work your way forward from there to the current record, this can cause puzzlement. In particular, coming from Apache Kafka, one expects that any call to GetRecords (in Apache Kafka “poll”) will return records as long as there are records in the stream. Because of the internal design of Amazon Kinesis, an empty array may be returned even if there are more records in the stream.

Amazon Kinesis does not store individual records sequentially at the first level in a continuous log. Rather, the records are aggregated and written to a “partition” of the log or the respective shard. This is similar to the log files in Apache Kafka partitions, which are also split when they reach a certain size. However, because of different clients, data of the own account is not necessarily found in the section. Despite multiple clients, the data is of course strictly separated (into separate sections) and cannot be read out via another account.

Thus, if no record is returned, this only means that there is no record near or in the section of the shard pointer. With a further call, the shard pointer is moved further and encounters the next section within the shard, which may contain its own data records.

There is a limit to how many times the call to GetRecords can be executed per second per shard. This circumstance leads to the fact, to come back to our use case, that it can take considerably longer than with Apache Kafka until one has worked one’s way from the past to the present and this way is lined with countless empty return objects.

Conclusion

The Amazon Kinesis family of products, particularly through the integration of the various other services provided by AWS, offers a wealth of capabilities through which a large number of use cases can be implemented completely serverless and with relatively little overhead.

My daily driver has always been Apache Kafka – with all its strengths and weaknesses. I first discovered Amazon Kinesis through the integration of Amazon Kinesis into our data exploration / management solution. Since the integration of Amazon Kinesis into KaDeck was done in close cooperation with the Amazon Kinesis team, I gained insight into many technical details and was able to test Amazon Kinesis extensively. In the process, I encountered one or two stumbling blocks and noticed some peculiarities (especially coming from Apache Kafka), which I have pointed out in the appropriate section of this article.

I was very impressed with the wealth of capabilities that Amazon Kinesis offers. Among them the Managed Apache Flink environment and the Lambda & DynamoDB integration. The prerequisite, of course, is to use AWS. It is definitely worth taking a look at this cloud, especially in the context of real-time data streaming. Running your own infrastructure components for such use cases is often a major challenge. AWS solutions can lead to significant cost savings with greater stability here. If the cloud or AWS is a red rag, Amazon Kinesis is basically not an option.

Let’s get to the elephant in the room: is Amazon Kinesis better than Apache Kafka? It depends: on the use case, the environment, and the flavor. When developing real-world use cases, Apache Kafka often gets lost in technical details. Not only when there are high requirements for guarantees (e.g. no record must be lost, no duplicate processing, etc.), but also for simpler use cases, the developer needs in-depth knowledge of the technical details of Apache Kafka to develop applications efficiently.

Amazon Kinesis, on the other hand, offers a larger number of suitable solutions out-of-the-box for simple use cases. If one cannot fall back on these functions in one’s use case, a deep, technical understanding is required here as well. In particular, the various limitations hidden in the documentation (API calls, data throughput, …) can drive up the complexity enormously.

At Xeotek, we now use Amazon Kinesis for internal processes, but we have also worked a lot with AWS before. Since Amazon Kinesis is unfortunately not yet as well known as Apache Kafka, I hope that this article has shown you an alternative to the top dog Apache Kafka.

In the end, as always, make up your own mind and share your experience via Twitter at @benjaminbuick. If you have any questions, please don’t hesitate to contact me.

Ben


Benjamin Buick is the CEO of Xeotek and an iSAQB certified software architect. Xeotek is the company behind KaDeck, a Data Exploration & Management solution for working with Apache Kafka and Amazon Kinesis.