Part 2 - Publishing and reading from a stream

As noted earlier, client applications can interact with a Stream using an AMQP client library, just like with traditional queues in LavinMQ. Working with LavinMQ Streams involves three simple steps:

  1. Declare/Instantiate a Stream
  2. Publish(write) messages to the Stream
  3. Consume(read) messages from the Stream

The code snippets in this section are based on amqp-client.js, The JavaScript AMQP client library. Before we proceed, let’s set up our development environment.

Node.js development environment

  • Confirm that Node.js is available from your command line with: node --version
  • If you do not have node.js installed, go to nodejs.org
  • Run mkdir lavinmq_stream_tutorials && cd lavinmq_stream_tutorials to create a directory for your Stream tutorials and navigate into it.
  • Run npm init  in the lavinmq_stream_tutorials directory to initialize it as a Node.js project. Leave all the questions unanswered by pressing enter all the way.
  • Add @cloudamqp/amqp-client and dotenv as a dependencies to your package.json file. Run: npm install --save @cloudamqp/amqp-client dotenv.
  • Create a .env file in in the root directory.
  • Add CLOUDAMQP_URL="lavinmq_url" to the .env file. Replace lavinmq_url with your correct server url. You can create a free LavinMQ instance here.
  • In order for amqp-client to work, you need to add “type”: “module” to your package.json just right under "name": "node_tutorials" like so:
    {
      "name": "node_tutorials",
      "type": "module",
    }

1. Declaring a stream

In LavinMQ, creating a Stream is as simple as creating any other queue. The only difference? You set the queue type to stream using the x-queue-type argument when you declare it. Also, Streams must be durable, meaning you’ll need to set durable=true at declaration time—LavinMQ doesn’t allow non-durable Streams (who wants a stream that does not survive a node restart?).

In our use-case, the user activity tracking app will publish all activity data to a Stream queue named users_activity. This Stream will hold a history of all user events, ready for our downstream services to consume. Because streams are immutable, they have the potential to grow infinitely until the machine runs out of disk space. As this is an undesirable behaviour, LavinMQ allows configuring streams to discard old messages based on some retention policies — This will be covered in more detail in part 4 of this series.

You can make the stream declaration with the appropriate arguments in one of two ways:

  1. Programmatically, from the client code
  2. From the management interface

Programmatically

You can declare a stream queue in your client code with the x-queue-type=stream and durable=true as shown in the snippet below:

import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'

// Access the LAVINMQ_URL environment variable*
const LAVINMQ_URL = process.env.LAVINMQ_URL

async function startProducer() {
  try {
    //Setup a connection to the LavinMQ server*
    const connection = new AMQPClient("LAVINMQ_URL")
    await connection.connect()
    const channel = await connection.channel()

    console.log("[✅] Connection over channel established")

    // Declare a Stream, named users_activity
    const q = await channel.queue(
      "users_activity", 
      { durable: true}, 
      {"x-queue-type": "stream"}
    )
  }
}

Management interface

Alternatively, a Stream can be created using the LavinMQ Management UI, under the Queues tab, like so:

Creating a stream queue from the LavinMQ management interface

Make sure to tick the “Durable” checkbox, as shown in the image above.

2. Publishing a message to a stream

Imagine a user clicking a “Buy Now” button. We’ll need to publish an event like this to the users_activity stream:

{
  "user_id": "12345",
  "event_type": "click",
  "page": "/products/56789",
  "timestamp": "2025-01-20T12:34:56Z"
}

Publishing messages to a Stream is no different from publishing messages to a queue. As an example, below, the previous snippet has been extended to publish a message to the users_activity  stream queue declared.

//Dependencies
import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'

const LAVINMQ_URL = process.env.LAVINMQ_URL

async function startProducer() {
  try {
     //Setup a connection to the LavinMQ server
     const connection = new AMQPClient(LAVINMQ_URL)
     await connection.connect()
     const channel = await connection.channel()
 
     console.log("[✅] Connection over channel established")
 
     // Declare a Stream, named users_activity
     const q = await channel.queue(
       "users_activity", 
       { durable: true}, 
       {"x-queue-type": "stream"}
     )
 
     //Publish a message to the exchange
     async function sendToQueue(routingKey, body) {
       await channel.basicPublish("", routingKey, JSON.stringify(body))
       console.log("[📥] Message sent to stream: ", body)
     }
   
     //Send some messages to the queue
     sendToQueue(
       "users_activity", 
       {
         "user_id": "12345",
         "event_type": "click",
         "page": "/products/56789",
         "timestamp": "2025-01-20T12:34:56Z"
       }
     );
     sendToQueue(
       "users_activity", 
       {
         "user_id": "12346",
         "event_type": "page_view",
         "page": "/products/56790",
         "timestamp": "2025-01-20T12:34:56Z"
       }
     );

    setTimeout(() => {
      //Close the connection
      connection.close()
      console.log("[❎] Connection closed")
      process.exit(0)
    }, 500);
  } catch (error) {
    console.error(error)
    //Retry after 3 second
    setTimeout(() => {
      startProducer()
    }, 3000)
  }
}

startProducer()

3. Consuming from Streams in LavinMQ

Messages can be consumed from a Stream the same way queues accomplish this task, more or less, but with some known differences.

  • Consuming messages in LavinMQ Streams requires setting the QoS prefetch.
  • LavinMQ does not allow consuming messages from a Stream with auto_ack=True
  • You can specify an offset to start consuming from any point in the Stream — This will be covered in the next part in this series.
import { AMQPClient } from '@cloudamqp/amqp-client'
import {} from 'dotenv/config'

const LAVINMQ_URL = process.env.LAVINMQ_URL

async function startConsumer() {
  //Setup a connection to the RabbitMQ server
  const connection = new AMQPClient(LAVINMQ_URL)
  await connection.connect()
  const channel = await connection.channel()

  console.log("[✅] Connection over channel established")
  console.log("[❎] Waiting for messages. To exit press CTRL+C ")
  
  // Set the prefetch
  channel.prefetch(100)

  const q = await channel.queue(
    "users_activity", 
    { durable: true}, 
    {"x-queue-type": "stream"}
  )

  let counter = 0;

  // Read messages from the users_activity stream
  await q.subscribe({ noAck: false }, async (msg) => {
    try {
      console.log(`[📤] Message received (${++counter})`, msg.bodyToString())
    } catch (error) {
      console.error(error)
    }
  })

  //When the process is terminated, close the connection
  process.on('SIGINT', () => {
    channel.close()
    connection.close()
    console.log("[❎] Connection closed")
    process.exit(0)
  });
}

startConsumer().catch(console.error);

In the snippet above, if the queue already contains messages, since no offset has been specified, the consumer will begin reading from the Stream’s starting point.

Non destructive consumer semantics with Streams

We have at this point, established that Streams let consumers read messages non-destructively. Consequently, the same event stays available for all consumers without duplication. Each service can process the same data at its own pace. This makes it super easy to implement fan-out architectures with Streams.

To demonstrate this, run three instances of the consumer script in separate terminals to simulate our use case with three consumers: a real-time analytics service, a recommendation engine, and a session analysis service. Each terminal will log the same set of messages, showing that all three consumers received the same messages.

Conclusion

As mentioned earlier, when consuming from a Stream in LavinMQ, clients have the ability to specify a starting point by using an offset. The x-stream-offset  consumer argument controls this behaviour. In the next part in this series, we will see how to work with offsets and read from a specific point in a stream.

What’s next?