Library Lifecycle¶
Understand the complete lifecycle of Pulse MQTT from initialization to shutdown, ensuring proper resource management and reliable operation.
Overview¶
The Pulse MQTT library has a well-defined lifecycle that includes:
- Initialization: Setting up the library with dependencies
- Connection: Establishing MQTT broker connection
- Operation: Submitting commands and handling events
- Monitoring: Health and network monitoring
- Shutdown: Graceful cleanup of resources
Lifecycle Stages¶
stateDiagram-v2
[*] --> Uninitialized
Uninitialized --> Initialized: initialize()
Initialized --> Connecting: submitCommand(ConnectCommand)
Connecting --> Connected: Connection Success
Connecting --> Disconnected: Connection Failure
Connected --> Disconnected: Connection Lost / Disconnect
Disconnected --> Connecting: Reconnect
Connected --> Operating: Commands & Messages
Operating --> Connected: Continuous Operation
Initialized --> ShutDown: shutDown()
Connected --> ShutDown: shutDown()
Disconnected --> ShutDown: shutDown()
ShutDown --> [*]
Initialization Phase¶
1. Basic Initialization¶
class MyApplication : Application() {
lateinit var pulseMqttKit: PulseMqttKit
private set
override fun onCreate() {
super.onCreate()
// Create instance
pulseMqttKit = PulseMqttKit()
// Initialize with bridge
val bridge = MyMqttBridge(this)
pulseMqttKit.initialize(this, bridge)
// Add listeners
pulseMqttKit.addListener(MyMqttListener())
}
}
2. Initialization Requirements¶
Context: Android application context
Bridge Implementation: Provides dependencies
class MyMqttBridge(private val context: Context) : PulseMqttKitBridge {
override fun getLogger(): Logger = MyLogger()
override fun getAppGson(): Gson = GsonBuilder().create()
override fun getCustomCoroutineScope(): CoroutineScope = MainScope()
override fun getHealthMonitoringConfig(): HealthMonitoringConfig =
HealthMonitoringConfig(monitoringFreqSeconds = 30)
override fun getNetworkConfig(): NetworkMonitoringConfig =
NetworkMonitoringConfig(enabled = true)
}
Event Listeners: Register before connecting
Connection Phase¶
1. Establishing Connection¶
val connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "unique-client-id",
username = "user",
password = "pass",
cleanSession = false,
automaticReconnect = true
)
val connectCommand = ConnectCommand(
connectionOptions = connectionOptions,
retryPolicy = RetryPolicy.exponential(maxRetries = 5)
)
pulseMqttKit.submitCommand(connectCommand)
2. Connection Lifecycle Events¶
override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
if (reconnect) {
Log.i(TAG, "Reconnected to broker")
// Re-subscribe or restore state if needed
} else {
Log.i(TAG, "Initial connection established")
// Perform initial setup
}
}
override fun onMqttConnectionLost(cause: Throwable?) {
Log.w(TAG, "Connection lost: ${cause?.message}")
// Handle disconnection
}
3. Connection States¶
| State | Description | Next Actions |
|---|---|---|
| Disconnected | No active connection | Submit ConnectCommand |
| Connecting | Connection in progress | Wait for result |
| Connected | Active connection | Submit other commands |
| Reconnecting | Attempting reconnection | Wait for recovery |
Operation Phase¶
1. Command Submission¶
// Connected state - ready for operations
if (isConnected()) {
// Subscribe to topics
pulseMqttKit.submitCommand(SubscribeCommand(topicConfigs))
// Publish messages
pulseMqttKit.submitCommand(PublishCommand(message, qos))
// Unsubscribe from topics
pulseMqttKit.submitCommand(UnsubscribeCommand(topics))
}
2. Message Handling¶
override fun onMqttMessageReceived(
topic: String?,
payload: String?,
topicMessage: TopicMessage<*>?
) {
// Process incoming messages
when (topicMessage) {
is TopicMessage.Deserialized<*> -> handleTypedMessage(topicMessage.data)
is TopicMessage.Plain -> handlePlainMessage(payload)
is TopicMessage.Error -> handleError(topicMessage.error)
}
}
3. Continuous Operation Pattern¶
class MqttService : Service() {
private lateinit var mqttKit: PulseMqttKit
override fun onCreate() {
super.onCreate()
mqttKit = (application as MyApp).pulseMqttKit
// Connect when service starts
connectToMqtt()
// Start monitoring
mqttKit.startHealthMonitoring()
}
override fun onDestroy() {
// Stop monitoring
mqttKit.stopHealthMonitoring()
// Don't disconnect - keep connection alive
super.onDestroy()
}
}
Monitoring Phase¶
1. Health Monitoring¶
// Start health monitoring
pulseMqttKit.startHealthMonitoring()
// Monitoring runs continuously checking connection health
// Automatically reconnects if connection is lost
2. Network Monitoring¶
override fun onInternetConnectionStatusChanged(isConnected: Boolean) {
if (isConnected) {
Log.i(TAG, "Network available")
// Network restored - library will auto-reconnect
} else {
Log.w(TAG, "Network unavailable")
// Pause operations that require connectivity
}
}
3. Monitoring Lifecycle¶
class MainActivity : AppCompatActivity() {
private lateinit var mqttKit: PulseMqttKit
override fun onStart() {
super.onStart()
// Start monitoring when app is visible
mqttKit.startHealthMonitoring()
}
override fun onStop() {
super.onStop()
// Stop monitoring when app goes to background
mqttKit.stopHealthMonitoring()
}
}
Disconnection Phase¶
1. Graceful Disconnection¶
// Graceful disconnect with quiesce timeout
val disconnectCommand = DisconnectCommand(
quiesceTimeout = 5000L // Allow 5 seconds for pending operations
)
pulseMqttKit.submitCommand(disconnectCommand)
2. Handling Disconnection¶
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
when (command) {
is DisconnectCommand -> {
Log.i(TAG, "Successfully disconnected")
// Perform cleanup if needed
}
}
}
3. Connection Loss vs Disconnect¶
// Intentional disconnect - stop reconnection attempts
fun intentionalDisconnect() {
mqttKit.stopHealthMonitoring() // Stop health checks
mqttKit.submitCommand(DisconnectCommand())
}
// Connection loss - automatic recovery
override fun onMqttConnectionLost(cause: Throwable?) {
// If automaticReconnect = true, library handles this
// Just update UI or log the event
updateConnectionStatus(false)
}
Shutdown Phase¶
1. Graceful Shutdown¶
class MyApplication : Application() {
override fun onTerminate() {
super.onTerminate()
// Shutdown MQTT - cleans up all resources
pulseMqttKit.shutDown {
Log.i(TAG, "MQTT shutdown complete")
}
}
}
2. Shutdown Process¶
// What shutDown() does:
// 1. Stops health monitoring
// 2. Stops network monitoring
// 3. Disconnects from broker
// 4. Clears all listeners
// 5. Cancels all pending commands
// 6. Releases all resources
// 7. Invokes completion callback
pulseMqttKit.shutDown {
// Called when shutdown is complete
Log.i(TAG, "All resources released")
}
3. Shutdown Best Practices¶
// ✓ Good - Shutdown in Application termination
class MyApplication : Application() {
override fun onLowMemory() {
super.onLowMemory()
// Consider shutting down MQTT to free memory
mqttKit.shutDown {}
}
}
// ✓ Good - Shutdown when user logs out
fun logout() {
mqttKit.shutDown {
clearUserData()
navigateToLogin()
}
}
Lifecycle Management Patterns¶
1. Application-Level Management¶
class MqttApplication : Application() {
val mqttManager by lazy { MqttManager(this) }
override fun onCreate() {
super.onCreate()
mqttManager.initialize()
}
override fun onTerminate() {
super.onTerminate()
mqttManager.shutdown()
}
}
class MqttManager(private val context: Context) {
private val mqttKit = PulseMqttKit()
private var isInitialized = false
fun initialize() {
if (!isInitialized) {
mqttKit.initialize(context, MyMqttBridge(context))
mqttKit.addListener(MyMqttListener())
isInitialized = true
}
}
fun shutdown() {
mqttKit.shutDown {
isInitialized = false
}
}
}
2. Activity-Level Management¶
class MainActivity : AppCompatActivity() {
private val mqttKit by lazy {
(application as MqttApplication).mqttKit
}
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// Connect if not connected
if (!isConnected()) {
connectToMqtt()
}
}
override fun onStart() {
super.onStart()
// Start monitoring when visible
mqttKit.startHealthMonitoring()
}
override fun onStop() {
super.onStop()
// Stop monitoring when not visible
mqttKit.stopHealthMonitoring()
}
}
3. Service-Level Management¶
class MqttForegroundService : Service() {
private lateinit var mqttKit: PulseMqttKit
override fun onCreate() {
super.onCreate()
mqttKit = (application as MqttApplication).mqttKit
// Start foreground service
startForeground(NOTIFICATION_ID, createNotification())
// Connect and monitor
connectToMqtt()
mqttKit.startHealthMonitoring()
}
override fun onDestroy() {
// Stop monitoring but keep connection
mqttKit.stopHealthMonitoring()
super.onDestroy()
}
override fun onBind(intent: Intent?): IBinder? = null
}
4. ViewModel Integration¶
class MqttViewModel(application: Application) : AndroidViewModel(application) {
private val mqttKit = (application as MqttApplication).mqttKit
private val _connectionState = MutableLiveData<Boolean>()
val connectionState: LiveData<Boolean> = _connectionState
private val listener = object : MqttUpdatesListener {
override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
_connectionState.postValue(true)
}
override fun onMqttConnectionLost(cause: Throwable?) {
_connectionState.postValue(false)
}
}
init {
mqttKit.addListener(listener)
}
override fun onCleared() {
super.onCleared()
mqttKit.removeListener(listener)
}
}
Best Practices¶
- Initialize Once: Create PulseMqttKit in Application class
- Persistent Connection: Keep connection alive across app lifecycle
- Monitor Wisely: Start/stop monitoring based on visibility
- Graceful Shutdown: Always use shutDown() for cleanup
- State Tracking: Maintain connection state for UI updates
- Resource Awareness: Adapt to device constraints
- Error Handling: Handle all lifecycle transition errors
- Testing: Test all lifecycle scenarios thoroughly
Next Steps¶
- Understand Connection Management
- Configure Health Monitoring
- Review Command Architecture