Lessons from building a Serverless Data Pipeline with AWS Kinesis and Lambda

Erica Windisch
IOpipe Blog
Published in
6 min readAug 29, 2018

--

At IOpipe, we monitor user’s Lambda functions, receiving runtime data for each of a user’s function invocations. This means ingesting a lot of data, sometimes billions of records per day, which we do in realtime using Kinesis and Lambda.

Introducing Kinesis Streams

We picked Kinesis Streams to process this data as a hosted version of a service similar to Kafka, but different in important ways. For one, it’s managed, but also, it scales readily and the workers that consume it automatically scale, when combined with Lambda.

Kinesis accepts data via a “PutRecord” API call. It then invokes a Lambda function we have registered at often as once per second per shard, as long as there are records. (Code example: SAM Template Kinesis to Lambda)

Lambda Invocations

While Kinesis may be traditionally consumed by the Kinesis SDK with a long-running process, the serverless approach is to consume the stream with Lambda. This was one of the features that attracted us to Kinesis over alternatives such as Kafka.

Once we wrote and uploaded our Lambda function, we configured a trigger for Kinesis. We can specify the number of records it will process, where to set the shard iterator, and from which Kinesis Stream to consume.

Configuring a Kinesis trigger for AWS Lambda in the AWS Console.

We picked 1,000 records/invocation which is reasonable if you can process these records fast enough. If you receive only 1 record in 1 second, you should expect one Lambda to be invoked and passed a single record. If you have 1,000 records in 1 second, a single invocation will be passed all 1,000 records. If you have 2,000 records, two invocations will be passed 1,000 records each.

At IOpipe, we dogfood our own product and utilize a feature that automatically observes the number of records processed per invocation, the duration of our function, and the duration of API calls to our backend databases. This gives us good insight into the health and performance of the stream.

Lambda Durations

Because shard capacity is based on reads/second, to maximize the utilization of shards, aim to keep your Lambda invocations under 1s with a batch size of 1,000. This isn’t always possible, but smaller batch sizes or longer-running lambda invocations will require additional Kinesis shards. Not only does each shard carry a cost, as well as every additional lambda invocation, but there’s a maximum number of shards both per-stream and per-account.

202 Accepted

It can be useful to build asynchronous HTTP APIs on top of Kinesis as we have. I prefer in this case to return a 202 Accepted header rather than a 200 Ok, since this indicates that asynchronous processing has begun, without guaranteeing to the client that the request has been completed. The RFC indicates that you SHOULD also provide an indication to check on the status of the request, but it’s not required and there’s no standard for how to provide that indication, although an HTTP Location header is commonly used.

Common solutions to implementing this API:

  • Use API Gateway directly with a mapping-template and a service proxy to Kinesis, which is completely serverless. AWS documents this solution, although I suggest only exposing the PutRecord method with a PartitionKey specified in the mapping template. (AWS Documentation)
  • Deploy EC2 instances or ECS containers to run an HTTP API behind an AWS Elastic Loadbalancer. This is not serverless, but offers low-latency, high-throughput networking that does not suffer from coldstarts.
  • Implement inside of Lambda@Edge with Cloudfront. One of the advantages of this solution is because Cloudfront is global, a single application may be able to operate across all regions efficiently without configuring regional endpoints.

None of these solutions are offered out of the box and will require custom development or complex configuration.

Finally, if latency to your HTTP client is important, it may be best to operate a Kinesis stream in each region your application services, with separate regional endpoints. Backend processing can then be handled distributed across regions, or centralized to a global stream using a Kinesis-to-Kinesis transfer. This might be mitigated somewhat by using the Lambda@Edge solution, but we have not (yet) benchmarked this.

Kinesis to Kinesis

It’s not too uncommon to shuttle data between Kinesis streams. If writing your own code to do this, it’s notable that while Kinesis reads can be up to 1,000 records/invocation with Lambda, the PUT operation to insert records into another Kinesis stream is limited to 500 records! This limitation requires setting the Kinesis trigger to batch only 500 records or adding logic to execute multiple API calls to the receiving Kinesis stream.

Capacity Planning

The Kinesis Stream will consist of one or more shards. Each shard handles 2MB/s in data, or 1,000 records per second. The Lambda trigger may be configured to consume anywhere from 1–1,000 event batches. Importantly, it’s worth noting that per this math, Lambda seems to only run as many concurrent invocations as there are Kinesis shards. If you build a Kinesis Stream with 10 shards, Kinesis will be readable at 10,000 records/sec, and will invoke up to 10 Lambdas concurrently, in 10 separate containers.

Incoming records are assigned to shards at the time of writing, so if a stream is receiving too many records and they cannot be processed quickly enough, increasing the number of shards may not immediately solve the problem without load-balancing writes into the new shards.

Database Connections

Our functions communicate to several databases, including one which writes to Postgres. Traditional relational databases consume significant memory for each connection, and Lambda automatically spins up and disposes of containers (and thus connections) fairly often. Thankfully, Kinesis does help minimize the impact of this problem slightly.

One of the challenges of Lambda is that being such a highly scalable service, it is possible with some event triggers to quickly spawn up to 1,000 concurrent containers, by default. That would mean 1,000 connections to a database before considering any other application (or Lambda) that might also be making connections. With PostgreSQL, including Amazon’s RDS, this can quickly exhaust memory.

Kinesis manages this problem somewhat out of the box because the Lambda concurrency is tied to the number of shards. Thus, a Kinesis stream with 10 shards will initiate at least 10 database connections.

At the application side, the database connection should be maintained within a global variable as these persist across invocations, up until a container is recycled. Database pooling should generally be disabled, or configured to persist only a single connection. If your database is still experiencing memory or connection count exhaustion, one can consider an intermediary service such as pgbouncer.

Iterator Resets

Kinesis readers consume streams based on an iterator. It might happen due to some failure that you need to delete your Lambda trigger and recreate it. If that happens, your iterator will be reset, and the stream will be processed from a set point in time, from: the oldest record, the latest record, or a specific timestamp.

It’s worth noting that it’s basically impossible to guarantee that stream processing can be stopped through a trigger deletion, and then restarted from the same point in time because records may not be read in a guaranteed order. Each shard has its own iterator and its own records, but operators can only specify their Lambda trigger based on a single timestamp, there’s no per-shard iterator configuration available.

Monitoring & Observability

Maintaining the health of the consuming Lambda and the Kinesis stream itself are important. Ingestion and processing should be real-time, and when it isn’t, you can run into a number of problems. To keep tabs on this, AWS Cloudwatch provides metrics for your Kinesis Streams:

Kinesis monitoring dashboard inside the AWS Console

We also utilize the IOpipe service itself to observe this from inside the Lambda invocations themselves. We can see how many Kinesis records were received by an invoked Lambda, and shuffle through the stream, looking at all invocations of all Lambdas trigged by a single stream. This is simplified by both the search and prev/next invocation features!

Give us your feedback!

We’d love to hear about your own stories and lessons learned from using AWS Kinesis Streams, especially with Lambda. Those choosing Kafka or Kinesis Firehose, we’d love to hear those stories too!

To get instant observability into the most granular behavior of your serverless applications, try IOpipe for free today or schedule a demo for your team.

--

--

Building Streaming AI/ML | Cloud Computing Pioneer | Serverless Architect | Observability Founder & CTO