Part 4 - Stream truncation with retention policies

In the last part of this series, we looked at how Stream queues’ immutability makes them perfect for message replay. But that same immutability comes with a trade-off—messages stick around even after being consumed. Left unchecked, Streams could grow endlessly, eating up disk space. To avoid this, you can set up data retention rules to automatically discard old messages — that will be the crux of this part of the series.

Data retention configuration

Data retention settings let you control when a Stream trims old messages—either after reaching a set size or age. Instead of deleting messages one by one, LavinMQ removes entire segment files. If you missed it earlier, Streams don’t store messages in one giant file; they use smaller 8MiB segment files. When a Stream needs to free up space, it simply deletes a whole segment along with its messages.

To configure a Stream’s retention strategy, you can adopt a size-based or time-based approach(or even both).

Size-based retention strategy

Here, the Stream is set up to discard segment files once the total size or number of messages in the Stream reaches a specified upper limit.

Setting up the sized-based retention strategy requires providing any of the following arguments when declaring the Stream. As mentioned earlier, this can also be done through a policy:

  • x-max-length-bytes
  • x-max-length

For example:

// Setting stream capacity to 1 thousand msgs*
const q = await channel.queue(
  "users_activity", 
  { durable: true}, 
  {"x-queue-type": "stream", "x-max-length": 1000}
)

In the example above, if the Stream hits the 1000 msgs limit, old segment files would be deleted until the limit is met.

Alternatively, you could set the x-max-length-bytes argument like so:

// Setting stream capacity to 1 thousand msgs
const q = await channel.queue(
  "users_activity", 
  { durable: true}, 
  {"x-queue-type": "stream", "x-max-length-bytes": 100000000}
)

When this is set, LavinMQ will delete segment files from the beginning of the Stream. The deletion happens when the Stream’s total size reaches the value of x-max-length-bytes.

For example, in the snippet above, the Stream will discard the oldest messages when the Stream’s disk usage hits 100000000 bytes. LavinMQ does not provide a default value for this.

Caveat

The retention is evaluated on a per-segment basis. Streams enforce retention limits only when a segment reaches its max size (8MB by default) and is closed in favour of a new one.

For example, if each segment holds 800 messages and the max length is set to 2000, nothing happens when the first or second segment fills up (1600 total messages). But once the third segment is full (2400 total messages), the first segment (800 messages) is deleted. This keeps the total message count hovering around 2000, with a slight fluctuation of ±400 messages.

Time-based retention strategy

The Stream is configured to truncate segment files that have surpassed a specific age threshold. On the other hand, setting up the time-based retention strategy requires providing the following arguments when declaring the Stream or via a policy:

  • x-max-age

The unit of this configuration could either be in years (Y), months (M), days (D), hours (h), minutes (m), or seconds (s).

For example:

// Expire messages that have been in the queue longer than 30 days*
const q = await channel.queue(
  "users_activity", 
  { durable: true}, 
  {"x-queue-type": "stream", "x-max-age": "30D"}
)

In the snippet provided, we employ a time-based retention strategy, whereby segment files that have been present for 30 days or longer are discarded. This process involves comparing the timestamp of the last message in each segment with the specified max-age.

When the timestamp of the last message in a segment exceeds the max age, the entire segment is discarded. This is due to the fact that messages are stored chronologically, ensuring that all other messages within the segment will be older than the last message. LavinMQ does not provide a default value for this.

Caveat

Again, the size-based retention is evaluated on a per-segment basis. To discard old messages, LavinMQ checks the timestamp of the last message in each segment—if it’s older than the max age, the entire segment gets deleted. Since messages are stored in order, anything in that segment is guaranteed to be even older.

Putting it all together

Here’s the full producer script with max-length set to 1000. It publishes 60,000 messages to the user_activity queue. Since the messages are lightweight, a single segment can hold up to 53,000 of them. Even with the 1,000-message limit, the queue may temporarily show up to 53,000 messages before the segment is deleted.

//Dependencies

import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'

const LAVINMQ_URL = process.env.LAVINMQ_URL

async function startProducer() {
  try {
     const connection = new AMQPClient(LAVINMQ_URL)
     await connection.connect()
     const channel = await connection.channel()
 
     console.log("[✅] Connection over channel established")
 
     // Declare a Stream, named users_activity
     const q = await channel.queue(
       "users_activity", 
       { durable: true}, 
       {"x-queue-type": "stream", "x-max-age": "1s", }
     )
 
     async function sendToQueue(routingKey, body) {
       await channel.basicPublish("", routingKey, JSON.stringify(body))
       console.log("[📥] Message sent to stream: ", body)
     }

    for (let index = 0; index < 60000; index++) {
      sendToQueue(
        "users_activity", 
        {
          "user_id":index,
          "event_type": "click",
          "page": `/products/${index}`,
          "timestamp": "2025-01-20T12:34:56Z"
        }
      );
    }
     
    setTimeout(() => {
      connection.close()
      console.log("[❎] Connection closed")
      process.exit(0)
    }, 1000000);
  } catch (error) {
    console.error(error)
    setTimeout(() => {
      startProducer()
    }, 3000)
  }
}

startProducer()

Conclusion

In this post, we broke down how LavinMQ handles data retention. We covered both size-based and time-based approaches, showing how old segments are automatically cleared to keep things running smoothly without hogging disk space. Up next, the series will explore stream filtering and automatic offset tracking.

What’s next?