Skip to content

Auto-Subscription

Auto-subscription automatically restores topic subscriptions after network disruptions or reconnections, eliminating the need to manually resubscribe and ensuring continuous message delivery.

Overview

Without auto-subscription, you must manually resubscribe after each reconnection:

// ❌ Without auto-subscription - manual resubscription required
override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
    if (reconnect) {
        // Must manually resubscribe to all topics
        pulseMqttKit.submitCommand(SubscribeCommand(topicConfigs))
    }
}

With auto-subscription enabled, the library handles this automatically:

// ✅ With auto-subscription - automatic resubscription
val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",
    autoSubscriptionConfig = AutoSubscriptionConfig(
        enabled = true
    )
)

How It Works

sequenceDiagram
    participant App
    participant PulseMQTT
    participant Broker

    App->>PulseMQTT: Subscribe to topics
    PulseMQTT->>PulseMQTT: Store subscription in memory
    PulseMQTT->>Broker: SUBSCRIBE
    Broker-->>PulseMQTT: SUBACK

    Note over PulseMQTT,Broker: Connection Lost
    Broker--xPulseMQTT: Connection dropped

    PulseMQTT->>Broker: Reconnect
    Broker-->>PulseMQTT: Connected

    Note over PulseMQTT: Auto-subscription triggered
    PulseMQTT->>Broker: SUBSCRIBE (stored topics)
    Broker-->>PulseMQTT: SUBACK
    PulseMQTT->>App: Messages resume

Process: 1. Library maintains subscription store in memory 2. On connection loss, subscriptions are preserved 3. On successful reconnection, library automatically resubscribes 4. No action required from application

Configuration

Basic Configuration

Enable with default settings:

val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",
    autoSubscriptionConfig = AutoSubscriptionConfig(
        enabled = true
    )
)

Advanced Configuration

Customize retry behavior for resubscription:

val autoSubConfig = AutoSubscriptionConfig(
    enabled = true,

    // Retry policy for resubscription attempts
    subscribeCommandRetryPolicy = RetryPolicy.exponential(
        maxRetries = 5,
        baseDelayMillis = 1000,
        maxDelayMillis = 30000
    ),

    // Timeout for each resubscription attempt
    subscribeCommandTimeout = 30_000L  // 30 seconds
)

val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",
    autoSubscriptionConfig = autoSubConfig
)

Pre-Populated Subscriptions

Provide initial subscriptions that activate on first connection:

// Define subscriptions upfront
val initialSubscriptions = hashMapOf(
    "orders/+/updates" to TopicTypeConfig(
        messageType = OrderUpdate::class.java,
        qosLevel = QOSLevel.QOS_1
    ),
    "rider/+/location" to TopicTypeConfig(
        messageType = RiderLocation::class.java,
        qosLevel = QOSLevel.QOS_0
    ),
    "notifications/+" to TopicTypeConfig(
        messageType = Notification::class.java,
        qosLevel = QOSLevel.QOS_1
    )
)

val autoSubConfig = AutoSubscriptionConfig(
    enabled = true,
    subscriptionStore = initialSubscriptions
)

// On first connection, these topics will be automatically subscribed
val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",
    autoSubscriptionConfig = autoSubConfig
)

Usage Patterns

Pattern 1: Progressive Subscription

Add subscriptions progressively as user navigates:

class OrderTrackingActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        // Subscribe to order-specific topic
        val orderId = intent.getStringExtra("ORDER_ID")
        pulseMqttKit.submitCommand(
            SubscribeCommand(
                topicConfigs = mapOf(
                    "orders/$orderId/updates" to TopicTypeConfig(
                        messageType = OrderUpdate::class.java,
                        qosLevel = QOSLevel.QOS_1
                    )
                )
            )
        )

        // Subscription automatically preserved across reconnections
    }

    override fun onDestroy() {
        super.onDestroy()

        // Optionally unsubscribe
        pulseMqttKit.submitCommand(
            UnsubscribeCommand(
                topics = mutableListOf("orders/$orderId/updates")
            )
        )
    }
}

Pattern 2: Persistent Subscriptions

Maintain subscriptions throughout app lifecycle:

class MqttInitializer(private val pulseMqttKit: PulseMqttKit) {

    fun initializeSubscriptions() {
        // Subscribe to persistent topics on app start
        val persistentTopics = mapOf(
            "app/config" to TopicTypeConfig(
                messageType = AppConfig::class.java,
                qosLevel = QOSLevel.QOS_1
            ),
            "system/announcements" to TopicTypeConfig(
                messageType = Announcement::class.java,
                qosLevel = QOSLevel.QOS_1
            )
        )

        pulseMqttKit.submitCommand(
            SubscribeCommand(topicConfigs = persistentTopics)
        )

        // These remain subscribed throughout app lifetime
        // Auto-subscription ensures they're restored after reconnections
    }
}

Pattern 3: User-Specific Subscriptions

Subscribe based on user identity:

class UserSessionManager(
    private val pulseMqttKit: PulseMqttKit,
    private val userId: String
) {

    fun onUserLoggedIn() {
        // Subscribe to user-specific topics
        val userTopics = mapOf(
            "users/$userId/notifications" to TopicTypeConfig(
                messageType = UserNotification::class.java,
                qosLevel = QOSLevel.QOS_1
            ),
            "users/$userId/messages" to TopicTypeConfig(
                messageType = ChatMessage::class.java,
                qosLevel = QOSLevel.QOS_1
            ),
            "users/$userId/orders" to TopicTypeConfig(
                messageType = OrderNotification::class.java,
                qosLevel = QOSLevel.QOS_1
            )
        )

        pulseMqttKit.submitCommand(
            SubscribeCommand(topicConfigs = userTopics)
        )

        // Auto-subscription maintains these subscriptions
        // even if network drops temporarily
    }

    fun onUserLoggedOut() {
        // Clean up user-specific subscriptions
        pulseMqttKit.submitCommand(
            UnsubscribeCommand(
                topics = mutableListOf(
                    "users/$userId/notifications",
                    "users/$userId/messages",
                    "users/$userId/orders"
                )
            )
        )
    }
}

Subscription Store Management

Query Active Subscriptions

class SubscriptionManager(private val pulseMqttKit: PulseMqttKit) {

    fun logActiveSubscriptions() {
        // Active subscriptions are tracked internally
        // Monitor through command results
        pulseMqttKit.addListener(object : MqttUpdatesListener {
            override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
                if (command is SubscribeCommand) {
                    logger.info("Active subscriptions updated: ${command.topicConfigs.keys}")
                }
            }
        })
    }
}

Modify Subscriptions

// Add new subscriptions
pulseMqttKit.submitCommand(
    SubscribeCommand(
        topicConfigs = mapOf(
            "new/topic" to TopicTypeConfig(
                messageType = NewMessage::class.java,
                qosLevel = QOSLevel.QOS_1
            )
        )
    )
)

// Remove specific subscriptions
pulseMqttKit.submitCommand(
    UnsubscribeCommand(
        topics = mutableListOf("old/topic")
    )
)

// Remove all subscriptions
pulseMqttKit.submitCommand(
    UnsubscribeCommand()  // Empty list = unsubscribe all
)

Monitoring Auto-Subscription

Track Resubscription Events

class AutoSubscriptionMonitor : MqttUpdatesListener {

    override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
        if (reconnect) {
            logger.info("Reconnected - auto-subscription will restore topics")
        }
    }

    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        if (command is SubscribeCommand && result.attempt > 1) {
            logger.info("""
                Auto-subscription succeeded after ${result.attempt} attempts
                Topics: ${command.topicConfigs.keys}
                Duration: ${result.totalTimeMillis}ms
            """.trimIndent())
        }
    }

    override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
        if (command is SubscribeCommand) {
            logger.error("""
                Auto-subscription failed after ${result.attempt} attempts
                Topics: ${command.topicConfigs.keys}
                Error: ${result.error.message}
            """.trimIndent())

            // Alert user or trigger manual recovery
            handleSubscriptionFailure(command.topicConfigs)
        }
    }
}

Best Practices

1. Enable Auto-Subscription for Mobile Apps

// ✅ Good - Handles intermittent mobile connectivity
AutoSubscriptionConfig(enabled = true)

2. Use Aggressive Retry Policies

// ✅ Good - Ensures subscriptions are restored
AutoSubscriptionConfig(
    enabled = true,
    subscribeCommandRetryPolicy = RetryPolicy.exponential(
        maxRetries = 5,
        baseDelayMillis = 1000,
        maxDelayMillis = 30000
    )
)

3. Pre-Populate Critical Subscriptions

// ✅ Good - Immediate subscription on first connect
val criticalTopics = hashMapOf(
    "emergency/alerts" to TopicTypeConfig(
        messageType = EmergencyAlert::class.java,
        qosLevel = QOSLevel.QOS_1
    )
)

AutoSubscriptionConfig(
    enabled = true,
    subscriptionStore = criticalTopics
)

4. Clean Up Unused Subscriptions

// ✅ Good - Unsubscribe when no longer needed
override fun onDestroy() {
    pulseMqttKit.submitCommand(
        UnsubscribeCommand(topics = mutableListOf("temp/topic"))
    )
    super.onDestroy()
}

5. Monitor Subscription Health

// ✅ Good - Track subscription success/failure
class SubscriptionHealthTracker : MqttUpdatesListener {
    private val subscriptionMetrics = mutableMapOf<String, Int>()

    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        if (command is SubscribeCommand) {
            command.topicConfigs.keys.forEach { topic ->
                subscriptionMetrics[topic] = (subscriptionMetrics[topic] ?: 0) + 1
            }
        }
    }
}

Comparison: With vs Without Auto-Subscription

Without Auto-Subscription

class ManualSubscriptionHandler : MqttUpdatesListener {
    private var subscriptions: Map<String, TopicTypeConfig<*>> = emptyMap()

    override fun onMqttConnectComplete(reconnect: Boolean, serverUri: String?) {
        if (reconnect) {
            // ❌ Must manually track and resubscribe
            pulseMqttKit.submitCommand(
                SubscribeCommand(topicConfigs = subscriptions)
            )
        }
    }

    override fun onMqttConnectionLost(cause: Throwable?) {
        // ❌ Must track subscriptions manually
        // ❌ Risk of losing subscription state
        // ❌ More code to maintain
    }

    fun subscribe(topics: Map<String, TopicTypeConfig<*>>) {
        // ❌ Must update internal state
        subscriptions = subscriptions + topics
        pulseMqttKit.submitCommand(SubscribeCommand(topicConfigs = topics))
    }
}

With Auto-Subscription

// ✅ Simple - library handles everything
val connectionOptions = ConnectionOptions(
    serverUri = "tcp://broker.example.com:1883",
    clientId = "my-client",
    autoSubscriptionConfig = AutoSubscriptionConfig(enabled = true)
)

// ✅ Just subscribe - no reconnection handling needed
pulseMqttKit.submitCommand(
    SubscribeCommand(topicConfigs = topics)
)

Troubleshooting

Problem: Subscriptions not restored after reconnection

Check: 1. Auto-subscription is enabled 2. Clean session is false 3. Connection was successful 4. No errors in subscription restoration

class DebugAutoSubscription : MqttUpdatesListener {
    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        if (command is SubscribeCommand) {
            logger.debug("Subscription restored: ${command.topicConfigs.keys}")
        }
    }
}

Problem: Subscription restoration fails

Solution: Check retry policy and logs:

AutoSubscriptionConfig(
    enabled = true,
    subscribeCommandRetryPolicy = RetryPolicy.exponential(
        maxRetries = 7,  // Increase retries
        baseDelayMillis = 2000,
        maxDelayMillis = 60000
    ),
    subscribeCommandTimeout = 60_000L  // Increase timeout
)

Problem: Too many subscriptions

Solution: Unsubscribe from unused topics:

// Regular cleanup
fun cleanupStaleSubscriptions() {
    val staleTopics = identifyStaleSubscriptions()
    if (staleTopics.isNotEmpty()) {
        pulseMqttKit.submitCommand(
            UnsubscribeCommand(topics = staleTopics.toMutableList())
        )
    }
}

Performance Considerations

Memory Usage

Each subscription consumes memory. Monitor and limit:

class SubscriptionLimiter {
    private val maxSubscriptions = 50
    private var currentCount = 0

    fun subscribe(topics: Map<String, TopicTypeConfig<*>>) {
        if (currentCount + topics.size > maxSubscriptions) {
            logger.warning("Subscription limit would be exceeded")
            // Clean up or reject
            return
        }

        pulseMqttKit.submitCommand(SubscribeCommand(topicConfigs = topics))
        currentCount += topics.size
    }
}

Reconnection Time

More subscriptions = longer restoration time:

// ✅ Good - Moderate number of subscriptions
subscriptions = 10-20 topics  // Fast restoration

// ⚠️ Caution - Many subscriptions
subscriptions = 50+ topics  // Slower restoration

// ❌ Avoid - Excessive subscriptions
subscriptions = 100+ topics  // Very slow, consider redesign

Next Steps