Python direct exchange

Previous Tutorial: Pub/Sub with fanout exchange

In our previous tutorial, we developed a basic notification system that sends notifications to users for new activities in all Slack channels.

However, in an ideal scenario, users would only want to receive notifications for activities in the channels they prioritize. But since our current notification system is not very smart, users end up receiving notifications for activities across all channels.

In this tutorial, we will improve our notification system.

Use case: An improved notfication system

Let’s improve our Slack clone. The number of channels still hasn’t changed:

  • HR
  • Marketing
  • And Support

Now, we are going to add a new feature that allows users to customize their notification settings and choose which channels they receive notifications from.

To do that, we only need to make just one trivial change to the previous implementation of our notification system - we will switch from using a fanout exchange to a direct exchange.

Basically, we’ve done most of the work in the previous tutorial.

But how is using the direct exchange going to make all the difference here?

In our previous implementation of the notification system we: - Declared a fanout exchange named slack_notifications - We then created three queues, hr_queue, marketing_queue and support_queue, and bind them to the exchange - Those queues were then bound to the fanout exchange with the binding keys: hr, marketing and support respectively - Three messages were then published to the exchange with the routing keys: hr, marketing and support respectively

The direct exchange would make all the difference here because it routes messages to the queue where there is a direct match between the queues binding key and the message’s routing key.

So in our case here, the message with the routing key, hr, will go to the hr_queue and same applies to all the other messages.

The image below gives a visual representation of the flow of messages from a direct exchange to three queues we mentioned earlier - given three messages A, B, C, with routing keys: hr, support and marketing respectively

Message flow in direct exchanges

Now that we fully understand our use-case and all the implementation details let’s implement our notfication system.

Publish subscribe channel with Pika, Python client

Note: we are just going to repeat everything we’ve done in the previous tutorial. Only that here, we are going to use a direct exchange in place of the fanout exchange.

Again we are going to have a producer and a consumer applications.

like we’ve mentioned in the previous tutorial, ideally, we would have three consumer applications, each bound to a separate queue. This would mimic the behavior of having different consumer applications responsible for sending notifications to different user groups such as hr, marketing, and support.

However, instead of creating three consumers, we will create a single Python script that acts as a consumer. The script will accept a command line argument specifying which queue to bind to. For example, if the argument is “hr,” the script will bind to the “hr_queue.”

This approach allows us to have a dynamic script that can function as an hr, marketing, or support consumer based on the received argument, eliminating the need for separate consumer applications.

To test that our direct exchange will route messages correctly to the appropriate queues we will run three instances of our consumer application.

Next, let’s create our producer, but first we need to set up our development environment.

Python development environment

We will re-use the development environment from the previous tutorial - same requirements, environment variables and directory structure.

Creating the producer - sending messages

The producer will: - Declare a direct exchange - Declare three queues and bind those queues to the exchange - Publish three messages to the exchange

To do that: - First, create a file direct_exchange_producer.py in the python_tutorials sub-directory - Paste snippet below in the new file

# producer.py

# producer.py

import pika, os
from dotenv import load_dotenv

load_dotenv()

# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')

# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")

channel = connection.channel() # start a channel

exchange_name = "slack_notifications_fanout"
channel.exchange_declare(
  exchange=exchange_name,
  exchange_type='direct'
) # Declare an exchange


queues = {
  "hr": "hr_queue", 
  "support": "support_queue", 
  "marketing": "marketing_queue"
} # binding_key: queue_name

# Declare three queues for each user groups
for _, queue_name in queues.items():
    channel.queue_declare(
        queue=queue_name,
        durable=True
    )


# Create bindings between the exchange and queues
for binding_key, queue_name in queues.items():
    channel.queue_bind(
        exchange=exchange_name,
        queue=queue_name,
        routing_key=binding_key
    )

def send_to_queue(channel, routing_key, body):
  channel.basic_publish(
        exchange=exchange_name,
        routing_key=routing_key,
        body=body,
        properties=pika.BasicProperties(
            delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
        )
  )
  print(f"[📥] Message sent to queue #{body}")

# Publish messages - one message for each user group
send_to_queue(
    channel=channel, routing_key="hr", body="New message in HR"
)
send_to_queue(
    channel=channel, routing_key="support", body="New message in Support"
)
send_to_queue(
    channel=channel, routing_key="marketing", body="New message in Marketing "
)

try:
  connection.close()
  print("[❎] Connection closed")
except Exception as e:
  print(f"Error: #{e}")
  

We merely copied the producer code from the previous tutorial and created a direct exchange instead of a fanout exchange.

Creating the consumer - consuming messages

As mentioned earlier, we will create one consumer application and run it three times with different arguments: hr, support and marketing.

The application will: - Declare a direct exchange - Create a queue whose name will be generated from the argument received. For example, if the argument is hr, then consumer will create a hr_queue for that running instance - Create a binding between the exchange and the queue with the argument received as the binding key.

To do that: - First, create a file direct_exchange_consumer.py in the python_tutorials sub-directory - Paste the snippet below in the new file

import pika, os, sys
from dotenv import load_dotenv

load_dotenv()

# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')

# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")

channel = connection.channel() # start a channel

user_groups = ["hr", "marketing", "support"]

# Get user group argument from commandline
user_group = sys.argv[1]
if not user_group:
    sys.stderr.write("Usage: %s [hr] [marketing] [support]\n" % sys.argv[0])
    sys.exit(1)

if user_group not in user_groups:
    sys.stderr.write("Invalid argument - allowed arguments: %s [hr] [marketing] [support]\n" % sys.argv[0])
    sys.exit(1)

queue_name = user_group + "_queue"
queue_binding_key = user_group
exchange_name = "slack_notifications_fanout"

channel.exchange_declare(
  exchange=exchange_name,
  exchange_type='direct'
) # Declare an exchange

channel.queue_declare(
  queue=queue_name,
  durable=True
) # Declare a queue

# Create a binding
channel.queue_bind(
  exchange=exchange_name,
  queue=queue_name,
  routing_key=queue_binding_key
)

def callback(ch, method, properties, body):
    print(f"[✅] Received #{ body }")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(
    queue_name,
    callback,
)

try:
  print("\n[❎] Waiting for messages. To exit press CTRL+C \n")
  channel.start_consuming()
except Exception as e:
  print(f"Error: #{e}")
  try:
    sys.exit(0)
  except SystemExit:
    os._exit(0)

Again, we merely copied the consumer code from the previous tutorial and created a direct exchange instead of a fanout exchange.

Now that we have our producer application and the consumer, we can now test our pub-sub set up.

Testing our appplications

Spin up four terminals. In the first three terminal, run three instances of your consumer application with:

  • Terminal 1 - a consumer instance for hr messages: python direct_exchange_consumer.py hr
  • Terminal 2 - a consumer instance for support messages: python direct_exchange_consumer.p support
  • Terminal 3 - a consumer instance for marketing messages: python direct_exchange_consumer.p marketing

In the fourth terminal run your producer application with: python direct_exchange_producer.py

If everything goes well, you should get the following output in the different terminals

Terminal 1 - hr consumer

$ python direct_exchange_consumer.py hr
[] Connection over channel established

[] Waiting for messages. To exit press CTRL+C 

[] Received #b'New message in HR'

Terminal 2 - support

$ python direct_exchange_consumer.py support
[] Connection over channel established

[] Waiting for messages. To exit press CTRL+C 

[] Received #b'New message in Support'

Terminal 3 - marketing

$ python direct_exchange_consumer.py marketing
[] Connection over channel established

[] Waiting for messages. To exit press CTRL+C 

[] Received #b'New message in Marketing'

Terminal 4 - Producer

$ python direct_exchange_producer.py
[✅] Connection over channel established
[📥] Message sent to queue #New message in HR
[📥] Message sent to queue #New message in Support
[📥] Message sent to queue #New message in Marketing 
[❎] Connection closed

In the consumer outputs above we see that each consumer gets a single message. The direct exchange now implements its routing rules that routes the hr message to the hr_queue only and same applies to all the other queues.

In the next tutorial, we will extend this improved implementatin of our notifications system to allow for even more granular notification settings.

But before we proceed to the next tutorial, here is a little something for you to think about.

Learning lab:

  • Use case: Imagine you have a notification system with a fanout exchange that sends messages to multiple queues.

  • Challenge: One day, you receive a requirement to add a new type of notification called “urgent” that should bypass the fanout exchange and be sent directly to a specific queue called “urgent_queue”.

  • Theoretical Reflection: How would you modify the existing system to handle this new requirement while still maintaining the functionality of the fanout exchange for other notifications?

  • Hint: Think about the different options you have to route messages within LavinMQ and how you can leverage them to address the new requirement while ensuring the fanout exchange continues to work as intended for other notifications.

What’s next?

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.