Introduction
Configuration
Language Support
AMQP 0-9-1 Overview
More Exchange Types
More Consumer Features
Queue Deep-dive
Other Features
Reliable Message Delivery
High Availability
Monitoring
Management HTTP API
Tutorials
Networking
LavinMQ CLI
Part 5 - Server-side offset tracking and stream filtering
In the previous part of this series, we looked at how LavinMQ handles data retention touching on both size-based and time-based approaches. This part will focus on two features of LavinMQ streams:
- Server-side offset tracking
- And stream filtering.
Server-side offset tracking
When consuming from a stream, it is possible to configure LavinMQ to handle tracking of offsets. This technique is formally called offset tracking. Essentially, offset tracking helps a consumer remember where it left off in a stream. Think of it as a bookmark for event processing.
For example, if a consumer has processed messages up to position 5000, that means it has successfully handled everything before that point. If it stops and later comes back online, it can resume from position 5001 instead of starting over.
How it works
Offset tracking can be enabled in two ways:
- By setting
x-stream-offset = null
to null. - Or by setting
x-stream-automatic-offset-tracking = true
.
In both cases, you must also set a consumer tag
.
When setting x-stream-offset = null
, the consumer will start reading
the stream from the beginning at first connection. At a later reconnect
the consumer will resume from where it left off automatically.
It is also possible to combine the automatic tracking with
all possible x-stream-offset
values by providing both a
valid x-stream-offset
and setting x-stream-automatic-offset-tracking = true
.
In this case, the consumer will start reading the stream from the provided x-stream-offset
at first connection. At a later reconnect the consumer will
resume from where it left off automatically.
You can disable server side tracking at any point by reconnecting
your consumer and providing a valid value for x-stream-offset
(and not providing x-stream-automatic-offset-tracking
).
Behind the scenes of offset-tracking
Offsets are saved on the server for all consumers that meet the requirements:
- Consumer tag set
- And
x-stream-offset == null
ORx-stream-automatic-offset-tracking == true
.
The offsets are stored in a separate file, and there’s one file per stream. The stored offsets are replicated to all followers. A new entry is stored for each message delivered, making sure that the tracking is always correct. The file is set to an initial size on startup, and it will be compacted/expanded as needed.
Performance wise, consuming from a stream will be slightly slower when LavinMQ has to handle offset tracking, since there is extra computation that needs to be done.
Offset tracking in action
The snippet below shows a consumer script with x-stream-automatic-offset-tracking
set to true
and consumer tag
set to recommendation-engine
.
import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'
const LAVINMQ_URL = process.env.LAVINMQ_URL
async function startConsumer() {
//Setup a connection to the RabbitMQ server
const connection = new AMQPClient(LAVINMQ_URL)
await connection.connect()
const channel = await connection.channel()
console.log("[✅] Connection over channel established")
console.log("[❎] Waiting for messages. To exit press CTRL+C ")
// Set the prefetch
channel.prefetch(1000000)
const q = await channel.queue(
"users_activity",
{ durable: true},
{"x-queue-type": "stream"}
)
let counter = 0;
// Read messages from the users_activity stream
await q.subscribe({ noAck: false, tag: "recommendation-engine", args: { "x-stream-automatic-offset-tracking": true } }, async (msg) => {
try {
console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
} catch (error) {
console.error(error)
}
})
//When the process is terminated, close the connection
process.on('SIGINT', () => {
channel.close()
connection.close()
console.log("[❎] Connection closed")
process.exit(0)
});
}
startConsumer().catch(console.error);
Stream filtering
Let’s revisit our user activity tracking app. Imagine a users_activity
stream collecting events like clicks, page views, and searches from all users.
Multiple consumers process these events, but not every consumer needs every message.
For example:
- The real-time analytics service wants all events to keep dashboards up to date.
- The recommendation engine is only interested in search queries, since those help refine personalised suggestions.
- The session analysis service only cares about page views to build user journey timelines.
Before LavinMQ 2.2, every consumer received all user activity events—even the ones they didn’t care about. Each consumer had to filter and discard irrelevant messages on its own. This worked, but it meant a huge amount of unnecessary data was being sent over the network—especially as the stream grew to thousands or millions of events.
For example, if the users_activity
queue has three messages, all three messages
would be sent to each consumer regardless of what it needs. Not exactly efficient, right?
Now, with stream filtering, LavinMQ handles this on the broker side before messages reach consumers. Instead of sending everything and making consumers do the work, only relevant messages are delivered. The result? Less bandwidth usage.
How it works?
You can filter messages by setting x-stream-filter-value
when publishing to a stream.
For example, to tag all search queries, set x-stream-filter-value = 'sq'
when publishing.
Likewise, for page view events, use x-stream-filter-value = 'pg'
.
When a consumer sets x-stream-filter
, it only receives messages that match the
filter value. The filter can be a single value (a string) or multiple values
separated by commas. If a message matches any of the filters, it gets delivered.
You can also choose to return all messages without a filter by
providing x-stream-match-unfiltered
as an argument on the consumer.
If you do not provide x-stream-filter
when consuming, all messages
in the stream will be returned as if no filters existed.
Conclusion
Offset tracking and stream filtering make LavinMQ Streams smarter and more efficient. With server-side offset tracking, consumers don’t have to manually keep track of where they left off—LavinMQ handles it for them. Stream filtering takes efficiency a step further by reducing unnecessary data transmission, allowing consumers to receive only the messages they actually need.
Through out this series we covered the different features of LavinMQ Streams, positioning it as an ideal choice for message streaming. But how does it stack up against other streaming technologies like Kafka and RabbitMQ Streams? In the next and last part of this series, we’ll look at the similarities and differences between these technologies.
What’s next?
- Next Tutorial: LavinMQ streams vs Kafka vs RabbitMQ streams
- Previous Tutorial: Stream truncation with retention policies