Retry Policies API¶
Retry policies determine how Pulse MQTT handles failed command execution, including whether to retry, how long to wait between attempts, and which exceptions should bypass retry logic.
Overview¶
All retry policies extend the abstract RetryPolicy class and provide:
- Retry Decision Logic - Determines if a command should be retried
- Delay Calculation - Calculates wait time between retry attempts
- Exception Filtering - Excludes specific errors from retry logic
Available Policies¶
NoRetryPolicy¶
Disables retries completely — commands are executed once and never retried.
Usage¶
Characteristics¶
maxRetries = 0- No delay between attempts (since there are no retries)
- Useful for fire-and-forget operations
When to Use¶
- Operations that should only be tried once
- When you want strict control over retries from outside
- For manual retry management
Example¶
val publishCommand = PublishCommand(
message = message,
qos = QOSLevel.QOS_0,
retryPolicy = RetryPolicy.none()
)
SequentialRetryPolicy¶
Fixed delay retry policy where each retry waits the same amount of time.
Factory Method¶
fun sequential(
maxRetries: Int = 3,
delayMillis: Long = 1000L,
excludedExceptionCodes: Set<MqttExceptionCode> = emptySet()
): SequentialRetryPolicy
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
maxRetries |
Int |
3 |
Maximum number of retry attempts |
delayMillis |
Long |
1000L |
Fixed delay between retries in milliseconds |
excludedExceptionCodes |
Set<MqttExceptionCode> |
emptySet() |
Exception codes that bypass retry logic |
Retry Timeline¶
With delayMillis=1000ms and maxRetries=3:
Attempt 1: Immediate (0ms)
Attempt 2: Wait 1000ms (1s)
Attempt 3: Wait 1000ms (1s)
Attempt 4: Wait 1000ms (1s)
Total max time: 3000ms (3s)
When to Use¶
- Transient network issues with predictable recovery time
- Simple retry scenarios without sophisticated backoff
Example¶
val connectCommand = ConnectCommand(
connectionOptions = options,
retryPolicy = RetryPolicy.sequential(
maxRetries = 5,
delayMillis = 2000 // 2 seconds between each retry
)
)
ExponentialRetryPolicy¶
Exponential backoff retry policy where delay doubles with each attempt.
Factory Method¶
fun exponential(
maxRetries: Int = 3,
baseDelayMillis: Long = 1000L,
maxDelayMillis: Long = 120000L,
excludedExceptionCodes: Set<MqttExceptionCode> = emptySet()
): ExponentialRetryPolicy
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
maxRetries |
Int |
3 |
Maximum number of retry attempts |
baseDelayMillis |
Long |
1000L |
Initial delay, doubles each retry |
maxDelayMillis |
Long |
120000L |
Maximum delay cap (2 minutes) |
excludedExceptionCodes |
Set<MqttExceptionCode> |
emptySet() |
Exception codes that bypass retry logic |
Retry Timeline¶
With baseDelayMillis=1000ms and maxRetries=3:
Attempt 1: Immediate (0ms)
Attempt 2: Wait 1000ms (1s) [1000 * 2^0]
Attempt 3: Wait 2000ms (2s) [1000 * 2^1]
Attempt 4: Wait 4000ms (4s) [1000 * 2^2]
Total max time: 7000ms (7s)
With maxDelayMillis cap:
Attempt 1: Immediate
Attempt 2: Wait 1s
Attempt 3: Wait 2s
Attempt 4: Wait 4s
Attempt 5: Wait 8s
Attempt 6: Wait 16s
Attempt 7: Wait 32s
Attempt 8: Wait 60s (capped at maxDelayMillis=60000ms)
When to Use¶
- Production environments (recommended)
- Network issues that may take time to resolve
- Preventing resource exhaustion on broker/server
- High-load scenarios
Example¶
val connectCommand = ConnectCommand(
connectionOptions = options,
retryPolicy = RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 2000, // Start at 2 seconds
maxDelayMillis = 60000, // Cap at 60 seconds
excludedExceptionCodes = hashSetOf(
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED
)
)
)
JitterRetryPolicy¶
Adds randomness to another retry policy to prevent thundering herd problem.
Factory Method¶
fun jitter(
jitterFactor: Double = 0.1,
basePolicy: RetryPolicy = exponential(3, 1000, 120000),
excludedExceptionCodes: Set<MqttExceptionCode> = emptySet()
): JitterRetryPolicy
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
jitterFactor |
Double |
0.1 |
Randomness factor (0.0 to 1.0) |
basePolicy |
RetryPolicy |
exponential(...) |
Underlying retry policy to add jitter to |
excludedExceptionCodes |
Set<MqttExceptionCode> |
emptySet() |
Exception codes that bypass retry logic |
Retry Timeline¶
Base policy: ExponentialRetryPolicy(baseDelay=1000ms)
Jitter factor: 0.1 (10%)
Attempt 1: Immediate (0ms)
Attempt 2: Wait ~1000ms ± 10% (900ms to 1100ms)
Attempt 3: Wait ~2000ms ± 10% (1800ms to 2200ms)
Attempt 4: Wait ~4000ms ± 10% (3600ms to 4400ms)
When to Use¶
- Multiple clients connecting simultaneously
- Preventing synchronized retry storms
- High-scale deployments
- Load balancing across time
Exception Filtering¶
Certain errors should never be retried (e.g., authentication failures). Use excludedExceptionCodes to specify these:
Available Exception Codes¶
enum class MqttExceptionCode {
REASON_CODE_NOT_AUTHORIZED, // Code 5: Invalid credentials
CONNECT_ALREADY_IN_PROGRESS, // Code 32100: Duplicate connection
UNKNOWN // Code -1: Unmapped errors
}
Usage Example¶
val connectCommand = ConnectCommand(
connectionOptions = options,
retryPolicy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 2000,
excludedExceptionCodes = hashSetOf(
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
MqttExceptionCode.CONNECT_ALREADY_IN_PROGRESS
)
)
)
When these exceptions occur, the command fails immediately without retries.
Retry Policy Comparison¶
| Policy | Best For | Pros | Cons |
|---|---|---|---|
| None | Fire-and-forget, testing | Fastest failure, simple | No resilience |
| Sequential | Simple scenarios, dev/test | Predictable timing | Can overwhelm resources |
| Exponential | Production environments | Resource-friendly, scales well | Longer recovery time |
| Jitter | High-scale deployments | Prevents thundering herd | Slightly complex |
Advanced Usage¶
Custom Retry Logic per Command Type¶
// Aggressive retries for connections
val connectRetry = RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 2000
)
// Conservative retries for publishes
val publishRetry = RetryPolicy.sequential(
maxRetries = 2,
delayMillis = 500
)
// No retries for disconnects
val disconnectRetry = RetryPolicy.none()
pulseMqttKit.submitCommand(ConnectCommand(options, retryPolicy = connectRetry))
pulseMqttKit.submitCommand(PublishCommand(message, qos, retryPolicy = publishRetry))
pulseMqttKit.submitCommand(DisconnectCommand(retryPolicy = disconnectRetry))
Dynamic Retry Policy Selection¶
fun getRetryPolicy(networkType: NetworkType): RetryPolicy {
return when (networkType) {
NetworkType.WIFI -> RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 1000
)
NetworkType.CELLULAR -> RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 2000
)
NetworkType.NONE -> RetryPolicy.none()
}
}
Monitoring Retry Attempts¶
pulseMqttKit.addListener(object : MqttUpdatesListener {
override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
println("Command failed after ${result.attempt} attempts")
println("Total time: ${result.totalTimeMillis}ms")
if (result.attempt >= command.retryPolicy.maxRetries) {
println("Max retries exhausted for ${command.type}")
}
}
})
See Also¶
- Commands API - Using retry policies with commands
- Connection Options - Connection configuration