How to implement RPC in LavinMQ: callback queues and direct reply-to
Not every message is fire-and-forget. When a service needs to call out to another and wait for a result, like a price check, a data lookup, or a health ping, that’s the Remote Procedure Call (RPC) pattern.
But first, what exactly is RPC?
In an RPC, a client sends a request to a remote server, which processes it and sends back a response. The initiating program waits until it receives a response before moving on.
How does RPC work in LavinMQ?
In LavinMQ, there is no stub, no auto-generated proxy that makes a remote call look like a local function. Instead, there are two approaches:
- Callback queue: the publisher includes a queue address in the message and the response is delivered there.
- Direct reply-to: the response is sent straight back to the publisher without creating a queue.
When to use RPC in LavinMQ?
RPC fits any pattern where the caller needs a result before moving on: data lookups, health checks, validation requests, or any workflow that requires a confirmed response rather than fire-and-forget messaging.
Option 1: Callback queue
The publisher sends a request message with the name of a callback queue in the reply_to property. The service handling the request sends the response to that queue:
channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = response_queue,
),
body=request
)The code above creates a new callback queue for every request, which works but has a drawback: when multiple requests are in flight, it’s not clear which response belongs to which request. Adding a correlation_id, a unique value per request, solves that:
channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = response_queue,
correlation_id = correlation_id
),
body=request
)Workflow of RPC callback queues in LavinMQ:
- The publisher creates an exclusive
callback_queue. - The publisher sends a message to
rpc_queuewith thereply_toproperty set to the callback queue and a uniquecorrelation_id. - The consumer (the service handling the request) picks up the message and processes it.
- The result is published back to the queue specified in
reply_to, using thecorrelation_idto match the response to the original request. - The publisher picks up the response from the callback queue.

callback_queue = channel.queue_declare(queue='', exclusive=True)
channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
correlation_id = correlation_id
),
body=request
)Option 2: Direct reply-to
Direct reply-to sends the response straight back to the publisher without creating a queue. To use it, set reply-to to amq.direct.reply-to and send the message in no-ack mode. LavinMQ generates a special routing key that the handling service uses to publish the result back via the default exchange. No queue is created, which avoids both long-lived and short-lived queues that consume memory.
Using direct reply-to in LavinMQ
The example below demonstrates a simple ping request. Start by consuming from amq.direct.reply-to before publishing, to avoid missing the response:
def client_consumer_callback(ch, method, properties, body):
msg = body.decode('utf-8')
if msg in "Hello from Consumer":
print("TARGET MACHINE IS ACTIVE")
global RECEIVED_HELLO
RECEIVED_HELLO = True
else:
print("RECEIVED UNEXPECTED MESSAGE")
ch.close()
channel.consume("amq.direct.reply-to", on_message_callback=client_consumer_callback)This consumer checks that the response contains the expected greeting.
Next, a second consumer mimics the remote service:
def consumer_callback(ch, method, properties, body):
msg = body.decode('utf-8')
if msg in "Hello World":
basic_props = BasicProperties()
ch.basic_publish(exchange='', routing_key=properties.reply_to, properties=basic_props, body="Hello from Consumer")
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
print("RECEIVED UNEXPECTED MESSAGE")
channel.consume("amq.direct.reply-to", on_message_callback=consumer_callback)This consumer receives the request, checks the message contains “Hello World”, and sends back a greeting. Both consumers subscribe to amq.direct.reply-to.
Finally, publish the request with the reply-to header set:
basic_props = BasicProperties(
reply_to="amq.direct.reply-to"
)
channel.basic_publish(
exchange='',
routing_key='',
properties=basic_props,
body="Hello World"
)This example implements a simple health check, but the same pattern works for any request-response flow: data lookups, user registration, or remote task execution.
Unlike the callback queue approach, direct reply-to sends information straight back to the publisher with no queue involved.
When using RPC this way:
- Try to establish a connection to the client using the generated name on a disposable channel to see if the client still exists
- Set the immediate flag to false when publishing
- Start consuming from
amq.lavinmq.reply-tobefore publishing the message - Set the mandatory flag if using
amq.lavinmq.reply-toto create error logs - Do not set the mandatory flag when using a direct-reply with
amq.lavinmq.reply-to.*as the queue
These tips help detect when something goes wrong and avoid losing messages.
NOTE:
LavinMQ supports the use of amq.rabbitmq.reply-to to allow for compatibility with other brokers.
Wrap up
RPC in LavinMQ comes down to a choice: a callback queue when correlation matters, direct reply-to when simplicity does.
Annie Blomgren