Retry Policies¶
Pulse MQTT provides flexible retry mechanisms to handle transient network failures and connection issues. Retry policies automatically re-attempt failed operations with configurable delays and limits.
Overview¶
Network operations can fail for various reasons: - Temporary network unavailability - Broker overload or maintenance - Timeout due to slow connections - Transient authentication issues
Retry policies allow you to: - Automatically retry failed operations - Control retry behavior with different strategies - Exclude specific errors from retries - Limit retry attempts to prevent infinite loops
Policy Types¶
No Retry (None)¶
Default policy that doesn't retry failed operations.
val policy = RetryPolicy.none()
val command = ConnectCommand(
connectionOptions = options,
retryPolicy = policy
)
Use when: - Operation failures should be immediate - Manual retry handling is preferred - Fast-fail behavior is desired
Sequential Retry¶
Retries with a fixed delay between attempts.
val policy = RetryPolicy.sequential(
maxRetries = 3,
delayMillis = 2000 // 2 seconds between retries
)
Retry Schedule: - Attempt 1: Immediate - Attempt 2: After 2000ms - Attempt 3: After 2000ms - Attempt 4: After 2000ms
Use when: - Predictable retry intervals are needed - Error conditions are expected to resolve quickly - Testing and debugging (consistent timing)
Exponential Backoff¶
Retries with exponentially increasing delays.
val policy = RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 1000,
maxDelayMillis = 30000
)
Retry Schedule: - Attempt 1: Immediate - Attempt 2: After 1000ms (1s) - Attempt 3: After 2000ms (2s) - Attempt 4: After 4000ms (4s) - Attempt 5: After 8000ms (8s) - Attempt 6: After 16000ms (16s, capped at maxDelay)
Use when: - Recovering from broker overload - Avoiding retry storms across multiple clients - Handling extended network outages - Production environments (recommended)
Jitter-Based Retry¶
Adds randomization to exponential backoff to prevent thundering herd.
val policy = RetryPolicy.jitter(
maxRetries = 5,
baseDelayMillis = 1000,
maxDelayMillis = 30000,
jitterPercentage = 0.25 // ±25% randomization
)
Retry Schedule (example with ±25% jitter): - Attempt 2: 750ms - 1250ms - Attempt 3: 1500ms - 2500ms - Attempt 4: 3000ms - 5000ms
Use when: - Multiple clients connect simultaneously - Preventing synchronized retry storms - Load distribution across time - Large-scale deployments
Exception Filtering¶
Exclude specific error codes from retry attempts:
val policy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 2000,
excludedExceptionCodes = hashSetOf(
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
MqttExceptionCode.REASON_CODE_CLIENT_ID_REJECTED,
MqttExceptionCode.REASON_CODE_BAD_USERNAME_OR_PASSWORD
)
)
Common Non-Retryable Errors: - Authentication failures (invalid credentials) - Authorization errors (insufficient permissions) - Invalid client ID - Malformed connection parameters - Protocol violations
Retryable Errors: - Network timeouts - Connection refused (temporary) - Broker unavailable - Server overload
Usage Examples¶
Connection with Exponential Backoff¶
val connectCommand = ConnectCommand(
connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "mobile-app-${userId}",
username = "mqtt_user",
password = "secure_password"
),
retryPolicy = RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 1000,
maxDelayMillis = 30000,
excludedExceptionCodes = hashSetOf(
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
MqttExceptionCode.REASON_CODE_BAD_USERNAME_OR_PASSWORD
)
)
)
pulseMqttKit.submitCommand(connectCommand)
Subscribe with Sequential Retry¶
val subscribeCommand = SubscribeCommand(
topicConfigs = mapOf(
"orders/+/updates" to TopicTypeConfig(
messageType = OrderUpdate::class.java,
qosLevel = QOSLevel.QOS_1
)
),
retryPolicy = RetryPolicy.sequential(
maxRetries = 3,
delayMillis = 2000
)
)
Publish with Jitter¶
val publishCommand = PublishCommand(
message = ZMqttMessage(
topic = "orders/123/status",
payload = gson.toJson(orderUpdate)
),
qos = QOSLevel.QOS_1,
retryPolicy = RetryPolicy.jitter(
maxRetries = 3,
baseDelayMillis = 500,
maxDelayMillis = 5000,
jitterPercentage = 0.3
)
)
Monitoring Retries¶
Track retry attempts through [MqttUpdatesListener]:
class RetryMonitoringListener : MqttUpdatesListener {
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
if (result.attempt > 1) {
logger.info("""
Command succeeded after ${result.attempt} attempts
Type: ${command.type}
Total time: ${result.totalTimeMillis}ms
""".trimIndent())
// Record retry metrics
metrics.recordRetrySuccess(
commandType = command.type,
attempts = result.attempt,
totalDuration = result.totalTimeMillis
)
}
}
override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
logger.error("""
Command failed after ${result.attempt} attempts
Type: ${command.type}
Error: ${result.error.message}
Total time: ${result.totalTimeMillis}ms
""".trimIndent(), result.error)
// Alert on persistent failures
if (result.attempt >= 3) {
alerting.sendAlert("MQTT command exhausted retries: ${command.type}")
}
}
}
pulseMqttKit.addListener(RetryMonitoringListener())
Best Practices¶
1. Use Exponential Backoff for Production¶
// ✅ Good - Prevents retry storms, respects broker load
RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 1000,
maxDelayMillis = 30000
)
// ❌ Avoid - Can overload broker, no backoff
RetryPolicy.sequential(
maxRetries = 10,
delayMillis = 100
)
2. Always Exclude Auth Failures¶
// ✅ Good - Fails fast on invalid credentials
RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 2000,
excludedExceptionCodes = hashSetOf(
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
MqttExceptionCode.REASON_CODE_BAD_USERNAME_OR_PASSWORD
)
)
3. Set Reasonable Max Retries¶
// ✅ Good - Balances reliability and failure detection
maxRetries = 3 // For quick operations
maxRetries = 5 // For critical operations
// ❌ Avoid - Too few (gives up too quickly)
maxRetries = 1
// ❌ Avoid - Too many (delays failure detection)
maxRetries = 20
4. Cap Maximum Delay¶
// ✅ Good - Prevents excessively long waits
RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 1000,
maxDelayMillis = 30000 // Cap at 30 seconds
)
5. Use Jitter in Multi-Client Scenarios¶
// ✅ Good - Distributes retry load
RetryPolicy.jitter(
maxRetries = 5,
baseDelayMillis = 1000,
maxDelayMillis = 30000,
jitterPercentage = 0.25 // ±25% randomization
)
Retry Policy Comparison¶
| Policy | Use Case | Pros | Cons |
|---|---|---|---|
| None | Fast-fail operations | Immediate feedback | No resilience |
| Sequential | Testing, debugging | Predictable timing | Can overload broker |
| Exponential | Production use | Respects broker load | Longer total time |
| Jitter | Multi-client systems | Prevents storms | Slightly unpredictable |
Advanced Configuration¶
Per-Operation Policies¶
Different operations may need different retry strategies:
// Critical connection - aggressive retries
val connectPolicy = RetryPolicy.exponential(
maxRetries = 7,
baseDelayMillis = 1000,
maxDelayMillis = 60000
)
// Non-critical publish - fast fail
val publishPolicy = RetryPolicy.sequential(
maxRetries = 2,
delayMillis = 1000
)
// Subscription - moderate retries
val subscribePolicy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 2000,
maxDelayMillis = 15000
)
Dynamic Policy Adjustment¶
Adjust policies based on runtime conditions:
fun getRetryPolicy(networkQuality: NetworkQuality): RetryPolicy {
return when (networkQuality) {
NetworkQuality.EXCELLENT -> RetryPolicy.sequential(
maxRetries = 2,
delayMillis = 500
)
NetworkQuality.GOOD -> RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 1000,
maxDelayMillis = 10000
)
NetworkQuality.POOR -> RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 2000,
maxDelayMillis = 30000
)
}
}
Custom Exception Handling¶
val policy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 2000,
excludedExceptionCodes = getExcludedCodes()
)
fun getExcludedCodes(): HashSet<MqttExceptionCode> {
return hashSetOf(
// Authentication errors
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
MqttExceptionCode.REASON_CODE_BAD_USERNAME_OR_PASSWORD,
// Configuration errors
MqttExceptionCode.REASON_CODE_CLIENT_ID_REJECTED,
MqttExceptionCode.REASON_CODE_PROTOCOL_ERROR,
// Resource errors
MqttExceptionCode.REASON_CODE_QUOTA_EXCEEDED
)
}
Troubleshooting¶
Problem: Commands never succeed despite retries¶
Solution: Check if errors are retryable:
override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
logger.error("Failure details:", result.error)
// Check if error is in excluded list
// Verify network connectivity
// Confirm broker availability
}
Problem: Retry delays are too long¶
Solution: Reduce maxDelayMillis or use sequential policy:
// Reduce max delay
RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 500,
maxDelayMillis = 5000 // Shorter max delay
)
Problem: Retry storms overloading broker¶
Solution: Use jitter-based policy:
RetryPolicy.jitter(
maxRetries = 3,
baseDelayMillis = 2000,
maxDelayMillis = 20000,
jitterPercentage = 0.3 // 30% randomization
)
Next Steps¶
- Learn about Connection Management
- Explore Health Monitoring