Introduction
Configuration
Language Support
AMQP 0-9-1 Overview
More Exchange Types
More Consumer Features
Queue Deep-dive
Other Features
Reliable Message Delivery
High Availability
Monitoring
Management HTTP API
Tutorials
Networking
LavinMQ CLI
Stream Queues
What are LavinMQ Streams?
LavinMQ Streams buffer messages from producers for consumers, like traditional queues. However, unlike queues, streams are immutable; messages can’t be erased; they can only be read. While retention settings offer some control, Streams are designed for long-term message storage. This allows consumers to subscribe and read the same message multiple times.
When to use LavinMQ Streams?
Streams are great for:
- Fan-out architectures: Where many consumers need to read the same message. Implementing a fan-out arrangement with LavinMQ Streams is remarkably straightforward. Merely declare a Stream and bind as many consumers as required.
- Replay & time-travel: Where consumers need to re-read the same message or start reading from any point in the Stream.
Client applications could talk to a Stream via an AMQP client library, just as they do with queues. Like queues, there are three steps to working with LavinMQ Streams:
- Declare a Stream
- Publish messages to the Stream
- Consume messages from the Stream
1. Declaring a stream
Streams are declared with the AMQP client libraries the same way queues are created. Set the x-queue-type
queue argument to stream
, and provide this argument at declaration time. Also make sure to declare the Stream with durable=true
. LavinMQ does not allow the creation of non-durable Streams.
import pika
connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
connection.close()
2. Publishing to the stream
As an example, below, the previous snippet has been extended to publish a message to the test_stream
declared.
import pika
connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()
channel.queue_declare(
queue="test_stream",
durable=True,
arguments={"x-queue-type": "stream"}
)
channel.basic_publish(
exchange="", # Use the default exchange
routing_key="test_stream",
body="Hello World"
)
connection.close()
In addition to the x-queue-type
argument, Streams support three additional queue arguments that can be specified at queue declaration or via a policy.
x-max-length
- Sets the maximum number of messages allowed in the stream at any given time. See retention. Default: not set.x-max-length-bytes
- Sets the maximum size of the Stream in bytes. See retention. Default: not set.x-max-age
- This argument will control how long a message survives in a LavinMQ Stream. The unit of this configuration could either be in years (Y), months (M), days (D), hours (h), minutes (m), or seconds (s). See retention. Default: not set.
3. Consuming from the stream
Three key things to note about consuming messages from a Stream queue:
- An offset can be specified to start reading from any point in the Stream.
- Consuming messages in LavinMQ Streams requires setting the QoS prefetch.
- LavinMQ does not allow consuming messages from a Stream with
auto_ack=True
As mentioned, when consuming from a Stream , clients have the ability to specify a starting point by using an offset. The x-stream-offset
consumer argument controls this behaviour.
LavinMQ supports the following offset values:
first
: This starts reading from the beginning of the Stream,offset 1
.last
: This starts reading from the beginning of the last segment file.next
: Initiates reading from the latest offset once the consumer is initiated - essentially, thenext
offset won’t attempt to read all the messages present in the Stream prior to the consumer’s activation.- A specific numerical offset value: Clients can specify an exact offset to attach to the log. If the specified offset does not exist, it will be adjusted to either the start or end of the log accordingly.
- Timestamp: This value represents a specific point in time to attach to the log.
Note: Connecting a consumer to a Stream that already contains messages, without specifying an offset, will configure the consumer to read from beginning of the stream.
Example: Sets the consumers prefetch to 100 and reads from the test_stream
via two approaches:
- Reading from the 5000th offset
- Reading from the first message in the Stream.
import pika
connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()
def callback(ch, method, properties, msg):
print(f"[✅] { msg }")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=100)
# Reading from the beginning of the Stream.
channel.basic_consume(
"test_stream",
callback,
auto_ack=False,
arguments={"x-stream-offset": 1}
)
connection.close()
import pika
connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()
def callback(ch, method, properties, msg):
print(f"[✅] { msg }")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=100)
# Reading from the 5000th offset
channel.basic_consume(
"test_stream",
callback,
auto_ack=False,
arguments={"x-stream-offset": 5000}
)
connection.close()
Offsets
Offsets in Streams serve a similar purpose as indexes in arrays. To start reading from a specific index in a Stream, simply specify an offset in the consumer query. Essentially, every message in a Stream has an offset. For instance, the following image illustrates how messages and their corresponding offsets would appear in a given Stream:
Data retention
A stream can be configured to discard old messages using data retention settings. Retention settings allow a Stream to automatically remove messages once they exceed a specified size or age.
Message truncation involves deleting an entire segment file. Instead of storing messages in a single large file, LavinMQ uses smaller 8MB files called segment files. When truncation occurs, a segment file and all its messages are deleted.
Note: Retention is evaluated per segment. A Stream applies retention limits only when an existing segment file reaches its maximum size (default: 8MB) and is closed in favour of a new one.
A Stream’s retention strategy can be configured using a size-based, time-based, or combined approach.
Size-based retention strategy
Here, the Stream is set up to discard segment files once the total size or number of messages in the Stream reaches a specified upper limit. Setting up the sized-based retention strategy requires providing any of the following arguments when declaring the Stream:
x-max-length-bytes
x-max-length
Example: Setting stream capacity to one thousand messages.
import pika
connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream", "x-max-length": 1000}
)
connection.close()
In the example above, if the Stream reaches the 1000 message limit, old segment files are deleted until the limit is met.
Time-based retention strategy
The Stream truncates segment files exceeding a set age. To enable time-based retention, specify x-max-age
when declaring the Stream or via policy.
Units: years (Y), months (M), days (D), hours (h), minutes (m), or seconds (s).
Example: Expire messages that have been in the queue longer than 30 days
import pika
connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream", "x-max-age": "30D"}
)
connection.close()
The snippet demonstrates a time-based retention strategy, where segment files older than 30 days are discarded.
Automatic offset tracking
When consuming from a stream, it is possible to configure LavinMQ to handle tracking of offsets. Offset tracking helps a consumer remember where it left off in a stream.
For example, if a consumer has processed messages up to position 5000, that means it has successfully handled everything before that point. If it stops and later comes back online, it can resume from position 5001 instead of starting over.
How it works
Offset tracking can be enabled in two ways:
- By setting
x-stream-offset = null
. - Or by setting
x-stream-automatic-offset-tracking = true
.
In both cases, setting the consumer tag
is required.
When setting x-stream-offset = null
, the consumer will start reading the stream from the beginning at first connection. At a later reconnect the consumer will resume from where it left off automatically.
It is also possible to combine the automatic tracking with all possible x-stream-offset
values by providing both a valid x-stream-offset
and setting x-stream-automatic-offset-tracking = true
. In this case, the consumer will start reading the stream from the provided x-stream-offset
at first connection. At a later reconnect the consumer will resume from where it left off automatically.
The snippet below shows a consumer script with x-stream-automatic-offset-tracking
set to true
and consumer tag
set to test-consumer
.
import pika
connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()
def callback(ch, method, properties, msg):
print(f"[✅] { msg }")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=100)
channel.basic_consume(
"test_stream",
callback,
auto_ack=False,
consumer_tag="**test-consumer"**
arguments={"x-stream-automatic-offset-tracking": true}
)
connection.close()
Server side tracking can be disabled at any point by reconnecting the consumer and providing a valid value for x-stream-offset
(and not providing x-stream-automatic-offset-tracking
).
Stream filtering
LavinMQ Streams support server-side filtering, allowing consumers to receive only the messages they need without scanning the entire stream. This reduces network usage and improves efficiency, making it easier to work with specific data.
How it works
Filter messages by setting x-stream-filter-value
when publishing to a stream. For example, to tag all search queries, set x-stream-filter-value = 'sq'
when publishing.
When a consumer sets x-stream-filter
, only messages matching the filter value are received. The filter can be a single value (a string) or multiple values separated by commas. If a message matches any of the filters, it is delivered. To receive all messages without a filter, provide x-stream-match-unfiltered
as an argument on the consumer.
If x-stream-filter
is not provided when consuming, all messages in the stream are returned as if no filters existed.
Feature comparison with queues
Due to their design and intended use cases, Streams lack some features found in normal queues.The table below compares queues vs streams.
Features | Queue | Stream |
---|---|---|
Non-durability | Queues can be non-durable | A stream must be durable. durable: false will fail. |
Exclusivity | Supported | Not supported |
Consumer priority | Supported | Not supported. x-priority argument will fail. |
Single Active Consumer | Supported | Not supported. x-single-active-consumer argument will fail. |
Consumer acknowledgement | Not required | Consumers must acknowledge messages when reading from Streams. noAck: true will fail. |
Dead Letter Exchange | Supported | Not supported. x-dead-letter-exchange argument will fail. |
Per-message TTL | Supported | Not supported. x-expires will fail. |
Delivery limit | Supported | Not supported. x-delivery-limit will fail. |
Reject on overflow | Supported | Not supported. x-overflow will fail. |
Channel Prefetch | Not required | A consumer must specify its channel prefetch |
Global Prefetch | Supported | Not supported |
Ready to take the next steps? Here are some things you should keep in mind:
Managed LavinMQ instance on CloudAMQP
LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.
Help and feedback
We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.