Python task queue

Previous Tutorial: Message routing

Modern web applications frequently encounter time-consuming tasks that must sometimes be decoupled from the HTTP request-response cycle - to optimise for performance.

Let’s take the case of an image processing web application where: - Each request in the application is an image resize task. - Executing these tasks synchronously within the HTTP request-response cycle can significantly impact application responsiveness and overall user satisfaction.

One way to overcome this challenge, is to define a LavinMQ queue and offload these time-consuming image resize tasks to it. Then a second process which could be a separate application or just a module in the application that created the task, would then pickup and process these image resize tasks.

In the messaging parlance, the application that picks up and process these time-consuming tasks is called the worker process and the defined queue a work or task queue. Work queue?

Understanding Work/Task queues:

Generally, work queues, are well, just queues - they line up things, in our case, image resize tasks in the order they arrive.

The idea behind a work queue is simple - schedule time-consuming tasks for execution at a later time in the work queue.

The work queue then holds these time-consuming tasks until a process called the worker is ready to process them. Producers can add tasks to the queue without worrying about the immediate execution, while workers/consumers can focus on processing the tasks without waiting for their generation. This separation enhances overall application performance.

Taking things a step further, we can even get better performance if we complicate things a bit with the competing consumer pattern.

The Competing Consumer Pattern:

By adopting the competing consumer pattern, we can spin up multiple worker processes or consumers and bind them to the work queue, enabling parallel processing of tasks.

Essentially, all the worker processeses or consumers bound to the work queue are potential receivers of a task - think of them as competitors. However, only one worker process gets to receive and process each task. This one-to-one arrangement inherently eliminates duplicate task processing - and this is good for the image-resize use case since we wouldn’t want to have the same image resized more than once.

But perharps, the next crucial question is how to achieve a messaging setup similar to the one described above using LavinMQ.

Work queue and the competing consumer pattern with LavinMQ

Remember one of the core requirements of the work queue and the competing consumer pattern is to setup a point-to-point channel where a time-consuming task added to the channel, in this case the work queue, is received by just one consumer or worker.

We can achieve this with the direct exchanges and more specifically the default exchange in LavinMQ.

We will declare a queue, named image_resize - under the hood, this queue is automically bound to the default LavinMQ direct exchange with the binding key image_resize. That way, all messages with the routing key image_resize will be routed to the image_resize queue.

We can then bind one or more consumers to the image_resize qeueue. Image resize tasks would be dispatched to these consumers in a round-robin fashion.

The image below visualizes this arrangement.

Task queues

Let’s see what this looks like in code

Work queue with Pika, Python client

We will use Pika to set up connections and channels as was the case in the previous tutorial.

Python development environment

We will re-use the development environment from the previous tutorial - same requirements, environment variables and directory structure.

Creating the producer - sending messages

Here, we will create a basic python application that publishes a plain text message to LavinMQ - ideally, we are meant to publish and image to be resized, but we will keep things simple. To simulate the action of resizing an image, a time-consuming task, we will have our workers sleep for 5 seconds whenever they pick up a new message.

Our producer will be very similar to the one we created in the hello world tutorial. Just make the following changes:

  • First, create a file work_queue_producer.py in the python_tutorials sub-directory
  • Paste snippet below in the new file
# producer.py

import pika, os
from dotenv import load_dotenv

load_dotenv()

# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')

# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")

channel = connection.channel() # start a channel
channel.queue_declare(queue="image_resize_queue") # Declare a queue

def send_to_queue(channel, routing_key, body):
  channel.basic_publish(
        exchange='',
        routing_key=routing_key,
        body=body
  )
  print(f"[📥] Message sent to queue #{body}")

# Publish messages
send_to_queue(
    channel=channel, routing_key="image_resize_queue", body="Resize an image - 1"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue", body="Resize an image - 2"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue", body="Resize an image - 3"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue", body="Resize an image - 4"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue", body="Resize an image - 5"
)
send_to_queue(
    channel=channel, routing_key="image_resize_queue", body="Resize an image - 6"
)

try:
  connection.close()
  print("[❎] Connection closed")
except Exception as e:
  print(f"Error: #{e}")
  

Note: We are still working with the default exchange. As a result, since our queue is named image_resize_queue and the six messages we will publish have the routing key image_resize_queue, all those messages will be routed to that queue by the default exchange.

Creating the consumer - consuming messages

Here, we will create a basic python application that will receive published messages. As mentioned earlier, to imitate the action of resizing an image, we will have the consumer sleep for 5 seconds.

Additionally, to simulate having multiple consumers running on multiple servers and bound to the same work queue, we will run three instances of this Python script - we will get to that later.

Our consumer will be very similar to the one we created in the hello world tutorial. Just make the following changes:

  • First, create a file work_queue_consumer.py in the python_tutorials sub-directory
  • Paste the snippet below in the new file
import pika, os, sys, time
from dotenv import load_dotenv

load_dotenv()

# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')

# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")

channel = connection.channel() # start a channel
channel.queue_declare(queue="image_resize_queue") # Declare a queue

def callback(ch, method, properties, body):
    print(f"[✅] Received #{ body }")
    time.sleep(5)
    print("[✅] Image resized!")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    "image_resize_queue",
    callback,
    auto_ack=False
)

try:
  print("\n[❎] Waiting for messages. To exit press CTRL+C \n")
  channel.start_consuming()
except Exception as e:
  print(f"Error: #{e}")
  try:
    sys.exit(0)
  except SystemExit:
    os._exit(0)

Testing our appplications

Spin up three terminals. In the first two terminals, run two instances of your consumer application with python work_queue_consumer.py

In the third terminal run your producer application with: python work_queue_producer.py

If everything goes well, you should get the following output in the different terminals

Terminal 1 - Consumer 1

->$ python work_queue_consumer.py
[✅] Connection over channel established

[❎] Waiting for messages. To exit press CTRL+C 

[✅] Received #b'Resize an image - 1'
[✅] Image resized!
[✅] Received #b'Resize an image - 3'
[✅] Image resized!
[✅] Received #b'Resize an image - 5'
[✅] Image resized!

Terminal 2 - Consumer 2

->$ python work_queue_consumer.py
[✅] Connection over channel established

[❎] Waiting for messages. To exit press CTRL+C 

[✅] Received #b'Resize an image - 2'
[✅] Image resized!
[✅] Received #b'Resize an image - 4'
[✅] Image resized!
[✅] Received #b'Resize an image - 6'
[✅] Image resized!

Terminal 3 - Producer

->$ python work_queue_producer.py
[✅] Connection over channel established
[📥] Message sent to queue #Resize an image - 1
[📥] Message sent to queue #Resize an image - 2
[📥] Message sent to queue #Resize an image - 3
[📥] Message sent to queue #Resize an image - 4
[📥] Message sent to queue #Resize an image - 5
[📥] Message sent to queue #Resize an image - 6
[❎] Connection closed

In the consumer outputs above we see that each consumer gets 3 messages, but how is LavinMQ able to distribute these messages evenly between the workers?

The channel.basic_qos(prefetch_count=1) setting we specified, ensures that a worker process could process one task at a time.

Lastly, in the previous tutorial we consumed messages with auto_ack set to True. But in this tutorial we swtiched to auto_ack=False and added ch.basic_ack(delivery_tag=method.delivery_tag) to our callback.

In the next tutorial, we will look at what these things means and explore some of the problems with the current implementation of our work queue.

But before we proceed to the next tutorial, here are some things for you to think about.

Learning lab: The mystery worker

  • Use case: Take the following steps - and remember to revert back to your previous code after this testing:
    • In the callback function of the consumer, update the waiting time to 60 from 5: time.sleep(60). We are simulating a really complex task
    • Run a single instance of the consumer in your terminal
    • Next, update your producer code to only publish three messages
    • In a separate terminal run your producer, to publish the three messages to lavinMQ
    • Wait for about 10 - 15s then terminate your consumer with CTRL+C: this is just to imitate your consumer crashing while still processing messages
  • Observation:
    • If you check the image_resize_queue from your admin interface, you’d see that it’s empty. But our consumer crashed before it could process all three messages right?
    • We can be sure the messages weren’t processed because it takes our consumer about 60s to process a single message and we terminated just about 15s after our messages were published
  • Theoretical Reflection:
    • So if our messages weren’t processed and they are no longer in the queue, then what happened?
    • What happened is, your messages were lost. But what could be the possible cause of this behavior?
    • How can you modify the setup or configuration to ensure that no messages are lost?
  • Hint: messages are currently consumed with auto_ack=True

What’s next?

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.