Other Features

MQTT with LavinMQ

MQTT, which stands for Message Queuing Telemetry Transport, is a lightweight, publish-subscribe messaging protocol. It's designed for machine-to-machine (M2M) and Internet of Things (IoT) communication. Developed with resource-constrained devices in mind, MQTT is designed for situations where network bandwidth is limited and power consumption needs to be minimized. This makes it ideal for remote sensors, embedded systems, and mobile applications with intermittent connectivity.

LavinMQ is a lightweight message broker, and MQTT's small code footprint and focus on efficiency make it a perfect match. By supporting MQTT, LavinMQ can serve as a robust and scalable platform for IoT and mobile applications, such as our [Real-time temperature monitoring with LavinMQ project](https://lavinmq.com/iot). It ensures asynchronous communication across different devices, making it a trusted choice for building flexible and resilient messaging systems.

MQTT Protocol Overview

MQTT operates on a client-server model and uses TCP/IP for all communication. By default, the protocol uses specific ports: 1883 for unencrypted connections and 8883 for secure, encrypted connections over TLS/SSL (MQTTS).

To establish a connection to a LavinMQ broker, the following information is needed:

  • Host/broker address: The domain name or IP address of the LavinMQ server.

  • Port: The port number (1883 for unencrypted or 8883 for MQTTS).

  • Credentials: Username and password for authentication. LavinMQ supports MQTT 3.1.0 and 3.1.1 clients and there are many client libraries for nearly every programming language. While they all follow the same protocol, their syntax and usage will differ a bit. The examples that follow are all in Crystal for the client library mqtt-client.cr.

Connecting to LavinMQ with MQTT

The following table outlines the MQTT protocol and the required arguments for each packet type.

Packet type Required Fields Description
Connect client_id, username, password Starts a session by authenticating and connecting the client to the broker.
Subscribe topic, qos Subscribes to a specific topic.
Unsubscribe topic Unsubscribes to a specific topic.
Publish topic, qos Sends a message to a specific topic.
Puback message_id Acknowledges receipt of a message (QoS 1).
Disconnect N/A Ends the session gracefully.

Steps to Connect:

  1. Send a CONNECT packet to LavinMQ’s MQTT listener on port 1883.
  2. Include the following fields in the CONNECT packet:
    • client_id: A unique identifier for the client.
    • username and password: For authentication.
    • clean_session: Boolean to specify whether to start a clean session.
    • will: An optional field for a Last Will and Testament message.
    • qos: Quality of Service level.

Example Using mqtt-client.cr:

require "mqtt-client"
require "mqtt-protocol"

will = MQTT::Client::Message.new("connect", "body".to_slice, 1_u8, false)

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false, will: will)

# Application logic goes here.

client.disconnect

Quality of Service (QoS)

QoS defines the guarantee level for message delivery:

  • 0 (At most once): No acknowledgment is required; the message may be lost.
  • 1 (At least once): Requires acknowledgment (Puback); may result in duplicates of messages.

Will

The Will argument specifies a message to be sent by the broker to a chosen topic if the client disconnects unexpectedly. Fields include:

  • will_topic: Topic for the Will message.
  • will_message: Content of the Will message.
  • will_qos: QoS level for the Will message.
  • will_retain: Whether the Will message should be retained.

Clean session

The clean_session flag determines whether to establish a persistent or transient session:

  • True: The broker will not retain session information, such as subscriptions or undelivered messages, after the client disconnects.
  • False: The session is persistent, and the broker retains subscriptions and undelivered messages, enabling the client to resume where it left off.

Persistent sessions benefit clients who need to reconnect frequently while maintaining continuity. However, transient sessions are lightweight and suitable for clients that only need real-time interactions without history.

Subscribing and Unsubscribing

Example: Subscribe to a Topic

require "mqtt-client"
require "mqtt-protocol"

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)

client.on_message do |msg|
  puts "Got a message, on topic #{msg.topic}: #{String.new(msg.body)}"
end

client.subscribe("my/topic")

# Application logic goes here.

client.unsubscribe("my/topic")

Publishing and Acknowledging Messages

Example: Publish a Message

Disclaimer: It’s not possible to publish messages to a topic that no session has subscribed to yet.

require "mqtt-client"
require "mqtt-protocol"

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)

client.publish("my/topic", "Hello World")

Example: Acknowledge a Published Message (Puback)

require "mqtt-client"
require "mqtt-protocol"

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)

client.on_message do |msg|
  puts "Got a message, on topic #{msg.topic}: #{String.new(msg.body)}"
  client.puback("my/topic", msg.id)
end

client.subscribe("my/topic", qos: 1)

Retained Messages

Retained messages are stored by the broker and delivered to new subscribers on a topic when they subscribe. To send a retained message:

Example: Send a Retained Message

require "mqtt-client"
require "mqtt-protocol"

client = MQTT::Client.new("localhost", 1883, user: "guest", password: "guest", client_id: "hello", clean_session: false)

client.publish("my/topic", "Hello World", retain: true)

Clustering with MQTT

In a clustered setup, LavinMQ replicates retained messages across nodes. To ensure followers forward MQTT traffic to the leader, configure the following settings:

Config variable Description
mqtt_unix_path UNIX path to listen for MQTT traffic, e.g.,
/tmp/lavinmq-mqtt.sock.

New Configuration Variables

The following variables control MQTT behavior in LavinMQ:

Variable Description
bind IP address that AMQP, MQTT, and HTTP servers will listen on.
port Port for non-TLS MQTT connections.
Default: 1883.
tls_port Port for TLS-enabled MQTT connections.
Default: 8883.
unix_path UNIX socket path for MQTT.
Example: /tmp/lavinmq-mqtt.sock.
max_inflight_messages Maximum number of messages in flight simultaneously.
Default: 65535.
default_vhost Specifies the default virtual host (vhost) for MQTT connections. Default: "/"

Ready to take the next steps?

Managed LavinMQ instance via CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec . You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Get started with CloudAMQP ->

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.


Can’t find what you’re looking for? Let us know
Was this helpful?

Search