Python publish subscribe

Previous Tutorial: Improved task queue

In our previous tutorials, we’ve mostly focused on working with a point-to-point channel where a message can only be received and processed by one consumer, but in reality, there are times where we’d want a single message to be broadcasted to multiple consumers.

In scenarios like that, the point-to-point channel we are now accustomed with would not work. In this tutorial, we will see how to set up a publish-subscribe channel with Lavin - in this arrangement, a message could be broadcasted to and processed by multiple consumers.

To do that, we are going to build a really dumb notification system.

Use case: A dumb notfication system

Let’s re-consider the scenario we’ve already covered in the message routing tutorial - developing a Slack clone for a team that utilizes three channels:

  • HR
  • Marketing
  • And Support

The purpose of our application is to notify channel members about new activities happening within each channel.

To do that, we will - Declare a fanout exchange named slack_notifications - We will then create three queues, hr_queue, marketing_queue and support_queue, and bind them to the exchange.

Ideally, each queue should only receive and store data from its corresponding channel. For example, the “hr” queue should only receive and store data from the HR Slack channel.

However, like we mentioned earlier, it’s a pretty dumb notification system at this point, so all messages published to the slack_notifications exchange are broadcasted to all the bound queues - this is the inherent behaviour of fanout exchanges, as we’ve seen earlier.

The image below provides a visual representation of the flow of messages three messages A, B, C from our fanout exchange to the queues bound to it.

Message flow in fanout exchanges

Now that we fully understand our use-case and part of the implementation details let’s implement our notfication system.

Publish subscribe channel with Pika, Python client

We will build a basic notification system using two Python applications. The first application, called the producer, will set up a fanout exchange, create queues, and publish three messages to the exchange.

Ideally, we would have three consumer applications, each bound to a separate queue. This would mimic the behavior of having different consumer applications responsible for sending notifications to different user groups such as hr, marketing, and support.

However, instead of creating three consumers, we will create a single Python script that acts as a consumer. The script will accept a command line argument specifying which queue to bind to. For example, if the argument is “hr,” the script will bind to the “hr_queue.”

This approach allows us to have a dynamic script that can function as an hr, marketing, or support consumer based on the received argument, eliminating the need for separate consumer applications.

If you think about it, in this case, we don’t need three separate copies of our Python script or consumer application. The only difference between the scripts would be the queue they bind to, which determines the type of notification they handle.

To test that our fanout exchange will broadcast messages to all queues and by extension all consumers regardless of routing keys and bindings, we will run three instances of our consumer application.

Next, let’s create our producer, but first we need to set up our development environment.

Python development environment

We will re-use the development environment from the previous tutorial - same requirements, environment variables and directory structure.

Creating the producer - sending messages

The producer will: - Declare a fanout exchange - Declare three queues and bind those queues to the exchange - Publish three messages to the exchange

To do that: - First, create a file pub_sub_producer.py in the python_tutorials sub-directory - Paste snippet below in the new file

# producer.py

import pika, os
from dotenv import load_dotenv

load_dotenv()

# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')

# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")

channel = connection.channel() # start a channel

exchange_name = "slack_notifications"
channel.exchange_declare(
  exchange=exchange_name,
  exchange_type='fanout'
) # Declare an exchange


queues = {
  "hr": "hr_queue", 
  "support": "support_queue", 
  "marketing": "marketing_queue"
} # binding_key: queue_name

# Declare three queues for each user groups
for _, queue_name in queues.items():
    channel.queue_declare(
        queue=queue_name,
        durable=True
    )


# Create bindings between the exchange and queues
for binding_key, queue_name in queues.items():
    channel.queue_bind(
        exchange=exchange_name,
        queue=queue_name,
        routing_key=binding_key
    )

def send_to_queue(channel, routing_key, body):
  channel.basic_publish(
        exchange=exchange_name,
        routing_key=routing_key,
        body=body,
        properties=pika.BasicProperties(
            delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
        )
  )
  print(f"[📥] Message sent to queue #{body}")

# Publish messages - one message for each user group
send_to_queue(
    channel=channel, routing_key="hr", body="New message in HR"
)
send_to_queue(
    channel=channel, routing_key="support", body="New message in Support"
)
send_to_queue(
    channel=channel, routing_key="marketing", body="New message in Marketing "
)

try:
  connection.close()
  print("[❎] Connection closed")
except Exception as e:
  print(f"Error: #{e}")

The snippet above is not very different from the previous producer snippets we’ve seen. Only that now we explicitly declared a fanout exchange with exchange_declare().

Then we moved on to create three queues and bindings between these queues and the fanout exchange with queue_bind().

We created unique bindings for each queue and published messages with routing keys knowing that all these won’t make any difference since our fanout exchange will forward every message to all the queues, then why did we do all that?

We did those things for two reasons: - To prove the behaviour of fanout exchanges - To make our work easier in the next tutorial - this will make sense eventually.

Now that we have the producer, next, we create the consumer

Creating the consumer - consuming messages

As mentioned earlier, we will create one consumer application and run it three times with different arguments: hr, support and marketing.

The application will: - Declare an exchange - Create a queue whose name will be generated from the argument received. For example, if the argument is hr, then consumer will create a hr_queue for that running instance - Create a binding between the exchange and the queue with the argument received as the binding key.

To do that: - First, create a file pub_sub_consumer.py in the python_tutorials sub-directory - Paste the snippet below in the new file

import pika, os, sys
from dotenv import load_dotenv

load_dotenv()

# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')

# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")

channel = connection.channel() # start a channel

user_groups = ["hr", "marketing", "support"]

# Get user group argument from commandline
user_group = sys.argv[1]
if not user_group:
    sys.stderr.write("Usage: %s [hr] [marketing] [support]\n" % sys.argv[0])
    sys.exit(1)

if user_group not in user_groups:
    sys.stderr.write("Invalid argument - allowed arguments: %s [hr] [marketing] [support]\n" % sys.argv[0])
    sys.exit(1)

queue_name = user_group + "_queue"
queue_binding_key = user_group
exchange_name = "slack_notifications"

channel.exchange_declare(
  exchange=exchange_name,
  exchange_type='fanout'
) # Declare an exchange

channel.queue_declare(
  queue=queue_name,
  durable=True
) # Declare a queue

# Create a binding
channel.queue_bind(
  exchange=exchange_name,
  queue=queue_name,
  routing_key=queue_binding_key
)

def callback(ch, method, properties, body):
    print(f"[✅] Received #{ body }")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(
    queue_name,
    callback,
)

try:
  print("\n[❎] Waiting for messages. To exit press CTRL+C \n")
  channel.start_consuming()
except Exception as e:
  print(f"Error: #{e}")
  try:
    sys.exit(0)
  except SystemExit:
    os._exit(0)

In the snippet above we grabbed argument passed from the commamdline, did some basic validations and used the argument to create a queue and bind it to the fanout exchange, slack_notifications.

Now that we have our producer application and the consumer, we can now test our pub-sub set up.

Testing our appplications

Spin up four terminals. In the first three terminal, run three instances of your consumer application with:

  • Terminal 1 - a consumer instance for hr messages: python pub_sub_consumer.py hr
  • Terminal 2 - a consumer instance for support messages: python pub_sub_consumer.py support
  • Terminal 3 - a consumer instance for marketing messages: python pub_sub_consumer.py marketing

In the fourth terminal run your producer application with: python pub_sub_producer.py

If everything goes well, you should get the following output in the different terminals

Terminal 1 - hr consumer

$ python pub_sub_consumer.py hr
[✅] Connection over channel established

[❎] Waiting for messages. To exit press CTRL+C 

[✅] Received #b'New message in HR'
[✅] Received #b'New message in Support'
[✅] Received #b'New message in Marketing '

Terminal 2 - support

$ python pub_sub_consumer.py support
[✅] Connection over channel established

[❎] Waiting for messages. To exit press CTRL+C 

[✅] Received #b'New message in HR'
[✅] Received #b'New message in Support'
[✅] Received #b'New message in Marketing '

Terminal 3 - marketing

$ python pub_sub_consumer.py marketing
[✅] Connection over channel established

[❎] Waiting for messages. To exit press CTRL+C 

[✅] Received #b'New message in HR'
[✅] Received #b'New message in Support'
[✅] Received #b'New message in Marketing '

Terminal 4 - Producer

$ python pub_sub_producer.py
[] Connection over channel established
[📥] Message sent to queue #New message in HR
[📥] Message sent to queue #New message in Support
[📥] Message sent to queue #New message in Marketing 
[] Connection closed

In the consumer outputs above we see that each consumer gets all 3 messages - regardless of whether it’s a hr, support or marketing consumer. And this is exactly how fanout exchanges behave.

In the next tutorial, we will see how we can route messages to the appropriate queues - such that the hr consumer only receives hr messages and so on.

But before we proceed to the next tutorial, here is a little something for you to think about.

Learning lab:

  • Use case: Imagine you have a notification system with a fanout exchange that sends messages to multiple queues.

  • Challenge: One day, you receive a requirement to add a new type of notification called “urgent” that should bypass the fanout exchange and be sent directly to a specific queue called “urgent_queue”.

  • Theoretical Reflection: How would you modify the existing system to handle this new requirement while still maintaining the functionality of the fanout exchange for other notifications?

  • Hint: Think about the different options you have to route messages within LavinMQ and how you can leverage them to address the new requirement while ensuring the fanout exchange continues to work as intended for other notifications.

What’s next?

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.