Persistent Exchange

Rewindable queues is a popular feature in message queueing. This allows for messages to be reread and re-consumed which covers use cases from a wide range of perspectives. LavinMQ is an AMQP broker with this ability through the Persistent Exchange. Sometimes you simply want to have the option to consume your message again.

What is a persistent exchange?

It is possible to add the x-persist-messages argument when creating an exchange (direct, topic, fanout, header). This creates a persistent exchange which makes it possible to re-read messages. The persistent exchange ensures that a reference to the message is kept for all messages sent through that exchange.

What is the benefit of persistent exchange?

Messages sent via a persistent exchange can be consumed multiple times. This is useful when you want to ensure that messages are still stored in the message store so that they can be consumed again.

How should I use a persistent exchange?

Use persistent exchange when you want to be able to read messages sent to an exchange more than once. This feature can be compared to how Apache Kafka stores messages in the commit log, where messages are not removed from the queue once they have been consumed or handled.

A persistent exchange will also store all messages coming into the exchange even though there are no queue bindings on that exchange. Assuring that messages won’t be lost.

The Persistent Exchange

All exchanges in LavinMQ can be marked as persistent exchanges with the a rgument of x-persist-messages. A persistent exchange stores all messages coming into that exchange even though there are no queue bindings on that exchange. The persistent exchange ensures that messages sent through that exchange are kept in an internal queue and are available even after they have been consumed.

The number of messages that can be stored in the persistent exchange are set when the exchange is created. The code example show how to mark an exchange as an persistent exchange, that should retain 1 message:

x_args = AMQP::Client::Arguments.new({"x-persist-messages" => 1})
x = channel.exchange(x_name, "topic", args: x_args)

The persistent exchange can also store messages for a given value of time. The code example shows how to retain messages 1 millisecond:

x_args = AMQP::Client::Arguments.new({"x-persist-ms" => 1})
x = ch.exchange(x_name, "topic", args: x_args)

The exchange is using an internal queue, visible to users in the management interface. The name of the queue is amq.persistent followed by the exchange name. All queue policies that exist can be applied to this queue, such as max-length. Make sure your queue is not filled with more messages than the disk can store.

Bind a queue to the persistent exchange

Messages sent through a persistent exchange are routed as normal messages, with the difference that they are also stored in an internal queue, so that they can be consumed again. When setting up a new binding between the exchange and a queue, additional arguments can be applied to decide if the stored messages should be routed to the new queue or not. There are different ways to select messages that should be re-consumed.

Message selection

Messages are stored in persistent exchanges in order. The first message published to the exchange is message 0, at the head of the queue:

9 8 7 6 5 4 3 2 1 0

Three arguments can be used; x-head, x-tail, and x-from.

With x-head argument

By supplying x-head as an argument to the binding X number of messages are selected starting with the oldest message. The value for x-head can be both positive and negative. If you bind a queue with the argument x-head=3 messages 0, 1, and 2 will be routed to your queue.

9 8 7 6 5 4 3 2 1 0

With x-tail argument

x-from allows you to be very specific on which messages to get. Instead of counting from the oldest or newest like x-head and x-tail, x-from specifies the message to start from to route to the queue. Each message consumed from a persistent exchange has an additional argument called x-offset which you can use to request a specific message again. In the example, if you want to get the message with the x-offset set to 7, you specify x-from=7.

9 8 7 6 5 4 3 2 1 0

If you specify a x-from=0 or an offset that doesn’t exist you will get all messages stored in the exchange.

Example

When consuming messages from a queue bound to a persistent exchange, some messages fail to be processed. Unfortunately, a re-queue of the message was missed. If you have been logging x-offset for each message you can use that value to bind a new queue to the exchange and supply that value as x-from for that binding. The result is that the new queue would get all messages from that offset.

Frequently Asked Questions

Q: How are messages stored in the persistent exchange?

Messages sent to exchanges marked as persistent exchanges are stored in an internal queue within LavinMQ. LavinMQ always stores a reference to messages, and a message sent to the internal queue has always an active reference.

See Message Store in LavinMQ.

Q: How long are messages stored in the persistent exchange?

Messages sent via a persistent exchange are stored in an internal queue. This queue will persist as many messages as specified in the argument. Messages are stored forever, as long as a max time is not specified.