LavinMQ with Python

The recommended library for Python to access LavinMQ servers is Pika.

Before we jump right into writing our code, let’s set up the development environment.

Python development environment

  • First make sure that you have Python and that it’s available from your command line. You can confirm this by running: python --version
  • If you get a NameError: name 'python' is not defined, then you do not have Python. Download Python from python.org or you could even use pyenv to manage multiple Python versions
  • Run python3 -m venv env-name on Unix and macOS or python -m venv env-name on Windows to create a virtual environment where we install the dependencies for our project. Replace env-name with whatever name you chose for your virtual environment.
  • Run source env-name/bin/activate on Unix and macOS or .\env-name\Scripts\activate on Windows to activate the virtual environment.
  • Open your project directory in your favourite text-editor and create a requirements.txt file. Add pika==1.1.0 and python-dotenv to requirements.txt file.
  • Install dependencies with pip install -r requirements.txt
  • Create a .env file in in the root directory
  • Add CLOUDAMQP_URL="lavinmq_url" to the `.env’ file. Replace lavinmq_url with your correct server url

Publishing messages

# producer.py

import pika, os
from dotenv import load_dotenv

load_dotenv()

# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')

# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")

channel = connection.channel() # start a channel
channel.queue_declare(queue="hello_world") # Declare a queue

def send_to_queue(channel, routing_key, body):
  channel.basic_publish(
        exchange='',
        routing_key=routing_key,
        body=body
  )
  print(f"[📥] Message sent to queue #{body}")

# Publish messages
send_to_queue(
    channel=channel, routing_key="hello_world", body="Hello World"
)
send_to_queue(
    channel=channel, routing_key="hello_world", body="Hello World"
)
send_to_queue(
    channel=channel, routing_key="hello_world", body="Hello World"
)

try:
  connection.close()
  print("[❎] Connection closed")
except Exception as e:
  print(f"Error: #{e}")

Consume messages

import pika, os, sys
from dotenv import load_dotenv

load_dotenv()

# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')

# Create a connection
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
print("[✅] Connection over channel established")

channel = connection.channel() # start a channel
channel.queue_declare(queue="hello_world") # Declare a queue

def callback(ch, method, properties, body):
    print(f"[✅] Received #{ body }")

channel.basic_consume(
    "hello_world",
    callback,
    auto_ack=True,
)

try:
  print("\n[❎] Waiting for messages. To exit press CTRL+C \n")
  channel.start_consuming()
except Exception as e:
  print(f"Error: #{e}")
  try:
    sys.exit(0)
  except SystemExit:
    os._exit(0)

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.