Commands API¶
Commands are the core building blocks of Pulse MQTT's operation model. Each command represents a discrete MQTT operation that can be queued, retried, monitored, and have dependencies.
Overview¶
All commands extend the abstract MqttCommand class and follow a consistent lifecycle:
- Creation - Command is instantiated with configuration
- Submission - Command is submitted via
PulseMqttKit.submitCommand() - Validation - Command parameters are validated
- Execution - Command is executed (potentially with retries)
- Result - Result is reported via
MqttUpdatesListener
Command Types¶
ConnectCommand¶
Establishes a connection to an MQTT broker.
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
connectionOptions |
ConnectionOptions |
Required | Configuration for the MQTT connection |
retryPolicy |
RetryPolicy |
RetryPolicy.none() |
Policy for retrying failed connection attempts |
commandTimeoutMillis |
Long |
120000L |
Maximum time to wait for connection (120 seconds) |
dependencies |
List<MqttCommand> |
emptyList() |
Commands that must complete before this one |
userContext |
Any? |
null |
Optional context object passed back in callbacks |
Usage Example¶
val connectCommand = ConnectCommand(
connectionOptions = ConnectionOptions(
serverUri = "tcp://broker.example.com:1883",
clientId = "my-client-id",
username = "user",
password = "pass",
cleanSession = false,
automaticReconnect = true
),
retryPolicy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 2000,
excludedExceptionCodes = hashSetOf(
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED
)
)
)
pulseMqttKit.submitCommand(connectCommand)
Connection Behavior¶
- If already connected: Command is ignored with reason
- If connection in progress: Command is ignored with reason
- Otherwise: Initiates new connection attempt
Auto-Subscription¶
If autoSubscriptionConfig.enabled is true in ConnectionOptions, the library will automatically resubscribe to configured topics after connection.
DisconnectCommand¶
Gracefully disconnects from the MQTT broker.
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
retryPolicy |
RetryPolicy |
RetryPolicy.none() |
Policy for retrying failed disconnect attempts |
commandTimeoutMillis |
Long |
60000L |
Maximum time to wait for disconnection (60 seconds) |
quiesceTimeout |
Long |
0L |
Time in milliseconds to wait for pending operations |
userContext |
Any? |
null |
Optional context object passed back in callbacks |
dependencies |
MutableList<MqttCommand> |
mutableListOf() |
Commands that must complete before this one |
Usage Example¶
val disconnectCommand = DisconnectCommand(
quiesceTimeout = 5000L, // Allow 5 seconds for pending operations
retryPolicy = RetryPolicy.sequential(
maxRetries = 2,
delayMillis = 1000
)
)
pulseMqttKit.submitCommand(disconnectCommand)
Disconnection Behavior¶
- If not connected: Command is ignored with reason
- If connected or connecting: Attempts graceful disconnection
- Waits up to
quiesceTimeoutmilliseconds for pending operations
Quiesce Timeout¶
The quiesce timeout allows time for: - In-flight messages to be sent - Acknowledgements to be received - Clean disconnection handshake
If timeout is 0, disconnection happens immediately without waiting.
PublishCommand¶
Publishes a message to an MQTT topic.
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
message |
ZMqttMessage |
Required | The message to publish containing topic, payload |
qos |
QOSLevel |
Required | Quality of Service level for message delivery |
userContext |
Any? |
null |
Optional context object passed back in callbacks |
commandTimeoutMillis |
Long |
60000L |
Maximum time to wait for publish completion |
retryPolicy |
RetryPolicy |
RetryPolicy.none() |
Policy for retrying failed publish attempts |
dependencies |
MutableList<MqttCommand> |
mutableListOf() |
Commands that must complete before this one |
Usage Example¶
// Create a message
val message = ZMqttMessage(
topic = "orders/123/status",
payload = """{"orderId": "123", "status": "delivered"}"""
)
// Publish with QoS 1 and retry policy
val publishCommand = PublishCommand(
message = message,
qos = QOSLevel.QOS_1,
retryPolicy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 1000
)
)
pulseMqttKit.submitCommand(publishCommand)
Message Payload¶
The payload should be a valid UTF-8 encoded string. For structured data, serialize to JSON before publishing:
data class OrderUpdate(val orderId: String, val status: String)
val orderUpdate = OrderUpdate(orderId = "123", status = "delivered")
val payload = gson.toJson(orderUpdate)
val message = ZMqttMessage(topic = "orders/123/status", payload = payload)
SubscribeCommand¶
Subscribes to one or more MQTT topics with type-safe message deserialization.
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
topicConfigs |
Map<String, TopicTypeConfig<*>> |
Required | Map of topic filters to their type configurations |
retryPolicy |
RetryPolicy |
RetryPolicy.none() |
Policy for retrying failed subscription attempts |
commandTimeoutMillis |
Long |
60000L |
Maximum time to wait for subscription |
dependencies |
MutableList<MqttCommand> |
mutableListOf() |
Commands that must complete before this one |
userContext |
Any? |
null |
Optional context object passed back in callbacks |
Usage Example¶
// Define message types
data class OrderUpdate(val orderId: String, val status: String)
data class RiderLocation(val lat: Double, val lon: Double)
val subscribeCommand = SubscribeCommand(
topicConfigs = mapOf(
"orders/updates" to TopicTypeConfig(
type = OrderUpdate::class.java,
qos = QOSLevel.QOS_1
),
"rider/location" to TopicTypeConfig(
type = RiderLocation::class.java,
qos = QOSLevel.QOS_0
)
),
retryPolicy = RetryPolicy.exponential(
maxRetries = 3,
baseDelayMillis = 1000
)
)
pulseMqttKit.submitCommand(subscribeCommand)
Type-Safe Deserialization¶
Messages received on subscribed topics are automatically deserialized to the specified type using Gson. Invalid messages that can't be deserialized are logged as errors and not delivered to listeners.
Auto-Subscription¶
If auto-subscription is enabled in ConnectionOptions, subscriptions are automatically re-established after reconnection.
UnsubscribeCommand¶
Unsubscribes from MQTT topics.
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
topics |
MutableList<String> |
mutableListOf() |
List of topic filters to unsubscribe from |
retryPolicy |
RetryPolicy |
RetryPolicy.none() |
Policy for retrying failed unsubscription attempts |
commandTimeoutMillis |
Long |
60000L |
Maximum time to wait for unsubscription |
dependencies |
List<MqttCommand> |
emptyList() |
Commands that must complete before this one |
userContext |
Any? |
null |
Optional context object passed back in callbacks |
Usage Examples¶
Unsubscribe from all topics:
val unsubscribeCommand = UnsubscribeCommand(
retryPolicy = RetryPolicy.sequential(
maxRetries = 2,
delayMillis = 1000
)
)
pulseMqttKit.submitCommand(unsubscribeCommand)
Unsubscribe from specific topics:
val unsubscribeCommand = UnsubscribeCommand(
topics = mutableListOf(
"orders/+/updates",
"rider/+/location"
),
retryPolicy = RetryPolicy.none()
)
pulseMqttKit.submitCommand(unsubscribeCommand)
Behavior¶
- If
topicslist is empty: Unsubscribes from ALL currently active subscriptions - If
topicslist contains topics: Unsubscribes from specified topics only - Topics not currently subscribed are ignored without error
Common Command Features¶
Command Dependencies¶
Commands can depend on other commands, ensuring they execute in a specific order:
val connectCommand = ConnectCommand(connectionOptions = options)
val subscribeCommand = SubscribeCommand(
topicConfigs = topicConfigs,
dependencies = mutableListOf(connectCommand) // Wait for connection first
)
pulseMqttKit.submitCommand(connectCommand)
pulseMqttKit.submitCommand(subscribeCommand)
Command Timeout¶
Each command has a timeout to prevent indefinite hanging:
val connectCommand = ConnectCommand(
connectionOptions = options,
commandTimeoutMillis = 30000L // 30 second timeout
)
User Context¶
Pass custom context objects through the command lifecycle:
data class OrderContext(val orderId: String, val userId: String)
val context = OrderContext(orderId = "ORDER-123", userId = "USER-456")
val publishCommand = PublishCommand(
message = message,
qos = QOSLevel.QOS_1,
userContext = context
)
// Retrieve in listener
pulseMqttKit.addListener(object : MqttUpdatesListener {
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
val resultData = result.data as? CommandResultData
val orderContext = resultData?.userContext as? OrderContext
println("Order ${orderContext?.orderId} published successfully")
}
})
Validation¶
Commands are automatically validated before execution:
// This will fail validation (empty topic)
val invalidPublish = PublishCommand(
message = ZMqttMessage(topic = "", payload = "test"),
qos = QOSLevel.QOS_0
)
// Validation will fail, command will not execute
Command Results¶
All commands produce a CommandResult which can be one of:
Success- Command executed successfullyFailure- Command failed after retriesIgnored- Command skipped due to current stateNone- No meaningful result (e.g., null client)
Results are delivered via MqttUpdatesListener:
pulseMqttKit.addListener(object : MqttUpdatesListener {
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
when (command) {
is ConnectCommand -> println("Connected successfully")
is SubscribeCommand -> println("Subscribed to topics")
is PublishCommand -> println("Message published")
}
}
override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
println("Command ${command.type} failed: ${result.error.message}")
println("After ${result.attempt} attempts")
}
override fun onCommandIgnored(command: MqttCommand, result: CommandResult.Ignored) {
println("Command ${command.type} ignored: ${result.reason}")
}
})
Best Practices¶
1. Always Use Retry Policies for Connection Commands¶
// ✅ Good - With retry policy
val connectCommand = ConnectCommand(
connectionOptions = options,
retryPolicy = RetryPolicy.exponential(maxRetries = 3)
)
// ❌ Bad - No retry policy
val connectCommand = ConnectCommand(connectionOptions = options)
2. Exclude Non-Retryable Errors¶
val connectCommand = ConnectCommand(
connectionOptions = options,
retryPolicy = RetryPolicy.exponential(
maxRetries = 3,
excludedExceptionCodes = hashSetOf(
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED
)
)
)
3. Use Dependencies for Sequential Operations¶
val connectCmd = ConnectCommand(connectionOptions = options)
val subscribeCmd = SubscribeCommand(
topicConfigs = configs,
dependencies = mutableListOf(connectCmd)
)
4. Set Appropriate Timeouts¶
// For slow networks
val connectCommand = ConnectCommand(
connectionOptions = options,
commandTimeoutMillis = 60000L // 60 seconds
)
Thread Safety¶
All commands are thread-safe and can be submitted from any thread. Command execution happens asynchronously on the coroutine scope provided by PulseMqttKitBridge.
See Also¶
- Connection Options - Configuration for connections
- Retry Policies - Retry strategy configuration
- Data Classes - Result and configuration classes