Using MQTT with LavinMQ

Documentation work in progress: With the upcoming release of LavinMQ 2.1, support for the MQTT 3.1 protocol will be available. Please note that LavinMQ 2.1 is not yet released.

LavinMQ supports the MQTT 3.1 protocol. This documentation provides an overview of the MQTT protocol implementation in LavinMQ, usage examples, and configuration details.

MQTT Protocol Overview

The following table outlines the MQTT protocol and the required arguments for each packet type when using the client mqtt-client.cr (LavinMQ’s MQTT client library, check it out here: link to come).

Packet Type Required Fields Description
Connect client_id, username, password Starts a session by authenticating and connecting the client to the broker.
Subscribe topic, qos Subscribes to a specific topic.
Unsubscribe topic Unsubscribes to a specific topic.
Publish topic, qos Sends a message to a specific topic.
Puback message_id Acknowledges receipt of a message (QoS 1).
Disconnect N/A Ends the session gracefully.

Connecting to LavinMQ with MQTT

Steps to Connect:

  1. Send a CONNECT packet to LavinMQ’s MQTT listener on port 1883.
  2. Include the following fields in the CONNECT packet:
    • client_id: A unique identifier for your client.
    • username and password: For authentication.
    • clean_session: Boolean to specify whether to start a clean session.
    • will: An optional field for a Last Will and Testament message.
    • qos: Quality of Service level.

Example Using mqtt-client.cr:

require "mqtt-client"
require "mqtt-protocol"

will = MQTT::Client::Message.new("connect", "body".to_slice, 1_u8, false)

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false, will: will)

#Do what you want to do with your MQTT-connect
#...

client.disconnect

Quality of Service (QoS)

QoS defines the guarantee level for message delivery:

  • 0 (At most once): No acknowledgment is required; the message may be lost.
  • 1 (At least once): Requires acknowledgment (Puback); may result in duplicates of messages.

Will

The Will argument specifies a message to be sent by the broker to a chosen topic if the client disconnects unexpectedly. Fields include:

  • will_topic: Topic for the Will message.
  • will_message: Content of the Will message.
  • will_qos: QoS level for the Will message.
  • will_retain: Whether the Will message should be retained.

Clean session

The clean_session flag determines whether to establish a persistent or transient session:

  • True: The broker will not retain session information, such as subscriptions or undelivered messages, after the client disconnects.
  • False: The session is persistent, and the broker retains subscriptions and undelivered messages, enabling the client to resume where it left off.

Persistent sessions benefit clients who need to reconnect frequently while maintaining continuity. However, transient sessions are lightweight and suitable for clients that only need real-time interactions without history.

Subscribing and Unsubscribing

Example: Subscribe to a Topic

require "mqtt-client"
require "mqtt-protocol"

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)

client.on_message do |msg|
  puts "Got a message, on topic #{msg.topic}: #{String.new(msg.body)}"
end

client.subscribe("my/topic")

# Do what you want to do with your MQTT-subscription
# ...

client.unsubscribe("my/topic")

Publishing and Acknowledging Messages

Example: Publish a Message

Disclaimer: You cannot publish messages to a topic that no session has subscribed to yet.

require "mqtt-client"
require "mqtt-protocol"

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)

client.publish("my/topic", "Hello World")

Example: Acknowledge a Published Message (Puback)

require "mqtt-client"
require "mqtt-protocol"

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)

client.on_message do |msg|
  puts "Got a message, on topic #{msg.topic}: #{String.new(msg.body)}"
  client.puback("my/topic", msg.id)
end

client.subscribe("my/topic", qos: 1)

Retained Messages

Retained messages are stored by the broker and delivered to new subscribers on a topic when they subscribe. To send a retained message:

Example: Send a Retained Message

require "mqtt-client"
require "mqtt-protocol"

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)

client.publish("my/topic", "Hello World", retain: true)

Clustering with MQTT

In a clustered setup, LavinMQ replicates retained messages across nodes. To ensure followers forward MQTT traffic to the leader, configure the following in your settings:

Config Variable Description
mqtt_unix_path UNIX path to listen for MQTT traffic, e.g., /tmp/lavinmq-mqtt.sock.

New Configuration Variables

The following variables control MQTT behavior in LavinMQ:

Variable Description
mqtt_bind IP address that AMQP, MQTT, and HTTP servers will listen on.
mqtt_port Port for non-TLS MQTT connections. Default: 1883.
mqtts_port Port for TLS-enabled MQTT connections. Default: 8883.
mqtt_unix_path UNIX socket path for MQTT. Example: /tmp/lavinmq-mqtt.sock.
max_inflight_messages Maximum number of messages in flight simultaneously. Default: 65535.

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.