Part 5 - Server-side offset tracking and stream filtering

In the previous part of this series, we looked at how LavinMQ handles data retention touching on both size-based and time-based approaches. This part will focus on two features of LavinMQ streams:

  • Server-side offset tracking
  • And stream filtering.

Server-side offset tracking

When consuming from a stream, it is possible to configure LavinMQ to handle tracking of offsets. This technique is formally called offset tracking. Essentially, offset tracking helps a consumer remember where it left off in a stream. Think of it as a bookmark for event processing.

For example, if a consumer has processed messages up to position 5000, that means it has successfully handled everything before that point. If it stops and later comes back online, it can resume from position 5001 instead of starting over.

How it works

Offset tracking can be enabled in two ways:

  1. By setting x-stream-offset = null to null.
  2. Or by setting x-stream-automatic-offset-tracking = true.

In both cases, you must also set a consumer tag .

When setting x-stream-offset = null, the consumer will start reading the stream from the beginning at first connection. At a later reconnect the consumer will resume from where it left off automatically.

It is also possible to combine the automatic tracking with all possible x-stream-offset values by providing both a valid x-stream-offset and setting x-stream-automatic-offset-tracking = true. In this case, the consumer will start reading the stream from the provided x-stream-offset at first connection. At a later reconnect the consumer will resume from where it left off automatically.

You can disable server side tracking at any point by reconnecting your consumer and providing a valid value for x-stream-offset (and not providing x-stream-automatic-offset-tracking).

Behind the scenes of offset-tracking

Offsets are saved on the server for all consumers that meet the requirements:

  • Consumer tag set
  • And x-stream-offset == null OR x-stream-automatic-offset-tracking == true.

The offsets are stored in a separate file, and there’s one file per stream. The stored offsets are replicated to all followers. A new entry is stored for each message delivered, making sure that the tracking is always correct. The file is set to an initial size on startup, and it will be compacted/expanded as needed.

Performance wise, consuming from a stream will be slightly slower when LavinMQ has to handle offset tracking, since there is extra computation that needs to be done.

Offset tracking in action

The snippet below shows a consumer script with x-stream-automatic-offset-tracking set to true and consumer tag set to recommendation-engine.

import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'

const LAVINMQ_URL = process.env.LAVINMQ_URL

async function startConsumer() {
  //Setup a connection to the RabbitMQ server
  const connection = new AMQPClient(LAVINMQ_URL)
  await connection.connect()
  const channel = await connection.channel()

  console.log("[✅] Connection over channel established")
  console.log("[❎] Waiting for messages. To exit press CTRL+C ")
  
  // Set the prefetch
  channel.prefetch(1000000)

  const q = await channel.queue(
    "users_activity", 
    { durable: true}, 
    {"x-queue-type": "stream"}
  )

  let counter = 0;

  // Read messages from the users_activity stream
  await q.subscribe({ noAck: false, tag: "recommendation-engine", args: { "x-stream-automatic-offset-tracking": true } }, async (msg) => {
    try {
      console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
    } catch (error) {
      console.error(error)
    }
  })

  //When the process is terminated, close the connection
  process.on('SIGINT', () => {
    channel.close()
    connection.close()
    console.log("[❎] Connection closed")
    process.exit(0)
  });
}

startConsumer().catch(console.error);

Stream filtering

Let’s revisit our user activity tracking app. Imagine a users_activity stream collecting events like clicks, page views, and searches from all users. Multiple consumers process these events, but not every consumer needs every message. For example:

  • The real-time analytics service wants all events to keep dashboards up to date.
  • The recommendation engine is only interested in search queries, since those help refine personalised suggestions.
  • The session analysis service only cares about page views to build user journey timelines.

Before LavinMQ 2.2, every consumer received all user activity events—even the ones they didn’t care about. Each consumer had to filter and discard irrelevant messages on its own. This worked, but it meant a huge amount of unnecessary data was being sent over the network—especially as the stream grew to thousands or millions of events.

For example, if the users_activity queue has three messages, all three messages would be sent to each consumer regardless of what it needs. Not exactly efficient, right?

Streaming without filtering

Now, with stream filtering, LavinMQ handles this on the broker side before messages reach consumers. Instead of sending everything and making consumers do the work, only relevant messages are delivered. The result? Less bandwidth usage.

How it works?

You can filter messages by setting x-stream-filter-value when publishing to a stream.

For example, to tag all search queries, set x-stream-filter-value = 'sq' when publishing. Likewise, for page view events, use x-stream-filter-value = 'pg'.

Setting a message filter value

When a consumer sets x-stream-filter, it only receives messages that match the filter value. The filter can be a single value (a string) or multiple values separated by commas. If a message matches any of the filters, it gets delivered.
You can also choose to return all messages without a filter by providing x-stream-match-unfiltered as an argument on the consumer.

Streaming with filtering

If you do not provide x-stream-filter when consuming, all messages in the stream will be returned as if no filters existed.

Conclusion

Offset tracking and stream filtering make LavinMQ Streams smarter and more efficient. With server-side offset tracking, consumers don’t have to manually keep track of where they left off—LavinMQ handles it for them. Stream filtering takes efficiency a step further by reducing unnecessary data transmission, allowing consumers to receive only the messages they actually need.

Through out this series we covered the different features of LavinMQ Streams, positioning it as an ideal choice for message streaming. But how does it stack up against other streaming technologies like Kafka and RabbitMQ Streams? In the next and last part of this series, we’ll look at the similarities and differences between these technologies.

What’s next?