MQTT with LavinMQ
MQTT, which stands for Message Queuing Telemetry Transport, is a lightweight, publish-subscribe messaging protocol. It's designed for machine-to-machine (M2M) and Internet of Things (IoT) communication. Developed with resource-constrained devices in mind, MQTT is designed for situations where network bandwidth is limited and power consumption needs to be minimized. This makes it ideal for remote sensors, embedded systems, and mobile applications with intermittent connectivity.
LavinMQ is a lightweight message broker, and MQTT's small code footprint and focus on efficiency make it a perfect match. By supporting MQTT, LavinMQ can serve as a robust and scalable platform for IoT and mobile applications, such as our [Real-time temperature monitoring with LavinMQ project](https://lavinmq.com/iot). It ensures asynchronous communication across different devices, making it a trusted choice for building flexible and resilient messaging systems.
MQTT Protocol Overview
MQTT operates on a client-server model and uses TCP/IP for all communication. By default, the protocol uses specific ports: 1883 for unencrypted connections and 8883 for secure, encrypted connections over TLS/SSL (MQTTS).
To establish a connection to a LavinMQ broker, the following information is needed:
-
Host/broker address: The domain name or IP address of the LavinMQ server.
-
Port: The port number (1883 for unencrypted or 8883 for MQTTS).
-
Credentials: Username and password for authentication. LavinMQ supports MQTT 3.1.0 and 3.1.1 clients and there are many client libraries for nearly every programming language. While they all follow the same protocol, their syntax and usage will differ a bit. The examples that follow are all in Crystal for the client library mqtt-client.cr.
Connecting to LavinMQ with MQTT
The following table outlines the MQTT protocol and the required arguments for each packet type.
Packet type | Required Fields | Description |
---|---|---|
Connect | client_id , username , password |
Starts a session by authenticating and connecting the client to the broker. |
Subscribe | topic , qos |
Subscribes to a specific topic. |
Unsubscribe | topic |
Unsubscribes to a specific topic. |
Publish | topic , qos |
Sends a message to a specific topic. |
Puback | message_id |
Acknowledges receipt of a message (QoS 1). |
Disconnect | N/A | Ends the session gracefully. |
Steps to Connect:
- Send a CONNECT packet to LavinMQ’s MQTT listener on port
1883
. - Include the following fields in the CONNECT packet:
client_id
: A unique identifier for the client.username
andpassword
: For authentication.clean_session
: Boolean to specify whether to start a clean session.will
: An optional field for a Last Will and Testament message.qos
: Quality of Service level.
Example Using mqtt-client.cr
:
require "mqtt-client"
require "mqtt-protocol"
will = MQTT::Client::Message.new("connect", "body".to_slice, 1_u8, false)
client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false, will: will)
# Application logic goes here.
client.disconnect
Quality of Service (QoS)
QoS defines the guarantee level for message delivery:
- 0 (At most once): No acknowledgment is required; the message may be lost.
- 1 (At least once): Requires acknowledgment (Puback); may result in duplicates of messages.
Will
The Will argument specifies a message to be sent by the broker to a chosen topic if the client disconnects unexpectedly. Fields include:
will_topic
: Topic for the Will message.will_message
: Content of the Will message.will_qos
: QoS level for the Will message.will_retain
: Whether the Will message should be retained.
Clean session
The clean_session
flag determines whether to establish a persistent or transient session:
- True: The broker will not retain session information, such as subscriptions or undelivered messages, after the client disconnects.
- False: The session is persistent, and the broker retains subscriptions and undelivered messages, enabling the client to resume where it left off.
Persistent sessions benefit clients who need to reconnect frequently while maintaining continuity. However, transient sessions are lightweight and suitable for clients that only need real-time interactions without history.
Subscribing and Unsubscribing
Example: Subscribe to a Topic
require "mqtt-client"
require "mqtt-protocol"
client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)
client.on_message do |msg|
puts "Got a message, on topic #{msg.topic}: #{String.new(msg.body)}"
end
client.subscribe("my/topic")
# Application logic goes here.
client.unsubscribe("my/topic")
Publishing and Acknowledging Messages
Example: Publish a Message
Disclaimer: It’s not possible to publish messages to a topic that no session has subscribed to yet.
require "mqtt-client"
require "mqtt-protocol"
client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)
client.publish("my/topic", "Hello World")
Example: Acknowledge a Published Message (Puback)
require "mqtt-client"
require "mqtt-protocol"
client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)
client.on_message do |msg|
puts "Got a message, on topic #{msg.topic}: #{String.new(msg.body)}"
client.puback("my/topic", msg.id)
end
client.subscribe("my/topic", qos: 1)
Retained Messages
Retained messages are stored by the broker and delivered to new subscribers on a topic when they subscribe. To send a retained message:
Example: Send a Retained Message
require "mqtt-client"
require "mqtt-protocol"
client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)
client.publish("my/topic", "Hello World", retain: true)
Clustering with MQTT
In a clustered setup, LavinMQ replicates retained messages across nodes. To ensure followers forward MQTT traffic to the leader, configure the following settings:
Config variable | Description |
---|---|
mqtt_unix_path |
UNIX path to listen for MQTT traffic, e.g., /tmp/lavinmq-mqtt.sock . |
New Configuration Variables
The following variables control MQTT behavior in LavinMQ:
Variable | Description |
---|---|
bind |
IP address that AMQP, MQTT, and HTTP servers will listen on. |
port |
Port for non-TLS MQTT connections. Default: 1883 . |
tls_port |
Port for TLS-enabled MQTT connections. Default: 8883 . |
unix_path |
UNIX socket path for MQTT. Example: /tmp/lavinmq-mqtt.sock . |
max_inflight_messages |
Maximum number of messages in flight simultaneously. Default: 65535 . |
default_vhost |
Specifies the default virtual host (vhost) for MQTT connections. Default: "/" |
Ready to take the next steps?
Managed LavinMQ instance via CloudAMQP
LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec . You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.
Get started with CloudAMQP ->Help and feedback
We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.