Python improved task queue

In the previous tutorial, we made some improvements around reliable message delivery by taking out auto_ack=True and replacing it with auto_ack=False.

Ensuring reliable message delivery is a good practice that’s even more needed in the task-queues scenario we explored in the previous tutorial - we will explain why as we proceed. For now, let’s make sense of ...

Reliable message delivery

In the hello world tutorial, we consumed messages with the auto_ack=True parameter. auto_ack stands for automatic acknowledgement and there are some inherent limitations to be aware of with this approach:

  • Firstly, auto-acknowledgment means that messages are automatically marked as processed as soon as they are delivered to consumers. So for example, if we had set auto_ack to True in the previous tutorial, then messages would be marked as processed as soon as a worker process picks them up.
  • But then what happens if a worker crashes or experiences errors before completing the processing?

Since with auto_ack=True, the broker immediately considers delivered messages as processed, when a worker crashes, all the messages delivered to it, and the one being processed are all lost.

But we do not want that, right? If for whatever reason a worker crashes, we’d like the messages delivered to it, to be redelivered to other workers. How then do we ensure that messages are never lost?

Enter publisher confirms and consumer acknowledegments

To overcome the limitations mentioned earlier, LavinMQ offers manual acknowledgements from both the producer and consumer side. But here, we are more concerned with ensuring that our worker processes consume messages more reliably - so we will focus on consumer acknowledgements.

By enabling manual acknowledgements, consumers take explicit responsibility for acknowledging the receipt and processing of messages. This mechanism ensures that messages are not lost in the event of consumer failures, as unacknowledged messages are redelivered.

To have our workers explicitly acknowledge the receipt of messages, we did two things in the previous tutorial:

  1. We set auto_ack=False in the basic_consume() function to disable automatic acknowledgement - under the hood, the default setting is auto_ack=False. So you’d be fine even if you don’t specify the argument

  2. Next, we manually acknowledged that a consumer has received and processed a message in the consumer’s callback as shown below with the basic_ack function. See snippet below:

def callback(ch, method, properties, body):
    print(f"[✅] Received #{ body }")
    time.sleep(5)
    print("[✅] Image resized!")
    ch.basic_ack(delivery_tag = method.delivery_tag)

While publisher confirms and consumer acknowledgements enehance message reliability, they still have certain limitations.

Limitations of Publisher Confirms and Consumer Acknowledgements

Publisher confirms and consumer acknowledgements do not totally safeguard against message loss - these mechanisms only prevent against message loss in the event of a consumer crash or restart.

But what happens if the LavinMQ server restarts unexpectedly or undergoes maintenance? Messages in the queues could be lost. This scenario can occur if the messages are not persisted to disk, resulting in potential data loss during server restarts.

To enforce more stringent data safety measures and mitigate the risk of data loss during server restarts, LavinMQ provides the concepts of durable queues and persistent messages.

A durable queue ensures that the queue itself survives server restarts, preserving the message flow. On the other hand, persistent messages are messages explicitly marked as durable, ensuring they are stored on disk and not lost during system failures. By combining durable queues with persistent messages along with acknowledgements, you can achieve a higher level of data durability and reliability.

To use durable queues and publish persistent messages, we need to make a couple of changes to the producer code in the previous tutorial.

First we will rename our queue from image_resize_queue to image_resize_queue_1 and declare it as durable as shown in the snippet below:

channel.queue_declare(
  queue="image_resize_queue_1",
  durable=True
) # Declare a queue

Make sure to effect this change on the consumer side as well, pay attention to: - Passing image_resize_queue_1 to the queue_declare() and basic_consume() functions

Next, we need to designate our messages as persistent by providing a delivery_mode attribute set to the value of pika.spec.PERSISTENT_DELIVERY_MODE as shown in the snippet below:

channel.basic_publish(
        exchange='',
        routing_key=routing_key,
        body=body,
        properties=pika.BasicProperties(
            delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
        )
  )

Lastly, we need to update the routing key for all messages published to: image_resize_queue_1. Generally, the producer code in the previous tutorial should now look like this:

# producer.py
import pika, os
from dotenv import load_dotenv

load_dotenv()

# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')

# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")

channel = connection.channel() # start a channel
channel.queue_declare(
  queue="image_resize_queue_1",
  durable=True
) # Declare a queue

def send_to_queue(channel, routing_key, body):
  channel.basic_publish(
        exchange='',
        routing_key=routing_key,
        body=body,
        properties=pika.BasicProperties(
            delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
        )
  )
  print(f"[📥] Message sent to queue #{body}")

# Publish messages
send_to_queue(
    channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 1"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 2"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 3"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 4"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 5"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue_1", body="Resize an image - 6"
)

try:
  connection.close()
  print("[❎] Connection closed")
except Exception as e:
  print(f"Error: #{e}")

Just in case you are wondering why we had to rename our queue in the previous tutorial from image_resize_queue to image_resize_queue_1, it’s because you can not redefine an existing queue with a different configuration in LavinMQ - so the workaround is to create a new queue with the new configs you need, and in our case, make the queue durable.

In this tutorial, we explored the limitations of consuming messages with auto_ack=True in our worker processes and how manual acknowledgements, along with publisher confirms and consumer acknowledgements, can address these limitations.

However, we also discussed the potential for message loss during server restarts despite these mechanisms. To strengthen data safety, we introduced the concepts of durable queues and persistent messages, which provide more stringent measures for preserving message integrity and mitigating data loss risks.

By implementing these advanced features, you can significantly enhance the reliability and durability of your messaging system in LavinMQ.

What’s next

From our hello world tutorial to this, we’ve mostly focused on working with a point-to-point channel where a message can only be received and processed by one consumer, but in reality, there are times where we’d want a single message to be broadcasted to multiple consumers.

In scenarios like that, the point-to-point channel we are now accustomed with would not work. In the subsequent tutorials we will explore other communication patterns available in LavinMQ.

But first, here is a little something for you to do

Learning lab:

  • Use case: Imagine you have a LavinMQ setup with a durable queue named “notifications” and a publisher that sends persistent messages to this queue. There are two consumers, Consumer A and Consumer B, both bound to the “notifications” queue. Consumer A is responsible for sending email notifications, while Consumer B handles SMS notifications.

  • Observation: Now, Consumer A successfully receives a notification message from LavinMQ but encounters an error while sending the email. However, Consumer B is ready and waiting for its turn to process the next message. What do you think will happen?

  • Theoretical Reflection: Take a moment to think about it, and consider the concepts we discussed in the article. How will the acknowledgment and redelivery mechanisms, along with the durable queue, come into play in this situation?

What’s next?

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.