Skip to content

Health Monitoring

Health monitoring provides automatic detection and recovery from MQTT connection failures through periodic health checks. It ensures your app maintains a reliable connection even in the face of silent failures or network issues.

Overview

MQTT connections can fail silently without triggering disconnection callbacks: - Background network switches (WiFi to cellular) - Broker maintenance or restarts - Firewall timeouts - NAT gateway session expiry

Health monitoring detects these issues and triggers automatic reconnection.

How It Works

sequenceDiagram
    participant App
    participant HealthMonitor
    participant PulseMQTT
    participant Broker

    App->>HealthMonitor: Start monitoring (every 30s)
    loop Health Check Cycle
        HealthMonitor->>PulseMQTT: Check isConnected()
        alt Connected
            PulseMQTT-->>HealthMonitor: true
            Note over HealthMonitor: Connection healthy
        else Not Connected
            PulseMQTT-->>HealthMonitor: false
            HealthMonitor->>PulseMQTT: Trigger reconnection
            PulseMQTT->>Broker: CONNECT
            Broker-->>PulseMQTT: CONNACK
        end
        HealthMonitor->>HealthMonitor: Wait 30s
    end

Configuration

Basic Setup

Enable health monitoring with default settings (30-second intervals, WorkManager):

class MyBridge : PulseMqttKitBridge {
    override fun getHealthMonitoringConfig(): HealthMonitoringConfig {
        return HealthMonitoringConfig(
            monitoringFreqSeconds = 30,
            type = HealthMonitoringType.WORK_MANAGER
        )
    }

    // Other bridge methods...
}

// Start monitoring
pulseMqttKit.startHealthMonitoring()

Custom Configuration

Customize monitoring frequency and strategy:

override fun getHealthMonitoringConfig(): HealthMonitoringConfig {
    return HealthMonitoringConfig(
        monitoringFreqSeconds = 60,  // Check every 60 seconds
        type = HealthMonitoringType.ALARM_MANAGER  // Use AlarmManager instead
    )
}

Custom Health Check Logic

Provide custom health verification logic:

override fun getHealthMonitoringConfig(): HealthMonitoringConfig {
    val customHealthCheck: () -> Unit = {
        // Custom logic to verify connection health
        val isHealthy = verifyConnectionHealth()

        if (!isHealthy) {
            logger.warning("Connection unhealthy, triggering recovery")
            triggerConnectionRecovery()
        } else {
            logger.debug("Connection healthy")
        }
    }

    return HealthMonitoringConfig(
        monitoringFreqSeconds = 45,
        type = HealthMonitoringType.WORK_MANAGER,
        healthCheck = customHealthCheck
    )
}

Monitoring Strategies

WorkManager (Default)

Uses Android's WorkManager for background execution:

HealthMonitoringConfig(
    monitoringFreqSeconds = 30,
    type = HealthMonitoringType.WORK_MANAGER
)

Advantages: - ✅ Battery efficient - ✅ Respects Doze mode - ✅ Survives app restarts - ✅ Works with JobScheduler API 23+

Best for: - Production apps - Battery-conscious applications - Long-running background monitoring

Requirements: - WorkManager dependency in build.gradle - Worker registered in AndroidManifest.xml

AlarmManager

Uses Android's AlarmManager for precise scheduling:

HealthMonitoringConfig(
    monitoringFreqSeconds = 30,
    type = HealthMonitoringType.ALARM_MANAGER
)

Advantages: - ✅ More precise timing - ✅ Works on all Android versions - ✅ Simpler configuration

Considerations: - ⚠️ Higher battery usage - ⚠️ May be delayed in Doze mode - ⚠️ Requires BroadcastReceiver

Best for: - Time-sensitive monitoring - Devices that don't support WorkManager - Testing and development

Requirements: - BroadcastReceiver registered in AndroidManifest.xml - RECEIVE_BOOT_COMPLETED permission for persistence

Lifecycle Management

Start Monitoring

// Start health monitoring
pulseMqttKit.startHealthMonitoring()

When to start: - After successful initial connection - On app launch (if should auto-connect) - When app enters foreground

Stop Monitoring

// Stop health monitoring
pulseMqttKit.stopHealthMonitoring()

When to stop: - Before app shutdown - When user explicitly disconnects - During planned maintenance windows

Complete Example

class MqttLifecycleManager(
    private val pulseMqttKit: PulseMqttKit
) : DefaultLifecycleObserver {

    override fun onCreate(owner: LifecycleOwner) {
        // Connect and start monitoring
        pulseMqttKit.submitCommand(ConnectCommand(connectionOptions))
    }

    override fun onStart(owner: LifecycleOwner) {
        // Start health monitoring when app is visible
        pulseMqttKit.startHealthMonitoring()
    }

    override fun onStop(owner: LifecycleOwner) {
        // Stop monitoring when app is in background
        pulseMqttKit.stopHealthMonitoring()
    }

    override fun onDestroy(owner: LifecycleOwner) {
        // Complete shutdown
        pulseMqttKit.shutDown()
    }
}

// Register with lifecycle
lifecycle.addObserver(MqttLifecycleManager(pulseMqttKit))

Advanced Patterns

Adaptive Monitoring Frequency

Adjust frequency based on app state:

class AdaptiveHealthMonitoring(
    private val pulseMqttKit: PulseMqttKit
) {

    fun onAppForeground() {
        pulseMqttKit.stopHealthMonitoring()

        // More frequent checks when app is active
        updateMonitoringConfig(frequencySeconds = 15)
        pulseMqttKit.startHealthMonitoring()
    }

    fun onAppBackground() {
        pulseMqttKit.stopHealthMonitoring()

        // Less frequent checks to save battery
        updateMonitoringConfig(frequencySeconds = 120)
        pulseMqttKit.startHealthMonitoring()
    }

    private fun updateMonitoringConfig(frequencySeconds: Long) {
        // Update bridge to return new config
        // This would require reinitializing with new bridge
    }
}

Network-Aware Monitoring

Adjust based on network conditions:

class NetworkAwareMonitoring : MqttUpdatesListener {

    override fun onInternetConnectionStatusChanged(isConnected: Boolean) {
        if (isConnected) {
            // Network restored - immediate health check
            performImmediateHealthCheck()
        } else {
            // Network lost - stop monitoring temporarily
            pulseMqttKit.stopHealthMonitoring()
        }
    }

    private fun performImmediateHealthCheck() {
        if (pulseMqttKit.isConnected() == false) {
            logger.info("Network restored but MQTT disconnected, reconnecting")
            pulseMqttKit.submitCommand(ConnectCommand(connectionOptions))
        }

        // Resume normal monitoring
        pulseMqttKit.startHealthMonitoring()
    }
}

Custom Health Verification

Implement sophisticated health checks:

class AdvancedHealthCheck(
    private val pulseMqttKit: PulseMqttKit,
    private val coroutineScope: CoroutineScope
) {
    private var lastMessageTime = System.currentTimeMillis()
    private val healthCheckTopic = "system/health/${clientId}"

    fun createHealthCheckFunction(): () -> Unit = {
        coroutineScope.launch {
            verifyConnectionHealth()
        }
    }

    private suspend fun verifyConnectionHealth() {
        val now = System.currentTimeMillis()
        val isConnected = pulseMqttKit.isConnected() == true
        val timeSinceLastMessage = now - lastMessageTime

        when {
            !isConnected -> {
                logger.warning("Health check: Not connected")
                attemptReconnection()
            }

            timeSinceLastMessage > 120_000 -> {
                logger.warning("Health check: No messages for 2 minutes, verifying")
                sendPingMessage()
            }

            else -> {
                logger.debug("Health check: Connection healthy")
            }
        }
    }

    private fun sendPingMessage() {
        val pingMessage = ZMqttMessage(
            topic = healthCheckTopic,
            payload = """{"type":"ping","timestamp":${System.currentTimeMillis()}}"""
        )

        pulseMqttKit.submitCommand(
            PublishCommand(
                message = pingMessage,
                qos = QOSLevel.QOS_0
            )
        )
    }

    private suspend fun attemptReconnection() {
        pulseMqttKit.submitCommand(
            ConnectCommand(
                connectionOptions = savedConnectionOptions,
                retryPolicy = RetryPolicy.exponential(
                    maxRetries = 3,
                    baseDelayMillis = 2000
                )
            )
        )
    }

    fun onMessageReceived() {
        lastMessageTime = System.currentTimeMillis()
    }
}

Monitoring Health Checks

Log Health Check Activity

class HealthCheckLogger : MqttUpdatesListener {

    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        if (command is ConnectCommand && result.attempt > 1) {
            logger.info("""
                Health check triggered reconnection succeeded
                Attempts: ${result.attempt}
                Duration: ${result.totalTimeMillis}ms
            """.trimIndent())

            // Record metric
            analytics.recordHealthCheckSuccess(
                attempts = result.attempt,
                duration = result.totalTimeMillis
            )
        }
    }

    override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
        if (command is ConnectCommand) {
            logger.error("""
                Health check triggered reconnection failed
                Attempts: ${result.attempt}
                Error: ${result.error.message}
            """.trimIndent(), result.error)

            // Send alert for persistent failures
            if (result.attempt >= 3) {
                alerting.sendAlert("MQTT health check failures", result.error)
            }
        }
    }
}

Track Health Metrics

class HealthMetrics {
    private var totalHealthChecks = 0
    private var failedHealthChecks = 0
    private var successfulRecoveries = 0

    fun recordHealthCheck(wasHealthy: Boolean) {
        totalHealthChecks++
        if (!wasHealthy) {
            failedHealthChecks++
        }
    }

    fun recordRecovery(successful: Boolean) {
        if (successful) {
            successfulRecoveries++
        }
    }

    fun getHealthScore(): Double {
        if (totalHealthChecks == 0) return 1.0
        return 1.0 - (failedHealthChecks.toDouble() / totalHealthChecks)
    }

    fun getStats(): HealthStats {
        return HealthStats(
            totalChecks = totalHealthChecks,
            failedChecks = failedHealthChecks,
            recoveries = successfulRecoveries,
            healthScore = getHealthScore()
        )
    }
}

Best Practices

1. Choose Appropriate Frequency

// ✅ Good - Balanced monitoring
monitoringFreqSeconds = 30  // Mobile apps
monitoringFreqSeconds = 60  // Background services

// ⚠️ Caution - Too frequent (battery drain)
monitoringFreqSeconds = 10

// ❌ Avoid - Too infrequent (slow failure detection)
monitoringFreqSeconds = 300

2. Use WorkManager for Production

// ✅ Good - Battery efficient, production-ready
HealthMonitoringType.WORK_MANAGER

// ⚠️ Use sparingly - Higher battery usage
HealthMonitoringType.ALARM_MANAGER

3. Stop Monitoring When Not Needed

// ✅ Good - Stop when disconnecting intentionally
pulseMqttKit.submitCommand(DisconnectCommand())
pulseMqttKit.stopHealthMonitoring()

4. Handle Lifecycle Properly

// ✅ Good - Tie monitoring to app lifecycle
override fun onStart() {
    pulseMqttKit.startHealthMonitoring()
}

override fun onStop() {
    pulseMqttKit.stopHealthMonitoring()
}

5. Monitor Health Check Results

// ✅ Good - Track and alert on persistent issues
class HealthMonitor : MqttUpdatesListener {
    private var consecutiveFailures = 0

    override fun onMqttConnectionLost(cause: Throwable?) {
        consecutiveFailures++
        if (consecutiveFailures >= 3) {
            notifyPersistentConnectionIssues()
        }
    }

    override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
        consecutiveFailures = 0
    }
}

Required Permissions

WorkManager Strategy

No additional permissions required beyond MQTT permissions.

AlarmManager Strategy

Add to AndroidManifest.xml:

<!-- For alarm-based health monitoring -->
<uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />

<application>
    <!-- Register broadcast receiver -->
    <receiver
        android:name="com.zomato.pulsemqtt.health.HealthCheckBroadcastReceiver"
        android:exported="false">
        <intent-filter>
            <action android:name="android.intent.action.BOOT_COMPLETED" />
        </intent-filter>
    </receiver>
</application>

Troubleshooting

Problem: Health monitoring not working

Check: 1. Health monitoring is started 2. WorkManager dependency is included 3. Required permissions are granted

// Verify monitoring is active
if (pulseMqttKit.isConnected() == true) {
    pulseMqttKit.startHealthMonitoring()
    logger.info("Health monitoring started")
}

Problem: Excessive battery drain

Solutions: 1. Increase monitoring frequency 2. Use WorkManager instead of AlarmManager 3. Stop monitoring in background

// Reduce frequency
monitoringFreqSeconds = 120  // 2 minutes instead of 30 seconds

// Or stop in background
override fun onStop() {
    pulseMqttKit.stopHealthMonitoring()
}

Problem: Health checks not detecting failures

Solution: Implement custom health verification:

val customCheck: () -> Unit = {
    val isConnected = pulseMqttKit.isConnected()
    val canPublish = tryPublishTestMessage()

    if (isConnected == true && !canPublish) {
        logger.warning("Connected but cannot publish, forcing reconnect")
        forceReconnection()
    }
}

Problem: Health monitoring continues after disconnect

Solution: Always stop monitoring before disconnect:

fun disconnect() {
    pulseMqttKit.stopHealthMonitoring()
    pulseMqttKit.submitCommand(DisconnectCommand())
}

Performance Impact

Battery Consumption

Frequency Strategy Battery Impact
30s WorkManager Low
60s WorkManager Very Low
30s AlarmManager Medium
15s AlarmManager High

Memory Usage

Health monitoring adds minimal memory overhead: - WorkManager: ~50KB - AlarmManager: ~20KB

CPU Usage

Each health check: - Duration: <10ms - CPU: Negligible - Network: None (just checks internal state)

Next Steps