Skip to content

Retry Policies

Pulse MQTT provides flexible retry mechanisms to handle transient network failures and connection issues. Retry policies automatically re-attempt failed operations with configurable delays and limits.

Overview

Network operations can fail for various reasons: - Temporary network unavailability - Broker overload or maintenance - Timeout due to slow connections - Transient authentication issues

Retry policies allow you to: - Automatically retry failed operations - Control retry behavior with different strategies - Exclude specific errors from retries - Limit retry attempts to prevent infinite loops

Policy Types

No Retry (None)

Default policy that doesn't retry failed operations.

val policy = RetryPolicy.none()

val command = ConnectCommand(
    connectionOptions = options,
    retryPolicy = policy
)

Use when: - Operation failures should be immediate - Manual retry handling is preferred - Fast-fail behavior is desired

Sequential Retry

Retries with a fixed delay between attempts.

val policy = RetryPolicy.sequential(
    maxRetries = 3,
    delayMillis = 2000 // 2 seconds between retries
)

Retry Schedule: - Attempt 1: Immediate - Attempt 2: After 2000ms - Attempt 3: After 2000ms - Attempt 4: After 2000ms

Use when: - Predictable retry intervals are needed - Error conditions are expected to resolve quickly - Testing and debugging (consistent timing)

Exponential Backoff

Retries with exponentially increasing delays.

val policy = RetryPolicy.exponential(
    maxRetries = 5,
    baseDelayMillis = 1000,
    maxDelayMillis = 30000
)

Retry Schedule: - Attempt 1: Immediate - Attempt 2: After 1000ms (1s) - Attempt 3: After 2000ms (2s) - Attempt 4: After 4000ms (4s) - Attempt 5: After 8000ms (8s) - Attempt 6: After 16000ms (16s, capped at maxDelay)

Use when: - Recovering from broker overload - Avoiding retry storms across multiple clients - Handling extended network outages - Production environments (recommended)

Jitter-Based Retry

Adds randomization to exponential backoff to prevent thundering herd.

val policy = RetryPolicy.jitter(
    maxRetries = 5,
    baseDelayMillis = 1000,
    maxDelayMillis = 30000,
    jitterPercentage = 0.25 // ±25% randomization
)

Retry Schedule (example with ±25% jitter): - Attempt 2: 750ms - 1250ms - Attempt 3: 1500ms - 2500ms - Attempt 4: 3000ms - 5000ms

Use when: - Multiple clients connect simultaneously - Preventing synchronized retry storms - Load distribution across time - Large-scale deployments

Exception Filtering

Exclude specific error codes from retry attempts:

val policy = RetryPolicy.exponential(
    maxRetries = 3,
    baseDelayMillis = 2000,
    excludedExceptionCodes = hashSetOf(
        MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
        MqttExceptionCode.REASON_CODE_CLIENT_ID_REJECTED,
        MqttExceptionCode.REASON_CODE_BAD_USERNAME_OR_PASSWORD
    )
)

Common Non-Retryable Errors: - Authentication failures (invalid credentials) - Authorization errors (insufficient permissions) - Invalid client ID - Malformed connection parameters - Protocol violations

Retryable Errors: - Network timeouts - Connection refused (temporary) - Broker unavailable - Server overload

Usage Examples

Connection with Exponential Backoff

val connectCommand = ConnectCommand(
    connectionOptions = ConnectionOptions(
        serverUri = "tcp://broker.example.com:1883",
        clientId = "mobile-app-${userId}",
        username = "mqtt_user",
        password = "secure_password"
    ),
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 5,
        baseDelayMillis = 1000,
        maxDelayMillis = 30000,
        excludedExceptionCodes = hashSetOf(
            MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
            MqttExceptionCode.REASON_CODE_BAD_USERNAME_OR_PASSWORD
        )
    )
)

pulseMqttKit.submitCommand(connectCommand)

Subscribe with Sequential Retry

val subscribeCommand = SubscribeCommand(
    topicConfigs = mapOf(
        "orders/+/updates" to TopicTypeConfig(
            messageType = OrderUpdate::class.java,
            qosLevel = QOSLevel.QOS_1
        )
    ),
    retryPolicy = RetryPolicy.sequential(
        maxRetries = 3,
        delayMillis = 2000
    )
)

Publish with Jitter

val publishCommand = PublishCommand(
    message = ZMqttMessage(
        topic = "orders/123/status",
        payload = gson.toJson(orderUpdate)
    ),
    qos = QOSLevel.QOS_1,
    retryPolicy = RetryPolicy.jitter(
        maxRetries = 3,
        baseDelayMillis = 500,
        maxDelayMillis = 5000,
        jitterPercentage = 0.3
    )
)

Monitoring Retries

Track retry attempts through [MqttUpdatesListener]:

class RetryMonitoringListener : MqttUpdatesListener {
    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        if (result.attempt > 1) {
            logger.info("""
                Command succeeded after ${result.attempt} attempts
                Type: ${command.type}
                Total time: ${result.totalTimeMillis}ms
            """.trimIndent())

            // Record retry metrics
            metrics.recordRetrySuccess(
                commandType = command.type,
                attempts = result.attempt,
                totalDuration = result.totalTimeMillis
            )
        }
    }

    override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
        logger.error("""
            Command failed after ${result.attempt} attempts
            Type: ${command.type}
            Error: ${result.error.message}
            Total time: ${result.totalTimeMillis}ms
        """.trimIndent(), result.error)

        // Alert on persistent failures
        if (result.attempt >= 3) {
            alerting.sendAlert("MQTT command exhausted retries: ${command.type}")
        }
    }
}

pulseMqttKit.addListener(RetryMonitoringListener())

Best Practices

1. Use Exponential Backoff for Production

// ✅ Good - Prevents retry storms, respects broker load
RetryPolicy.exponential(
    maxRetries = 5,
    baseDelayMillis = 1000,
    maxDelayMillis = 30000
)

// ❌ Avoid - Can overload broker, no backoff
RetryPolicy.sequential(
    maxRetries = 10,
    delayMillis = 100
)

2. Always Exclude Auth Failures

// ✅ Good - Fails fast on invalid credentials
RetryPolicy.exponential(
    maxRetries = 3,
    baseDelayMillis = 2000,
    excludedExceptionCodes = hashSetOf(
        MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
        MqttExceptionCode.REASON_CODE_BAD_USERNAME_OR_PASSWORD
    )
)

3. Set Reasonable Max Retries

// ✅ Good - Balances reliability and failure detection
maxRetries = 3  // For quick operations
maxRetries = 5  // For critical operations

// ❌ Avoid - Too few (gives up too quickly)
maxRetries = 1

// ❌ Avoid - Too many (delays failure detection)
maxRetries = 20

4. Cap Maximum Delay

// ✅ Good - Prevents excessively long waits
RetryPolicy.exponential(
    maxRetries = 5,
    baseDelayMillis = 1000,
    maxDelayMillis = 30000  // Cap at 30 seconds
)

5. Use Jitter in Multi-Client Scenarios

// ✅ Good - Distributes retry load
RetryPolicy.jitter(
    maxRetries = 5,
    baseDelayMillis = 1000,
    maxDelayMillis = 30000,
    jitterPercentage = 0.25  // ±25% randomization
)

Retry Policy Comparison

Policy Use Case Pros Cons
None Fast-fail operations Immediate feedback No resilience
Sequential Testing, debugging Predictable timing Can overload broker
Exponential Production use Respects broker load Longer total time
Jitter Multi-client systems Prevents storms Slightly unpredictable

Advanced Configuration

Per-Operation Policies

Different operations may need different retry strategies:

// Critical connection - aggressive retries
val connectPolicy = RetryPolicy.exponential(
    maxRetries = 7,
    baseDelayMillis = 1000,
    maxDelayMillis = 60000
)

// Non-critical publish - fast fail
val publishPolicy = RetryPolicy.sequential(
    maxRetries = 2,
    delayMillis = 1000
)

// Subscription - moderate retries
val subscribePolicy = RetryPolicy.exponential(
    maxRetries = 3,
    baseDelayMillis = 2000,
    maxDelayMillis = 15000
)

Dynamic Policy Adjustment

Adjust policies based on runtime conditions:

fun getRetryPolicy(networkQuality: NetworkQuality): RetryPolicy {
    return when (networkQuality) {
        NetworkQuality.EXCELLENT -> RetryPolicy.sequential(
            maxRetries = 2,
            delayMillis = 500
        )

        NetworkQuality.GOOD -> RetryPolicy.exponential(
            maxRetries = 3,
            baseDelayMillis = 1000,
            maxDelayMillis = 10000
        )

        NetworkQuality.POOR -> RetryPolicy.exponential(
            maxRetries = 5,
            baseDelayMillis = 2000,
            maxDelayMillis = 30000
        )
    }
}

Custom Exception Handling

val policy = RetryPolicy.exponential(
    maxRetries = 3,
    baseDelayMillis = 2000,
    excludedExceptionCodes = getExcludedCodes()
)

fun getExcludedCodes(): HashSet<MqttExceptionCode> {
    return hashSetOf(
        // Authentication errors
        MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
        MqttExceptionCode.REASON_CODE_BAD_USERNAME_OR_PASSWORD,

        // Configuration errors
        MqttExceptionCode.REASON_CODE_CLIENT_ID_REJECTED,
        MqttExceptionCode.REASON_CODE_PROTOCOL_ERROR,

        // Resource errors
        MqttExceptionCode.REASON_CODE_QUOTA_EXCEEDED
    )
}

Troubleshooting

Problem: Commands never succeed despite retries

Solution: Check if errors are retryable:

override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
    logger.error("Failure details:", result.error)
    // Check if error is in excluded list
    // Verify network connectivity
    // Confirm broker availability
}

Problem: Retry delays are too long

Solution: Reduce maxDelayMillis or use sequential policy:

// Reduce max delay
RetryPolicy.exponential(
    maxRetries = 3,
    baseDelayMillis = 500,
    maxDelayMillis = 5000  // Shorter max delay
)

Problem: Retry storms overloading broker

Solution: Use jitter-based policy:

RetryPolicy.jitter(
    maxRetries = 3,
    baseDelayMillis = 2000,
    maxDelayMillis = 20000,
    jitterPercentage = 0.3  // 30% randomization
)

Next Steps