Optimize write throughput for Amazon Kinesis Knowledge Streams


Amazon Kinesis Knowledge Streams is utilized by many shoppers to seize, course of, and retailer information streams at any scale. This degree of unparalleled scale is enabled by dividing every information stream into a number of shards. Every shard in a stream has a 1 Mbps or 1,000 data per second write throughput restrict. Whether or not your information streaming utility is gathering clickstream information from an online utility or recording telemetry information from billions of Web of Issues (IoT) gadgets, streaming functions are extremely inclined to a various quantity of knowledge ingestion. Typically such a big and surprising quantity of knowledge could possibly be the factor we least anticipate. As an illustration, contemplate utility logic with a retry mechanism when writing data to a Kinesis information stream. In case of a community failure, it’s frequent to buffer information regionally and write them when connectivity is restored. Relying on the speed that information is buffered and the period of connectivity subject, the native buffer can accumulate sufficient information that might saturate the obtainable write throughput quota of a Kinesis information stream.

When an utility makes an attempt to jot down extra information than what’s allowed, it can obtain write throughput exceeded errors. In some cases, not having the ability to tackle these errors in a well timed method can lead to information loss, sad clients, and different undesirable outcomes. On this put up, we discover the standard causes behind write throughput exceeded errors, together with strategies to determine them. We then information you on swift responses to those occasions and supply a number of options for mitigation. Lastly, we delve into how on-demand capability mode may be worthwhile in addressing these errors.

Why will we get write throughput exceeded errors?

Write throughput exceeded errors are usually attributable to three completely different situations:

  • The only is the case the place the producer utility is producing extra information than the throughput obtainable within the Kinesis information stream (the sum of all shards).
  • Subsequent, we’ve the case the place information distribution will not be even throughout all shards, generally known as scorching shard subject.
  • Write all through errors may also be attributable to an utility selecting a partition key to jot down data at a charge exceeding the throughput provided by a single shard. This example is considerably just like scorching shard subject, however as we see later on this put up, in contrast to a scorching shard subject, you possibly can’t resolve this drawback by including extra shards to the information stream. This habits is often generally known as a scorching key subject.

Earlier than we focus on find out how to diagnose these points, let’s take a look at how Kinesis information streams arrange information and its relationship to jot down throughput exceeded errors.

A Kinesis information stream has a number of shards to retailer information. Every shard is assigned a key vary in 128-bit integer area. When you view the main points of a knowledge stream utilizing the describe-stream operation within the AWS Command Line Interface (AWS CLI), you possibly can truly see this key vary project:

$ aws kinesis describe-stream --stream-name my-data-stream
"StreamDescription": {
  "Shards": [
    {
      "ShardId": "shardId-000000000000",
      "HashKeyRange": {
        "StartingHashKey": "0",
        "EndingHashKey": 
        "85070591730234615865843651857942052863"
       }
    },
    {
       "ShardId": "shardId-000000000001",
       "HashKeyRange": {
       "StartingHashKey": 
          "85070591730234615865843651857942052864",
       "EndingHashKey": 
         "170141183460469231731687303715884105727"
       }
    }
  ]
}

When a producer utility invokes the PutRecord or PutRecords API, the service calculates a MD5 hash for the PartitionKey specified within the file. The ensuing hash is used to find out which shard to retailer that file. You’ll be able to take extra management over this course of by setting the ExplicitHashKey property within the PutRecord request to a hash key that falls inside a particular shard’s key vary. As an illustration, setting ExplicitHashKey to 0 will assure that file is written to shard ID shardId-0 within the stream described within the previous code snippet.

How partition keys are distributed throughout obtainable shards performs a significant position in maximizing the obtainable throughput in a Kinesis information stream. When the partition key getting used is repeated steadily in a method that some keys are extra frequent than the others, shards storing these data might be utilized extra. We additionally get the identical internet impact if we use ExplicitHashKey and our logic for selecting the hash secret’s biased in the direction of a subset of shards.

Think about you’ve gotten a fleet of internet servers logging efficiency metrics for every internet request served right into a Kinesis information stream with two shards and also you used a request URL because the partition key. Every time a request is served, the appliance makes a name to the PutRecord API carrying a 10-bytes file. Let’s say that you’ve a complete of 10 URLs and every receives 10 requests per second. Underneath these circumstances, complete throughput required for the workload is 1,000 bytes per second and 100 requests per second. If we assume good distribution of 10 URLs throughout the 2 shards, every shard will obtain 500 bytes per second and 50 requests per second.

Now think about one among these URLs went viral and it began receiving 1,000 requests per second. Though the scenario is optimistic from a enterprise perspective, you’re now getting ready to making customers sad. After the web page gained recognition, you’re now counting 1,040 requests per second for the shard storing the favored URL (1000 + 10 * 4). At this level, you’ll obtain write throughput exceeded errors from that shard. You’re throttled primarily based on the requests per second quota as a result of even with elevated requests, you’re nonetheless producing roughly 11 KB of knowledge.

You’ll be able to resolve this drawback both by utilizing a UUID for every request because the partition key so that you just share the full load throughout each shards, or by including extra shards to the Kinesis information stream. The tactic you select depends upon the way you wish to eat information. Altering the partition key to a UUID can be problematic in order for you efficiency metrics from a given URL to be at all times processed by the identical shopper occasion or if you wish to preserve the order of data on a per-URL foundation.

Understanding the precise explanation for write all through exceeded errors is a crucial step in remediating them. Within the subsequent sections, we focus on find out how to determine the foundation trigger and remediate this drawback.

Figuring out the reason for write throughput exceeded errors

Step one in fixing an issue is that understanding that it exists. You need to use the WriteProvisionedThrougputExceeded metric in Amazon CloudWatch on this case. You’ll be able to correlate the spikes within the WriteProvisionedThrougputExceeded metric to the IncomingBytes and IncomingRecords metrics to determine whether or not an utility is getting throttled as a result of dimension of knowledge or the variety of data written.

Let’s take a look at a number of checks we carried out in a stream with two shards as an instance numerous situations. On this occasion, with two shards in our stream, complete throughput obtainable to our producer utility is both 2 Mbps or 2,000 data per second.

Within the first check, we ran a producer to jot down batches of 30 data, every being 100 KB, utilizing the PutRecords API. As you possibly can see within the graph on the left of the next determine, our WriteProvisionedThroughputExceedded errors depend went up. The graph on the appropriate reveals that we’re reaching the two Mbps restrict, however our incoming data charge is far decrease than the two,000 data per second restrict (Kinesis metrics are printed at 1-minute intervals, therefore 125.8 and 120,000 as higher limits).Record size based throttling example

The next figures present how the identical three metrics modified once we modified the producer to jot down batches of 500 data, every being 50 bytes, within the second check. This time, we exceeded the two,000 data per second throughput restrict, however our incoming bytes charge is properly below the restrict.

Record count based throttling

Now that we all know that drawback exists, we should always search for clues to see if we’re exceeding the general throughput obtainable within the stream or if we’re having a scorching shard subject as a consequence of an imbalanced partition key distribution as mentioned earlier. One strategy to that is to make use of enhanced shard-level metrics. Previous to our checks, we enabled enhanced shard-level metrics, and we are able to see within the following determine that each shards equally reached their quota in our first check.

Enhanced shard level metrics

We have now seen Kinesis information streams containing 1000’s of shards harnessing the facility of infinite scale in Kinesis information streams. Nevertheless, plotting enhanced shard-level metrics on a such massive stream could not present a straightforward to option to discover out which shards are over-utilized. In that occasion, it’s higher to make use of CloudWatch Metrics Insights to run queries to view top-n gadgets, as proven within the following code (regulate the LIMIT 5 clause accordingly):

-- Present prime 5 shards with highest incoming bytes
SELECT
SUM(IncomingBytes)
FROM "AWS/Kinesis"
GROUP BY ShardId, StreamName
ORDER BY MAX() DESC
LIMIT 5

-- Present prime 5 shards with highest incoming data
SELECT
SUM(IncomingRecords)
FROM "AWS/Kinesis"
GROUP BY ShardId, StreamName
ORDER BY MAX() DESC
LIMIT 5

Enhanced shard-level metrics are usually not enabled by default. When you didn’t allow them and also you wish to carry out root trigger evaluation after an incident, this selection isn’t very useful. As well as, you possibly can solely question the most up-to-date 3 hours of knowledge. Enhanced shard-level metrics additionally incur extra prices for CloudWatch metrics and it could be value prohibitive to have it at all times on in information streams with numerous shards.

One attention-grabbing situation is when the workload is bursty, which may make the ensuing CloudWatch metrics graphs somewhat baffling. It is because Kinesis publishes CloudWatch metric information aggregated at 1-minute intervals. Consequently, though you possibly can see write throughput exceeded errors, your incoming bytes/data graphs could also be nonetheless inside the limits. For example this situation, we modified our check to create a burst of writes exceeding the boundaries after which sleep for a number of seconds. Then we repeated this cycle for a number of minutes to yield the graphs within the following determine, which present write throughput exceeded errors on the left, however the IncomingBytes and IncomingRecords graphs on the appropriate appear superb.

Effect of one data aggregated at 1-minute intervals

To reinforce the method of figuring out write throughput exceeded errors, we developed a CLI device referred to as Kinesis Sizzling Shard Advisor (KHS). With KHS, you possibly can view shard utilization when shard-level metrics are usually not enabled. That is significantly helpful for investigating a problem retrospectively. It may additionally present most steadily written keys to a selected shard. KHS reviews shard utilization by studying data and aggregating them per second intervals primarily based on the ApproximateArrivalTimestamp within the file. Due to this, you can too perceive shard utilization drivers throughout bursty write workloads.

By operating the next command, we are able to get KHS to examine the information that arrived in 1 minute throughout our first check and generate a report:

khs -stream my-data-stream -from "2023-06-22 17:35:00" -to "2023-06-22 17:36:00"

For the given time window, the abstract part within the generated report reveals the utmost bytes per second charge noticed, complete bytes ingested, most data per second noticed, and the full variety of data ingested for every shard.

KHS report summary

Selecting a shard ID within the first column will show a graph of incoming bytes and data for that shard. That is just like the graph you get in CloudWatch metrics, besides the KHS graph reviews on a per-second foundation. As an illustration, within the following determine, we are able to see how the producer was going by a sequence of bursty writes adopted by a throttling occasion throughout our check case.

KHS shard level metrics display

Operating the identical command with the -aggregate-key choice allows partition key distribution evaluation. It generates a further graph for every shard exhibiting the important thing distribution, as proven within the following determine. For our check situation, we are able to solely see every key getting used one time as a result of we used a brand new UUID for every file.

KHS key distribution graph

As a result of KHS reviews primarily based on information saved in streams, it creates an enhanced fan-out shopper at startup to forestall utilizing the learn throughput quota obtainable for different customers. When the evaluation is full, it deletes that enhanced fan-out shopper.

Due its nature of studying information streams, KHS can switch numerous information throughout evaluation. As an illustration, assume you’ve gotten a stream with 100 shards. If all of them are absolutely utilized throughout a minute window specified utilizing -from and -to arguments, the host operating KHS will obtain at the least 1 MB * 100 * 60 = 6000 MB = roughly 6 GB information. To keep away from this type of extreme information switch and velocity up the evaluation course of, we suggest first utilizing the WriteProvisionedThroughoutExceeded CloudWatch metric to determine a time interval if you skilled throttling and use a small window (corresponding to 10 seconds) with KHS. You may as well run KHS in an Amazon Elastic Compute Cloud (Amazon EC2) occasion in the identical AWS Area as your Kinesis information stream to reduce community latency throughout reads.

KHS is designed to run in a single machine to diagnose large-scale workloads. Utilizing a naive in-memory-based counting algorithm (corresponding to a hash map storing the partition key and depend) for partition key distribution evaluation might simply exhaust the obtainable reminiscence within the host system. Subsequently, we use a probabilistic information construction referred to as count-min-sketch to estimate the variety of occasions a key has been used. In consequence, the quantity you see within the report needs to be taken as an approximate worth somewhat than an absolute worth. In spite of everything, with this report, we simply wish to discover out if there’s an imbalance within the keys written to a shard.

Now that we perceive what causes scorching shards and find out how to determine them, let’s take a look at find out how to take care of this in producer functions and remediation steps.

Remediation steps

Having producers retry writes is a step in the direction of making our producers resilient to jot down throughput exceeded errors. Think about our earlier pattern utility logging efficiency metrics information for every internet request served by a fleet of internet servers. When implementing this retry mechanism, it’s best to keep in mind that data that aren’t written to the Kinesis stream are going to be in host system’s reminiscence. The primary subject with that is, if the host crashes earlier than the data could possibly be written, you’ll expertise information loss. Eventualities corresponding to monitoring internet request efficiency information is perhaps extra forgiving for this sort of information loss than situations like monetary transactions. You need to consider sturdiness ensures required on your utility and make use of strategies to attain them.

The second subject is that data ready to be written to the Kinesis information stream are going to eat the host system’s reminiscence. If you begin getting throttled and have some retry logic in place, it’s best to discover that your reminiscence utilization goes up. A retry mechanism ought to have a option to keep away from exhausting the host system’s reminiscence.

With the suitable retry logic in place, for those who obtain write throughput exceeded errors, you should use the strategies we mentioned earlier to determine the trigger. After you determine the foundation trigger, you possibly can select the suitable remediation step:

  • If the producer utility is exceeding the general stream’s throughput, you possibly can add extra shards to the stream to extend its write throughput capability. When including shards, the Kinesis information stream makes the brand new shards obtainable incrementally, minimizing the time that producers expertise write throughput exceeded errors. So as to add shards to a stream, you should use the Kinesis console, the update-shard-count operation within the AWS CLI, the UpdateShardCount API by the AWS SDK, or the ShardCount property within the AWS CloudFormation template used to create the stream.
  • If the producer utility is exceeding the throughput restrict of some shards (scorching shard subject), decide one of many following choices primarily based on shopper necessities:
    • If locality of knowledge is required (data with the identical partition key are at all times processed by the identical shopper) or an order primarily based on partition secret’s required, use the split-shard operation within the AWS CLI or the SplitShard API within the AWS SDK to separate these shards.
    • If locality or order primarily based on the present partition key will not be required, change the partition key scheme to extend its distribution.
  • If the producer utility is exceeding the throughput restrict of a shard as a consequence of a single partition key (scorching key subject), change the partition key scheme to extend its distribution.

Kinesis Knowledge Streams additionally has an on-demand capability mode. In on-demand capability mode, Kinesis Knowledge Streams mechanically scales streams when wanted. Moreover, you possibly can swap between on-demand and provisioned capability modes with out inflicting an outage. This could possibly be significantly helpful if you’re experiencing write throughput exceeded errors however require rapid response to maintain your utility obtainable to your customers. In such cases, you possibly can swap a provisioned capability mode information stream to an on-demand information stream and let Kinesis Knowledge Streams deal with the required scale appropriately. You’ll be able to then carry out root trigger evaluation within the background and take corrective actions. Lastly, if mandatory, you possibly can change the capability mode again to provisioned.

Conclusion

You need to now have a stable understanding of the frequent causes of write throughput exceeded errors in Kinesis information streams, find out how to diagnose them, and what actions to take to appropriately take care of them. We hope that this put up will aid you make your Kinesis Knowledge Streams functions extra strong. In case you are simply beginning with Kinesis Knowledge Streams, we suggest referring to the Developer Information.

If in case you have any questions or suggestions, please go away them within the feedback part.


Concerning the Authors

Buddhike de Silva is a Senior Specialist Options Architect at Amazon Net Providers. Buddhike helps clients run massive scale streaming analytics workloads on AWS and make the very best out of their cloud journey.

Nihar Sheth is a Senior Product Supervisor at Amazon Net Providers. He’s keen about creating intuitive product experiences that resolve advanced buyer issues and allow clients to attain their enterprise targets.

Recent Articles

Related Stories

Leave A Reply

Please enter your comment!
Please enter your name here