Skip to content

Commands API

Commands are the core building blocks of Pulse MQTT's operation model. Each command represents a discrete MQTT operation that can be queued, retried, monitored, and have dependencies.

Overview

All commands extend the abstract MqttCommand class and follow a consistent lifecycle:

  1. Creation - Command is instantiated with configuration
  2. Submission - Command is submitted via PulseMqttKit.submitCommand()
  3. Validation - Command parameters are validated
  4. Execution - Command is executed (potentially with retries)
  5. Result - Result is reported via MqttUpdatesListener

Command Types

ConnectCommand

Establishes a connection to an MQTT broker.

Constructor Parameters

Parameter Type Default Description
connectionOptions ConnectionOptions Required Configuration for the MQTT connection
retryPolicy RetryPolicy RetryPolicy.none() Policy for retrying failed connection attempts
commandTimeoutMillis Long 120000L Maximum time to wait for connection (120 seconds)
dependencies List<MqttCommand> emptyList() Commands that must complete before this one
userContext Any? null Optional context object passed back in callbacks

Usage Example

val connectCommand = ConnectCommand(
    connectionOptions = ConnectionOptions(
        serverUri = "tcp://broker.example.com:1883",
        clientId = "my-client-id",
        username = "user",
        password = "pass",
        cleanSession = false,
        automaticReconnect = true
    ),
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 3,
        baseDelayMillis = 2000,
        excludedExceptionCodes = hashSetOf(
            MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED
        )
    )
)

pulseMqttKit.submitCommand(connectCommand)

Connection Behavior

  • If already connected: Command is ignored with reason
  • If connection in progress: Command is ignored with reason
  • Otherwise: Initiates new connection attempt

Auto-Subscription

If autoSubscriptionConfig.enabled is true in ConnectionOptions, the library will automatically resubscribe to configured topics after connection.


DisconnectCommand

Gracefully disconnects from the MQTT broker.

Constructor Parameters

Parameter Type Default Description
retryPolicy RetryPolicy RetryPolicy.none() Policy for retrying failed disconnect attempts
commandTimeoutMillis Long 60000L Maximum time to wait for disconnection (60 seconds)
quiesceTimeout Long 0L Time in milliseconds to wait for pending operations
userContext Any? null Optional context object passed back in callbacks
dependencies MutableList<MqttCommand> mutableListOf() Commands that must complete before this one

Usage Example

val disconnectCommand = DisconnectCommand(
    quiesceTimeout = 5000L, // Allow 5 seconds for pending operations
    retryPolicy = RetryPolicy.sequential(
        maxRetries = 2,
        delayMillis = 1000
    )
)

pulseMqttKit.submitCommand(disconnectCommand)

Disconnection Behavior

  • If not connected: Command is ignored with reason
  • If connected or connecting: Attempts graceful disconnection
  • Waits up to quiesceTimeout milliseconds for pending operations

Quiesce Timeout

The quiesce timeout allows time for: - In-flight messages to be sent - Acknowledgements to be received - Clean disconnection handshake

If timeout is 0, disconnection happens immediately without waiting.


PublishCommand

Publishes a message to an MQTT topic.

Constructor Parameters

Parameter Type Default Description
message ZMqttMessage Required The message to publish containing topic, payload
qos QOSLevel Required Quality of Service level for message delivery
userContext Any? null Optional context object passed back in callbacks
commandTimeoutMillis Long 60000L Maximum time to wait for publish completion
retryPolicy RetryPolicy RetryPolicy.none() Policy for retrying failed publish attempts
dependencies MutableList<MqttCommand> mutableListOf() Commands that must complete before this one

Usage Example

// Create a message
val message = ZMqttMessage(
    topic = "orders/123/status",
    payload = """{"orderId": "123", "status": "delivered"}"""
)

// Publish with QoS 1 and retry policy
val publishCommand = PublishCommand(
    message = message,
    qos = QOSLevel.QOS_1,
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 3,
        baseDelayMillis = 1000
    )
)

pulseMqttKit.submitCommand(publishCommand)

Message Payload

The payload should be a valid UTF-8 encoded string. For structured data, serialize to JSON before publishing:

data class OrderUpdate(val orderId: String, val status: String)

val orderUpdate = OrderUpdate(orderId = "123", status = "delivered")
val payload = gson.toJson(orderUpdate)
val message = ZMqttMessage(topic = "orders/123/status", payload = payload)

SubscribeCommand

Subscribes to one or more MQTT topics with type-safe message deserialization.

Constructor Parameters

Parameter Type Default Description
topicConfigs Map<String, TopicTypeConfig<*>> Required Map of topic filters to their type configurations
retryPolicy RetryPolicy RetryPolicy.none() Policy for retrying failed subscription attempts
commandTimeoutMillis Long 60000L Maximum time to wait for subscription
dependencies MutableList<MqttCommand> mutableListOf() Commands that must complete before this one
userContext Any? null Optional context object passed back in callbacks

Usage Example

// Define message types
data class OrderUpdate(val orderId: String, val status: String)
data class RiderLocation(val lat: Double, val lon: Double)

val subscribeCommand = SubscribeCommand(
    topicConfigs = mapOf(
        "orders/updates" to TopicTypeConfig(
            type = OrderUpdate::class.java,
            qos = QOSLevel.QOS_1
        ),
        "rider/location" to TopicTypeConfig(
            type = RiderLocation::class.java,
            qos = QOSLevel.QOS_0
        )
    ),
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 3,
        baseDelayMillis = 1000
    )
)

pulseMqttKit.submitCommand(subscribeCommand)

Type-Safe Deserialization

Messages received on subscribed topics are automatically deserialized to the specified type using Gson. Invalid messages that can't be deserialized are logged as errors and not delivered to listeners.

Auto-Subscription

If auto-subscription is enabled in ConnectionOptions, subscriptions are automatically re-established after reconnection.


UnsubscribeCommand

Unsubscribes from MQTT topics.

Constructor Parameters

Parameter Type Default Description
topics MutableList<String> mutableListOf() List of topic filters to unsubscribe from
retryPolicy RetryPolicy RetryPolicy.none() Policy for retrying failed unsubscription attempts
commandTimeoutMillis Long 60000L Maximum time to wait for unsubscription
dependencies List<MqttCommand> emptyList() Commands that must complete before this one
userContext Any? null Optional context object passed back in callbacks

Usage Examples

Unsubscribe from all topics:

val unsubscribeCommand = UnsubscribeCommand(
    retryPolicy = RetryPolicy.sequential(
        maxRetries = 2,
        delayMillis = 1000
    )
)

pulseMqttKit.submitCommand(unsubscribeCommand)

Unsubscribe from specific topics:

val unsubscribeCommand = UnsubscribeCommand(
    topics = mutableListOf(
        "orders/+/updates",
        "rider/+/location"
    ),
    retryPolicy = RetryPolicy.none()
)

pulseMqttKit.submitCommand(unsubscribeCommand)

Behavior

  • If topics list is empty: Unsubscribes from ALL currently active subscriptions
  • If topics list contains topics: Unsubscribes from specified topics only
  • Topics not currently subscribed are ignored without error

Common Command Features

Command Dependencies

Commands can depend on other commands, ensuring they execute in a specific order:

val connectCommand = ConnectCommand(connectionOptions = options)

val subscribeCommand = SubscribeCommand(
    topicConfigs = topicConfigs,
    dependencies = mutableListOf(connectCommand) // Wait for connection first
)

pulseMqttKit.submitCommand(connectCommand)
pulseMqttKit.submitCommand(subscribeCommand)

Command Timeout

Each command has a timeout to prevent indefinite hanging:

val connectCommand = ConnectCommand(
    connectionOptions = options,
    commandTimeoutMillis = 30000L // 30 second timeout
)

User Context

Pass custom context objects through the command lifecycle:

data class OrderContext(val orderId: String, val userId: String)

val context = OrderContext(orderId = "ORDER-123", userId = "USER-456")

val publishCommand = PublishCommand(
    message = message,
    qos = QOSLevel.QOS_1,
    userContext = context
)

// Retrieve in listener
pulseMqttKit.addListener(object : MqttUpdatesListener {
    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        val resultData = result.data as? CommandResultData
        val orderContext = resultData?.userContext as? OrderContext
        println("Order ${orderContext?.orderId} published successfully")
    }
})

Validation

Commands are automatically validated before execution:

// This will fail validation (empty topic)
val invalidPublish = PublishCommand(
    message = ZMqttMessage(topic = "", payload = "test"),
    qos = QOSLevel.QOS_0
)
// Validation will fail, command will not execute

Command Results

All commands produce a CommandResult which can be one of:

  • Success - Command executed successfully
  • Failure - Command failed after retries
  • Ignored - Command skipped due to current state
  • None - No meaningful result (e.g., null client)

Results are delivered via MqttUpdatesListener:

pulseMqttKit.addListener(object : MqttUpdatesListener {
    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        when (command) {
            is ConnectCommand -> println("Connected successfully")
            is SubscribeCommand -> println("Subscribed to topics")
            is PublishCommand -> println("Message published")
        }
    }

    override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
        println("Command ${command.type} failed: ${result.error.message}")
        println("After ${result.attempt} attempts")
    }

    override fun onCommandIgnored(command: MqttCommand, result: CommandResult.Ignored) {
        println("Command ${command.type} ignored: ${result.reason}")
    }
})

Best Practices

1. Always Use Retry Policies for Connection Commands

// ✅ Good - With retry policy
val connectCommand = ConnectCommand(
    connectionOptions = options,
    retryPolicy = RetryPolicy.exponential(maxRetries = 3)
)

// ❌ Bad - No retry policy
val connectCommand = ConnectCommand(connectionOptions = options)

2. Exclude Non-Retryable Errors

val connectCommand = ConnectCommand(
    connectionOptions = options,
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 3,
        excludedExceptionCodes = hashSetOf(
            MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED
        )
    )
)

3. Use Dependencies for Sequential Operations

val connectCmd = ConnectCommand(connectionOptions = options)
val subscribeCmd = SubscribeCommand(
    topicConfigs = configs,
    dependencies = mutableListOf(connectCmd)
)

4. Set Appropriate Timeouts

// For slow networks
val connectCommand = ConnectCommand(
    connectionOptions = options,
    commandTimeoutMillis = 60000L // 60 seconds
)

Thread Safety

All commands are thread-safe and can be submitted from any thread. Command execution happens asynchronously on the coroutine scope provided by PulseMqttKitBridge.


See Also