Pitfalls to avoid designing SQS Consumers – threads, locks and self induced message duplication and processing delays

Background

Some of my previous blog posts on SQS like Long Polling vs Short Polling and Delivery Delays covered the basics of leveraging Amazon SQS for messaging, supported polling models, message life cycle and much more. As can be seen from those posts, building simple SQS consumers, consuming and processing messages from Amazon SQS queues is indeed a cake walk. However, as the saying goes, often times, the devil is in the detail. There is much more to writing an efficient consumer that can reliably process messages without causing duplication or delays in message processing. So, this blog post is about getting into details of one of the most important aspects to consider while designing and implementing consumers for Amazon SQS. This blog post also encompasses a summary of experiences I have had reviewing design and implementation of a few cloud scale business applications that act as SQS consumers for high through put messaging.

Message Visibility Timeout – A Quick Introduction

Message visibility timeout is a mechanism that is used by Amazon SQS to avoid multiple consumers consuming and processing the same message concurrently.

Essentially, to avoid processing lags while consuming and processing messages, in most situations, one may run multiple processing application instances and/or multiple threads within a single running application instance, trying to consume and process messages from the same queue in parallel. However in such a situation, one would not like the same message to get processed by more than one consumer instances (running as separate processes/threads). Thus, Amazon SQS API, while reading messages, allows consumers to set a time window for which the messages returned against a particular read request are hidden from all other subsequent read requests for given amount of time.

One of my earlier blog posts explains Message Visibility Timeout with examples in the context of overall message life-cycle in SQS. I would suggest taking a quick peek at the blog post in case you aren’t familiar with SQS Message Lifecycle. The following diagram provides a nice quick overview of the Message visibility timeout concept itself.

Message Visibility Timeout

Since we now understand the basic concept of message visibility timeout window, lets try to dig a bit deeper in the context of scenarios.

Too Short Message Visibility Timeout

One of the core assumptions while writing SQS consumers is that each of the consumers would delete their respective messages once they have been processed, and the delete call is expected to be made within the Message Visibility Timeout window. If the delete call is not made at all or is made after the Message Visibility Timeout expires, the same message would re-appear in the queue and hence would be available for processing by any subsequent read requests from consumers (same consumer or other consumer).

In such a case, even if the message is already processed, it will get re-processed by a consumer unless consumers are smart enough to filter out any duplicates based on content of the message. Thus, it is very important to set the Message Visibility Timeout value to be sufficiently large to allow consumers to get sufficient time to process the message and delete it once successfully processed.

Too Long Message Visibility Timeout

Lets consider the exactly opposite case – what if the Message Visibility Timeout window is too long. Going by the definition and explanation above, once a message is returned to a consumer, it is hidden away from other consumers for the time window defined by Message Visibility Timeout value used during the read request.

Thus, if the window is too large, it will still be fine as long as the message is successfully processed and deleted by the consumer that read it (within the visibility timeout window). However if for some reason, the consumer cannot process the message, or fails processing the message, or if the consumer abruptly shuts down while processing the message, the message will remain hidden and hence not visible to other consumers for processing until the visibility timeout expires (or unless the visibility timeout for that message is changed using the ChangeMessageVisibility API).

To summarize, based on the sensitivity of how fast the message needs to be processed, having the Message Visibility Timeout set to a larger value may not be as bad as setting it to a very small value (as explained in previous section), although setting it to a very large value can delay message processing sometimes.

How Threads make it complicated

Prototyping a SQS consumer using the SQS API is obviously much simpler than designing and writing a real world consumer, that runs in an cloud scale enterprise business critical application, continuously polling the SQS queue and reliably consuming and processing messages from the queue. In most enterprise cloud scale business applications, SQS consuming applications would be multi-threaded, meaning the application may spawn multiple threads to consume and process messages from SQS so that there is no lag in processing. Often times, there may be multiple instances of the same application running, each processing messages from the same queue using multiple threads.

As with any multi-threaded application, there is complexity with data being processed in multiple threads, and as we will see later, it becomes all the more complex with the message visibility timeout considerations. Lets understand this in some more details.

A typical multi thread SQS consumer

Here is how a typical multi threaded SQS consumer looks like

Typical Message consumption flow chart using Multi Threaded consumers

The flow chart above is self explanatory, just to summarize it quickly – messages are read in a single (in most cases background) thread using the ReceiveMessage call, and based on the input parameters and availability of messages in the queue, SQS returns up to 10 messages in a single ReceiveMessage call, which are then distributed (or assigned) to one or more threads for processing (using a thread pool). Once a message is successfully processed, it needs to be deleted from the SQS queue, to indicate to SQS that the message is successfully processed.

As one may note, all of the above looks great as long as everything is working as expected. However as we all know, the world is not ideal :), so lets try to understand what can go wrong…

Self Induced Message Duplication and Processing Delays

We already spoke about the scenario when the message visibility timeout is too short in a single threaded consumer. Messages can reappear and hence lead to duplicate processing of same messages.

Thread scheduling cycles and inherent wait time!

Lets consider the case of a multi threaded consumer – in such a case, each processing thread is trying to process one or more messages. In other words, each thread is holding onto one or more messages in memory, while the clock is still ticking for message visibility timeout to expire for those messages. Thus, even if the message visibility timeout is set to a reasonable value, the fact that many messages are in memory waiting to be processed across multiple threads, and the fact that multiple threads are competing against each other for scheduling cycles, based on how threads get scheduled, one can run into situations where the message visibility timeout may expire for some messages that are still being held in memory (waiting to be processed) across some threads. Since these messages are still in memory (assigned to some thread), they would eventually get processed when the respective thread(s) gets an opportunity to run. However, in the mean time, SQS would make such messages reappear for any subsequent RecieveMessage calls (because their message visibility timeout period has expired), causing duplication of such messages, unless the application logic itself takes care of de-duping the messages before processing based on message attributes or message content.

Locking can make it more complicated!

Another possible variant of the scenario described above is inappropriate use of locking (across threads) while processing messages. Lets consider the case where each processing thread has more than one message to process and as part of the message processing, it is supposed to call into another service or resource, for which, it needs to acquire a lock of some kind. On the face of it, it appears like a simple scenario, however with multiple threads using locks to synchronize access for either a shared resource or a dependent resource or service, we can easily run into a long loop of repeatedly processing the same set of messages again and again, potentially leading to disastrous consequences based on the domain under consideration. This is certainly possible with systems that are processing hundreds or thousands of messages per second using a multi-consumer multi-threaded processing model.

Poorly designed multi threaded consumers can be dangerous!

Even considering a very simple multi-threaded consumer without any complex locking, an inefficient or casual implementation of multi threaded consumption of messages may lead to duplication of messages or even delay in processing messages. Imagine a thread pool with max ten threads, and all ten threads are in the process of processing one or messages assigned to them or waiting for some resource locks to get released. Now, if the polling thread still continues to poll and add messages to the thread pool for processing, it results in two issues – potential duplication of messages and/or delay in message processing. Since messages get queued up in thread pool waiting for being picked up (even before they are assigned to some thread for processing), it may lead to the same visibility window timing out problem we discussed earlier, which in turn would lead to message duplication because the messages would reappear in SQS. Likewise, if there are multiple instances of the same consumer application running, this would mean the other consumer may not see such messages until they are either processed or released as unprocessed message, thus causing delays in message processing.

Recommendations and Best Practices

As seen in the previous sections, design of a cloud scale business critical application that acts as a consumer of SQS messages involves a lot of complexity and a careful consideration of various aspects including message processing algorithm, dependencies on other systems, a well thought out strategy and carefully chosen and rigorously tested selection of run time parameters.

Based on observations noted above, lets look at some of the recommendations that be useful to make such consumers effective, efficient and error free.

Use a reasonably sufficient message visibility timeout window – Use an appropriate value for this setting while reading messages from the queue. There is a default value available at queue level, and can also be specified in each read request. It is recommended to set it for each read request, especially if the queue used to send and receive multiple types of messages, and average processing time can differ by message types. Additionally, it is recommended to add a small buffer to the typical time you expect the message get processed, this would help avoid message duplication.

Message visibility timeout can be extended – Yes, message visibility timeout can be extended for messages that are still being processed and run the risk of their original visibility timeout window expiring soon. In my observation, based on reviews I have performed of some of the applications that make use of SQS, this is one of the most under utilized option. So, make use of it, its an excellent way to avoid message duplication or reprocessing, and helps maintaining the queue metrics healthy as well. The only catch here is to change it before the original visibility timeout window expires. So the recommendation is for the processing thread itself to control this aspect, especially if the processing itself is a long running process. Another aspect to consider is using timeouts for blocking calls while processing messages, that way, the processing thread does not block for ever waiting on some external call that is made to process the message.

Use polling and processing thread pool model – When setting up a multi-threaded processing model, use a thread pool based design, with a separate polling thread for reading messages that feeds into a thread pool with a fixed size for processing of the messages.

Number of messages per processing thread – It is always advisable to process a single message per processing thread. This makes processing and deleting the message once it is processed much clean and easier to manage and does not involve the complexity of tracking and managing multiple messages per thread. It also simplifies extending the message visibility timeout in case a message requires more time to get processed. If there is a need to process multiple messages together in a single thread, either due to business use case reasons or technical reasons, treat all of those messages as a unit for processing. In such cases, make use of the batch API’s to delete respective messages that have been processed. Likewise, in case the message visibility needs to be extended, use the batch API to extend message visibility for all the required message in one single call.

Read messages only if a processing thread is available – When using the separate polling and processing thread pool model for processing messages, ensure that the polling thread issues a SQS read request only when there is at least one thread idle in the thread pool, available to process an incoming message. Do not queue up messages in the thread pool, since this may lead duplication (due to visibility timing out) as well as delays, as seen earlier. When using thread pools to process in parallel using multiple processing threads, do some profiling to choose an appropriate size for the thread pool.

Use of batch API’s for delete and changing visibility timeout – Although there are batch APIs to delete multiple message(s) and/or change visibility timeout of already read message(s) in a single call, the message(s) are still operated upon individually (on the server) and hence a detailed response containing the status of the call for each message is returned as a response. Thus, it is important to parse the response and take appropriate action as necessary for each message even when using the batch APIs.

Alright, I guess we have covered a lot through this blog, and hopefully it has helped you gain some more insights about gotchas you may run into while designing SQS consumers and would help you design and implement better consuming applications, when using Amazon SQS for messaging in future.

Feedback, suggestions are most welcome as always.

Happy learning, Happy sharing!!

Cheers,

-Amit

How to prevent an accidental damage to your Java 8 Functional Interfaces

Single Abstract Method Interfaces serve as a fundamental building block for functional programming within Java 8. The design of Lambda expressions in Java 8 is fundamentally dependent on Single Abstract Method Interfaces. And as we all know,  any Java interface can be classified as a Single Abstract Method Interface (or Functional Interface) as long as…

Lifecycle of a Message in Amazon SQS – A Detailed Coverage

In case you haven’t been following this blog series on Amazon SQS, we have already looked at the mechanics of consuming messages from Amazon SQS through one of my earlier blog post, and have looked at the concepts and implementation details to implement Delivery Delays with Amazon SQS through another earlier blog post. As you…

Delivery Delays with Amazon SQS – Concepts, Use cases, Mechanisms

One of my earlier blogs, Amazon SQS – Long Polling versus Short Polling, introduced Amazon SQS as a cloud based, highly available, reliable, scalable, distributed, fully managed messaging service and provided a detailed explanation of the all important topic of strategies around writing efficient message consumers for Amazon SQS. Let us try and understand another…

Amazon SQS – Long Polling versus Short Polling

If you have worked with Amazon Web Services, I am sure you would have heard of Amazon SQS, a reliable, highly scalable, distributed, fully managed message queuing service on cloud. This blog post intends to explain what is involved while consuming messages from Amazon SQS, its nuances and best practices. As you may know, Amazon…

Designing multitenant processing systems – Threads vs ThreadPools

Introduction It has happened to me so many times that I see Java code, written fairly recently, may be even a year or two years ago, and still uses the Java Thread class to build sufficiently complex multi-threaded systems, and obviously in a lot of such instances, it makes me think otherwise. One such interesting…

Using Java 8 Lambdas to implement simple transformations – Part 1

If you are not used to functional programming, or have been coding in prior Java versions for a long time now, it is somewhat a mixed feeling when you read about Java 8 Lambda features or when you look at Java 8 code utilising the Lambda feature. Obviously, reading about it makes you feel like you should…

Benchmarking Approaches to Sort Java Map by Value

As the name suggests, this blog tries to explore a few ways one could use to sort a Java Map by values it contains and present a quick comparison of these approaches. Alright, first things first, sorting a Map by Value is not supported directly through a JDK API, at least until Java 8, so one needs to…

Apache Kafka – a fresh look at messaging use cases

This blog tries to present a comparison of JMS 1.1 based messaging brokers and Apache Kafka with respect to typical messaging use cases. Traditional messaging – Quick look If you have used messaging systems in the past, specifically systems/brokers that comply to the JMS 1.1 specification (ActiveMQ, RabbitMQ etc.), you would recall that they had two…