Skip to content

Retry Policies API

Retry policies determine how Pulse MQTT handles failed command execution, including whether to retry, how long to wait between attempts, and which exceptions should bypass retry logic.

Overview

All retry policies extend the abstract RetryPolicy class and provide:

  • Retry Decision Logic - Determines if a command should be retried
  • Delay Calculation - Calculates wait time between retry attempts
  • Exception Filtering - Excludes specific errors from retry logic

Available Policies

NoRetryPolicy

Disables retries completely — commands are executed once and never retried.

Usage

val policy = RetryPolicy.none()

Characteristics

  • maxRetries = 0
  • No delay between attempts (since there are no retries)
  • Useful for fire-and-forget operations

When to Use

  • Operations that should only be tried once
  • When you want strict control over retries from outside
  • For manual retry management

Example

val publishCommand = PublishCommand(
    message = message,
    qos = QOSLevel.QOS_0,
    retryPolicy = RetryPolicy.none() 
)

SequentialRetryPolicy

Fixed delay retry policy where each retry waits the same amount of time.

Factory Method

fun sequential(
    maxRetries: Int = 3,
    delayMillis: Long = 1000L,
    excludedExceptionCodes: Set<MqttExceptionCode> = emptySet()
): SequentialRetryPolicy

Parameters

Parameter Type Default Description
maxRetries Int 3 Maximum number of retry attempts
delayMillis Long 1000L Fixed delay between retries in milliseconds
excludedExceptionCodes Set<MqttExceptionCode> emptySet() Exception codes that bypass retry logic

Retry Timeline

With delayMillis=1000ms and maxRetries=3:

Attempt 1: Immediate (0ms)
Attempt 2: Wait 1000ms (1s)
Attempt 3: Wait 1000ms (1s)
Attempt 4: Wait 1000ms (1s)
Total max time: 3000ms (3s)

When to Use

  • Transient network issues with predictable recovery time
  • Simple retry scenarios without sophisticated backoff

Example

val connectCommand = ConnectCommand(
    connectionOptions = options,
    retryPolicy = RetryPolicy.sequential(
        maxRetries = 5,
        delayMillis = 2000 // 2 seconds between each retry
    )
)

ExponentialRetryPolicy

Exponential backoff retry policy where delay doubles with each attempt.

Factory Method

fun exponential(
    maxRetries: Int = 3,
    baseDelayMillis: Long = 1000L,
    maxDelayMillis: Long = 120000L,
    excludedExceptionCodes: Set<MqttExceptionCode> = emptySet()
): ExponentialRetryPolicy

Parameters

Parameter Type Default Description
maxRetries Int 3 Maximum number of retry attempts
baseDelayMillis Long 1000L Initial delay, doubles each retry
maxDelayMillis Long 120000L Maximum delay cap (2 minutes)
excludedExceptionCodes Set<MqttExceptionCode> emptySet() Exception codes that bypass retry logic

Retry Timeline

With baseDelayMillis=1000ms and maxRetries=3:

Attempt 1: Immediate (0ms)
Attempt 2: Wait 1000ms (1s)    [1000 * 2^0]
Attempt 3: Wait 2000ms (2s)    [1000 * 2^1]
Attempt 4: Wait 4000ms (4s)    [1000 * 2^2]
Total max time: 7000ms (7s)

With maxDelayMillis cap:

Attempt 1: Immediate
Attempt 2: Wait 1s
Attempt 3: Wait 2s
Attempt 4: Wait 4s
Attempt 5: Wait 8s
Attempt 6: Wait 16s
Attempt 7: Wait 32s
Attempt 8: Wait 60s (capped at maxDelayMillis=60000ms)

When to Use

  • Production environments (recommended)
  • Network issues that may take time to resolve
  • Preventing resource exhaustion on broker/server
  • High-load scenarios

Example

val connectCommand = ConnectCommand(
    connectionOptions = options,
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 5,
        baseDelayMillis = 2000,    // Start at 2 seconds
        maxDelayMillis = 60000,     // Cap at 60 seconds
        excludedExceptionCodes = hashSetOf(
            MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED
        )
    )
)

JitterRetryPolicy

Adds randomness to another retry policy to prevent thundering herd problem.

Factory Method

fun jitter(
    jitterFactor: Double = 0.1,
    basePolicy: RetryPolicy = exponential(3, 1000, 120000),
    excludedExceptionCodes: Set<MqttExceptionCode> = emptySet()
): JitterRetryPolicy

Parameters

Parameter Type Default Description
jitterFactor Double 0.1 Randomness factor (0.0 to 1.0)
basePolicy RetryPolicy exponential(...) Underlying retry policy to add jitter to
excludedExceptionCodes Set<MqttExceptionCode> emptySet() Exception codes that bypass retry logic

Retry Timeline

Base policy: ExponentialRetryPolicy(baseDelay=1000ms)
Jitter factor: 0.1 (10%)

Attempt 1: Immediate (0ms)
Attempt 2: Wait ~1000ms ± 10% (900ms to 1100ms)
Attempt 3: Wait ~2000ms ± 10% (1800ms to 2200ms)
Attempt 4: Wait ~4000ms ± 10% (3600ms to 4400ms)

When to Use

  • Multiple clients connecting simultaneously
  • Preventing synchronized retry storms
  • High-scale deployments
  • Load balancing across time

Exception Filtering

Certain errors should never be retried (e.g., authentication failures). Use excludedExceptionCodes to specify these:

Available Exception Codes

enum class MqttExceptionCode {
    REASON_CODE_NOT_AUTHORIZED,      // Code 5: Invalid credentials
    CONNECT_ALREADY_IN_PROGRESS,     // Code 32100: Duplicate connection
    UNKNOWN                           // Code -1: Unmapped errors
}

Usage Example

val connectCommand = ConnectCommand(
    connectionOptions = options,
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 3,
        baseDelayMillis = 2000,
        excludedExceptionCodes = hashSetOf(
            MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
            MqttExceptionCode.CONNECT_ALREADY_IN_PROGRESS
        )
    )
)

When these exceptions occur, the command fails immediately without retries.


Retry Policy Comparison

Policy Best For Pros Cons
None Fire-and-forget, testing Fastest failure, simple No resilience
Sequential Simple scenarios, dev/test Predictable timing Can overwhelm resources
Exponential Production environments Resource-friendly, scales well Longer recovery time
Jitter High-scale deployments Prevents thundering herd Slightly complex

Advanced Usage

Custom Retry Logic per Command Type

// Aggressive retries for connections
val connectRetry = RetryPolicy.exponential(
    maxRetries = 5,
    baseDelayMillis = 2000
)

// Conservative retries for publishes
val publishRetry = RetryPolicy.sequential(
    maxRetries = 2,
    delayMillis = 500
)

// No retries for disconnects
val disconnectRetry = RetryPolicy.none()

pulseMqttKit.submitCommand(ConnectCommand(options, retryPolicy = connectRetry))
pulseMqttKit.submitCommand(PublishCommand(message, qos, retryPolicy = publishRetry))
pulseMqttKit.submitCommand(DisconnectCommand(retryPolicy = disconnectRetry))

Dynamic Retry Policy Selection

fun getRetryPolicy(networkType: NetworkType): RetryPolicy {
    return when (networkType) {
        NetworkType.WIFI -> RetryPolicy.exponential(
            maxRetries = 5,
            baseDelayMillis = 1000
        )
        NetworkType.CELLULAR -> RetryPolicy.exponential(
            maxRetries = 3,
            baseDelayMillis = 2000
        )
        NetworkType.NONE -> RetryPolicy.none()
    }
}

Monitoring Retry Attempts

pulseMqttKit.addListener(object : MqttUpdatesListener {
    override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
        println("Command failed after ${result.attempt} attempts")
        println("Total time: ${result.totalTimeMillis}ms")

        if (result.attempt >= command.retryPolicy.maxRetries) {
            println("Max retries exhausted for ${command.type}")
        }
    }
})

See Also