Bite-Sized Serverless

Lambda Icon

Filter DynamoDB Event Streams Sent to Lambda

Lambda - Intermediate (200)
At Re:Invent 2021, AWS announced that events streamed from DynamoDB to Lambda Functions can now be filtered at the source. This allows us to build cleaner, more efficient and more cost-effective solutions. This feature is powered by Event Source Mappings, the invisible engine converting streams and queues to batches. In this Bite we will dive deep on Event Source Mappings and its filtering capabilities.

Introducing Event Source Mappings

Event Source Mappings are a free service provided by AWS as part of their Lambda offering. They sit between a source queue or stream and a Lambda Function. They read messages up to a configured maximum, and then forward those messages as a batch to a Lambda Function.
Event Source Mappings can be configured with a BatchSize. This value specifies how many messages the Event Source Mapping should collect before invoking the Lambda Function. Setting the BatchSize to a value of 1 results in a Lambda Function invocation for every message on the source. A lower BatchSize leads to faster event processing but higher cost. When using DynamoDB Streams, Kinesis Data Streams or SQS as a source, you can also configure a value for MaximumBatchingWindowInSeconds. This value specifies how long the Event Source Mapping will wait before forwarding the messages. When either the BatchSize, the MaximumBatchingWindowInSeconds, or Lambda's maximum payload size of 6MB is reached, the Lambda Function is invoked.

Additional configuration for streaming sources

Streaming sources (Kinesis Data Streams and DynamoDB Streams) support a number of additional properties:
To learn more about these options, follow the links above.

Introducing event filtering

In November 2021 AWS introduced event filtering for SQS, DynamoDB Streams and Kinesis Data Streams sources. This feature is part of the Event Source Mapping, which means events are filtered before they are sent to the Lambda Function.
Without event filtering, every single message on the source would be sent to the consuming Lambda Function. This might lead to very inefficient and expensive implementations. Consider, for example, a DynamoDB Table with millions of updates per day. You might only want to process newly inserted user entities, but the unfiltered stream contains inserts, updates and deletes of every other entity as well. You would have to either split your DynamoDB table into two - one for high traffic and one for low traffic - or you would have to drop irrelevant messages as they are received by your Lambda Function.
With event filtering on Event Source Mappings this is a problem of the past. Now you can choose which events are put in the Lambda batch, based on metadata and / or data in the event. In the sections below, we will look at a number of example filters for SQS and DynamoDB sources. All examples are included in a CDK project available for download at the bottom of this page.

SQS filtering

An SQS record delivered to a Lambda Function looks like this:
1{ 2 "messageId": "7f137b23-0ac4-4fde-be3b-23b36584e9a2", 3 "receiptHandle": "AQEBPhWzck21oVX5U....", 4 "body": "Test message.", 5 "attributes": { 6 "ApproximateReceiveCount": "1", 7 "SentTimestamp": "1640892485223", 8 "SenderId": "AROA2YGTX4BEXAMPLE", 9 "ApproximateFirstReceiveTimestamp": "1640892485228" 10 }, 11 "messageAttributes": { 12 "Priority": { 13 "stringValue": "HIGH", 14 "binaryValue": null, 15 "stringListValues": [], 16 "binaryListValues": [], 17 "dataType": "String" 18 } 19 }, 20 "md5OfMessageAttributes": "0eaa197e5037b3185fbdc80819713bc8", 21 "md5OfBody": "1926c8d6aa1900172a0fb95cb4696330", 22 "eventSource": "aws:sqs", 23 "eventSourceARN": "arn:aws:sqs:eu-west-1:123412341234:FilterDynamodb...", 24 "awsRegion": "eu-west-1" 25}
A filter that only forwards events where the metadata message attribute "Priority" matches the value "HIGH" would look like this in CDK (at the time of writing native support for filter_criteria was not yet available, so we use an escape hatch).
1# sqs_high_priority.py in CDK project 2high_priority = lambda_.CfnEventSourceMapping( 3 scope=self, 4 id="HighPriorityEventSourceMapping", 5 function_name=processor_function.function.function_name, 6 event_source_arn=queue.queue_arn, 7 maximum_batching_window_in_seconds=1, 8 batch_size=1, 9) 10 11high_priority.add_property_override( 12 property_path="FilterCriteria", 13 value={ 14 "Filters": [ 15 { 16 "Pattern": json.dumps( 17 { 18 "messageAttributes": { 19 "Priority": {"stringValue": ["HIGH"]} 20 }, 21 } 22 ) 23 }, 24 ], 25 }, 26)
In the payload above, the body is a simple string, not a JSON object. If we would like to match only strings that start with "Test", we would write the following filter.
1# sqs_test_prefix.py in CDK project 2test_prefix = lambda_.CfnEventSourceMapping( 3 scope=self, 4 id="TestPrefixEventSourceMapping", 5 function_name=processor_function.function.function_name, 6 event_source_arn=queue.queue_arn, 7 maximum_batching_window_in_seconds=1, 8 batch_size=1, 9) 10 11test_prefix.add_property_override( 12 property_path="FilterCriteria", 13 value={ 14 "Filters": [ 15 {"Pattern": json.dumps({"body": [{"prefix": "Test"}]})}, 16 ], 17 }, 18)
If the body is a JSON object, we can filter on the fields in the object. For example, if the message looks like this:
1{ 2 "messageId": "7f137b23-0ac4-4fde-be3b-23b36584e9a2", 3 "receiptHandle": "AQEBPhWzck21oVX...", 4 "body": "{\"event_type\": \"NEW_PURCHASE\", \"price\": 201}", 5 "attributes": { 6 "ApproximateReceiveCount": "1", 7 "SentTimestamp": "1640892485223", 8 "SenderId": "AROA2YGTEXAMPLE", 9 "ApproximateFirstReceiveTimestamp": "1640892485228" 10 }, 11 "messageAttributes": { 12 "ExampleAttribute": { 13 "stringValue": "ExampleValue", 14 "binaryValue": null, 15 "stringListValues": [], 16 "binaryListValues": [], 17 "dataType": "String" 18 }, 19 "Priority": { 20 "stringValue": "HIGH", 21 "binaryValue": null, 22 "stringListValues": [], 23 "binaryListValues": [], 24 "dataType": "String" 25 } 26 }, 27 "md5OfMessageAttributes": "0eaa197e5037b3185fbdc80819713bc8", 28 "md5OfBody": "1926c8d6aa1900172a0fb95cb4696330", 29 "eventSource": "aws:sqs", 30 "eventSourceARN": "arn:aws:sqs:eu-west-1:123412341234:FilterDynamodb...", 31 "awsRegion": "eu-west-1" 32}
We could match messages where the event_type is "NEW_PURCHASE" and the price is above 200:
1# sqs_new_purchase_price.py in CDK project 2new_purchase_price = lambda_.CfnEventSourceMapping( 3 scope=self, 4 id="NewPurchasePriceEventSourceMapping", 5 function_name=processor_function.function.function_name, 6 event_source_arn=queue.queue_arn, 7 maximum_batching_window_in_seconds=1, 8 batch_size=1, 9) 10 11new_purchase_price.add_property_override( 12 property_path="FilterCriteria", 13 value={ 14 "Filters": [ 15 {"Pattern": json.dumps({"body": {"event_type": ["NEW_PURCHASE"]}})}, 16 { 17 "Pattern": json.dumps( 18 {"body": {"price": [{"numeric": [">", 200]}]}} 19 ) 20 }, 21 ], 22 }, 23)

DynamoDB filtering

Like with SQS, filters on DynamoDB Streams also allow us to match on metadata and data. The following JSON is an example event from a DynamoDB Stream:
1{ 2 "eventID": "1", 3 "eventVersion": "1.0", 4 "dynamodb": { 5 "Keys": { 6 "Id": { 7 "N": "101" 8 } 9 }, 10 "NewImage": { 11 "Message": { 12 "S": "New item!" 13 }, 14 "Id": { 15 "N": "101" 16 } 17 }, 18 "StreamViewType": "NEW_AND_OLD_IMAGES", 19 "SequenceNumber": "111", 20 "SizeBytes": 26 21 }, 22 "awsRegion": "us-west-2", 23 "eventName": "INSERT", 24 "eventSourceARN": "eventsourcearn", 25 "eventSource": "aws:dynamodb" 26}
If we would like to match only newly added items, we would write this metadata filter:
1# ddb_inserts_only.py in CDK project 2inserts_only = lambda_.CfnEventSourceMapping( 3 scope=self, 4 id="InsertsOnlyEventSourceMapping", 5 function_name=processor_function.function.function_name, 6 event_source_arn=table.table_stream_arn, 7 maximum_batching_window_in_seconds=1, 8 starting_position="TRIM_HORIZON", 9 batch_size=1, 10 destination_config={ 11 "onFailure": { 12 "destination": failure_destination.queue_arn, 13 }, 14 }, 15) 16 17inserts_only.add_property_override( 18 property_path="FilterCriteria", 19 value={ 20 "Filters": [ 21 { 22 "Pattern": json.dumps( 23 { 24 "eventName": ["INSERT"], 25 } 26 ) 27 }, 28 ], 29 }, 30)
And if we would only want to receive newly inserted items where the attribute EntityType matches "User":
1# ddb_insert_users.py in CDK project 2user_inserts_only = lambda_.CfnEventSourceMapping( 3 scope=self, 4 id="UserInsertsOnlyEventSourceMapping", 5 function_name=processor_function.function.function_name, 6 event_source_arn=table.table_stream_arn, 7 maximum_batching_window_in_seconds=1, 8 starting_position="TRIM_HORIZON", 9 batch_size=1, 10 destination_config={ 11 "onFailure": { 12 "destination": failure_destination.queue_arn, 13 }, 14 }, 15) 16 17user_inserts_only.add_property_override( 18 property_path="FilterCriteria", 19 value={ 20 "Filters": [ 21 { 22 "Pattern": json.dumps( 23 { 24 "eventName": ["INSERT"], 25 "dynamodb": { 26 "NewImage": {"EntityType": {"S": ["User"]}} 27 }, 28 } 29 ) 30 }, 31 ], 32 }, 33)

Control data processing through item attributes

In our last example we wrote a filter on the attribute EntityType. If this attribute is not present, the Event Source Mapping will not include the message in the batch sent to the Lambda Function. This behavior allows us to control which events should be processed by the Lambda Function through the items' attributes.
Take a financial sytem processing transactions. Some transactions are flagged for anomalies and should be processed by our Lambda Function. We could add an attribute FlaggedAt, which is only set for a small percentage of items in our table. The following filter would only match newly inserted or updated items where the FlaggedAt attribute is present:
1# ddb_new_and_updated_flagged.py in CDK project 2ddb_new_and_updated_flagged = lambda_.CfnEventSourceMapping( 3 scope=self, 4 id="DdbNewAndUpdatedFlaggedEventSourceMapping", 5 function_name=processor_function.function.function_name, 6 event_source_arn=table.table_stream_arn, 7 maximum_batching_window_in_seconds=1, 8 starting_position="TRIM_HORIZON", 9 batch_size=1, 10 destination_config={ 11 "onFailure": { 12 "destination": failure_destination.queue_arn, 13 }, 14 }, 15) 16 17ddb_new_and_updated_flagged.add_property_override( 18 property_path="FilterCriteria", 19 value={ 20 "Filters": [ 21 { 22 "Pattern": json.dumps( 23 { 24 "eventName": ["INSERT", "MODIFY"], 25 "dynamodb": { 26 "NewImage": {"FlaggedAt": {"S": [{"exists": True}]}} 27 }, 28 } 29 ) 30 }, 31 ], 32 }, 33)
Note that "exists" matching only works on leaf nodes, which is the "S" in our example. You cannot use the exists filter on an intermediate node like "FlaggedAt". If our FlaggedAt value could be either a string or a number, we would need multiple filters:
1# ddb_new_and_updated_flagged.py in CDK project 2ddb_new_and_updated_flagged.add_property_override( 3 property_path="FilterCriteria", 4 value={ 5 "Filters": [ 6 { 7 "Pattern": json.dumps( 8 { 9 "eventName": ["INSERT", "MODIFY"], 10 "dynamodb": { 11 "NewImage": {"FlaggedAt": {"S": [{"exists": True}]}} 12 }, 13 } 14 ) 15 }, 16 { 17 "Pattern": json.dumps( 18 { 19 "eventName": ["INSERT", "MODIFY"], 20 "dynamodb": { 21 "NewImage": {"FlaggedAt": {"N": [{"exists": True}]}} 22 }, 23 } 24 ) 25 }, 26 ], 27 }, 28)

Caveat: SQS messages can only be processed once

In the CDK project attached to this Bite you will find multiple Event Source Mappings on the same SQS Queue and DynamoDB Table. You should be aware of a significant caveat in the way multiple Event Source Mappings on a single queue behave.
When multiple Event Source Mappings are configured on the same queue they are simultaneously polling that queue. When a new message arrives, it will be consumed by any of the Event Source Mappings and compared against its filter. If a message matches the filter, it is put in the Lambda Function batch. When a message does not pass the filter it is not returned to the queue. The message will be considered complete, and will not be available to other consumers, including other Event Source Mappings, even if they have matching filters. This topic is not clearly documented, but was discussed on Serverless Office Hours and Twitter.
Given this caveat, the solution below is a dangerous design that will lead to lost messages.
When multiple Event Source Mappings are configured on streaming sources like DynamoDB Streams and Kinesis, they will simultaneously read from the same stream. Events are not taken from the stream as with SQS, so an event can be processed multiple times by different consumers.
In this case, an item in DynamoDB with an attribute Priority set to HIGH and an attribute EventType set to NEW_PURCHASE will reliably be processed by both Lambda Functions.

Event Source Mappings cost

Event Source Mappings are free to use at any scale. The Lambda Function executions it triggers will be billed at standard rates. The consumption of messages at the source might incur cost as well, but this depends on the services its consuming from. For SQS, Event Source Mappings uses the ReceiveMessage API, which is priced per million requests. For Kinesis, Event Source Mappings use the GetRecords API, which is free. At higher scales, you might want to configure Enhanced Fan-Out at an additional cost. DynamoDB uses the GetRecords API, but the docs state that you are not charged for GetRecords API calls invoked by Lambda as part of DynamoDB triggers, which is nice.

Conclusion

Event Source Mappings have always been a great service, even if they are relatively little known. Lambda integrations on streams and queues would be very difficult or costly to build without them. With the added filtering functionality, they have become much more powerful, and will lead to more efficient and cost-effective solutions.

CDK Project

The services and code described in this Bite are available as a Python AWS Cloud Development Kit (CDK) Project. Within the project, execute a cdk synth to generate CloudFormation templates. Then deploy these templates to your AWS account with a cdk deploy. For your convenience, ready-to-use CloudFormation templates are also available in the cdk.out folder. For further instructions how to use the CDK, see Getting started with the AWS CDK.

Click the Download button below for a Zip file containing the project.