Redelivery with RabbitMQ

Lukasz Lenart
SoftwareMill Tech Blog
7 min readApr 28, 2021

--

When you are developing a service that needs to communicate with other services or has to support integration with customer services, you should consider implementing a proper retry mechanism. Network connections can fail, other services can be temporarily unavailable to do maintenance or rescheduling of Pods to another node. Even running your services on Kubernetes these days doesn’t guarantee 100% availability. Redelivery is the king!

Photo by jesse ramirez on Unsplash

Why do I need this

You can ask an obvious question why do I need this when I can use a built-in backpressure mechanism of my library/framework/etc.

The problem with backpressure is that you are stuck, you are stopping processing other messages, you cannot confirm/acknowledge the message till it isn’t successfully delivered over the wire. The backpressure mechanism is good when one system is overloaded by another to slow down pushing/receiving new messages. In case of a broken network connection, temporary system unavailability, etc., implementing a proper redelivery functionality is the only solution.

Should I care

In the very first step, you should analyse if in a given situation you need to re-deliver your message. Maybe in your case, it won’t be needed, if one message is gone, the following message will deliver the missing information and everything will be fine. Having temporal system instability or eventual consistency can be acceptable and it doesn’t make sense to implement the redelivery functionality.

Requirements in this aspect can vary, sometimes business requires to have a 100% delivery guarantee, but after clarifying the requirements and collecting all the information it can be clear that all they want is to have updated information every 1 minute or so … 60 seconds or 60 000 milliseconds in the computer world is a lot of time ;-)

100% delivery

Purists will oppose that there is nothing like a 100% deliverability guarantee, and they are right. Anyway, I will focus on having 100% deliverability in my service as this the only area I can control.

What does “100% deliverability guarantee” mean? My understanding of this term is that the message was transmitted over the wire to another service and my service has got confirmation that the message got accepted.

That’s all, I have no guarantee that the other system will handle the message properly, will start a proper process based on it, or store the message for future processing. Basically, I did my job and the message was delivered with confirmation.

To the details

In my example implementation, I will use RabbitMQ and its functionality to support the redelivery mechanism. We are using the same concepts in a real application I’m working on right now.

You can use another message broker which is already available in your system, yet some functions can be only available in the old-good queue based brokers.

RabbitMQ provides a very convenient TTL mechanism, which can be applied to a single queue, a set of queues, or even to a single message. The last option can also be used to implement a backpressure mechanism, yet this is out of the scope of this post. I will focus on redelivery implementation and I left implementing the backpressure as homework :)

Diagrams

As they say, a picture is worth 1000 words, so below I present two flows representing our use cases:

Flow with one consumer and one queue
Flow with two consumers backed by two queues

I assume there is nothing complicated in the above diagrams, and these are the most commons use case scenarios in my opinion.

The first diagram represents a very straightforward flow where messages are produced by a Producer (P) and pushed into an Exchange (X) and then forward into a Queue (Q) and finally consumed by a Consumer (C) which then delivers them over the Internet to a Customer service.

The second flow shows an Exchange of type fanout which spreads the messages into all the bounded queues where each queue is consumed by the dedicated Consumers to either store the message into a database or deliver it over the wire to a Customer service.

In both cases, we must apply a bit different redelivery flow, yet the other diagrams should explain this a bit better.

Simple redelivery

Let’s take a look at the very first diagram. In case of a customer system unavailability, all that is needed is to be able to put the message aside where it can wait a bit and when the time runs out push it into the exchange again, so then the consumer can pick it up and try to deliver again.

Redelivery queue with TTL set to 10 seconds

The diagram shows one additional queue (R1) which is used as a temporary store of the undelivered messages. If consumer C1 will nack (Negative Acknowledgement) the message, it will be automatically transferred to the redelivery queue. This happens because the queue Q1 has defined x-dead-letter-routing-key to R1 and this automatically re-routes any negatively acknowledged messages into R1. By setting TTL to 10 seconds and dead-letter-exchange to X for the queue R1 we create a temporary store, which will move any message after reaching the timeout into the exchange X and this will initiate the redelivery.

Note: please be aware that if the message was delivered and your consumer crashed before acknowledging it, it will remain in the queue and another consumer instance will pick it up and try to deliver again.

Implementing such a use case doesn’t require changes in your code, just redefine your queues and exchanges a bit and you’re done!

Split flow and redelivery

In the case of the second flow with an Exchange spreading messages into two different queues, the redelivery flow needs to be a bit different:

Redelivery queue with an extra exchange

This diagram differs from the first redelivery diagram by having an additional exchange RX which is bounded to working queue Q2. We need that additional exchange as I don’t want to redeliver the same message into both working queues as then duplicating the message for Q1/C1 — all I want to is to redeliver the message if the Consumer C2 fails to transmit the message.

You can ask why I’m using an additional exchange and why not use x-dead-letter-routing-key option to move a message on timeout back to the queue Q2?

I like to use exchanges as then it’s easier to bound other queues/flows to monitor the redelivery mechanism. Let’s say you want to raise the alarm if the message was not delivered ten times. Using the above additional exchange is easy — you must implement a service to do this job and connect it to a queue bound to the exchange RX. I wouldn’t do this in C2 because of separation of concerns.

And again implementing such flow doesn’t require changing a line of code in your services. All you have to do is to change queues/exchanges definitions.

Yet, there is one drawback of the presented redelivery flows — a fail message will be redelivered endlessly.

Rejecting the message

Maybe in your case, endless redelivery has some sense, but even then I would implement some monitoring mechanism to notify admins about the looping message.

In our case, we want to try to deliver the message a few times (configurable per Consumer/Service) and then put the message aside and trigger the alert.

To do is, you will have to change consumer code a bit and set up another set of exchanges and queues:

Redelivery flow with rejected queue

In my case, I implemented a redelivery limiter directly in my consumer C2 — if it receives a message with a count header value over the defined limit, it will move the message into a dedicated exchange which is connected with consumer C3 to raise the alarm and then it will ack the original message (to avoid another redelivery loop).

RabbitMQ increases count value automatically of each dead-lettered message, you just need to extract this value, an example Scala code:

if (headers != null && headers.containsKey("x-death")) {
val xDeath = headers.get("x-death")
.asInstanceOf[util.ArrayList[util.Map[String, AnyRef]]].asScala
logger.debug(s"x-death header: $xDeath")
val counter = xDeath.find { header =>
header.asScala.contains("count")
} map { header =>
header.get("count").toString.toInt
}
logger.debug(s"Count value $counter")
counter.getOrElse(0)
} else {
0
}

Final word

I hope you found a solution to your problems after reading this post. There are other options how to implement a redelivery mechanism using RabbitMQ — one of them could be a headers exchange. Also as I mentioned in the beginning, TTL can be set per message and by using this approach you can implement a very flexible backpressure mechanism.

I have prepared a small example application that demonstrates the above redelivery mechanism. It’s written in Scala with Alpakka and I have used Livestub to emulate customer endpoint’s behavior, have fun!

--

--

OSS enthusiast, ASF committer, Apache Struts lead, developer, husband and father and biker :-)