Bite-Sized Serverless

SQS Icon

Implement the Priority Queue Pattern with SQS and Lambda

SQS - Advanced (300)
A queue is a buffer for constrained resources. A priority queue is an additional queue that should be emptied first, even when the main queue has millions of items on it. This allows important messages to be processed quickly, regardless of system load. In this Bite, we will show how to implement the priority queue pattern with SQS and Lambda.
Physical resources, such as printers, paint mixers, and hotdog stands, but also CPUs, hard drives, and networks are good examples of constrained resources. They can only handle so many operations per given time frame. Send more instructions or orders than the resource can handle, and their instruction queue starts to back up.
Every performance problem starts with a queue backing up somewhere. Maybe it's a socket's listening queue. Maybe it's the OS's run queue or the database's I/O queue.
The quote above is from the chapter Stability Patterns - Create Back Pressure in Michael T. Nygard's seminal book Release It!. The chapter explains how back pressure can help avoid performance problems. But sometimes we just want to bypass the full queue without dropping any items from the main queue. Bypassing the main queue with a priority queue can, for example, allow a system to trigger a scale-out event, which in turn helps to empty the main queue.
Another common use case for priority queues is customers in tiered payment plans: the "premium plan" customers' orders should be processed first, followed by "basic plan" customers' orders, and only when there is spare capacity should the "free plan" customers' orders be processed.

Queues in an Event-Driven System

Queues are generally accessed through a polling mechanism. The constrained resource (let's say, a hot dog stand) polls the queue, then handles the work. When the resource finished the job, it polls the queue for its next task. Implementing the priority queue pattern is easy in a polling environment: all we have to do is poll the priority queue first. If it has work available, process it. If the priority queue is empty, poll the main queue.
AWS Lambda and many other serverless technologies are event-driven. Something happens, and this triggers something else. Polling is generally considered an anti-pattern in serverless environments. But how does this work for SQS and Lambda? Since SQS Queues need to be polled, but Lambda Functions only fire based on events, there has to be some secret glue. This glue is called Event Source Mappings. Introduced in June 2018, Event Source Mappings are a free Lambda component that continuously polls SQS Queues (and other queues and streams) for messages, and invokes a Lambda Function when messages are found. Simply put, they convert a pull-based system into a push-based system.

Event Source Mappings complicate Priority Queues

Event Source Mappings are extremely useful and powerful. They enable use cases that would be very hard and expensive to build yourself. But for all their benefits, they do complicate the priority queue pattern. We can configure multiple SQS Queues and Event Source Mappings, and point them to a single Lambda Function, but they will push their messages in arbitrary order. There is no way to prioritize one queue over another.
The log below is the result of 10 messages placed on the main queue and 10 messages on the priority queue, at the same time. As you can see, there is no prioritization between the two queues.
1successfully processed main msg 2 2successfully processed main msg 4 3successfully processed HIGH PRIORITY msg 3 4successfully processed main msg 1 5successfully processed main msg 3 6successfully processed main msg 10 7successfully processed HIGH PRIORITY msg 6 8successfully processed HIGH PRIORITY msg 5 9successfully processed main msg 8 10successfully processed HIGH PRIORITY msg 7 11successfully processed main msg 5 12successfully processed HIGH PRIORITY msg 8 13successfully processed HIGH PRIORITY msg 9 14successfully processed main msg 6 15successfully processed HIGH PRIORITY msg 10 16successfully processed HIGH PRIORITY msg 2 17successfully processed HIGH PRIORITY msg 1 18successfully processed HIGH PRIORITY msg 4 19successfully processed main msg 9 20successfully processed main msg 7

Use Cases for Priority Queues processed by Lambda Functions

AWS Lambda Functions are often associated with massive parallelism and scalability. You can throw almost any load at it, and it will horizontally scale to cover your requests. This is discussed in great detail in Vlad Ionescu's post Scaling containers on AWS in 2022. However, massive parallelism is not the only use case for Lambda. Using Reserved Concurrency, Lambda Functions can be limited to a low amount of parallel executions, which can protect downstream resources like physical devices, non-scalable resources, or rate-limited APIs. These throttled Lambda Functions are a perfect match for SQS, which will buffer requests when no capacity is available. And these use cases are also a perfect fit for priority queues, which allow us to fast-lane important messages to our constrained downstream systems.

How to Build Priority Queues for Lambda: Check the Priority Queue for Messages

When designing the serverless priority queue solution, we followed three requirements:
  1. No polling, except for Event Source Mappings.
  2. Works for any message pattern, including no messages on the main queue, no messages on the priority queue, and any mix of messages in between.
  3. Avoid additional components, such as Parameter Store or Step Functions as much as possible.
The implementation matching these requirements is surprisingly simple. We point both queues to the same Lambda Function. When the function is invoked, check if its source is the priority queue. If it is, process it. If it isn't, check if messages are available on the priority queue. If there are, return the current message to its queue. If there aren't, process the message. Repeat.

Priority Queues in Action

In our first demo, we have two standard (non-FIFO) queues: a main queue and a priority queue. Both queues are configured with an Event Source Mapping pointing to a single Lambda Function. The Lambda Function has a reserved concurrency of 1, which means only one message will be processed at a time. This protects downstream resources from overloading.
The solution uses Event Source Mapping's Partial Batch Response to return messages to the queue. When the Lambda Function returns a message to the queue, the function immediately becomes available for the next message. The event handler and partial batch response are shown in the code below. The full source of this file, and all other code in this Bite, can be found on GitHub.
1def event_handler(event, _context): 2 returned_messages = [] 3 for record in event["Records"]: 4 try: 5 _process_record(record) 6 except PriorityMessageAvailable: 7 print("Yielding to priority message") 8 returned_messages.append(record["messageId"]) 9 except Exception as exc: 10 print(f"Got unexpected error: {type(exc)} - {exc}") 11 returned_messages.append(record["messageId"]) 12 13 return { 14 "batchItemFailures": [ 15 {"itemIdentifier": msg_id} for msg_id in returned_messages 16 ] 17 }
The _process_record() function called in the event handler looks like this:
1def _process_record(record): 2 """Handle the record or raise an error when higher priority messages are available.""" 3 is_priority_msg = record["eventSourceARN"] == PRIORITY_QUEUE_ARN 4 if not is_priority_msg: 5 _check_priority_messages_available() 6 7 _handle_record(record)
The _handle_record() function simply sleeps for five seconds and prints the record's body. The _check_priority_messages_available() function is covered below.
When we put 10 messages on the main queue, wait a few seconds, and then upload 5 messages to the priority queue, the output looks like this:
1successfully processed main msg 1 2successfully processed main msg 7 3successfully processed HIGH PRIORITY msg 1 4successfully processed main msg 6 5Yielding to priority message 6Yielding to priority message 7successfully processed main msg 10 8Yielding to priority message 9Yielding to priority message 10Yielding to priority message 11successfully processed HIGH PRIORITY msg 4 12successfully processed HIGH PRIORITY msg 2 13Yielding to priority message 14Yielding to priority message 15Yielding to priority message 16successfully processed HIGH PRIORITY msg 3 17successfully processed HIGH PRIORITY msg 5 18Yielding to priority message 19Yielding to priority message 20successfully processed main msg 9 21Yielding to priority message 22Yielding to priority message 23successfully processed main msg 4 24Yielding to priority message 25successfully processed main msg 8 26successfully processed main msg 3 27successfully processed main msg 2 28successfully processed main msg 5
Removing the "Yielding to priority message" lines, we can clearly see the priority queue at work:
1successfully processed main msg 1 2successfully processed main msg 7 3successfully processed HIGH PRIORITY msg 1 4successfully processed main msg 6 5successfully processed main msg 10 6successfully processed HIGH PRIORITY msg 4 7successfully processed HIGH PRIORITY msg 2 8successfully processed HIGH PRIORITY msg 3 9successfully processed HIGH PRIORITY msg 5 10successfully processed main msg 9 11successfully processed main msg 4 12successfully processed main msg 8 13successfully processed main msg 3 14successfully processed main msg 2 15successfully processed main msg 5
You'll note that the prioritization is not perfect. This is due to the eventual consistency of the SQS.GetQueueAttributes API call, which we use to determine if the priority queue has messages available:
1def _check_priority_messages_available(): 2 response = sqs_client.get_queue_attributes( 3 QueueUrl=PRIORITY_QUEUE_URL, 4 AttributeNames=[ 5 "ApproximateNumberOfMessages", 6 "ApproximateNumberOfMessagesNotVisible", 7 ], 8 ) 9 for key, value in response["Attributes"].items(): 10 # Check if any attribute is > 0, raise error if 11 # messages are in any way available on the priority queue. 12 if int(value) > 0: 13 raise PriorityMessageAvailable(key)
The documentation clearly defines this behavior:
The ApproximateNumberOfMessagesDelayed, ApproximateNumberOfMessagesNotVisible, and ApproximateNumberOfMessagesVisible metrics may not achieve consistency until at least 1 minute after the producers stop sending messages. This period is required for the queue metadata to reach eventual consistency.
In our use case the eventual consistency manifests in two ways:
  1. When messages are placed on the priority queue, the ApproximateNumberOfMessages value generally becomes larger than 0 after a few seconds.
  2. When all messages on the priority queue are processed, the ApproximateNumberOfMessages value generally resets to 0 after a few seconds.
Eventual consistency is less of a problem with FIFO queues, as we'll see in the next example.

FIFO Queues

In the previous section, we saw that messages are processed in loose order and the SQS.GetQueueAttributes API is eventually consistent. Let's see what happens when we repeat the same experiment, but with FIFO queues (github source).
1successfully processed main msg 1 2successfully processed main msg 2 3Yielding to priority message 4successfully processed HIGH PRIORITY msg 1 5successfully processed HIGH PRIORITY msg 2 6successfully processed HIGH PRIORITY msg 3 7successfully processed HIGH PRIORITY msg 4 8successfully processed HIGH PRIORITY msg 5 9successfully processed main msg 3 10successfully processed main msg 4 11successfully processed main msg 5 12successfully processed main msg 6 13successfully processed main msg 7 14successfully processed main msg 8 15successfully processed main msg 9 16successfully processed main msg 10
Much better! With FIFO queues we see the messages being processed in perfect order, and very efficiently at that. Only once did the function reject a message to yield to the priority queue. Repeating the test with various configurations (more messages, shorter processing times) shows this is a consistent pattern.

Multiple Levels of Priorities

The priority queue pattern can be extended to as many priority levels as needed. For example, you could have a HIGH > MEDIUM > MAIN setup, where the queues have to be emptied in order. This requires a small change in the Lambda Function: it now needs to check if messages are present in queues with a higher priority than itself (github source).
1def _process_record(record): 2 is_high_priority_msg = record["eventSourceARN"] == HIGH_PRIORITY_QUEUE_ARN 3 is_medium_priority_msg = record["eventSourceARN"] == MEDIUM_PRIORITY_QUEUE_ARN 4 is_main_msg = not is_medium_priority_msg and not is_high_priority_msg 5 if is_medium_priority_msg: 6 # Medium Prio messages should yield to High Prio messages 7 _check_high_priority_messages_available() 8 if is_main_msg: 9 # Messages on the main queue should yield to High and Medium Prio messages 10 _check_high_priority_messages_available() 11 _check_medium_priority_messages_available() 12 13 _handle_record(record)
Running this setup with three FIFO queues yields the following results:
1successfully processed main msg 1 2successfully processed main msg 2 3successfully processed main msg 3 4Yielding to priority message 5successfully processed Medium Prio msg 1 6successfully processed Medium Prio msg 2 7successfully processed Medium Prio msg 3 8successfully processed Medium Prio msg 4 9Yielding to priority message 10successfully processed HIGH PRIO msg 1 11successfully processed HIGH PRIO msg 2 12successfully processed HIGH PRIO msg 3 13successfully processed Medium Prio msg 5 14successfully processed main msg 4 15successfully processed main msg 5 16successfully processed main msg 6 17successfully processed main msg 7 18successfully processed main msg 8 19successfully processed main msg 9 20successfully processed main msg 10

Higher Levels of Concurrency

The examples above are based on a concurrency of one. But a hot dog stand often employs three people, each of which independently takes orders from the queue. So let's take the FIFO example with a single priority queue and change the Lambda Function's concurrency to three (github source). Because we're using FIFO queues we need to send messages with multiple group IDs - otherwise the messages would still be processed with a concurrency of one.
Running the test with 20 messages on the main queue and 10 messages on the priority queues, spread across three Message Group IDs, yields the following results:
1successfully processed main msg 1 (Group ID 1) 2successfully processed main msg 2 (Group ID 2) 3successfully processed main msg 3 (Group ID 0) 4successfully processed main msg 4 (Group ID 1) 5successfully processed main msg 5 (Group ID 2) 6successfully processed main msg 6 (Group ID 0) 7successfully processed main msg 7 (Group ID 1) 8successfully processed main msg 8 (Group ID 2) 9successfully processed main msg 9 (Group ID 0) 10successfully processed main msg 10 (Group ID 1) 11Yielding to priority message 12successfully processed main msg 11 (Group ID 2) 13Yielding to priority message 14successfully processed main msg 12 (Group ID 0) 15Yielding to priority message 16successfully processed HIGH PRIO msg 1 (Group ID 1) 17successfully processed HIGH PRIO msg 2 (Group ID 2) 18successfully processed HIGH PRIO msg 3 (Group ID 0) 19successfully processed HIGH PRIO msg 4 (Group ID 1) 20successfully processed HIGH PRIO msg 5 (Group ID 2) 21successfully processed HIGH PRIO msg 6 (Group ID 0) 22successfully processed HIGH PRIO msg 7 (Group ID 1) 23successfully processed HIGH PRIO msg 8 (Group ID 2) 24successfully processed HIGH PRIO msg 9 (Group ID 0) 25successfully processed HIGH PRIO msg 10 (Group ID 1) 26successfully processed main msg 13 (Group ID 1) 27successfully processed main msg 14 (Group ID 2) 28successfully processed main msg 15 (Group ID 0) 29successfully processed main msg 16 (Group ID 1) 30successfully processed main msg 17 (Group ID 2) 31successfully processed main msg 18 (Group ID 0) 32successfully processed main msg 20 (Group ID 2) 33successfully processed main msg 19 (Group ID 1)
This time we see a few "Yielding to priority message" lines. These are the three concurrent Lambda executions, each of which detects messages on the priority queue. Message processing is still very reliable and perfectly in order within the Group IDs.

Conclusion

In this Bite, we have covered use cases for priority queues. We have seen that implementing the priority queue pattern is easy in polling systems. In AWS Lambda polling is performed by Event Source Mappings, which makes implementing the priority queue pattern more difficult. However, by having the Lambda Function reject messages when higher priority messages are available, we can achieve the same result. It would be nice if AWS would provide built-in support for Event Source Mapping prioritization. This could be implemented as a priority integer per Event Source Mapping, where ESMs with a higher number are processed first. But for now, our solution gets the job done.
We have also seen that the priority queue pattern works best with FIFO queues. This is mostly due to the highly distributed nature of standard queues: the messages are stored across many partitions, and processed by Event Source Mappings as batches. This does not work well with a prioritization solution. FIFO queues, on the other hand, are specifically built for prioritization and play very nicely together with priority queues.
Finally, the priority queue pattern is only useful for constrained resources. If your resource is not constrained, you're probably better off building two queues with two separate Lambda Functions.

CDK Project

The services and code described in this Bite are available as a Python AWS Cloud Development Kit (CDK) Project. Within the project, execute a cdk synth to generate CloudFormation templates. Then deploy these templates to your AWS account with a cdk deploy. For your convenience, ready-to-use CloudFormation templates are also available in the cdk.out folder. For further instructions how to use the CDK, see Getting started with the AWS CDK.

Click the Download button below for a Zip file containing the project.