AMQP Queues
An AMQP queue is a buffer where messages are held until a consuming client retrieves them for processing. Messages remain in the queue until consumed or removed due to expiration, purging, deletion, or conditions such as reaching a size limit. In LavinMQ, all messages sent to a queue are persistently stored on disk. Queues deliver messages in a first-in, first-out (FIFO) order.
A queue serves as a temporary storage area for messages, acting as a buffer before they are consumed. When created, queues can be configured with various attributes that determine their lifecycle and behavior. For instance, an auto-delete queue is automatically removed when its last connection closes, while an exclusive queue is restricted to a single connection and is deleted once that connection ends.
LavinMQ supports two types of queues: standard queues and stream queues. Standard queues are designed for short-term message storage, where messages are discarded once handled. If multiple message processing times are required, LavinMQ offers stream queues. In stream queues, messages are not deleted after consumption, allowing multiple consumers to read the same message and enabling message replay. Read the Stream queues documentation for more information.
Using a queue
When setting up a message queue, begin by declaring it with a name. If the queue does not already exist, it will be created.
The sending service declares the queue, and the LavinMQ server confirms the declaration by responding declare-ok. The service then connects a consumer to the message queue. Messages can now be sent via the queue from one service (producer) to another (consumer). The consumer can explicitly disconnect from a queue or disconnect by closing the channel and/or connection.
Example: Declare a durable queue.
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Declare a durable queue
channel.queue_declare(queue='test', durable=True)
connection.close()
Queue properties
A queue can be created with some given queue properties. Queue properties cannot be applied with policies and must be enabled per queue. These properties are available in LavinMQ:
- Name
- Durable
- Exclusive
- Auto-delete
- Arguments
Queue Name
A queue can have a given name, if no name is specified, most client libraries assign a random name to the queue. Queue names starting with the amq. prefix are reserved for internal use and cannot be used for user-defined queues.
Example: Declare a queue with the queue name the_queue_name.
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='the_queue_name', durable=True)
Durable queues
A queue can be marked as durable, which specifies whether it should persist a LavinMQ restart. To create a durable queue, specify the property durable as True during the queue’s creation as done in the example above.
Example: Declare a durable queue.
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='the_queue_name', durable=True)
Exclusive queues
Setting the properties to exclusive is useful when limiting a queue to only one consumer. Exclusive queues can only be used by one connection, meaning that all actions to and from that queue happen over and follow the preset settings of one specific connection. When the connection closes, the exclusive queue is deleted.
Example: Declare an exclusive queue.
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test', exclusive=True)
Auto-delete queues
A queue can be declared with the auto-delete property. This is useful when queues should not stay open when the final connection to the queue closes. The auto-delete property requires at least one connected consumer to succeed. Otherwise, it will not be deleted.
Note: Different consumer and broker communication methods can impact the auto-delete property. For example, the basic.get method gives direct access to a queue instead of setting up a consumer, so consuming a message with basic.get will not affect auto_delete.
Example: Declare an auto-delete queue.
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test', auto_delete=True)
connection.close()
Queue arguments (optional properties)
The specifications of AMQP 0.9.1 enable support for various features called arguments. Depending on the argument, their settings can be changed after the queue declaration.
It is recommended that queue arguments are set using policies to configure multiple queues simultaneously and ensure they are updated automatically when the policy definition changes.
Read more about policies.
| Argument | Description |
|---|---|
| Auto Expire ( x-expires : Int) |
Sets the time (in milliseconds) after which an idle queue (one without consumers) is automatically deleted. |
| Max Length ( x-max-length : Int) |
Limits the number of messages a queue can hold. If exceeded, messages are dropped based on the overflow policy. |
| Message TTL ( x-message-ttl : Int) |
Defines how long (in milliseconds) a message can stay in the queue before being automatically removed. |
| Delivery Limit ( x-delivery-limit : Int) |
Specifies the maximum number of delivery attempts for a message before it is discarded or sent to a dead-letter exchange. |
| Overflow Behaviour ( x-overflow : String) |
Determines what happens when a queue reaches its max length: By default this is set to "drop-head" (removes oldest messages). Also available is "reject-publish" (rejects new messages). |
| Dead Letter Exchange ( x-dead-letter-exchange : String) |
Specifies an exchange where messages are sent when they expire, exceed delivery attempts, or are rejected. |
| Dead Letter Routing Key ( x-dead-letter-routing-key : String) |
Defines a custom routing key for messages sent to the dead-letter exchange. |
| Max Priority ( x-max-priority : Int) |
Enables message prioritization with a priority range between 1 and 255. Read more about message priority here. |
| Stream Queue ( x-queue-type=stream) |
Turns the queue into a stream queue, storing messages for history-based retrieval instead of traditional message queuing. |
| Max Age ( x-max-age : String) |
Limits how long messages are retained in a stream queue. Older messages are automatically deleted. |
| Message Deduplication ( x-deduplication-enabled : Bool) |
Enables deduplication to avoid storing duplicate messages. |
| Deduplication Cache Size ( x-cache-size : Int) |
Sets how many messages to store for deduplication checks. A larger cache uses more memory. |
| Deduplication Cache TTL ( x-cache-ttl : Int) |
Cache entry time-to-live in milliseconds (default: unlimited). |
| Deduplication Header ( x-deduplication-header : String) |
Specifies what header to guard for deduplication, defaults to x-deduplication-header. |
Pausing a queue
Pausing a queue will pause all consumers. All clients will remain connected but receive no messages from the queue. The “pause queue” option can be found in queue details in the LavinMQ management interface or via the HTTP API.
Pausing a queue using RabbitMQ HTTP API:
curl -X PUT [http://guest:guest@localhost:15672/api/queues/vhost_name/queue_name/pause](http://guest:guest@localhost:15672/api/queues/vhost/queue/pause)
Resuming a queue using RabbitMQ HTTP API:
curl -X PUT [http://guest:guest@localhost:15672/api/queues/vhost_name/queue_name/](http://guest:guest@localhost:15672/api/queues/vhost/queue/pause)resume

Restarting a closed queue
A queue can enter a closed state if it encounters an error during operation, such as corrupt data on disk. When a queue closes unexpectedly, the reason is written to the LavinMQ log. Check the log to understand what caused the queue to close before attempting a restart.
In most cases, you need to resolve the underlying issue before the queue can be successfully restarted. For example, if a queue was closed due to a corrupt segment file, you may need to delete the affected file from the queue’s data directory. Restarting a queue without addressing the root cause will likely result in the queue closing again immediately.
Once the issue has been resolved, a closed queue can be restarted without restarting the entire LavinMQ server. This works for both standard queues and stream queues. Only queues in the closed state can be restarted — attempting to restart a running queue will return an error.
Restarting a closed queue using the HTTP API:
curl -X PUT http://guest:guest@localhost:15672/api/queues/vhost_name/queue_name/restart
Restarting a closed queue using lavinmqctl:
lavinmqctl restart_queue queue_nameA “Restart queue” button is also available in the queue details view of the management interface when the queue is in a closed state.
Ready to take the next steps?
Managed LavinMQ instance via CloudAMQP
LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec . You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.
Get started with CloudAMQP ->Help and feedback
We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.