Skip to content

Connection Management

Pulse MQTT provides robust connection management with automatic reconnection, session persistence, and graceful handling of network disruptions.

Connection Lifecycle

stateDiagram-v2
    [*] --> Disconnected
    Disconnected --> Connecting: Connect Command
    Connecting --> Connected: Success
    Connecting --> Disconnected: Failure
    Connected --> Disconnected: Disconnect Command
    Connected --> Disconnected: Connection Lost
    Disconnected --> Connecting: Auto Reconnect
    Connected --> Connected: Keep-Alive

Basic Connection

Simple Connection

val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-app-${userId}",
    username = "mqtt_user",
    password = "secure_password"
)

val connectCommand = ConnectCommand(
    connectionOptions = connectionOptions,
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 3,
        baseDelayMillis = 2000
    )
)

pulseMqttKit.submitCommand(connectCommand)

Monitor Connection State

class ConnectionMonitor : MqttUpdatesListener {
    override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
        if (reconnect) {
            logger.info("Reconnected to $serverUri")
            showReconnectionNotification()
        } else {
            logger.info("Initial connection to $serverUri successful")
            onFirstConnection()
        }
    }

    override fun onMqttConnectionLost(cause: Throwable?) {
        logger.warning("Connection lost: ${cause?.message}")
        showOfflineIndicator()
        pauseOperations()
    }

    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        if (command is ConnectCommand) {
            hideOfflineIndicator()
            resumeOperations()
        }
    }
}

pulseMqttKit.addListener(ConnectionMonitor())

Connection Options

Server URI Formats

// TCP (unencrypted)
serverUri = "tcp://broker.example.com:1883"


// WebSocket
serverUri = "ws://broker.example.com:9001"

// WebSocket Secure
serverUri = "wss://broker.example.com:9443"

Client ID

Unique identifier for the MQTT client:

// Static ID
clientId = "mobile-app-production"

// Dynamic ID with user context
clientId = "rider-app-${userId}-${deviceId}"

// UUID-based
clientId = "app-${UUID.randomUUID()}"

Best Practices: - Keep client IDs under 23 characters for MQTT 3.1 compatibility - Make IDs unique across all clients - Include context (app, user, device) for debugging - Avoid special characters

Authentication

val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",

    // Username/password authentication
    username = "mqtt_user",
    password = "secure_password"
)

Keep-Alive

Maintains connection by periodic pinging:

val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",

    // Send keep-alive ping every 30 seconds
    keepAliveIntervalSeconds = 30
)

Recommendations: - Mobile: 30-60 seconds (balance between responsiveness and battery) - IoT devices: 60-120 seconds (conserve power) - Always-on services: 15-30 seconds (quick failure detection)

Connection Timeout

Maximum time to wait for connection establishment:

val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",

    // Wait up to 30 seconds for connection
    connectionTimeoutSeconds = 30
)

Session Management

Clean Session

Controls session persistence across connections:

// Clean session = false: Persistent session
val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",
    cleanSession = false  // Remember subscriptions and queued messages
)

Clean Session = false (Persistent): - ✅ Subscriptions preserved across disconnections - ✅ Queued QoS 1 and 2 messages delivered after reconnect - ✅ Useful for mobile apps with intermittent connectivity - ❌ Requires stable client ID - ❌ More broker resources

Clean Session = true (Temporary): - ✅ Fresh start on each connection - ✅ Lower broker resource usage - ✅ Good for testing and development - ❌ Subscriptions lost on disconnect - ❌ Queued messages discarded

Session Expiry

MQTT 5.0 feature for fine-grained session control:

// Session expires after 1 hour of disconnection
sessionExpiryInterval = 3600

Automatic Reconnection

Built-in Automatic Reconnection

Eclipse Paho provides automatic reconnection:

val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",
    automaticReconnect = true  // Enable auto-reconnect
)

How it works: 1. Connection lost detected 2. onMqttConnectionLost callback invoked 3. Automatic reconnection attempts begin 4. Exponential backoff between attempts 5. onMqttConnectComplete(reconnect=true) on success

Manual Reconnection

For more control, handle reconnection manually:

class ManualReconnectHandler : MqttUpdatesListener {
    private var reconnectJob: Job? = null

    override fun onMqttConnectionLost(cause: Throwable?) {
        logger.warning("Connection lost, scheduling reconnect")
        scheduleReconnect()
    }

    private fun scheduleReconnect() {
        reconnectJob?.cancel()
        reconnectJob = coroutineScope.launch {
            var attempt = 0
            while (attempt < 5) {
                delay(getBackoffDelay(attempt))

                val result = tryReconnect()
                if (result) {
                    logger.info("Reconnection successful")
                    return@launch
                }

                attempt++
            }
            logger.error("Reconnection failed after $attempt attempts")
        }
    }

    private fun getBackoffDelay(attempt: Int): Long {
        return min(1000L * (2.0.pow(attempt)).toLong(), 30000L)
    }

    private suspend fun tryReconnect(): Boolean {
        return try {
            pulseMqttKit.submitCommand(
                ConnectCommand(
                    connectionOptions = savedConnectionOptions,
                    retryPolicy = RetryPolicy.none()
                )
            )
            true
        } catch (e: Exception) {
            logger.error("Reconnect attempt failed", e)
            false
        }
    }
}

Last Will and Testament (LWT)

Configure a message to be published when client disconnects unexpectedly:

val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "rider-app-${riderId}",

    // LWT configuration
    willTopic = "riders/${riderId}/status",
    willMessage = """{"status": "offline", "timestamp": ${System.currentTimeMillis()}}""",
    willQos = 1,
    willRetain = true
)

Use cases: - Presence detection (online/offline status) - Device health monitoring - Emergency notifications - Session cleanup triggers

Example: Presence System

// Connect with online LWT
val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "user-${userId}",
    willTopic = "users/${userId}/presence",
    willMessage = """{"status": "offline"}""",
    willQos = 1,
    willRetain = true
)

// After connection, publish online status
val onlineMessage = ZMqttMessage(
    topic = "users/${userId}/presence",
    payload = """{"status": "online"}""",
    retained = true
)

pulseMqttKit.submitCommand(
    PublishCommand(
        message = onlineMessage,
        qos = QOSLevel.QOS_1
    )
)

Graceful Disconnection

Normal Disconnect

val disconnectCommand = DisconnectCommand(
    quiesceTimeout = 5000L,  // Wait 5 seconds for pending operations
    retryPolicy = RetryPolicy.sequential(
        maxRetries = 2,
        delayMillis = 1000
    )
)

pulseMqttKit.submitCommand(disconnectCommand)

Shutdown Sequence

Complete cleanup when app closes:

class MqttLifecycleManager(private val pulseMqttKit: PulseMqttKit) {

    fun onAppPause() {
        // Optional: Disconnect on pause
        pulseMqttKit.submitCommand(DisconnectCommand(quiesceTimeout = 3000L))
    }

    fun onAppStop() {
        // Stop health monitoring
        pulseMqttKit.stopHealthMonitoring()
    }

    fun onAppDestroy() {
        // Complete shutdown
        pulseMqttKit.shutDown {
            logger.info("MQTT client shutdown complete")
        }
    }
}

Connection Monitoring

Health Checks

Monitor connection health programmatically:

class ConnectionHealthMonitor {
    private var lastMessageTime = System.currentTimeMillis()

    fun startMonitoring() {
        scope.launch {
            while (isActive) {
                delay(10_000)  // Check every 10 seconds
                checkConnectionHealth()
            }
        }
    }

    private fun checkConnectionHealth() {
        val isConnected = pulseMqttKit.isConnected()
        val timeSinceLastMessage = System.currentTimeMillis() - lastMessageTime

        when {
            isConnected == false -> {
                logger.warning("Not connected to broker")
                showDisconnectedUI()
            }
            timeSinceLastMessage > 60_000 -> {
                logger.warning("No messages received for 60 seconds")
                // Possible connection issue even though client reports connected
                verifyConnection()
            }
            else -> {
                showConnectedUI()
            }
        }
    }

    private fun verifyConnection() {
        // Publish to a test topic to verify connectivity
        val testMessage = ZMqttMessage(
            topic = "system/ping",
            payload = """{"client_id": "${clientId}", "timestamp": ${System.currentTimeMillis()}}"""
        )
        pulseMqttKit.submitCommand(PublishCommand(testMessage, QOSLevel.QOS_0))
    }
}

Connection Metrics

Track connection statistics:

class ConnectionMetrics : MqttUpdatesListener {
    private var connectionCount = 0
    private var disconnectionCount = 0
    private var lastConnectTime = 0L
    private var totalUptime = 0L

    override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
        connectionCount++
        lastConnectTime = System.currentTimeMillis()

        metrics.record("mqtt.connection.established", mapOf(
            "reconnect" to reconnect,
            "server" to serverUri
        ))
    }

    override fun onMqttConnectionLost(cause: Throwable?) {
        disconnectionCount++
        val uptime = System.currentTimeMillis() - lastConnectTime
        totalUptime += uptime

        metrics.record("mqtt.connection.lost", mapOf(
            "uptime_ms" to uptime,
            "cause" to (cause?.javaClass?.simpleName ?: "unknown")
        ))
    }

    fun getConnectionStats(): ConnectionStats {
        return ConnectionStats(
            connections = connectionCount,
            disconnections = disconnectionCount,
            averageUptime = if (disconnectionCount > 0) 
                totalUptime / disconnectionCount else 0,
            isCurrentlyConnected = pulseMqttKit.isConnected() ?: false
        )
    }
}

Best Practices

1. Always Use Retry Policies

// ✅ Good - Handles transient failures
ConnectCommand(
    connectionOptions = options,
    retryPolicy = RetryPolicy.exponential(maxRetries = 3, baseDelayMillis = 2000)
)

// ❌ Avoid - Single attempt only
ConnectCommand(
    connectionOptions = options,
    retryPolicy = RetryPolicy.none()
)

2. Monitor Connection Events

// ✅ Good - React to connection changes
class ConnectionAwareListener : MqttUpdatesListener {
    override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
        updateUI(connected = true)
    }

    override fun onMqttConnectionLost(cause: Throwable?) {
        updateUI(connected = false)
    }
}

3. Use Clean Session Appropriately

// ✅ Mobile app - persistent session
cleanSession = false

// ✅ Testing/debugging - clean session
cleanSession = true

// ✅ Stateless service - clean session
cleanSession = true

4. Configure Appropriate Timeouts

// ✅ Good - Reasonable timeouts
ConnectionOptions(
    connectionTimeoutSeconds = 30,
    keepAliveIntervalSeconds = 60
)

// ❌ Avoid - Too short (false failures)
connectionTimeoutSeconds = 5

// ❌ Avoid - Too long (slow failure detection)
keepAliveIntervalSeconds = 300

5. Handle Disconnection Gracefully

// ✅ Good - Clean shutdown
pulseMqttKit.shutDown {
    releaseResources()
    logger.info("Clean shutdown completed")
}

Troubleshooting

Problem: Frequent disconnections

Possible causes: - Network instability - Keep-alive too short - Broker resource constraints

Solutions:

// Increase keep-alive
keepAliveIntervalSeconds = 90

// Enable automatic reconnect
automaticReconnect = true

// Use health monitoring
pulseMqttKit.startHealthMonitoring()

Problem: Cannot reconnect after network change

Solution: Use network monitoring:

class NetworkAwareReconnect : MqttUpdatesListener {
    override fun onInternetConnectionStatusChanged(isConnected: Boolean) {
        if (isConnected && pulseMqttKit.isConnected() == false) {
            // Network restored, trigger reconnect
            pulseMqttKit.submitCommand(ConnectCommand(connectionOptions))
        }
    }
}

Problem: Session not persisting

Check: - cleanSession = false - Same client ID used - Broker supports session persistence

Problem: Connection timeout

Solutions:

// Increase timeout
connectionTimeoutSeconds = 60

// Check network connectivity
// Verify broker is reachable
// Confirm firewall rules allow connection

Next Steps