Language Support
AMQP 0-9-1 Overview
More Exchange Types
More Consumer Features
Queue Deep-dive
Other Features
Reliable Message Delivery
High Availability
Management HTTP API
Python improved task queue
- Previous Tutorial: Task queue with default exchange
In the previous tutorial, we made some improvements around reliable
message delivery by taking out auto_ack=True
and replacing it with
Ensuring reliable message delivery is a good practice that’s even more needed in the task-queues scenario we explored in the previous tutorial - we will explain why as we proceed. For now, let’s make sense of ...
Reliable message delivery
In the hello world tutorial, we consumed messages with the
auto_ack stands for automatic acknowledgement and there are some inherent
limitations to be aware of with this approach:
- Firstly, auto-acknowledgment means that messages are automatically marked as processed as soon as they are delivered to consumers. So for example, if we had set auto_ack to True in the previous tutorial, then messages would be marked as processed as soon as a worker process picks them up.
- But then what happens if a worker crashes or experiences errors before completing the processing?
Since with auto_ack=True
, the broker immediately considers delivered messages as
processed, when a worker crashes, all the messages delivered to it, and the one being
processed are all lost.
But we do not want that, right? If for whatever reason a worker crashes, we’d like the messages delivered to it, to be redelivered to other workers. How then do we ensure that messages are never lost?
Enter publisher confirms and consumer acknowledegments
To overcome the limitations mentioned earlier, LavinMQ offers manual acknowledgements from both the producer and consumer side. But here, we are more concerned with ensuring that our worker processes consume messages more reliably - so we will focus on consumer acknowledgements.
By enabling manual acknowledgements, consumers take explicit responsibility for acknowledging the receipt and processing of messages. This mechanism ensures that messages are not lost in the event of consumer failures, as unacknowledged messages are redelivered.
To have our workers explicitly acknowledge the receipt of messages, we did two things in the previous tutorial:
We set
in thebasic_consume()
function to disable automatic acknowledgement - under the hood, the default setting isauto_ack=False
. So you’d be fine even if you don’t specify the argument -
Next, we manually acknowledged that a consumer has received and processed a message in the consumer’s callback as shown below with the
function. See snippet below:
def callback(ch, method, properties, body):
print(f"[✅] Received #{ body }")
print("[✅] Image resized!")
ch.basic_ack(delivery_tag = method.delivery_tag)
While publisher confirms and consumer acknowledgements enehance message reliability, they still have certain limitations.
Limitations of Publisher Confirms and Consumer Acknowledgements
Publisher confirms and consumer acknowledgements do not totally safeguard against message loss - these mechanisms only prevent against message loss in the event of a consumer crash or restart.
But what happens if the LavinMQ server restarts unexpectedly or undergoes maintenance? Messages in the queues could be lost. This scenario can occur if the messages are not persisted to disk, resulting in potential data loss during server restarts.
To enforce more stringent data safety measures and mitigate the risk of data loss during server restarts, LavinMQ provides the concepts of durable queues and persistent messages.
A durable queue ensures that the queue itself survives server restarts, preserving the message flow. On the other hand, persistent messages are messages explicitly marked as durable, ensuring they are stored on disk and not lost during system failures. By combining durable queues with persistent messages along with acknowledgements, you can achieve a higher level of data durability and reliability.
To use durable queues and publish persistent messages, we need to make a couple of changes to the producer code in the previous tutorial.
First we will rename our queue from image_resize_queue
to image_resize_queue_1
and declare it as durable as shown in the snippet below:
) # Declare a queue
Make sure to effect this change on the consumer side as well,
pay attention to:
- Passing image_resize_queue_1
to the queue_declare()
and basic_consume()
Next, we need to designate our messages as persistent by providing a
attribute set to the value of pika.spec.PERSISTENT_DELIVERY_MODE
as shown in the snippet below:
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
Lastly, we need to update the routing key for all messages published to:
. Generally, the producer code in the previous tutorial
should now look like this:
import pika, os
from dotenv import load_dotenv
# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")
channel = # start a channel
) # Declare a queue
def send_to_queue(channel, routing_key, body):
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
print(f"[📥] Message sent to queue #{body}")
# Publish messages
channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 1"
channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 2"
channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 3"
channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 4"
channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 5"
channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 6"
print("[❎] Connection closed")
except Exception as e:
print(f"Error: #{e}")
Just in case you are wondering why we had to rename our queue in the previous
tutorial from image_resize_queue
to image_resize_queue_1
, it’s because
you can not redefine an existing queue with a different configuration in LavinMQ -
so the workaround is to create a new queue with the new configs
you need, and in our case, make the queue durable.
In this tutorial, we explored the limitations of consuming messages with auto_ack=True in our worker processes and how manual acknowledgements, along with publisher confirms and consumer acknowledgements, can address these limitations.
However, we also discussed the potential for message loss during server restarts despite these mechanisms. To strengthen data safety, we introduced the concepts of durable queues and persistent messages, which provide more stringent measures for preserving message integrity and mitigating data loss risks.
By implementing these advanced features, you can significantly enhance the reliability and durability of your messaging system in LavinMQ.
What’s next
From our hello world
tutorial to this, we’ve mostly focused on working with a
point-to-point channel where a message can only be received and processed by one
consumer, but in reality, there are times where we’d want a single message to be broadcasted
to multiple consumers.
In scenarios like that, the point-to-point channel we are now accustomed with would not work. In the subsequent tutorials we will explore other communication patterns available in LavinMQ.
But first, here is a little something for you to do
Learning lab:
Use case: Imagine you have a LavinMQ setup with a durable queue named “notifications” and a publisher that sends persistent messages to this queue. There are two consumers, Consumer A and Consumer B, both bound to the “notifications” queue. Consumer A is responsible for sending email notifications, while Consumer B handles SMS notifications.
Observation: Now, Consumer A successfully receives a notification message from LavinMQ but encounters an error while sending the email. However, Consumer B is ready and waiting for its turn to process the next message. What do you think will happen?
Theoretical Reflection: Take a moment to think about it, and consider the concepts we discussed in the article. How will the acknowledgment and redelivery mechanisms, along with the durable queue, come into play in this situation?
What’s next?
- Next Tutorial: Pub/Sub with fanout exchange
- Previous Tutorial: Task queue with default exchange
Ready to take the next steps? Here are some things you should keep in mind:
Managed LavinMQ instance on CloudAMQP
LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.
Help and feedback
We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.