Introduction
Configuration
Language Support
AMQP 0-9-1 Overview
More Exchange Types
More Consumer Features
Queue Deep-dive
Other Features
Reliable Message Delivery
High Availability
Monitoring
Management HTTP API
Tutorials
Networking
LavinMQ CLI
Part 2 - Publishing and reading from a stream
As noted earlier, client applications can interact with a Stream using an AMQP client library, just like with traditional queues in LavinMQ. Working with LavinMQ Streams involves three simple steps:
- Declare/Instantiate a Stream
- Publish(write) messages to the Stream
- Consume(read) messages from the Stream
The code snippets in this section are based on amqp-client.js, The JavaScript AMQP client library. Before we proceed, let’s set up our development environment.
Node.js development environment
- Confirm that Node.js is available from your command line with:
node --version
- If you do not have node.js installed, go to nodejs.org
- Run
mkdir lavinmq_stream_tutorials && cd lavinmq_stream_tutorials
to create a directory for your Stream tutorials and navigate into it. - Run
npm init
in thelavinmq_stream_tutorials
directory to initialize it as a Node.js project. Leave all the questions unanswered by pressing enter all the way. - Add
@cloudamqp/amqp-client
anddotenv
as a dependencies to your package.json file. Run:npm install --save @cloudamqp/amqp-client dotenv
. - Create a
.env
file in in the root directory. - Add
CLOUDAMQP_URL="lavinmq_url"
to the.env
file. Replacelavinmq_url
with your correct server url. You can create a free LavinMQ instance here. - In order for amqp-client to work, you need to add
“type”: “module”
to your package.json just right under"name": "node_tutorials"
like so:
{
"name": "node_tutorials",
"type": "module",
}
1. Declaring a stream
In LavinMQ, creating a Stream is as simple as creating any other queue.
The only difference? You set the queue type to stream
using the x-queue-type
argument when you declare it. Also, Streams must be durable, meaning
you’ll need to set durable=true
at declaration time—LavinMQ doesn’t allow
non-durable Streams (who wants a stream that does not survive a node restart?).
In our use-case, the user activity tracking app will publish all activity
data to a Stream queue named users_activity
. This Stream will hold a
history of all user events, ready for our downstream services to consume.
Because streams are immutable, they have the potential to grow infinitely
until the machine runs out of disk space. As this is an undesirable behaviour,
LavinMQ allows configuring streams to discard old messages based on some
retention policies — This will be covered in more detail in part 4 of this series.
You can make the stream declaration with the appropriate arguments in one of two ways:
- Programmatically, from the client code
- From the management interface
Programmatically
You can declare a stream queue in your client code with the x-queue-type=stream
and durable=true
as shown in the snippet below:
import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'
// Access the LAVINMQ_URL environment variable*
const LAVINMQ_URL = process.env.LAVINMQ_URL
async function startProducer() {
try {
//Setup a connection to the LavinMQ server*
const connection = new AMQPClient("LAVINMQ_URL")
await connection.connect()
const channel = await connection.channel()
console.log("[✅] Connection over channel established")
// Declare a Stream, named users_activity
const q = await channel.queue(
"users_activity",
{ durable: true},
{"x-queue-type": "stream"}
)
}
}
Management interface
Alternatively, a Stream can be created using the LavinMQ Management UI, under the Queues tab, like so:
Make sure to tick the “Durable” checkbox, as shown in the image above.
2. Publishing a message to a stream
Imagine a user clicking a “Buy Now” button. We’ll need to publish
an event like this to the users_activity
stream:
{
"user_id": "12345",
"event_type": "click",
"page": "/products/56789",
"timestamp": "2025-01-20T12:34:56Z"
}
Publishing messages to a Stream is no different from publishing messages
to a queue. As an example, below, the previous snippet has been extended
to publish a message to the users_activity
stream queue declared.
//Dependencies
import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'
const LAVINMQ_URL = process.env.LAVINMQ_URL
async function startProducer() {
try {
//Setup a connection to the LavinMQ server
const connection = new AMQPClient(LAVINMQ_URL)
await connection.connect()
const channel = await connection.channel()
console.log("[✅] Connection over channel established")
// Declare a Stream, named users_activity
const q = await channel.queue(
"users_activity",
{ durable: true},
{"x-queue-type": "stream"}
)
//Publish a message to the exchange
async function sendToQueue(routingKey, body) {
await channel.basicPublish("", routingKey, JSON.stringify(body))
console.log("[📥] Message sent to stream: ", body)
}
//Send some messages to the queue
sendToQueue(
"users_activity",
{
"user_id": "12345",
"event_type": "click",
"page": "/products/56789",
"timestamp": "2025-01-20T12:34:56Z"
}
);
sendToQueue(
"users_activity",
{
"user_id": "12346",
"event_type": "page_view",
"page": "/products/56790",
"timestamp": "2025-01-20T12:34:56Z"
}
);
setTimeout(() => {
//Close the connection
connection.close()
console.log("[❎] Connection closed")
process.exit(0)
}, 500);
} catch (error) {
console.error(error)
//Retry after 3 second
setTimeout(() => {
startProducer()
}, 3000)
}
}
startProducer()
3. Consuming from Streams in LavinMQ
Messages can be consumed from a Stream the same way queues accomplish this task, more or less, but with some known differences.
- Consuming messages in LavinMQ Streams requires setting the QoS prefetch.
- LavinMQ does not allow consuming messages from a Stream with
auto_ack=True
- You can specify an offset to start consuming from any point in the Stream — This will be covered in the next part in this series.
import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'
const LAVINMQ_URL = process.env.LAVINMQ_URL
async function startConsumer() {
//Setup a connection to the RabbitMQ server
const connection = new AMQPClient(LAVINMQ_URL)
await connection.connect()
const channel = await connection.channel()
console.log("[✅] Connection over channel established")
console.log("[❎] Waiting for messages. To exit press CTRL+C ")
// Set the prefetch
channel.prefetch(100)
const q = await channel.queue(
"users_activity",
{ durable: true},
{"x-queue-type": "stream"}
)
let counter = 0;
// Read messages from the users_activity stream
await q.subscribe({ noAck: false }, async (msg) => {
try {
console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
} catch (error) {
console.error(error)
}
})
//When the process is terminated, close the connection
process.on('SIGINT', () => {
channel.close()
connection.close()
console.log("[❎] Connection closed")
process.exit(0)
});
}
startConsumer().catch(console.error);
In the snippet above, if the queue already contains messages, since no offset has been specified, the consumer will begin reading from the Stream’s starting point.
Non destructive consumer semantics with Streams
We have at this point, established that Streams let consumers read messages non-destructively. Consequently, the same event stays available for all consumers without duplication. Each service can process the same data at its own pace. This makes it super easy to implement fan-out architectures with Streams.
To demonstrate this, run three instances of the consumer script in separate terminals to simulate our use case with three consumers: a real-time analytics service, a recommendation engine, and a session analysis service. Each terminal will log the same set of messages, showing that all three consumers received the same messages.
Conclusion
As mentioned earlier, when consuming from a Stream in LavinMQ, clients have
the ability to specify a starting point by using an offset. The x-stream-offset
consumer argument controls this behaviour. In the next part in this series,
we will see how to work with offsets and read from a specific point in a stream.
What’s next?
- Next Tutorial: Replay(time-travel) with offsets
- Previous Tutorial: Overview of LavinMQ streams