Stream Queues

Queues in LavinMQ are great! However, there are scenarios where queues are limited:

  • They deliver the same message to multiple consumers by binding a dedicated queue for each consumer - this makes scaling more resource-intensive.
  • Once a message is read from a queue and acknowledged, it is erased. Thus, you can’t re-read the same message multiple times or go back in time and start reading from a point in the queue.

Stream Queues or simply Streams in LavinMQ mitigate the above-listed challenges.

What are LavinMQ Streams?

LavinMQ Streams perform similar tasks as the traditional queues - they buffer messages from producers for consumers to read. However, Streams in LavinMQ differ from the traditional queues in one significant way:

  • Streams are immutable: Meaning, messages written to a Stream cannot be erased, they can only be read. Even though this behavior is configurable to some extent(see retention), Streams are primarily designed to permanently persist messages. As a result, consumers can subscribe to a Stream and read the same message as many times as needed.

Consumers interact with Streams in LavinMQ using AMQP-based clients and the AMQP protocol. Currently, LavinMQ does not support any dedicated Stream protocol.

It is important to note that the Stream queue in LavinMQ wasn’t created to replace the traditional queues but rather, to complement them. Streams open up new possibilities for LavinMQ use cases.

Let’s explore these use cases in more detail.

When can I use LavinMQ Streams?

The use cases where Streams shine include:

  • Fan-out architectures: Where many consumers need to read the same message.
  • Replay & time-travel: Where consumers need to re-read the same message or start reading from any point in the Stream.

Fan-out Architectures

Implementing a fan-out architecture, where multiple consumers need to read the same message, can be tricky with traditional queues. Implementing fan-outs with queues will require creating a dedicated queue for each new consumer - this is not very scalable.

Thankfully, LavinMQ’s Streams offer a streamlined solution to this issue. Consumers can read messages from a Stream in a non-destructive manner, ensuring that the message remains accessible for subsequent consumers.

Implementing a fan-out arrangement with LavinMQ Streams is remarkably straightforward. Merely declare a Stream and bind as many consumers as required.

The accompanying illustration provides a visual depiction of how implementing a fan-out using a Stream would appear:

Fanout Architectures with LavinMQ Streams

In contrast, achieving the same result with queues would involve a more complex setup, as shown in the following image:

Fanout Architectures with Traditional Queues

Replay & Time Travel

LavinMQ Streams are also suitable for use cases where consumers need to re-read messages, which is not possible with queues. In addition to re-consuming
messages, Streams also allow consumers to point to any position within the Stream and commence message consumption from that point.

This unique ability to replay and traverse through time within Streams is facilitated by the concept of offsets.

What are offsets?

Offsets in Streams serve a similar purpose as indexes in arrays. To start reading from a specific index in a Stream, you simply specify an offset in the consumer query. Essentially, every message in a Stream has an offset.

For instance, the following image illustrates how messages and their corresponding offsets would appear in a given Stream:

Offsets in Stream Queues

Usage

As mentioned earlier, client applications could talk to a Stream via an AMQP client library, just as they do with queues.

Like queues, there are three steps to working with LavinMQ Streams via an AMQP client library:

  • Declare/Instantiate a Stream
  • Publish(write) messages to the Stream
  • Consume(read) messages from the Stream

The code snippets in this section are based on amqp-client.js, The JavaScript client library.

Declaring a stream

Streams are declared with the AMQP client libraries the same way queues are created. Set the x-queue-type queue argument to stream, and provide this argument at declaration time.

Also make sure to declare the Stream with durable=true - LavinMQ does not allow the creation of non-durable Streams.

See the code snippet below:

import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'

// Access the LAVINMQ_URL environment variable
const LAVINMQ_URL = process.env.LAVINMQ_URL

async function startProducer() {
  try {
    //Setup a connection to the LavinMQ server
    const connection = new AMQPClient("LAVINMQ_URL")
    await connection.connect()
    const channel = await connection.channel()

    console.log("[✅] Connection over channel established")

    // Declare a Stream, named test_stream
    const q = await channel.queue(
      "test_stream", 
      { durable: true}, 
      {"x-queue-type": "stream"}
    )
  }
}

Alternatively, a Stream can be created using the LavinMQ Management UI, under the Queues tab, like so:

 Stream from management UI

Make sure to tick the “Durable” checkbox, as shown in the image above.

Publishing a message to a stream

Publishing messages to a Stream is no different from publishing messages to a queue. As an example, below, the previous snippet has been extended to publish a message to the test_stream declared.

async function startProducer() {
  try {
    //Setup a connection to the LavinMQ server
    const connection = new AMQPClient(LAVINMQ_URL)
    await connection.connect()
    const channel = await connection.channel()

    console.log("[✅] Connection over channel established")

    // Declare a Stream, named test_stream
    const q = await channel.queue(
      "test_stream", 
      { durable: true}, 
      {"x-queue-type": "stream"}
    )

    //Publish a message to the exchange
    async function sendToQueue(routingKey, body) {
      await channel.basicPublish("", routingKey, body)
      console.log("[📥] Message sent to stream: ", body)
    }
  
    //Send some messages to the queue
    sendToQueue("test_stream", "Hello World 1");
    sendToQueue("test_stream", "Hello World 2");
    sendToQueue("test_stream", "Hello World 3");
    sendToQueue("test_stream", "Hello World 4");
    sendToQueue("test_stream", "Hello World 5");
  }
}

You can find the complete producer script here

In addition to the x-queue-type argument we saw, Streams support 3 additional queue arguments that can be specified at queue declaration or via a policy.

  • x-max-length - Sets the maximum number of messages allowed in the stream at any given time. See retention. Default: not set.

  • x-max-length-bytes - Sets the maximum size of the Stream in bytes. See retention. Default: not set.

  • x-max-age - This argument will control how long a message survives in a LavinMQ Stream. The unit of this configuration could either be in years (Y), months (M), days (D), hours (h), minutes (m), or seconds (s). See retention. Default: not set.

Next, an explanation of how to consume this message.

Consuming from Streams in LavinMQ

Messages can be consumed from a Stream the same way queues accomplish this task, more or less, but with two known differences.

  • You can specify an offset to start reading/consuming from any point in the Stream.
  • Consuming messages in LavinMQ Streams requires setting the QoS prefetch.
  • LavinMQ does not allow consuming messages from a Stream with auto_ack=True

As mentioned, when consuming from a Stream in LavinMQ, clients have the ability to specify a starting point by using an offset. The x-stream-offset consumer argument controls this behavior.

Note: When you connect a consumer to a Stream that already contains messages, without specifying an offset, the consumer will begin reading from the Stream’s starting point.

With that being said, LavinMQ supports the following offset values:

  • first: This starts reading from the beginning of the Stream, offset 1.
  • last: This starts reading from the beginning of the last segment file.
  • next: Initiates reading from the latest offset once the consumer is initiated - essentially, the next offset won’t attempt to read all the messages present in the Stream prior to the consumer’s activation.
  • A specific numerical offset value: Clients can specify an exact offset to attach to the log. If the specified offset does not exist, it will be adjusted to either the start or end of the log accordingly.
  • Timestamp - This value represents a specific point in time to attach to the log.

The snippet below sets the consumers prefetch to 100 and reads from the test_stream.

async function startConsumer() {
  // Set the prefetch
  channel.prefetch(100)

  const q = await channel.queue(
    "test_stream", 
    { durable: true}, 
    {"x-queue-type": "stream"}
  )

  let counter = 0;

  // Read messages from the test_stream
  await q.subscribe({ noAck: false }, async (msg) => {
    try {
      console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
    } catch (error) {
      console.error(error)
    }
  })
}

Notice how in the snippet above, the subscribe() function does not specify an offset? As a result, the consumer will begin reading from the beginning of the stream.

Alternatively, we can decide to grab a message with an offset of 5000 or the very first message in the Stream - the snippet below demonstrates how we can achieve both.

// Grabbing the first message
await q.subscribe({ noAck: false, args: { "x-stream-offset": "first" }}, async (msg) => {
    try {
      console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
    } catch (error) {
      console.error(error)
    }
})

// Grabbing the message with offset 5000
await q.subscribe({ noAck: false, args: { "x-stream-offset": 5000 }}, async (msg) => {
    try {
      console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
    } catch (error) {
      console.error(error)
    }
})

You can find the complete consumer script here

Data retention

Since LavinMQ Streams are immutable, they inherently tend to grow infinitely until the disk space runs out. As this is an undesirable behavior, a Stream can be configured to discard old messages through a data retention configuration.

Data retention arguments generally allow you configure a Stream to truncate its messages once it reaches a given size or a specified age.

Truncating messages entails deleting an entire segment file. But what is a segment file?

Generally, LavinMQ does not persist messages in a big single file. Instead, Streams and Queues in general persist messages in small 8MiB files called segment files. A Stream truncates its size by deleting a segment file and all its messages.

Note: The retention is evaluated on a per-segment basis. Essentially, Streams only apply the retention limits you set whenever an existing segment file has reached its maximum size(by default 8MiB) and is closed in favor of a new one.

To configure a Stream’s retention strategy, you can adopt a size-based or time-based approach(or even both).

Size-based retention strategy

Here, the Stream is set up to discard segment files once the total size or number of messages in the Stream reaches a specified upper limit.

Setting up the sized-based retention strategy requires providing any of the following arguments when declaring the Stream. As mentioned earlier, this can also be done through a policy:

  • x-max-length-bytes
  • x-max-length

For example:

// Setting stream capacity to 1 thousand msgs
const q = await channel.queue(
  "test_stream", 
  { durable: true}, 
  {"x-queue-type": "stream", "x-max-length": 1000}
)

In the example above, if the Stream hits the 1000 msgs limit, old segment files would be deleted until the limit is met.

Time-based retention strategy

The Stream is configured to truncate segment files that have surpassed a specific age threshold.

On the other hand, setting up the time-based retention strategy requires providing the following arguments when declaring the Stream or via a policy:

  • x-max-age

Again, the unit of this configuration could either be in years (Y), months (M), days (D), hours (h), minutes (m), or seconds (s).

For example:

// Expire messages that have been in the queue longer than 30 days
const q = await channel.queue(
  "test_stream", 
  { durable: true}, 
  {"x-queue-type": "stream", "x-max-age": "30D"}
)

In the snippet provided, we employ a time-based retention strategy, whereby segment files that have been present for 30 days or longer are discarded. This process involves comparing the timestamp of the last message in each segment with the specified max-age.

When the timestamp of the last message in a segment exceeds the max age, the entire segment is discarded. This is due to the fact that messages are stored chronologically, ensuring that all other messages within the segment will be older than the last message.

LavinMQ does not provide a default value for this.

Feature comparison with traditional queues in LavinMQ

Streams are different from regular queues, so they don’t exactly match the way AMQP 0.9.1 queues work. Many things that other types of queues can do, Streams can’t do because of the way they’re designed keeping their use-cases in mind.

The table below presents some of the features in LavinMQ and how they compare across queues and streams.

Features Queue Stream
Non-durability Queues can be non-durable A stream must be durable. durable: false will fail.
Exclusivity Supported Not supported
Consumer priority Supported Not supported. x-priority argument will fail.
Single Active Consumer Supported Not supported. x-single-active-consumer argument will fail.
Consumer acknowledgement Not required Consumers must acknowledge messages when reading from Streams. noAck: true will fail.
Dead Letter Exchange Supported Not supported. x-dead-letter-exchange argument will fail.
Per-message TTL Supported Not supported. x-expires will fail.
Delivery limit Supported Not supported. x-delivery-limit will fail.
Reject on overflow Supported Not supported. x-overflow will fail.
Channel Prefetch Not required A consumer must specify its channel prefetch
Global Prefetch Supported Not supported

Wrap up

This piece explored the fundamentals of LavinMQ Streams: from when to use Streams to how to get started with them. We took a step further to cover retention configurations that help prevent Streams from growing indefinitely.

Overall, Streams weren’t created to replace traditional queues in LavinMQ, but to complement them. Streams open up new possibilities for LavinMQ use cases.

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.