Command-Based Architecture¶
Pulse MQTT uses a command-based architecture that provides a clean, testable, and reliable way to execute MQTT operations. This design pattern treats every MQTT operation as a command object that encapsulates all necessary information and behavior.
Overview¶
Instead of directly calling MQTT methods, you submit commands to the library. Each command is:
- Self-contained: Includes all parameters and logic needed for execution
- Retryable: Can be automatically retried with configurable policies
- Trackable: Has lifecycle callbacks through listeners
- Composable: Can have dependencies on other commands
Command Types¶
Pulse MQTT provides five core command types:
ConnectCommand¶
Establishes a connection to the MQTT broker.
val connectCommand = ConnectCommand(
connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "my-app-${userId}",
username = "mqtt_user",
password = "secure_password",
cleanSession = false,
automaticReconnect = true
),
retryPolicy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 2000
)
)
pulseMqttKit.submitCommand(connectCommand)
Features: - Automatic duplicate connection prevention - Connection-in-progress detection - Auto-subscription support after successful connection - Last Will and Testament (LWT) configuration
SubscribeCommand¶
Subscribes to one or more MQTT topics with type-safe deserialization.
data class OrderUpdate(val orderId: String, val status: String)
val subscribeCommand = SubscribeCommand(
topicConfigs = mapOf(
"orders/+/updates" to TopicTypeConfig(
messageType = OrderUpdate::class.java,
qosLevel = QOSLevel.QOS_1
)
),
retryPolicy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 1000
)
)
pulseMqttKit.submitCommand(subscribeCommand)
Features: - Multiple topics in single command - Type-safe message deserialization - Automatic tracking for auto-resubscription - Flexible QoS configuration per topic
PublishCommand¶
Publishes a message to an MQTT topic.
val message = ZMqttMessage(
topic = "orders/123/status",
payload = """{"orderId": "123", "status": "delivered"}""",
retained = false
)
val publishCommand = PublishCommand(
message = message,
qos = QOSLevel.QOS_1,
retryPolicy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 1000
)
)
pulseMqttKit.submitCommand(publishCommand)
Features: - Configurable QoS levels (0, 1, 2) - Retained message support - Fire-and-forget or acknowledged delivery - Automatic retry on transient failures
DisconnectCommand¶
Gracefully disconnects from the MQTT broker.
val disconnectCommand = DisconnectCommand(
quiesceTimeout = 5000L, // Wait 5 seconds for pending operations
retryPolicy = RetryPolicy.sequential(
maxRetries = 2,
delayMillis = 1000
)
)
pulseMqttKit.submitCommand(disconnectCommand)
Features: - Graceful disconnection with quiesce timeout - Pending message completion - Subscription preservation for reconnection - Clean connection teardown
UnsubscribeCommand¶
Unsubscribes from topics.
// Unsubscribe from specific topics
val unsubscribeCommand = UnsubscribeCommand(
topics = mutableListOf(
"orders/+/updates",
"rider/+/location"
)
)
// Unsubscribe from all topics
val unsubscribeAllCommand = UnsubscribeCommand() // Empty list = all topics
pulseMqttKit.submitCommand(unsubscribeCommand)
Lifecycle Callbacks¶
Track command execution through [MqttUpdatesListener]:
class MyMqttListener : MqttUpdatesListener {
override fun onCommandSubmitted(command: MqttCommand) {
logger.debug("Command submitted: ${command.type}")
}
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
logger.info("Command succeeded: ${command.type} in ${result.totalTimeMillis}ms")
}
override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
logger.error("Command failed: ${command.type}", result.error)
}
override fun onCommandIgnored(command: MqttCommand, result: CommandResult.Ignored) {
logger.warning("Command ignored: ${result.reason}")
}
}
pulseMqttKit.addListener(MyMqttListener())
Command Dependencies¶
Commands can depend on other commands, ensuring execution order:
val connectCommand = ConnectCommand(connectionOptions = options)
val subscribeCommand = SubscribeCommand(
topicConfigs = subscriptions,
dependencies = mutableListOf(connectCommand) // Wait for connection
)
// Submit both - subscribe will wait for connect
pulseMqttKit.submitCommand(connectCommand)
pulseMqttKit.submitCommand(subscribeCommand)
Benefits of Command Architecture¶
1. Separation of Concerns¶
Commands encapsulate operation logic separate from execution infrastructure.
2. Retry Policies¶
Built-in retry support with pluggable strategies:
val command = ConnectCommand(
connectionOptions = options,
retryPolicy = RetryPolicy.exponential(
maxRetries = 5,
baseDelayMillis = 1000,
maxDelayMillis = 30000,
excludedExceptionCodes = hashSetOf(
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED
)
)
)
3. Composability¶
Commands can be chained and composed:
// Connect -> Subscribe -> Publish workflow
val connectCmd = ConnectCommand(options)
val subscribeCmd = SubscribeCommand(topics, dependencies = mutableListOf(connectCmd))
val publishCmd = PublishCommand(message, QOSLevel.QOS_1, dependencies = mutableListOf(subscribeCmd))
4. Monitoring and Debugging¶
Complete visibility into operation lifecycle:
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
metrics.recordCommandSuccess(
type = command.type,
duration = result.totalTimeMillis,
attempts = result.attempt
)
}
Advanced Usage¶
Custom Command Context¶
Pass custom context through commands:
data class OrderContext(val orderId: String, val userId: String)
val context = OrderContext(orderId = "123", userId = "user-456")
val command = PublishCommand(
message = message,
qos = QOSLevel.QOS_1,
userContext = context
)
// Retrieve context in callback
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
val context = result.data?.userContext as? OrderContext
logger.info("Order ${context?.orderId} published successfully")
}
Command Timeout¶
Configure per-command timeouts:
val command = ConnectCommand(
connectionOptions = options,
commandTimeoutMillis = 30_000L // 30 seconds timeout
)
Bulk Operations¶
Submit multiple commands efficiently:
// Subscribe to multiple topic groups
val commands = listOf(
SubscribeCommand(topicConfigs = orderTopics),
SubscribeCommand(topicConfigs = riderTopics),
SubscribeCommand(topicConfigs = notificationTopics)
)
commands.forEach { pulseMqttKit.submitCommand(it) }
Next Steps¶
- Learn about Retry Policies
- Explore Type-Safe Messages