Part 3 - Replay(time-travel) with offsets

In the previous part in this series, we explored how to publish user activity events; clicks, page views, and searches to a Stream queue. We also demonstrated how consumers can read from the Stream via the AMQP protocol. One important thing we highlighted was the immutable nature of LavinMQ streams: once a message is in the stream, it stays there, allowing multiple consumers to process it independently. Lastly, we covered how offsets allow attaching to a specific point in a Stream — We will look at this in more detail here and to begin…

What are offsets?

In LavinMQ Streams, every message has a unique offset, which represents its position in the stream. Offsets serve a similar purpose as indexes in arrays. The first message in the stream has offset 1, the second has offset 2, and so on. Consumers can use these offsets to pinpoint exactly where they want to start consuming from in a Stream.

The following image illustrates how messages and their corresponding offsets would appear in a given Stream:

LavinMQ streams offsets

When are offsets required?

Offsets allow consumers to start reading from a specific point in a LavinMQ stream. But when would this be necessary? Let’s look at some real-world scenarios from our user activity tracking app where offsets are essential.

1. Recovering missed events after downtime

The session analysis service crashes and misses thousands of user events—page views, clicks, and searches. When it restarts, it needs to resume exactly where it left off to avoid gaps in session timelines. Since LavinMQ Streams retain messages, the service can pick up from its last processed offset and catch up on missed events. This can be done manually or automatically using server-side offset tracking (covered in Part 5 of this guide).

2. Reprocessing historical data for a new algorithm

The recommendation engine gets an upgrade, but the new model needs past user activity to train properly. Instead of waiting for fresh data, it can rewind to an earlier point in the stream and process historical events as if they were happening live. By using a timestamp offset, the engine can jump back a month and feed the model with real-world user behaviour, ensuring it’s well-trained before going live.

3. Debugging or investigating an anomaly

The real-time analytics service detects a sudden spike in activity at 3 AM last night. Was it a bot attack? A bug? Something else? With a timestamp offset, the service can rewind to 3 AM, step through events in order, and analyze what really happened.

Consuming with offsets

In the previous part in this series, the subscribe() function of the consumer did not specify an offset. As a result, the consumer will consume messages from the beginning of the stream. As mentioned earlier, let’s say the recommendation engine is testing a new algorithm and needs user activity events from 3 months ago. How can this be done?

LavinMQ Streams allow consuming messages with the x-stream-offset argument which accepts the following values:

  • first: Starts reading from the very beginning of the stream (offset 1).
  • last: Starts from the beginning of the last segment file in the stream.
  • next: Begins processing new events only, skipping historical messages.
  • A specific numerical offset: Lets you jump to an exact position in the stream (e.g., offset 1001). If the offset doesn’t exist, it adjusts to the start or end of the stream.
  • Timestamp: Attaches to the stream at a specific point in time.

The snippet below demonstrates two scenarios:

  • Consuming from the 5000th offset.
  • And consuming from the very beginning of the stream.
// Grabbing the first message
await q.subscribe({ noAck: false, args: { "x-stream-offset": "first" }}, async (msg) => {
    try {
        console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
    } catch (error) {
        console.error(error)
    }
})

// Grabbing the message with offset 5000
await q.subscribe({ noAck: false, args: { "x-stream-offset": 5000 }}, async (msg) => {
    try {
        console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
    } catch (error) {
        console.error(error)
    }
})

Testing offsets

Here are some ideas to play around with offsets. Update your producer to publish 1000 messages to the user_activity stream queue. See code snippet below:

//Dependencies
import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'

const LAVINMQ_URL = process.env.LAVINMQ_URL

async function startProducer() {
  try {
     //Setup a connection to the LavinMQ server
     const connection = new AMQPClient(LAVINMQ_URL)
     await connection.connect()
     const channel = await connection.channel()
 
     console.log("[✅] Connection over channel established")
 
     // Declare a Stream, named users_activity
     const q = await channel.queue(
       "users_activity", 
       { durable: true}, 
       {"x-queue-type": "stream"}
     )
 
     //Publish a message to the exchange
     async function sendToQueue(routingKey, body) {
       await channel.basicPublish("", routingKey, JSON.stringify(body))
       console.log("[📥] Message sent to stream: ", body)
     }
     
     for (let index = 0; index < 1000; index++) {
      //Send 10000 messages to the queue
      sendToQueue(
        "users_activity", 
        {
          "user_id":index,
          "event_type": "click",
          "page": `/products/${index}`,
          "timestamp": "2025-01-20T12:34:56Z"
        }
      );
     }
     
     
    setTimeout(() => {
      //Close the connection
      connection.close()
      console.log("[❎] Connection closed")
      process.exit(0)
    }, 100000);
  } catch (error) {
    console.error(error)
    //Retry after 3 second
    setTimeout(() => {
      startProducer()
    }, 3000)
  }
}

startProducer()

You can now try consuming with different offset values in your consumer code and see the outcome. The complete consumer code is shown below:

import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'

const LAVINMQ_URL = process.env.LAVINMQ_URL

async function startConsumer() {
  //Setup a connection to the RabbitMQ server
  const connection = new AMQPClient(LAVINMQ_URL)
  await connection.connect()
  const channel = await connection.channel()

  console.log("[✅] Connection over channel established")
  console.log("[❎] Waiting for messages. To exit press CTRL+C ")
  
  // Set the prefetch
  channel.prefetch(1000000)

  const q = await channel.queue(
    "users_activity", 
    { durable: true}, 
    {"x-queue-type": "stream"}
  )

  let counter = 0;

  // Read messages from the users_activity stream
  await q.subscribe({ noAck: false, args: { "x-stream-offset": 950 } }, async (msg) => {
    try {
      console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
    } catch (error) {
      console.error(error)
    }
  })

  //When the process is terminated, close the connection
  process.on('SIGINT', () => {
    channel.close()
    connection.close()
    console.log("[❎] Connection closed")
    process.exit(0)
  });
}

startConsumer().catch(console.error);

A note on segment files

In LavinMQ, every message sent to a queue (including streams) is written directly to disk into what we call the Message Store. Each queue has its own message store, which consists of multiple segment files. These segments can grow up to 8MB each, with new messages continuously appended to the latest segment.

Lavinmq segment files

When you pass last to the x-stream-offset argument, the consumer starts reading from the beginning of the most recent segment file.

Conclusion

In this post, we covered what offsets are, when they’re needed, and how different offset values let consumers attach to a specific point in the stream. But there’s one important thing to remember: streams are immutable. Unlike traditional queues, messages aren’t removed once acknowledged, meaning stream queues can grow indefinitely. As this is an undesirable behaviour, LavinMQ Streams provide retention strategies to control growth. The next part of this series will cover the different ways to keep your streams from growing out of control.

What’s next?