Introduction
Configuration
Language Support
AMQP 0-9-1 Overview
More Exchange Types
More Consumer Features
Queue Deep-dive
Other Features
Reliable Message Delivery
High Availability
Monitoring
Management HTTP API
Tutorials
Networking
LavinMQ CLI
Stream Queues
Queues in LavinMQ are great! However, there are scenarios where queues are limited:
- They deliver the same message to multiple consumers by binding a dedicated queue for each consumer - this makes scaling more resource-intensive.
- Once a message is read from a queue and acknowledged, it is erased. Thus, you can’t re-read the same message multiple times or go back in time and start reading from a point in the queue.
Stream Queues or simply Streams in LavinMQ mitigate the above-listed challenges.
What are LavinMQ Streams?
LavinMQ Streams perform similar tasks as the traditional queues - they buffer messages from producers for consumers to read. However, Streams in LavinMQ differ from the traditional queues in one significant way:
- Streams are immutable: Meaning, messages written to a Stream cannot be erased, they can only be read. Even though this behavior is configurable to some extent(see retention), Streams are primarily designed to permanently persist messages. As a result, consumers can subscribe to a Stream and read the same message as many times as needed.
Consumers interact with Streams in LavinMQ using AMQP-based clients and the AMQP protocol. Currently, LavinMQ does not support any dedicated Stream protocol.
It is important to note that the Stream queue in LavinMQ wasn’t created to replace the traditional queues but rather, to complement them. Streams open up new possibilities for LavinMQ use cases.
Let’s explore these use cases in more detail.
When can I use LavinMQ Streams?
The use cases where Streams shine include:
- Fan-out architectures: Where many consumers need to read the same message.
- Replay & time-travel: Where consumers need to re-read the same message or start reading from any point in the Stream.
Fan-out Architectures
Implementing a fan-out architecture, where multiple consumers need to read the same message, can be tricky with traditional queues. Implementing fan-outs with queues will require creating a dedicated queue for each new consumer - this is not very scalable.
Thankfully, LavinMQ’s Streams offer a streamlined solution to this issue. Consumers can read messages from a Stream in a non-destructive manner, ensuring that the message remains accessible for subsequent consumers.
Implementing a fan-out arrangement with LavinMQ Streams is remarkably straightforward. Merely declare a Stream and bind as many consumers as required.
The accompanying illustration provides a visual depiction of how implementing a fan-out using a Stream would appear:
In contrast, achieving the same result with queues would involve a more complex setup, as shown in the following image:
Replay & Time Travel
LavinMQ Streams are also suitable for use cases where consumers need to re-read
messages, which is not possible with queues. In addition to re-consuming
messages, Streams also allow consumers to point to any position within the
Stream and commence message consumption from that point.
This unique ability to replay and traverse through time within Streams is facilitated by the concept of offsets.
What are offsets?
Offsets in Streams serve a similar purpose as indexes in arrays. To start reading from a specific index in a Stream, you simply specify an offset in the consumer query. Essentially, every message in a Stream has an offset.
For instance, the following image illustrates how messages and their corresponding offsets would appear in a given Stream:
Usage
As mentioned earlier, client applications could talk to a Stream via an AMQP client library, just as they do with queues.
Like queues, there are three steps to working with LavinMQ Streams via an AMQP client library:
- Declare/Instantiate a Stream
- Publish(write) messages to the Stream
- Consume(read) messages from the Stream
The code snippets in this section are based on amqp-client.js, The JavaScript client library.
Declaring a stream
Streams are declared with the AMQP client libraries the same way queues
are created. Set the x-queue-type
queue argument to stream
, and provide
this argument at declaration time.
Also make sure to declare the Stream with durable=true
- LavinMQ does not allow
the creation of non-durable Streams.
See the code snippet below:
Alternatively, a Stream can be created using the LavinMQ Management UI, under the Queues tab, like so:
Make sure to tick the “Durable” checkbox, as shown in the image above.
Publishing a message to a stream
Publishing messages to a Stream is no different from publishing messages to a queue.
As an example, below, the previous snippet has been extended to publish a message
to the test_stream
declared.
You can find the complete producer script here
In addition to the x-queue-type
argument we saw, Streams support 3
additional queue arguments that can be specified at queue declaration or via
a policy.
-
x-max-length
- Sets the maximum number of messages allowed in the stream at any given time. See retention. Default: not set. -
x-max-length-bytes
- Sets the maximum size of the Stream in bytes. See retention. Default: not set. -
x-max-age
- This argument will control how long a message survives in a LavinMQ Stream. The unit of this configuration could either be in years (Y), months (M), days (D), hours (h), minutes (m), or seconds (s). See retention. Default: not set.
Next, an explanation of how to consume this message.
Consuming from Streams in LavinMQ
Messages can be consumed from a Stream the same way queues accomplish this task, more or less, but with two known differences.
- You can specify an offset to start reading/consuming from any point in the Stream.
- Consuming messages in LavinMQ Streams requires setting the QoS prefetch.
- LavinMQ does not allow consuming messages from a Stream with
auto_ack=True
As mentioned, when consuming from a Stream in LavinMQ, clients have the
ability to specify a starting point by using an offset. The x-stream-offset
consumer argument controls this behavior.
Note: When you connect a consumer to a Stream that already contains messages, without specifying an offset, the consumer will begin reading from the Stream’s starting point.
With that being said, LavinMQ supports the following offset values:
first
: This starts reading from the beginning of the Stream,offset 1
.last
: This starts reading from the beginning of the last segment file.next
: Initiates reading from the latest offset once the consumer is initiated - essentially, thenext
offset won’t attempt to read all the messages present in the Stream prior to the consumer’s activation.- A specific numerical offset value: Clients can specify an exact offset to attach to the log. If the specified offset does not exist, it will be adjusted to either the start or end of the log accordingly.
- Timestamp - This value represents a specific point in time to attach to the log.
The snippet below sets the consumers prefetch to 100 and reads from
the test_stream
.
Notice how in the snippet above, the subscribe()
function does not
specify an offset? As a result, the consumer will begin reading from the
beginning of the stream.
Alternatively, we can decide to grab a message with an offset of 5000
or
the very first message in the Stream - the snippet below demonstrates how
we can achieve both.
You can find the complete consumer script here
Data retention
Since LavinMQ Streams are immutable, they inherently tend to grow infinitely until the disk space runs out. As this is an undesirable behavior, a Stream can be configured to discard old messages through a data retention configuration.
Data retention arguments generally allow you configure a Stream to truncate its messages once it reaches a given size or a specified age.
Truncating messages entails deleting an entire segment file. But what is a segment file?
Generally, LavinMQ does not persist messages in a big single file. Instead, Streams and Queues in general persist messages in small 8MiB files called segment files. A Stream truncates its size by deleting a segment file and all its messages.
Note: The retention is evaluated on a per-segment basis. Essentially, Streams only apply the retention limits you set whenever an existing segment file has reached its maximum size(by default 8MiB) and is closed in favor of a new one.
To configure a Stream’s retention strategy, you can adopt a size-based or time-based approach(or even both).
Size-based retention strategy
Here, the Stream is set up to discard segment files once the total size or number of messages in the Stream reaches a specified upper limit.
Setting up the sized-based retention strategy requires providing any of the following arguments when declaring the Stream. As mentioned earlier, this can also be done through a policy:
x-max-length-bytes
x-max-length
For example:
In the example above, if the Stream hits the 1000 msgs limit, old segment files would be deleted until the limit is met.
Time-based retention strategy
The Stream is configured to truncate segment files that have surpassed a specific age threshold.
On the other hand, setting up the time-based retention strategy requires providing the following arguments when declaring the Stream or via a policy:
x-max-age
Again, the unit of this configuration could either be in years (Y), months (M), days (D), hours (h), minutes (m), or seconds (s).
For example:
In the snippet provided, we employ a time-based retention strategy, whereby segment files that have been present for 30 days or longer are discarded. This process involves comparing the timestamp of the last message in each segment with the specified max-age.
When the timestamp of the last message in a segment exceeds the max age, the entire segment is discarded. This is due to the fact that messages are stored chronologically, ensuring that all other messages within the segment will be older than the last message.
LavinMQ does not provide a default value for this.
Feature comparison with traditional queues in LavinMQ
Streams are different from regular queues, so they don’t exactly match the way AMQP 0.9.1 queues work. Many things that other types of queues can do, Streams can’t do because of the way they’re designed keeping their use-cases in mind.
The table below presents some of the features in LavinMQ and how they compare across queues and streams.
Features | Queue | Stream |
---|---|---|
Non-durability | Queues can be non-durable | A stream must be durable. durable: false will fail. |
Exclusivity | Supported | Not supported |
Consumer priority | Supported | Not supported. x-priority argument will fail. |
Single Active Consumer | Supported | Not supported. x-single-active-consumer argument will fail. |
Consumer acknowledgement | Not required | Consumers must acknowledge messages when reading from Streams. noAck: true will fail. |
Dead Letter Exchange | Supported | Not supported. x-dead-letter-exchange argument will fail. |
Per-message TTL | Supported | Not supported. x-expires will fail. |
Delivery limit | Supported | Not supported. x-delivery-limit will fail. |
Reject on overflow | Supported | Not supported. x-overflow will fail. |
Channel Prefetch | Not required | A consumer must specify its channel prefetch |
Global Prefetch | Supported | Not supported |
Wrap up
This piece explored the fundamentals of LavinMQ Streams: from when to use Streams to how to get started with them. We took a step further to cover retention configurations that help prevent Streams from growing indefinitely.
Overall, Streams weren’t created to replace traditional queues in LavinMQ, but to complement them. Streams open up new possibilities for LavinMQ use cases.
Ready to take the next steps? Here are some things you should keep in mind:
Managed LavinMQ instance on CloudAMQP
LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.
Help and feedback
We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.