Quality of Service (QoS) Levels¶
Learn how to use MQTT Quality of Service levels effectively with Pulse MQTT for reliable message delivery.
Overview¶
MQTT provides three Quality of Service (QoS) levels that define the guarantee of message delivery between client and broker:
- QoS 0: At most once delivery (fire and forget)
- QoS 1: At least once delivery (acknowledged delivery)
- QoS 2: Exactly once delivery (assured delivery)
Pulse MQTT fully supports all three QoS levels for both publishing and subscribing.
QoS Levels Explained¶
QoS 0: At Most Once¶
Delivery Guarantee: Message is delivered at most once, or not at all.
sequenceDiagram
participant Client
participant Broker
Client->>Broker: PUBLISH (QoS 0)
Note over Client,Broker: No acknowledgment
Characteristics: - Fastest delivery - No acknowledgment required - No retries - Message may be lost if network fails - No storage required
Use Cases: - Sensor data where missing occasional readings is acceptable - Real-time metrics that are frequently updated - Non-critical notifications - High-frequency data where latest value matters most
Example:
// Temperature sensor updates every second
val message = ZMqttMessage(
topic = "sensors/temperature",
payload = """{"value": 22.5, "unit": "celsius"}"""
)
pulseMqttKit.submitCommand(
PublishCommand(
message = message,
qos = QOSLevel.QOS_0 // Fire and forget
)
)
QoS 1: At Least Once¶
Delivery Guarantee: Message is delivered at least once, possibly more than once.
sequenceDiagram
participant Client
participant Broker
Client->>Broker: PUBLISH (QoS 1)
Broker->>Client: PUBACK
Note over Client,Broker: Acknowledged, may duplicate
Characteristics: - Acknowledged delivery - Message is retried until acknowledged - Duplicates possible if acknowledgment is lost - Moderate overhead - Requires message storage until acknowledged
Use Cases: - Important events that must be received - User notifications - Order status updates - Application state changes - Most business-critical messages
Example:
// Order status update - must be delivered
val message = ZMqttMessage(
topic = "orders/status",
payload = """{"order_id": "12345", "status": "delivered"}"""
)
pulseMqttKit.submitCommand(
PublishCommand(
message = message,
qos = QOSLevel.QOS_1, // Acknowledged delivery
retryPolicy = RetryPolicy.exponential(maxRetries = 3)
)
)
QoS 2: Exactly Once¶
Delivery Guarantee: Message is delivered exactly once, no more, no less.
sequenceDiagram
participant Client
participant Broker
Client->>Broker: PUBLISH (QoS 2)
Broker->>Client: PUBREC
Client->>Broker: PUBREL
Broker->>Client: PUBCOMP
Note over Client,Broker: Guaranteed exactly once
Characteristics: - Highest guarantee - Four-way handshake - No duplicates - Highest overhead - Slowest delivery - Requires state management
Use Cases: - Financial transactions - Payment confirmations - Critical commands - Billing events - Any scenario where duplicates cause problems
Example:
// Payment transaction - must be exactly once
val message = ZMqttMessage(
topic = "payments/transaction",
payload = """{"amount": 99.99, "transaction_id": "TXN123"}"""
)
pulseMqttKit.submitCommand(
PublishCommand(
message = message,
qos = QOSLevel.QOS_2, // Exactly once
retryPolicy = RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 2000
)
)
)
QoS Comparison¶
| Feature | QoS 0 | QoS 1 | QoS 2 |
|---|---|---|---|
| Delivery | At most once | At least once | Exactly once |
| Duplicates | No | Possible | No |
| Message Loss | Possible | No | No |
| Overhead | Lowest | Medium | Highest |
| Performance | Fastest | Fast | Slowest |
| Bandwidth | Minimal | Low | High |
| Battery Impact | Minimal | Low | High |
| Use When | Speed critical | Reliability needed | Duplicates harmful |
Subscribing with QoS¶
Basic Subscription¶
val topics = mapOf(
"sensors/temperature" to TopicTypeConfig(
messageType = Temperature::class.java,
qosLevel = QOSLevel.QOS_0 // Fast updates, occasional loss OK
),
"orders/updates" to TopicTypeConfig(
messageType = OrderUpdate::class.java,
qosLevel = QOSLevel.QOS_1 // Important updates, must receive
),
"payments/confirm" to TopicTypeConfig(
messageType = PaymentConfirmation::class.java,
qosLevel = QOSLevel.QOS_2 // Critical, no duplicates
)
)
pulseMqttKit.submitCommand(
SubscribeCommand(
topicConfigs = topics,
retryPolicy = RetryPolicy.exponential(maxRetries = 3)
)
)
Downgrade Behavior¶
The actual QoS used is the minimum of: - Publisher's QoS - Subscriber's requested QoS - Broker's maximum QoS
// Publisher sends at QoS 2
PublishCommand(message, qos = QOSLevel.QOS_2)
// Subscriber requests QoS 1
SubscribeCommand(topicConfig with qosLevel = QOSLevel.QOS_1)
// Result: Messages delivered at QoS 1 (minimum of 2 and 1)
Publishing with QoS¶
Message Priority¶
class MessagePublisher(private val mqttKit: PulseMqttKit) {
fun publishCritical(topic: String, payload: String) {
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(topic, payload),
qos = QOSLevel.QOS_2,
retryPolicy = RetryPolicy.exponential(maxRetries = 5)
)
)
}
fun publishImportant(topic: String, payload: String) {
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(topic, payload),
qos = QOSLevel.QOS_1,
retryPolicy = RetryPolicy.exponential(maxRetries = 3)
)
)
}
fun publishRealtime(topic: String, payload: String) {
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(topic, payload),
qos = QOSLevel.QOS_0 // No retry needed
)
)
}
}
Retained Messages¶
// Publish with QoS and retain flag
val message = ZMqttMessage(
topic = "app/config",
payload = """{"version": "2.0"}""",
retained = true // New subscribers get latest value
)
pulseMqttKit.submitCommand(
PublishCommand(
message = message,
qos = QOSLevel.QOS_1 // Ensure retention is acknowledged
)
)
Use Case Examples¶
1. IoT Sensor Network¶
class SensorPublisher(private val mqttKit: PulseMqttKit) {
fun publishTemperature(value: Double) {
// High frequency, loss acceptable
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(
topic = "sensors/temp",
payload = """{"value": $value}"""
),
qos = QOSLevel.QOS_0
)
)
}
fun publishAlert(message: String) {
// Critical alerts, must be delivered
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(
topic = "sensors/alert",
payload = """{"alert": "$message"}"""
),
qos = QOSLevel.QOS_2
)
)
}
}
2. E-Commerce Application¶
class OrderManager(private val mqttKit: PulseMqttKit) {
fun updateOrderStatus(orderId: String, status: String) {
// Important but duplicates OK (idempotent)
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(
topic = "orders/$orderId/status",
payload = """{"status": "$status"}"""
),
qos = QOSLevel.QOS_1
)
)
}
fun processPayment(orderId: String, amount: Double) {
// Critical, no duplicates allowed
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(
topic = "payments/process",
payload = """{"order": "$orderId", "amount": $amount}"""
),
qos = QOSLevel.QOS_2
)
)
}
}
3. Chat Application¶
class ChatManager(private val mqttKit: PulseMqttKit) {
fun sendMessage(roomId: String, message: String) {
// Must be delivered, duplicates handled by client
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(
topic = "chat/$roomId/messages",
payload = """{"text": "$message", "id": "${UUID.randomUUID()}"}"""
),
qos = QOSLevel.QOS_1 // QoS 1 sufficient with message IDs
)
)
}
fun sendTypingIndicator(roomId: String, isTyping: Boolean) {
// Transient state, loss acceptable
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(
topic = "chat/$roomId/typing",
payload = """{"typing": $isTyping}"""
),
qos = QOSLevel.QOS_0
)
)
}
}
4. Location Tracking¶
class LocationTracker(private val mqttKit: PulseMqttKit) {
fun updateLocation(lat: Double, lng: Double) {
// Frequent updates, latest is most important
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(
topic = "location/updates",
payload = """{"lat": $lat, "lng": $lng}"""
),
qos = QOSLevel.QOS_0
)
)
}
fun reportSignificantLocation(lat: Double, lng: Double) {
// Important waypoints, must be recorded
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage(
topic = "location/waypoints",
payload = """{"lat": $lat, "lng": $lng}"""
),
qos = QOSLevel.QOS_1
)
)
}
}
Performance Optimization¶
1. Choose Appropriate QoS¶
// ✓ Good - Use QoS 0 for high-frequency data
fun publishMetrics(metrics: Metrics) {
mqttKit.submitCommand(
PublishCommand(
message = createMessage(metrics),
qos = QOSLevel.QOS_0 // Fast, minimal overhead
)
)
}
// ✗ Bad - Using QoS 2 for high-frequency data
fun publishMetrics(metrics: Metrics) {
mqttKit.submitCommand(
PublishCommand(
message = createMessage(metrics),
qos = QOSLevel.QOS_2 // Slow, high overhead
)
)
}
2. Batch Operations¶
// For QoS 1/2, consider batching multiple updates
fun batchPublish(updates: List<Update>) {
val batchedPayload = gson.toJson(updates)
mqttKit.submitCommand(
PublishCommand(
message = ZMqttMessage("updates/batch", batchedPayload),
qos = QOSLevel.QOS_1 // Single acknowledgment for all
)
)
}
3. Monitor Performance¶
class QoSPerformanceMonitor : MqttUpdatesListener {
private val qos0Times = mutableListOf<Long>()
private val qos1Times = mutableListOf<Long>()
private val qos2Times = mutableListOf<Long>()
override fun onCommandSuccess(
command: MqttCommand,
result: CommandResult.Success
) {
if (command is PublishCommand) {
when (command.qos) {
QOSLevel.QOS_0 -> qos0Times.add(result.totalTimeMillis)
QOSLevel.QOS_1 -> qos1Times.add(result.totalTimeMillis)
QOSLevel.QOS_2 -> qos2Times.add(result.totalTimeMillis)
}
}
}
fun getAverageTime(qos: QOSLevel): Double {
val times = when (qos) {
QOSLevel.QOS_0 -> qos0Times
QOSLevel.QOS_1 -> qos1Times
QOSLevel.QOS_2 -> qos2Times
}
return times.average()
}
}
Best Practices¶
1. Match QoS to Requirements¶
// Critical financial data
qos = QOSLevel.QOS_2
// Important notifications
qos = QOSLevel.QOS_1
// Real-time metrics
qos = QOSLevel.QOS_0
2. Consider Network Conditions¶
class AdaptiveQoSManager {
fun getOptimalQoS(networkQuality: NetworkQuality): QOSLevel {
return when (networkQuality) {
NetworkQuality.EXCELLENT -> QOSLevel.QOS_2
NetworkQuality.GOOD -> QOSLevel.QOS_1
NetworkQuality.POOR -> QOSLevel.QOS_0
}
}
}
3. Handle Idempotency¶
// For QoS 1, ensure operations are idempotent
data class OrderStatusUpdate(
val orderId: String,
val status: OrderStatus,
val timestamp: Long,
val version: Int // Handle duplicates
)
4. Balance Reliability and Performance¶
// Use QoS 1 + message IDs instead of QoS 2 when possible
data class Message(
val id: String = UUID.randomUUID().toString(),
val content: String,
val timestamp: Long = System.currentTimeMillis()
)
// Client deduplicates using message ID
Troubleshooting¶
High Latency with QoS 2¶
Problem: QoS 2 messages are slow
Solution:
// Consider downgrading to QoS 1 with deduplication
// Or increase timeouts
PublishCommand(
message = message,
qos = QOSLevel.QOS_2,
retryPolicy = RetryPolicy.exponential(
baseDelayMillis = 2000, // Longer base delay
maxDelayMillis = 60000
)
)
Duplicate Messages with QoS 1¶
Problem: Receiving duplicate messages
Solution:
// Implement client-side deduplication
class MessageDeduplicator {
private val seen = mutableSetOf<String>()
fun isDuplicate(messageId: String): Boolean {
return !seen.add(messageId)
}
}
Message Loss with QoS 0¶
Problem: Missing critical messages
Solution:
// Upgrade to QoS 1 or 2
PublishCommand(
message = message,
qos = QOSLevel.QOS_1 // Ensure delivery
)
Next Steps¶
- Learn about Command Architecture
- Configure Retry Policies
- Explore Type-Safe Messages