Stream Queues

What are LavinMQ Streams?

LavinMQ Streams buffer messages from producers for consumers, like traditional queues. However, unlike queues, streams are immutable; messages can’t be erased; they can only be read. While retention settings offer some control, Streams are designed for long-term message storage. This allows consumers to subscribe and read the same message multiple times.

When to use LavinMQ Streams?

Streams are great for:

  • Fan-out architectures: Where many consumers need to read the same message. Implementing a fan-out arrangement with LavinMQ Streams is remarkably straightforward. Merely declare a Stream and bind as many consumers as required.
  • Replay & time-travel: Where consumers need to re-read the same message or start reading from any point in the Stream.

Client applications could talk to a Stream via an AMQP client library, just as they do with queues. Like queues, there are three steps to working with LavinMQ Streams:

  1. Declare a Stream
  2. Publish messages to the Stream
  3. Consume messages from the Stream

1. Declaring a stream

Streams are declared with the AMQP client libraries the same way queues are created. Set the x-queue-type queue argument to stream, and provide this argument at declaration time. Also make sure to declare the Stream with durable=true . LavinMQ does not allow the creation of non-durable Streams.

import pika

connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()

channel.queue_declare(
    queue='test_stream',
    durable=True,
    arguments={"x-queue-type": "stream"}
)

connection.close()

2. Publishing to the stream

As an example, below, the previous snippet has been extended to publish a message to the test_stream declared.

import pika

connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()

channel.queue_declare(
    queue="test_stream",
    durable=True,
    arguments={"x-queue-type": "stream"}
)

channel.basic_publish(
	exchange="", # Use the default exchange
	routing_key="test_stream",
	body="Hello World"
)

connection.close()

In addition to the x-queue-type argument, Streams support three additional queue arguments that can be specified at queue declaration or via a policy.

  • x-max-length - Sets the maximum number of messages allowed in the stream at any given time. See retention. Default: not set.
  • x-max-length-bytes - Sets the maximum size of the Stream in bytes. See retention. Default: not set.
  • x-max-age - This argument will control how long a message survives in a LavinMQ Stream. The unit of this configuration could either be in years (Y), months (M), days (D), hours (h), minutes (m), or seconds (s). See retention. Default: not set.

3. Consuming from the stream

Three key things to note about consuming messages from a Stream queue:

  • An offset can be specified to start reading from any point in the Stream.
  • Consuming messages in LavinMQ Streams requires setting the QoS prefetch.
  • LavinMQ does not allow consuming messages from a Stream with auto_ack=True

As mentioned, when consuming from a Stream , clients have the ability to specify a starting point by using an offset. The x-stream-offset consumer argument controls this behaviour.

LavinMQ supports the following offset values:

  • first: This starts reading from the beginning of the Stream, offset 1.
  • last: This starts reading from the beginning of the last segment file.
  • next: Initiates reading from the latest offset once the consumer is initiated - essentially, the next offset won’t attempt to read all the messages present in the Stream prior to the consumer’s activation.
  • A specific numerical offset value: Clients can specify an exact offset to attach to the log. If the specified offset does not exist, it will be adjusted to either the start or end of the log accordingly.
  • Timestamp: This value represents a specific point in time to attach to the log.

Note: Connecting a consumer to a Stream that already contains messages, without specifying an offset, will configure the consumer to read from beginning of the stream.

Example: Sets the consumers prefetch to 100 and reads from the test_stream via two approaches:

  • Reading from the 5000th offset
  • Reading from the first message in the Stream.
import pika

connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()

def callback(ch, method, properties, msg):
		print(f"[✅]  { msg }")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=100)

# Reading from the beginning of the Stream.
channel.basic_consume(
    "test_stream",
    callback,
    auto_ack=False,
    arguments={"x-stream-offset": 1}
)
connection.close()
import pika

connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()

def callback(ch, method, properties, msg):
		print(f"[✅]  { msg }")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=100)

# Reading from the 5000th offset
channel.basic_consume(
    "test_stream",
    callback,
    auto_ack=False,
    arguments={"x-stream-offset": 5000}
)
connection.close()

Offsets

Offsets in Streams serve a similar purpose as indexes in arrays. To start reading from a specific index in a Stream, simply specify an offset in the consumer query. Essentially, every message in a Stream has an offset. For instance, the following image illustrates how messages and their corresponding offsets would appear in a given Stream:

Streams

Data retention

A stream can be configured to discard old messages using data retention settings. Retention settings allow a Stream to automatically remove messages once they exceed a specified size or age.

Message truncation involves deleting an entire segment file. Instead of storing messages in a single large file, LavinMQ uses smaller 8MB files called segment files. When truncation occurs, a segment file and all its messages are deleted.

Note: Retention is evaluated per segment. A Stream applies retention limits only when an existing segment file reaches its maximum size (default: 8MB) and is closed in favour of a new one.

A Stream’s retention strategy can be configured using a size-based, time-based, or combined approach.

Size-based retention strategy

Here, the Stream is set up to discard segment files once the total size or number of messages in the Stream reaches a specified upper limit. Setting up the sized-based retention strategy requires providing any of the following arguments when declaring the Stream:

  • x-max-length-bytes
  • x-max-length

Example: Setting stream capacity to one thousand messages.

import pika

connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()

channel.queue_declare(
    queue='test_stream',
    durable=True,
    arguments={"x-queue-type": "stream", "x-max-length": 1000}
)

connection.close()

In the example above, if the Stream reaches the 1000 message limit, old segment files are deleted until the limit is met.

Time-based retention strategy

The Stream truncates segment files exceeding a set age. To enable time-based retention, specify x-max-age when declaring the Stream or via policy.

Units: years (Y), months (M), days (D), hours (h), minutes (m), or seconds (s).

Example: Expire messages that have been in the queue longer than 30 days

import pika

connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()

channel.queue_declare(
    queue='test_stream',
    durable=True,
    arguments={"x-queue-type": "stream", "x-max-age": "30D"}
)

connection.close()

The snippet demonstrates a time-based retention strategy, where segment files older than 30 days are discarded.

Automatic offset tracking

When consuming from a stream, it is possible to configure LavinMQ to handle tracking of offsets. Offset tracking helps a consumer remember where it left off in a stream.

For example, if a consumer has processed messages up to position 5000, that means it has successfully handled everything before that point. If it stops and later comes back online, it can resume from position 5001 instead of starting over.

How it works

Offset tracking can be enabled in two ways:

  1. By setting x-stream-offset = null .
  2. Or by setting x-stream-automatic-offset-tracking = true.

In both cases, setting the  consumer tag  is required.

When setting x-stream-offset = null, the consumer will start reading the stream from the beginning at first connection. At a later reconnect the consumer will resume from where it left off automatically.

It is also possible to combine the automatic tracking with all possible x-stream-offset values by providing both a valid x-stream-offset and setting x-stream-automatic-offset-tracking = true. In this case, the consumer will start reading the stream from the provided x-stream-offset at first connection. At a later reconnect the consumer will resume from where it left off automatically.

The snippet below shows a consumer script with x-stream-automatic-offset-tracking set to true and consumer tag set to test-consumer.

import pika

connection = pika.BlockingConnection(pika.URLParameters('host-url'))
channel = connection.channel()

def callback(ch, method, properties, msg):
    print(f"[✅]  { msg }")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=100)

channel.basic_consume(
    "test_stream",
    callback,
    auto_ack=False,
    consumer_tag="**test-consumer"**
    arguments={"x-stream-automatic-offset-tracking": true}
)

connection.close()

Server side tracking can be disabled at any point by reconnecting the consumer and providing a valid value for x-stream-offset (and not providing x-stream-automatic-offset-tracking).

Stream filtering

LavinMQ Streams support server-side filtering, allowing consumers to receive only the messages they need without scanning the entire stream. This reduces network usage and improves efficiency, making it easier to work with specific data.

How it works

Filter messages by setting x-stream-filter-value when publishing to a stream. For example, to tag all search queries, set x-stream-filter-value = 'sq' when publishing.

When a consumer sets x-stream-filter, only messages matching the filter value are received. The filter can be a single value (a string) or multiple values separated by commas. If a message matches any of the filters, it is delivered. To receive all messages without a filter, provide x-stream-match-unfiltered as an argument on the consumer.

If x-stream-filter is not provided when consuming, all messages in the stream are returned as if no filters existed.

Feature comparison with queues

Due to their design and intended use cases, Streams lack some features found in normal queues.The table below compares queues vs streams.

Features Queue Stream
Non-durability Queues can be non-durable A stream must be durable. durable: false will fail.
Exclusivity Supported Not supported
Consumer priority Supported Not supported. x-priority argument will fail.
Single Active Consumer Supported Not supported. x-single-active-consumer argument will fail.
Consumer acknowledgement Not required Consumers must acknowledge messages when reading from Streams. noAck: true will fail.
Dead Letter Exchange Supported Not supported. x-dead-letter-exchange argument will fail.
Per-message TTL Supported Not supported. x-expires will fail.
Delivery limit Supported Not supported. x-delivery-limit will fail.
Reject on overflow Supported Not supported. x-overflow will fail.
Channel Prefetch Not required A consumer must specify its channel prefetch
Global Prefetch Supported Not supported

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.