AMQP Queues

What is a queue?

An AMQP queue is a buffer where messages are held until a consuming client retrieves them for processing. Messages remain in the queue until consumed or removed due to expiration, purging, deletion, or conditions such as reaching a size limit. In LavinMQ, all messages sent to a queue are persistently stored on disk. Queues deliver messages in a first-in, first-out (FIFO) order.

What features do queues hold?

A queue serves as a temporary storage area for messages, acting as a buffer before they are consumed. When created, queues can be configured with various attributes that determine their lifecycle and behavior. For instance, an auto-delete queue is automatically removed when its last connection closes, while an exclusive queue is restricted to a single connection and is deleted once that connection ends.


LavinMQ supports two types of queues: standard queues and stream queues. Standard queues are designed for short-term message storage, where messages are discarded once handled. If multiple message processing times are required, LavinMQ offers stream queues. In stream queues, messages are not deleted after consumption, allowing multiple consumers to read the same message and enabling message replay. Read the Stream queues documentation for more information.

Using a queue

When setting up a message queue, begin by declaring it with a name. If the queue does not already exist, it will be created.

The sending service declares the queue, and the LavinMQ server confirms the declaration by responding declare-ok. The service then connects a consumer to the message queue. Messages can now be sent via the queue from one service (producer) to another (consumer). The consumer can explicitly disconnect from a queue or disconnect by closing the channel and/or connection.

Example: Declare a durable queue.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare a durable queue
channel.queue_declare(queue='test', durable=True)

connection.close()

Queue properties

A queue can be created with some given queue properties. Queue properties cannot be applied with policies and must be enabled per queue. These properties are available in LavinMQ:

  • Name
  • Durable
  • Exclusive
  • Auto-delete
  • Arguments

Queue Name

A queue can have a given name, if no name is specified, most client libraries assign a random name to the queue. Queue names starting with the amq. prefix are reserved for internal use and cannot be used for user-defined queues.

Example: Declare a queue with the queue name the_queue_name.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='the_queue_name', durable=True)

Durable queues

A queue can be marked as durable, which specifies whether it should persist a LavinMQ restart. To create a durable queue, specify the property durable as True during the queue’s creation as done in the example above.

Example: Declare a durable queue.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='the_queue_name', durable=True)

Exclusive queues

Setting the properties to exclusive is useful when limiting a queue to only one consumer. Exclusive queues can only be used by one connection, meaning that all actions to and from that queue happen over and follow the preset settings of one specific connection. When the connection closes, the exclusive queue is deleted.

Example: Declare an exclusive queue.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='test', exclusive=True)

Auto-delete queues

A queue can be declared with the auto-delete property. This is useful when queues should not stay open when the final connection to the queue closes. The auto-delete property requires at least one connected consumer to succeed. Otherwise, it will not be deleted.

Note: Different consumer and broker communication methods can impact the auto-delete property. For example, the basic.get method gives direct access to a queue instead of setting up a consumer, so consuming a message with basic.get will not affect auto_delete.

Example: Declare an auto-delete queue.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='test', auto_delete=True)

connection.close()

Queue arguments (optional properties)

The specifications of AMQP 0.9.1 enable support for various features called arguments. Depending on the argument, their settings can be changed after the queue declaration.

It is recommended that queue arguments are set using policies to configure multiple queues simultaneously and ensure they are updated automatically when the policy definition changes.

Read more about policies.

Argument Description
Auto Expire (x-expires : Int) Sets the time (in milliseconds) after which an idle queue (one without consumers) is automatically deleted.
Max Length (x-max-length : Int) Limits the number of messages a queue can hold. If exceeded, messages are dropped based on the overflow policy.
Message TTL (x-message-ttl : Int) Defines how long (in milliseconds) a message can stay in the queue before being automatically removed.
Delivery Limit (x-delivery-limit : Int) Specifies the maximum number of delivery attempts for a message before it is discarded or sent to a dead-letter exchange.
Overflow Behaviour (x-overflow : String) Determines what happens when a queue reaches its max length: By default this is set to "drop-head" (removes oldest messages). Also available is "reject-publish" (rejects new messages).
Dead Letter Exchange (x-dead-letter-exchange : String) Specifies an exchange where messages are sent when they expire, exceed delivery attempts, or are rejected.
Dead Letter Routing Key (x-dead-letter-routing-key : String) Defines a custom routing key for messages sent to the dead-letter exchange.
Max Priority (x-max-priority : Int) Enables message prioritization with a priority range between 1 and 255. Read more about message priority here.
Stream Queue (x-queue-type=stream) Turns the queue into a stream queue, storing messages for history-based retrieval instead of traditional message queuing.
Max Age (x-max-age : String) Limits how long messages are retained in a stream queue. Older messages are automatically deleted.
Message Deduplication (x-deduplication-enabled : Bool) Enables deduplication to avoid storing duplicate messages.
Deduplication Cache Size (x-cache-size : Int) Sets how many messages to store for deduplication checks. A larger cache uses more memory.
Deduplication Cache TTL (x-cache-ttl : Int) Cache entry time-to-live in milliseconds (default: unlimited).
Deduplication Header (x-deduplication-header : String) Specifies what header to guard for deduplication, defaults to x-deduplication-header.

Pausing a queue

Pausing a queue will pause all consumers. All clients will remain connected but receive no messages from the queue. The “pause queue” option can be found in queue details in the LavinMQ management interface or via the HTTP API.

Pausing a queue using RabbitMQ HTTP API:

curl -X PUT [http://guest:guest@localhost:15672/api/queues/vhost_name/queue_name/pause](http://guest:guest@localhost:15672/api/queues/vhost/queue/pause)

Resuming a queue using RabbitMQ HTTP API:

curl -X PUT [http://guest:guest@localhost:15672/api/queues/vhost_name/queue_name/](http://guest:guest@localhost:15672/api/queues/vhost/queue/pause)resume

Management Interface pause queue


Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.