AMQP 0-9-1 Overview
- The AMQP Protocol
- Connections & Channels
- Publishers and Consumers
More Exchange Types
More Consumer Features
Reliable Message Delivery
High Availability and Backup
Management HTTP API
Python direct exchange
Previous Tutorial: Pub/Sub with fanout exchange
In our previous tutorial, we developed a basic notification system that sends notifications to users for new activities in all Slack channels.
However, in an ideal scenario, users would only want to receive notifications for activities in the channels they prioritize. But since our current notification system is not very smart, users end up receiving notifications for activities across all channels.
In this tutorial, we will improve our notification system.
Use case: An improved notfication system
Let’s improve our Slack clone. The number of channels still hasn’t changed:
- And Support
Now, we are going to add a new feature that allows users to customize their notification settings and choose which channels they receive notifications from.
To do that, we only need to make just one trivial change to the previous implementation of our notification system - we will switch from using a fanout exchange to a direct exchange.
Basically, we’ve done most of the work in the previous tutorial.
But how is using the direct exchange going to make all the difference here?
In our previous implementation of the notification system we: - Declared a fanout exchange named slack_notifications - We then created three queues, hr_queue, marketing_queue and support_queue, and bind them to the exchange - Those queues were then bound to the fanout exchange with the binding keys: hr, marketing and support respectively - Three messages were then published to the exchange with the routing keys: hr, marketing and support respectively
The direct exchange would make all the difference here because it routes messages to the queue where there is a direct match between the queues binding key and the message’s routing key.
So in our case here, the message with the routing key, hr, will go to the hr_queue and same applies to all the other messages.
The image below gives a visual representation of the flow of messages from a direct exchange to three queues we mentioned earlier - given three messages A, B, C, with routing keys: hr, support and marketing respectively
Now that we fully understand our use-case and all the implementation details let’s implement our notfication system.
Publish subscribe channel with Pika, Python client
Note: we are just going to repeat everything we’ve done in the previous tutorial. Only that here, we are going to use a direct exchange in place of the fanout exchange.
Again we are going to have a producer and a consumer applications.
like we’ve mentioned in the previous tutorial, ideally, we would have three consumer applications, each bound to a separate queue. This would mimic the behavior of having different consumer applications responsible for sending notifications to different user groups such as hr, marketing, and support.
However, instead of creating three consumers, we will create a single Python script that acts as a consumer. The script will accept a command line argument specifying which queue to bind to. For example, if the argument is “hr,” the script will bind to the “hr_queue.”
This approach allows us to have a dynamic script that can function as an hr, marketing, or support consumer based on the received argument, eliminating the need for separate consumer applications.
To test that our direct exchange will route messages correctly to the appropriate queues we will run three instances of our consumer application.
Next, let’s create our producer, but first we need to set up our development environment.
Python development environment
We will re-use the development environment from the previous tutorial - same requirements, environment variables and directory structure.
Creating the producer - sending messages
The producer will: - Declare a direct exchange - Declare three queues and bind those queues to the exchange - Publish three messages to the exchange
To do that:
- First, create a file
direct_exchange_producer.py in the
- Paste snippet below in the new file
We merely copied the producer code from the previous tutorial and created a direct exchange instead of a fanout exchange.
Creating the consumer - consuming messages
As mentioned earlier, we will create one consumer application and run it three times with different arguments: hr, support and marketing.
The application will: - Declare a direct exchange - Create a queue whose name will be generated from the argument received. For example, if the argument is hr, then consumer will create a hr_queue for that running instance - Create a binding between the exchange and the queue with the argument received as the binding key.
To do that:
- First, create a file
direct_exchange_consumer.py in the
- Paste the snippet below in the new file
Again, we merely copied the consumer code from the previous tutorial and created a direct exchange instead of a fanout exchange.
Now that we have our producer application and the consumer, we can now test our pub-sub set up.
Testing our appplications
Spin up four terminals. In the first three terminal, run three instances of your consumer application with:
- Terminal 1 - a consumer instance for hr messages:
python direct_exchange_consumer.py hr
- Terminal 2 - a consumer instance for support messages:
python direct_exchange_consumer.p support
- Terminal 3 - a consumer instance for marketing messages:
python direct_exchange_consumer.p marketing
In the fourth terminal run your producer application with:
If everything goes well, you should get the following output in the different terminals
Terminal 1 - hr consumer
Terminal 2 - support
Terminal 3 - marketing
Terminal 4 - Producer
In the consumer outputs above we see that each consumer gets a single message. The direct exchange now implements its routing rules that routes the hr message to the hr_queue only and same applies to all the other queues.
In the next tutorial, we will extend this improved implementatin of our notifications system to allow for even more granular notification settings.
But before we proceed to the next tutorial, here is a little something for you to think about.
Use case: Imagine you have a notification system with a fanout exchange that sends messages to multiple queues.
Challenge: One day, you receive a requirement to add a new type of notification called “urgent” that should bypass the fanout exchange and be sent directly to a specific queue called “urgent_queue”.
Theoretical Reflection: How would you modify the existing system to handle this new requirement while still maintaining the functionality of the fanout exchange for other notifications?
Hint: Think about the different options you have to route messages within LavinMQ and how you can leverage them to address the new requirement while ensuring the fanout exchange continues to work as intended for other notifications.
Ready to take the next steps? Here are some things you should keep in mind:
Managed LavinMQ instance on CloudAMQP
LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.