<- Back to blog archive
Guide

How to implement RPC in LavinMQ: callback queues and direct reply-to

How to implement RPC in LavinMQ: callback queues and direct reply-to

Not every message is fire-and-forget. When a service needs to call out to another and wait for a result, like a price check, a data lookup, or a health ping, that’s the Remote Procedure Call (RPC) pattern.

But first, what exactly is RPC?

In an RPC, a client sends a request to a remote server, which processes it and sends back a response. The initiating program waits until it receives a response before moving on.

How does RPC work in LavinMQ?

In LavinMQ, there is no stub, no auto-generated proxy that makes a remote call look like a local function. Instead, there are two approaches:

  • Callback queue: the publisher includes a queue address in the message and the response is delivered there.
  • Direct reply-to: the response is sent straight back to the publisher without creating a queue.

When to use RPC in LavinMQ?

RPC fits any pattern where the caller needs a result before moving on: data lookups, health checks, validation requests, or any workflow that requires a confirmed response rather than fire-and-forget messaging.

Option 1: Callback queue

The publisher sends a request message with the name of a callback queue in the reply_to property. The service handling the request sends the response to that queue:

channel.basic_publish(
  exchange='',
  routing_key='rpc_queue',
  properties=pika.BasicProperties(
    reply_to = response_queue,
  ),
  body=request
)

The code above creates a new callback queue for every request, which works but has a drawback: when multiple requests are in flight, it’s not clear which response belongs to which request. Adding a correlation_id, a unique value per request, solves that:

channel.basic_publish(
  exchange='',
  routing_key='rpc_queue',
  properties=pika.BasicProperties(
    reply_to = response_queue,
    correlation_id = correlation_id
  ),
  body=request
)

Workflow of RPC callback queues in LavinMQ:

  • The publisher creates an exclusive callback_queue.
  • The publisher sends a message to rpc_queue with the reply_to property set to the callback queue and a unique correlation_id.
  • The consumer (the service handling the request) picks up the message and processes it.
  • The result is published back to the queue specified in reply_to, using the correlation_id to match the response to the original request.
  • The publisher picks up the response from the callback queue.

RPC callback queue workflow in LavinMQ

callback_queue = channel.queue_declare(queue='', exclusive=True)

channel.basic_publish(
  exchange='',
  routing_key='rpc_queue',
  properties=pika.BasicProperties(
    reply_to = callback_queue,
    correlation_id = correlation_id
  ),
  body=request
)

Option 2: Direct reply-to

Direct reply-to sends the response straight back to the publisher without creating a queue. To use it, set reply-to to amq.direct.reply-to and send the message in no-ack mode. LavinMQ generates a special routing key that the handling service uses to publish the result back via the default exchange. No queue is created, which avoids both long-lived and short-lived queues that consume memory.

Using direct reply-to in LavinMQ

The example below demonstrates a simple ping request. Start by consuming from amq.direct.reply-to before publishing, to avoid missing the response:

def client_consumer_callback(ch, method, properties, body):
  msg = body.decode('utf-8')

  if msg in "Hello from Consumer":
    print("TARGET MACHINE IS ACTIVE")
    global RECEIVED_HELLO
    RECEIVED_HELLO = True
  else:
    print("RECEIVED UNEXPECTED MESSAGE")
    ch.close()
    channel.consume("amq.direct.reply-to", on_message_callback=client_consumer_callback)

This consumer checks that the response contains the expected greeting.

Next, a second consumer mimics the remote service:

def consumer_callback(ch, method, properties, body):
  msg = body.decode('utf-8')

  if msg in "Hello World":
    basic_props = BasicProperties()
    ch.basic_publish(exchange='', routing_key=properties.reply_to, properties=basic_props, body="Hello from Consumer")
    ch.basic_ack(delivery_tag=method.delivery_tag)
  else:
    print("RECEIVED UNEXPECTED MESSAGE")
    channel.consume("amq.direct.reply-to", on_message_callback=consumer_callback)

This consumer receives the request, checks the message contains “Hello World”, and sends back a greeting. Both consumers subscribe to amq.direct.reply-to.

Finally, publish the request with the reply-to header set:

basic_props = BasicProperties(
    reply_to="amq.direct.reply-to"
)

channel.basic_publish(
    exchange='',
    routing_key='',
    properties=basic_props,
    body="Hello World"
)

This example implements a simple health check, but the same pattern works for any request-response flow: data lookups, user registration, or remote task execution.

Unlike the callback queue approach, direct reply-to sends information straight back to the publisher with no queue involved.

When using RPC this way:

  • Try to establish a connection to the client using the generated name on a disposable channel to see if the client still exists
  • Set the immediate flag to false when publishing
  • Start consuming from amq.lavinmq.reply-to before publishing the message
  • Set the mandatory flag if using amq.lavinmq.reply-to to create error logs
  • Do not set the mandatory flag when using a direct-reply with amq.lavinmq.reply-to.* as the queue

These tips help detect when something goes wrong and avoid losing messages.

NOTE: LavinMQ supports the use of amq.rabbitmq.reply-to to allow for compatibility with other brokers.

Wrap up

RPC in LavinMQ comes down to a choice: a callback queue when correlation matters, direct reply-to when simplicity does.

Author

Annie Blomgren

More good reads
Topics MQTT
Security MQTT