Skip to content

Quality of Service (QoS) Levels

Learn how to use MQTT Quality of Service levels effectively with Pulse MQTT for reliable message delivery.

Overview

MQTT provides three Quality of Service (QoS) levels that define the guarantee of message delivery between client and broker:

  • QoS 0: At most once delivery (fire and forget)
  • QoS 1: At least once delivery (acknowledged delivery)
  • QoS 2: Exactly once delivery (assured delivery)

Pulse MQTT fully supports all three QoS levels for both publishing and subscribing.

QoS Levels Explained

QoS 0: At Most Once

Delivery Guarantee: Message is delivered at most once, or not at all.

sequenceDiagram
    participant Client
    participant Broker
    Client->>Broker: PUBLISH (QoS 0)
    Note over Client,Broker: No acknowledgment

Characteristics: - Fastest delivery - No acknowledgment required - No retries - Message may be lost if network fails - No storage required

Use Cases: - Sensor data where missing occasional readings is acceptable - Real-time metrics that are frequently updated - Non-critical notifications - High-frequency data where latest value matters most

Example:

// Temperature sensor updates every second
val message = ZMqttMessage(
    topic = "sensors/temperature",
    payload = """{"value": 22.5, "unit": "celsius"}"""
)

pulseMqttKit.submitCommand(
    PublishCommand(
        message = message,
        qos = QOSLevel.QOS_0  // Fire and forget
    )
)

QoS 1: At Least Once

Delivery Guarantee: Message is delivered at least once, possibly more than once.

sequenceDiagram
    participant Client
    participant Broker
    Client->>Broker: PUBLISH (QoS 1)
    Broker->>Client: PUBACK
    Note over Client,Broker: Acknowledged, may duplicate

Characteristics: - Acknowledged delivery - Message is retried until acknowledged - Duplicates possible if acknowledgment is lost - Moderate overhead - Requires message storage until acknowledged

Use Cases: - Important events that must be received - User notifications - Order status updates - Application state changes - Most business-critical messages

Example:

// Order status update - must be delivered
val message = ZMqttMessage(
    topic = "orders/status",
    payload = """{"order_id": "12345", "status": "delivered"}"""
)

pulseMqttKit.submitCommand(
    PublishCommand(
        message = message,
        qos = QOSLevel.QOS_1,  // Acknowledged delivery
        retryPolicy = RetryPolicy.exponential(maxRetries = 3)
    )
)

QoS 2: Exactly Once

Delivery Guarantee: Message is delivered exactly once, no more, no less.

sequenceDiagram
    participant Client
    participant Broker
    Client->>Broker: PUBLISH (QoS 2)
    Broker->>Client: PUBREC
    Client->>Broker: PUBREL
    Broker->>Client: PUBCOMP
    Note over Client,Broker: Guaranteed exactly once

Characteristics: - Highest guarantee - Four-way handshake - No duplicates - Highest overhead - Slowest delivery - Requires state management

Use Cases: - Financial transactions - Payment confirmations - Critical commands - Billing events - Any scenario where duplicates cause problems

Example:

// Payment transaction - must be exactly once
val message = ZMqttMessage(
    topic = "payments/transaction",
    payload = """{"amount": 99.99, "transaction_id": "TXN123"}"""
)

pulseMqttKit.submitCommand(
    PublishCommand(
        message = message,
        qos = QOSLevel.QOS_2,  // Exactly once
        retryPolicy = RetryPolicy.exponential(
            maxRetries = 5,
            baseDelayMillis = 2000
        )
    )
)

QoS Comparison

Feature QoS 0 QoS 1 QoS 2
Delivery At most once At least once Exactly once
Duplicates No Possible No
Message Loss Possible No No
Overhead Lowest Medium Highest
Performance Fastest Fast Slowest
Bandwidth Minimal Low High
Battery Impact Minimal Low High
Use When Speed critical Reliability needed Duplicates harmful

Subscribing with QoS

Basic Subscription

val topics = mapOf(
    "sensors/temperature" to TopicTypeConfig(
        messageType = Temperature::class.java,
        qosLevel = QOSLevel.QOS_0  // Fast updates, occasional loss OK
    ),
    "orders/updates" to TopicTypeConfig(
        messageType = OrderUpdate::class.java,
        qosLevel = QOSLevel.QOS_1  // Important updates, must receive
    ),
    "payments/confirm" to TopicTypeConfig(
        messageType = PaymentConfirmation::class.java,
        qosLevel = QOSLevel.QOS_2  // Critical, no duplicates
    )
)

pulseMqttKit.submitCommand(
    SubscribeCommand(
        topicConfigs = topics,
        retryPolicy = RetryPolicy.exponential(maxRetries = 3)
    )
)

Downgrade Behavior

The actual QoS used is the minimum of: - Publisher's QoS - Subscriber's requested QoS - Broker's maximum QoS

// Publisher sends at QoS 2
PublishCommand(message, qos = QOSLevel.QOS_2)

// Subscriber requests QoS 1
SubscribeCommand(topicConfig with qosLevel = QOSLevel.QOS_1)

// Result: Messages delivered at QoS 1 (minimum of 2 and 1)

Publishing with QoS

Message Priority

class MessagePublisher(private val mqttKit: PulseMqttKit) {

    fun publishCritical(topic: String, payload: String) {
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(topic, payload),
                qos = QOSLevel.QOS_2,
                retryPolicy = RetryPolicy.exponential(maxRetries = 5)
            )
        )
    }

    fun publishImportant(topic: String, payload: String) {
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(topic, payload),
                qos = QOSLevel.QOS_1,
                retryPolicy = RetryPolicy.exponential(maxRetries = 3)
            )
        )
    }

    fun publishRealtime(topic: String, payload: String) {
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(topic, payload),
                qos = QOSLevel.QOS_0  // No retry needed
            )
        )
    }
}

Retained Messages

// Publish with QoS and retain flag
val message = ZMqttMessage(
    topic = "app/config",
    payload = """{"version": "2.0"}""",
    retained = true  // New subscribers get latest value
)

pulseMqttKit.submitCommand(
    PublishCommand(
        message = message,
        qos = QOSLevel.QOS_1  // Ensure retention is acknowledged
    )
)

Use Case Examples

1. IoT Sensor Network

class SensorPublisher(private val mqttKit: PulseMqttKit) {

    fun publishTemperature(value: Double) {
        // High frequency, loss acceptable
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(
                    topic = "sensors/temp",
                    payload = """{"value": $value}"""
                ),
                qos = QOSLevel.QOS_0
            )
        )
    }

    fun publishAlert(message: String) {
        // Critical alerts, must be delivered
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(
                    topic = "sensors/alert",
                    payload = """{"alert": "$message"}"""
                ),
                qos = QOSLevel.QOS_2
            )
        )
    }
}

2. E-Commerce Application

class OrderManager(private val mqttKit: PulseMqttKit) {

    fun updateOrderStatus(orderId: String, status: String) {
        // Important but duplicates OK (idempotent)
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(
                    topic = "orders/$orderId/status",
                    payload = """{"status": "$status"}"""
                ),
                qos = QOSLevel.QOS_1
            )
        )
    }

    fun processPayment(orderId: String, amount: Double) {
        // Critical, no duplicates allowed
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(
                    topic = "payments/process",
                    payload = """{"order": "$orderId", "amount": $amount}"""
                ),
                qos = QOSLevel.QOS_2
            )
        )
    }
}

3. Chat Application

class ChatManager(private val mqttKit: PulseMqttKit) {

    fun sendMessage(roomId: String, message: String) {
        // Must be delivered, duplicates handled by client
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(
                    topic = "chat/$roomId/messages",
                    payload = """{"text": "$message", "id": "${UUID.randomUUID()}"}"""
                ),
                qos = QOSLevel.QOS_1  // QoS 1 sufficient with message IDs
            )
        )
    }

    fun sendTypingIndicator(roomId: String, isTyping: Boolean) {
        // Transient state, loss acceptable
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(
                    topic = "chat/$roomId/typing",
                    payload = """{"typing": $isTyping}"""
                ),
                qos = QOSLevel.QOS_0
            )
        )
    }
}

4. Location Tracking

class LocationTracker(private val mqttKit: PulseMqttKit) {

    fun updateLocation(lat: Double, lng: Double) {
        // Frequent updates, latest is most important
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(
                    topic = "location/updates",
                    payload = """{"lat": $lat, "lng": $lng}"""
                ),
                qos = QOSLevel.QOS_0
            )
        )
    }

    fun reportSignificantLocation(lat: Double, lng: Double) {
        // Important waypoints, must be recorded
        mqttKit.submitCommand(
            PublishCommand(
                message = ZMqttMessage(
                    topic = "location/waypoints",
                    payload = """{"lat": $lat, "lng": $lng}"""
                ),
                qos = QOSLevel.QOS_1
            )
        )
    }
}

Performance Optimization

1. Choose Appropriate QoS

// ✓ Good - Use QoS 0 for high-frequency data
fun publishMetrics(metrics: Metrics) {
    mqttKit.submitCommand(
        PublishCommand(
            message = createMessage(metrics),
            qos = QOSLevel.QOS_0  // Fast, minimal overhead
        )
    )
}

// ✗ Bad - Using QoS 2 for high-frequency data
fun publishMetrics(metrics: Metrics) {
    mqttKit.submitCommand(
        PublishCommand(
            message = createMessage(metrics),
            qos = QOSLevel.QOS_2  // Slow, high overhead
        )
    )
}

2. Batch Operations

// For QoS 1/2, consider batching multiple updates
fun batchPublish(updates: List<Update>) {
    val batchedPayload = gson.toJson(updates)
    mqttKit.submitCommand(
        PublishCommand(
            message = ZMqttMessage("updates/batch", batchedPayload),
            qos = QOSLevel.QOS_1  // Single acknowledgment for all
        )
    )
}

3. Monitor Performance

class QoSPerformanceMonitor : MqttUpdatesListener {

    private val qos0Times = mutableListOf<Long>()
    private val qos1Times = mutableListOf<Long>()
    private val qos2Times = mutableListOf<Long>()

    override fun onCommandSuccess(
        command: MqttCommand,
        result: CommandResult.Success
    ) {
        if (command is PublishCommand) {
            when (command.qos) {
                QOSLevel.QOS_0 -> qos0Times.add(result.totalTimeMillis)
                QOSLevel.QOS_1 -> qos1Times.add(result.totalTimeMillis)
                QOSLevel.QOS_2 -> qos2Times.add(result.totalTimeMillis)
            }
        }
    }

    fun getAverageTime(qos: QOSLevel): Double {
        val times = when (qos) {
            QOSLevel.QOS_0 -> qos0Times
            QOSLevel.QOS_1 -> qos1Times
            QOSLevel.QOS_2 -> qos2Times
        }
        return times.average()
    }
}

Best Practices

1. Match QoS to Requirements

// Critical financial data
qos = QOSLevel.QOS_2

// Important notifications
qos = QOSLevel.QOS_1

// Real-time metrics
qos = QOSLevel.QOS_0

2. Consider Network Conditions

class AdaptiveQoSManager {

    fun getOptimalQoS(networkQuality: NetworkQuality): QOSLevel {
        return when (networkQuality) {
            NetworkQuality.EXCELLENT -> QOSLevel.QOS_2
            NetworkQuality.GOOD -> QOSLevel.QOS_1
            NetworkQuality.POOR -> QOSLevel.QOS_0
        }
    }
}

3. Handle Idempotency

// For QoS 1, ensure operations are idempotent
data class OrderStatusUpdate(
    val orderId: String,
    val status: OrderStatus,
    val timestamp: Long,
    val version: Int  // Handle duplicates
)

4. Balance Reliability and Performance

// Use QoS 1 + message IDs instead of QoS 2 when possible
data class Message(
    val id: String = UUID.randomUUID().toString(),
    val content: String,
    val timestamp: Long = System.currentTimeMillis()
)

// Client deduplicates using message ID

Troubleshooting

High Latency with QoS 2

Problem: QoS 2 messages are slow

Solution:

// Consider downgrading to QoS 1 with deduplication
// Or increase timeouts
PublishCommand(
    message = message,
    qos = QOSLevel.QOS_2,
    retryPolicy = RetryPolicy.exponential(
        baseDelayMillis = 2000,  // Longer base delay
        maxDelayMillis = 60000
    )
)

Duplicate Messages with QoS 1

Problem: Receiving duplicate messages

Solution:

// Implement client-side deduplication
class MessageDeduplicator {
    private val seen = mutableSetOf<String>()

    fun isDuplicate(messageId: String): Boolean {
        return !seen.add(messageId)
    }
}

Message Loss with QoS 0

Problem: Missing critical messages

Solution:

// Upgrade to QoS 1 or 2
PublishCommand(
    message = message,
    qos = QOSLevel.QOS_1  // Ensure delivery
)

Next Steps