Connection Management¶
Pulse MQTT provides robust connection management with automatic reconnection, session persistence, and graceful handling of network disruptions.
Connection Lifecycle¶
stateDiagram-v2
[*] --> Disconnected
Disconnected --> Connecting: Connect Command
Connecting --> Connected: Success
Connecting --> Disconnected: Failure
Connected --> Disconnected: Disconnect Command
Connected --> Disconnected: Connection Lost
Disconnected --> Connecting: Auto Reconnect
Connected --> Connected: Keep-Alive
Basic Connection¶
Simple Connection¶
val connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "my-app-${userId}",
username = "mqtt_user",
password = "secure_password"
)
val connectCommand = ConnectCommand(
connectionOptions = connectionOptions,
retryPolicy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 2000
)
)
pulseMqttKit.submitCommand(connectCommand)
Monitor Connection State¶
class ConnectionMonitor : MqttUpdatesListener {
override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
if (reconnect) {
logger.info("Reconnected to $serverUri")
showReconnectionNotification()
} else {
logger.info("Initial connection to $serverUri successful")
onFirstConnection()
}
}
override fun onMqttConnectionLost(cause: Throwable?) {
logger.warning("Connection lost: ${cause?.message}")
showOfflineIndicator()
pauseOperations()
}
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
if (command is ConnectCommand) {
hideOfflineIndicator()
resumeOperations()
}
}
}
pulseMqttKit.addListener(ConnectionMonitor())
Connection Options¶
Server URI Formats¶
// TCP (unencrypted)
serverUri = "tcp://broker.example.com:1883"
// WebSocket
serverUri = "ws://broker.example.com:9001"
// WebSocket Secure
serverUri = "wss://broker.example.com:9443"
Client ID¶
Unique identifier for the MQTT client:
// Static ID
clientId = "mobile-app-production"
// Dynamic ID with user context
clientId = "rider-app-${userId}-${deviceId}"
// UUID-based
clientId = "app-${UUID.randomUUID()}"
Best Practices: - Keep client IDs under 23 characters for MQTT 3.1 compatibility - Make IDs unique across all clients - Include context (app, user, device) for debugging - Avoid special characters
Authentication¶
val connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "my-client",
// Username/password authentication
username = "mqtt_user",
password = "secure_password"
)
Keep-Alive¶
Maintains connection by periodic pinging:
val connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "my-client",
// Send keep-alive ping every 30 seconds
keepAliveIntervalSeconds = 30
)
Recommendations: - Mobile: 30-60 seconds (balance between responsiveness and battery) - IoT devices: 60-120 seconds (conserve power) - Always-on services: 15-30 seconds (quick failure detection)
Connection Timeout¶
Maximum time to wait for connection establishment:
val connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "my-client",
// Wait up to 30 seconds for connection
connectionTimeoutSeconds = 30
)
Session Management¶
Clean Session¶
Controls session persistence across connections:
// Clean session = false: Persistent session
val connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "my-client",
cleanSession = false // Remember subscriptions and queued messages
)
Clean Session = false (Persistent): - ✅ Subscriptions preserved across disconnections - ✅ Queued QoS 1 and 2 messages delivered after reconnect - ✅ Useful for mobile apps with intermittent connectivity - ❌ Requires stable client ID - ❌ More broker resources
Clean Session = true (Temporary): - ✅ Fresh start on each connection - ✅ Lower broker resource usage - ✅ Good for testing and development - ❌ Subscriptions lost on disconnect - ❌ Queued messages discarded
Session Expiry¶
MQTT 5.0 feature for fine-grained session control:
Automatic Reconnection¶
Built-in Automatic Reconnection¶
Eclipse Paho provides automatic reconnection:
val connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "my-client",
automaticReconnect = true // Enable auto-reconnect
)
How it works:
1. Connection lost detected
2. onMqttConnectionLost callback invoked
3. Automatic reconnection attempts begin
4. Exponential backoff between attempts
5. onMqttConnectComplete(reconnect=true) on success
Manual Reconnection¶
For more control, handle reconnection manually:
class ManualReconnectHandler : MqttUpdatesListener {
private var reconnectJob: Job? = null
override fun onMqttConnectionLost(cause: Throwable?) {
logger.warning("Connection lost, scheduling reconnect")
scheduleReconnect()
}
private fun scheduleReconnect() {
reconnectJob?.cancel()
reconnectJob = coroutineScope.launch {
var attempt = 0
while (attempt < 5) {
delay(getBackoffDelay(attempt))
val result = tryReconnect()
if (result) {
logger.info("Reconnection successful")
return@launch
}
attempt++
}
logger.error("Reconnection failed after $attempt attempts")
}
}
private fun getBackoffDelay(attempt: Int): Long {
return min(1000L * (2.0.pow(attempt)).toLong(), 30000L)
}
private suspend fun tryReconnect(): Boolean {
return try {
pulseMqttKit.submitCommand(
ConnectCommand(
connectionOptions = savedConnectionOptions,
retryPolicy = RetryPolicy.none()
)
)
true
} catch (e: Exception) {
logger.error("Reconnect attempt failed", e)
false
}
}
}
Last Will and Testament (LWT)¶
Configure a message to be published when client disconnects unexpectedly:
val connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "rider-app-${riderId}",
// LWT configuration
willTopic = "riders/${riderId}/status",
willMessage = """{"status": "offline", "timestamp": ${System.currentTimeMillis()}}""",
willQos = 1,
willRetain = true
)
Use cases: - Presence detection (online/offline status) - Device health monitoring - Emergency notifications - Session cleanup triggers
Example: Presence System
// Connect with online LWT
val connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "user-${userId}",
willTopic = "users/${userId}/presence",
willMessage = """{"status": "offline"}""",
willQos = 1,
willRetain = true
)
// After connection, publish online status
val onlineMessage = ZMqttMessage(
topic = "users/${userId}/presence",
payload = """{"status": "online"}""",
retained = true
)
pulseMqttKit.submitCommand(
PublishCommand(
message = onlineMessage,
qos = QOSLevel.QOS_1
)
)
Graceful Disconnection¶
Normal Disconnect¶
val disconnectCommand = DisconnectCommand(
quiesceTimeout = 5000L, // Wait 5 seconds for pending operations
retryPolicy = RetryPolicy.sequential(
maxRetries = 2,
delayMillis = 1000
)
)
pulseMqttKit.submitCommand(disconnectCommand)
Shutdown Sequence¶
Complete cleanup when app closes:
class MqttLifecycleManager(private val pulseMqttKit: PulseMqttKit) {
fun onAppPause() {
// Optional: Disconnect on pause
pulseMqttKit.submitCommand(DisconnectCommand(quiesceTimeout = 3000L))
}
fun onAppStop() {
// Stop health monitoring
pulseMqttKit.stopHealthMonitoring()
}
fun onAppDestroy() {
// Complete shutdown
pulseMqttKit.shutDown {
logger.info("MQTT client shutdown complete")
}
}
}
Connection Monitoring¶
Health Checks¶
Monitor connection health programmatically:
class ConnectionHealthMonitor {
private var lastMessageTime = System.currentTimeMillis()
fun startMonitoring() {
scope.launch {
while (isActive) {
delay(10_000) // Check every 10 seconds
checkConnectionHealth()
}
}
}
private fun checkConnectionHealth() {
val isConnected = pulseMqttKit.isConnected()
val timeSinceLastMessage = System.currentTimeMillis() - lastMessageTime
when {
isConnected == false -> {
logger.warning("Not connected to broker")
showDisconnectedUI()
}
timeSinceLastMessage > 60_000 -> {
logger.warning("No messages received for 60 seconds")
// Possible connection issue even though client reports connected
verifyConnection()
}
else -> {
showConnectedUI()
}
}
}
private fun verifyConnection() {
// Publish to a test topic to verify connectivity
val testMessage = ZMqttMessage(
topic = "system/ping",
payload = """{"client_id": "${clientId}", "timestamp": ${System.currentTimeMillis()}}"""
)
pulseMqttKit.submitCommand(PublishCommand(testMessage, QOSLevel.QOS_0))
}
}
Connection Metrics¶
Track connection statistics:
class ConnectionMetrics : MqttUpdatesListener {
private var connectionCount = 0
private var disconnectionCount = 0
private var lastConnectTime = 0L
private var totalUptime = 0L
override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
connectionCount++
lastConnectTime = System.currentTimeMillis()
metrics.record("mqtt.connection.established", mapOf(
"reconnect" to reconnect,
"server" to serverUri
))
}
override fun onMqttConnectionLost(cause: Throwable?) {
disconnectionCount++
val uptime = System.currentTimeMillis() - lastConnectTime
totalUptime += uptime
metrics.record("mqtt.connection.lost", mapOf(
"uptime_ms" to uptime,
"cause" to (cause?.javaClass?.simpleName ?: "unknown")
))
}
fun getConnectionStats(): ConnectionStats {
return ConnectionStats(
connections = connectionCount,
disconnections = disconnectionCount,
averageUptime = if (disconnectionCount > 0)
totalUptime / disconnectionCount else 0,
isCurrentlyConnected = pulseMqttKit.isConnected() ?: false
)
}
}
Best Practices¶
1. Always Use Retry Policies¶
// ✅ Good - Handles transient failures
ConnectCommand(
connectionOptions = options,
retryPolicy = RetryPolicy.exponential(maxRetries = 3, baseDelayMillis = 2000)
)
// ❌ Avoid - Single attempt only
ConnectCommand(
connectionOptions = options,
retryPolicy = RetryPolicy.none()
)
2. Monitor Connection Events¶
// ✅ Good - React to connection changes
class ConnectionAwareListener : MqttUpdatesListener {
override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
updateUI(connected = true)
}
override fun onMqttConnectionLost(cause: Throwable?) {
updateUI(connected = false)
}
}
3. Use Clean Session Appropriately¶
// ✅ Mobile app - persistent session
cleanSession = false
// ✅ Testing/debugging - clean session
cleanSession = true
// ✅ Stateless service - clean session
cleanSession = true
4. Configure Appropriate Timeouts¶
// ✅ Good - Reasonable timeouts
ConnectionOptions(
connectionTimeoutSeconds = 30,
keepAliveIntervalSeconds = 60
)
// ❌ Avoid - Too short (false failures)
connectionTimeoutSeconds = 5
// ❌ Avoid - Too long (slow failure detection)
keepAliveIntervalSeconds = 300
5. Handle Disconnection Gracefully¶
// ✅ Good - Clean shutdown
pulseMqttKit.shutDown {
releaseResources()
logger.info("Clean shutdown completed")
}
Troubleshooting¶
Problem: Frequent disconnections¶
Possible causes: - Network instability - Keep-alive too short - Broker resource constraints
Solutions:
// Increase keep-alive
keepAliveIntervalSeconds = 90
// Enable automatic reconnect
automaticReconnect = true
// Use health monitoring
pulseMqttKit.startHealthMonitoring()
Problem: Cannot reconnect after network change¶
Solution: Use network monitoring:
class NetworkAwareReconnect : MqttUpdatesListener {
override fun onInternetConnectionStatusChanged(isConnected: Boolean) {
if (isConnected && pulseMqttKit.isConnected() == false) {
// Network restored, trigger reconnect
pulseMqttKit.submitCommand(ConnectCommand(connectionOptions))
}
}
}
Problem: Session not persisting¶
Check:
- cleanSession = false
- Same client ID used
- Broker supports session persistence
Problem: Connection timeout¶
Solutions:
// Increase timeout
connectionTimeoutSeconds = 60
// Check network connectivity
// Verify broker is reachable
// Confirm firewall rules allow connection
Next Steps¶
- Learn about Auto-Subscription
- Explore Health Monitoring
- See Network Monitoring