Skip to content

Library Lifecycle

Understand the complete lifecycle of Pulse MQTT from initialization to shutdown, ensuring proper resource management and reliable operation.

Overview

The Pulse MQTT library has a well-defined lifecycle that includes:

  • Initialization: Setting up the library with dependencies
  • Connection: Establishing MQTT broker connection
  • Operation: Submitting commands and handling events
  • Monitoring: Health and network monitoring
  • Shutdown: Graceful cleanup of resources

Lifecycle Stages

stateDiagram-v2
    [*] --> Uninitialized
    Uninitialized --> Initialized: initialize()
    Initialized --> Connecting: submitCommand(ConnectCommand)
    Connecting --> Connected: Connection Success
    Connecting --> Disconnected: Connection Failure
    Connected --> Disconnected: Connection Lost / Disconnect
    Disconnected --> Connecting: Reconnect
    Connected --> Operating: Commands & Messages
    Operating --> Connected: Continuous Operation
    Initialized --> ShutDown: shutDown()
    Connected --> ShutDown: shutDown()
    Disconnected --> ShutDown: shutDown()
    ShutDown --> [*]

Initialization Phase

1. Basic Initialization

class MyApplication : Application() {

    lateinit var pulseMqttKit: PulseMqttKit
        private set

    override fun onCreate() {
        super.onCreate()

        // Create instance
        pulseMqttKit = PulseMqttKit()

        // Initialize with bridge
        val bridge = MyMqttBridge(this)
        pulseMqttKit.initialize(this, bridge)

        // Add listeners
        pulseMqttKit.addListener(MyMqttListener())
    }
}

2. Initialization Requirements

Context: Android application context

pulseMqttKit.initialize(applicationContext, bridge)

Bridge Implementation: Provides dependencies

class MyMqttBridge(private val context: Context) : PulseMqttKitBridge {
    override fun getLogger(): Logger = MyLogger()
    override fun getAppGson(): Gson = GsonBuilder().create()
    override fun getCustomCoroutineScope(): CoroutineScope = MainScope()
    override fun getHealthMonitoringConfig(): HealthMonitoringConfig = 
        HealthMonitoringConfig(monitoringFreqSeconds = 30)
    override fun getNetworkConfig(): NetworkMonitoringConfig = 
        NetworkMonitoringConfig(enabled = true)
}

Event Listeners: Register before connecting

pulseMqttKit.addListener(object : MqttUpdatesListener {
    // Implement callback methods
})

Connection Phase

1. Establishing Connection

val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "unique-client-id",
    username = "user",
    password = "pass",
    cleanSession = false,
    automaticReconnect = true
)

val connectCommand = ConnectCommand(
    connectionOptions = connectionOptions,
    retryPolicy = RetryPolicy.exponential(maxRetries = 5)
)

pulseMqttKit.submitCommand(connectCommand)

2. Connection Lifecycle Events

override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
    if (reconnect) {
        Log.i(TAG, "Reconnected to broker")
        // Re-subscribe or restore state if needed
    } else {
        Log.i(TAG, "Initial connection established")
        // Perform initial setup
    }
}

override fun onMqttConnectionLost(cause: Throwable?) {
    Log.w(TAG, "Connection lost: ${cause?.message}")
    // Handle disconnection
}

3. Connection States

State Description Next Actions
Disconnected No active connection Submit ConnectCommand
Connecting Connection in progress Wait for result
Connected Active connection Submit other commands
Reconnecting Attempting reconnection Wait for recovery

Operation Phase

1. Command Submission

// Connected state - ready for operations
if (isConnected()) {
    // Subscribe to topics
    pulseMqttKit.submitCommand(SubscribeCommand(topicConfigs))

    // Publish messages
    pulseMqttKit.submitCommand(PublishCommand(message, qos))

    // Unsubscribe from topics
    pulseMqttKit.submitCommand(UnsubscribeCommand(topics))
}

2. Message Handling

override fun onMqttMessageReceived(
    topic: String?,
    payload: String?,
    topicMessage: TopicMessage<*>?
) {
    // Process incoming messages
    when (topicMessage) {
        is TopicMessage.Deserialized<*> -> handleTypedMessage(topicMessage.data)
        is TopicMessage.Plain -> handlePlainMessage(payload)
        is TopicMessage.Error -> handleError(topicMessage.error)
    }
}

3. Continuous Operation Pattern

class MqttService : Service() {

    private lateinit var mqttKit: PulseMqttKit

    override fun onCreate() {
        super.onCreate()
        mqttKit = (application as MyApp).pulseMqttKit

        // Connect when service starts
        connectToMqtt()

        // Start monitoring
        mqttKit.startHealthMonitoring()
    }

    override fun onDestroy() {
        // Stop monitoring
        mqttKit.stopHealthMonitoring()

        // Don't disconnect - keep connection alive
        super.onDestroy()
    }
}

Monitoring Phase

1. Health Monitoring

// Start health monitoring
pulseMqttKit.startHealthMonitoring()

// Monitoring runs continuously checking connection health
// Automatically reconnects if connection is lost

2. Network Monitoring

override fun onInternetConnectionStatusChanged(isConnected: Boolean) {
    if (isConnected) {
        Log.i(TAG, "Network available")
        // Network restored - library will auto-reconnect
    } else {
        Log.w(TAG, "Network unavailable")
        // Pause operations that require connectivity
    }
}

3. Monitoring Lifecycle

class MainActivity : AppCompatActivity() {

    private lateinit var mqttKit: PulseMqttKit

    override fun onStart() {
        super.onStart()
        // Start monitoring when app is visible
        mqttKit.startHealthMonitoring()
    }

    override fun onStop() {
        super.onStop()
        // Stop monitoring when app goes to background
        mqttKit.stopHealthMonitoring()
    }
}

Disconnection Phase

1. Graceful Disconnection

// Graceful disconnect with quiesce timeout
val disconnectCommand = DisconnectCommand(
    quiesceTimeout = 5000L // Allow 5 seconds for pending operations
)

pulseMqttKit.submitCommand(disconnectCommand)

2. Handling Disconnection

override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
    when (command) {
        is DisconnectCommand -> {
            Log.i(TAG, "Successfully disconnected")
            // Perform cleanup if needed
        }
    }
}

3. Connection Loss vs Disconnect

// Intentional disconnect - stop reconnection attempts
fun intentionalDisconnect() {
    mqttKit.stopHealthMonitoring() // Stop health checks
    mqttKit.submitCommand(DisconnectCommand())
}

// Connection loss - automatic recovery
override fun onMqttConnectionLost(cause: Throwable?) {
    // If automaticReconnect = true, library handles this
    // Just update UI or log the event
    updateConnectionStatus(false)
}

Shutdown Phase

1. Graceful Shutdown

class MyApplication : Application() {

    override fun onTerminate() {
        super.onTerminate()

        // Shutdown MQTT - cleans up all resources
        pulseMqttKit.shutDown {
            Log.i(TAG, "MQTT shutdown complete")
        }
    }
}

2. Shutdown Process

// What shutDown() does:
// 1. Stops health monitoring
// 2. Stops network monitoring
// 3. Disconnects from broker
// 4. Clears all listeners
// 5. Cancels all pending commands
// 6. Releases all resources
// 7. Invokes completion callback

pulseMqttKit.shutDown {
    // Called when shutdown is complete
    Log.i(TAG, "All resources released")
}

3. Shutdown Best Practices

// ✓ Good - Shutdown in Application termination
class MyApplication : Application() {
    override fun onLowMemory() {
        super.onLowMemory()
        // Consider shutting down MQTT to free memory
        mqttKit.shutDown {}
    }
}

// ✓ Good - Shutdown when user logs out
fun logout() {
    mqttKit.shutDown {
        clearUserData()
        navigateToLogin()
    }
}

Lifecycle Management Patterns

1. Application-Level Management

class MqttApplication : Application() {

    val mqttManager by lazy { MqttManager(this) }

    override fun onCreate() {
        super.onCreate()
        mqttManager.initialize()
    }

    override fun onTerminate() {
        super.onTerminate()
        mqttManager.shutdown()
    }
}

class MqttManager(private val context: Context) {

    private val mqttKit = PulseMqttKit()
    private var isInitialized = false

    fun initialize() {
        if (!isInitialized) {
            mqttKit.initialize(context, MyMqttBridge(context))
            mqttKit.addListener(MyMqttListener())
            isInitialized = true
        }
    }

    fun shutdown() {
        mqttKit.shutDown {
            isInitialized = false
        }
    }
}

2. Activity-Level Management

class MainActivity : AppCompatActivity() {

    private val mqttKit by lazy {
        (application as MqttApplication).mqttKit
    }

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        // Connect if not connected
        if (!isConnected()) {
            connectToMqtt()
        }
    }

    override fun onStart() {
        super.onStart()
        // Start monitoring when visible
        mqttKit.startHealthMonitoring()
    }

    override fun onStop() {
        super.onStop()
        // Stop monitoring when not visible
        mqttKit.stopHealthMonitoring()
    }
}

3. Service-Level Management

class MqttForegroundService : Service() {

    private lateinit var mqttKit: PulseMqttKit

    override fun onCreate() {
        super.onCreate()

        mqttKit = (application as MqttApplication).mqttKit

        // Start foreground service
        startForeground(NOTIFICATION_ID, createNotification())

        // Connect and monitor
        connectToMqtt()
        mqttKit.startHealthMonitoring()
    }

    override fun onDestroy() {
        // Stop monitoring but keep connection
        mqttKit.stopHealthMonitoring()

        super.onDestroy()
    }

    override fun onBind(intent: Intent?): IBinder? = null
}

4. ViewModel Integration

class MqttViewModel(application: Application) : AndroidViewModel(application) {

    private val mqttKit = (application as MqttApplication).mqttKit

    private val _connectionState = MutableLiveData<Boolean>()
    val connectionState: LiveData<Boolean> = _connectionState

    private val listener = object : MqttUpdatesListener {
        override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
            _connectionState.postValue(true)
        }

        override fun onMqttConnectionLost(cause: Throwable?) {
            _connectionState.postValue(false)
        }
    }

    init {
        mqttKit.addListener(listener)
    }

    override fun onCleared() {
        super.onCleared()
        mqttKit.removeListener(listener)
    }
}

Best Practices

  1. Initialize Once: Create PulseMqttKit in Application class
  2. Persistent Connection: Keep connection alive across app lifecycle
  3. Monitor Wisely: Start/stop monitoring based on visibility
  4. Graceful Shutdown: Always use shutDown() for cleanup
  5. State Tracking: Maintain connection state for UI updates
  6. Resource Awareness: Adapt to device constraints
  7. Error Handling: Handle all lifecycle transition errors
  8. Testing: Test all lifecycle scenarios thoroughly

Next Steps