Skip to content

Command-Based Architecture

Pulse MQTT uses a command-based architecture that provides a clean, testable, and reliable way to execute MQTT operations. This design pattern treats every MQTT operation as a command object that encapsulates all necessary information and behavior.

Overview

Instead of directly calling MQTT methods, you submit commands to the library. Each command is:

  • Self-contained: Includes all parameters and logic needed for execution
  • Retryable: Can be automatically retried with configurable policies
  • Trackable: Has lifecycle callbacks through listeners
  • Composable: Can have dependencies on other commands

Command Types

Pulse MQTT provides five core command types:

ConnectCommand

Establishes a connection to the MQTT broker.

val connectCommand = ConnectCommand(
    connectionOptions = ConnectionOptions(
        serverUri = "tcp://broker.example.com:1883",
        clientId = "my-app-${userId}",
        username = "mqtt_user",
        password = "secure_password",
        cleanSession = false,
        automaticReconnect = true
    ),
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 3,
        baseDelayMillis = 2000
    )
)

pulseMqttKit.submitCommand(connectCommand)

Features: - Automatic duplicate connection prevention - Connection-in-progress detection - Auto-subscription support after successful connection - Last Will and Testament (LWT) configuration

SubscribeCommand

Subscribes to one or more MQTT topics with type-safe deserialization.

data class OrderUpdate(val orderId: String, val status: String)

val subscribeCommand = SubscribeCommand(
    topicConfigs = mapOf(
        "orders/+/updates" to TopicTypeConfig(
            messageType = OrderUpdate::class.java,
            qosLevel = QOSLevel.QOS_1
        )
    ),
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 3,
        baseDelayMillis = 1000
    )
)

pulseMqttKit.submitCommand(subscribeCommand)

Features: - Multiple topics in single command - Type-safe message deserialization - Automatic tracking for auto-resubscription - Flexible QoS configuration per topic

PublishCommand

Publishes a message to an MQTT topic.

val message = ZMqttMessage(
    topic = "orders/123/status",
    payload = """{"orderId": "123", "status": "delivered"}""",
    retained = false
)

val publishCommand = PublishCommand(
    message = message,
    qos = QOSLevel.QOS_1,
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 3,
        baseDelayMillis = 1000
    )
)

pulseMqttKit.submitCommand(publishCommand)

Features: - Configurable QoS levels (0, 1, 2) - Retained message support - Fire-and-forget or acknowledged delivery - Automatic retry on transient failures

DisconnectCommand

Gracefully disconnects from the MQTT broker.

val disconnectCommand = DisconnectCommand(
    quiesceTimeout = 5000L, // Wait 5 seconds for pending operations
    retryPolicy = RetryPolicy.sequential(
        maxRetries = 2,
        delayMillis = 1000
    )
)

pulseMqttKit.submitCommand(disconnectCommand)

Features: - Graceful disconnection with quiesce timeout - Pending message completion - Subscription preservation for reconnection - Clean connection teardown

UnsubscribeCommand

Unsubscribes from topics.

// Unsubscribe from specific topics
val unsubscribeCommand = UnsubscribeCommand(
    topics = mutableListOf(
        "orders/+/updates",
        "rider/+/location"
    )
)

// Unsubscribe from all topics
val unsubscribeAllCommand = UnsubscribeCommand() // Empty list = all topics

pulseMqttKit.submitCommand(unsubscribeCommand)

Lifecycle Callbacks

Track command execution through [MqttUpdatesListener]:

class MyMqttListener : MqttUpdatesListener {
    override fun onCommandSubmitted(command: MqttCommand) {
        logger.debug("Command submitted: ${command.type}")
    }

    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        logger.info("Command succeeded: ${command.type} in ${result.totalTimeMillis}ms")
    }

    override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
        logger.error("Command failed: ${command.type}", result.error)
    }

    override fun onCommandIgnored(command: MqttCommand, result: CommandResult.Ignored) {
        logger.warning("Command ignored: ${result.reason}")
    }
}

pulseMqttKit.addListener(MyMqttListener())

Command Dependencies

Commands can depend on other commands, ensuring execution order:

val connectCommand = ConnectCommand(connectionOptions = options)

val subscribeCommand = SubscribeCommand(
    topicConfigs = subscriptions,
    dependencies = mutableListOf(connectCommand) // Wait for connection
)

// Submit both - subscribe will wait for connect
pulseMqttKit.submitCommand(connectCommand)
pulseMqttKit.submitCommand(subscribeCommand)

Benefits of Command Architecture

1. Separation of Concerns

Commands encapsulate operation logic separate from execution infrastructure.

2. Retry Policies

Built-in retry support with pluggable strategies:

val command = ConnectCommand(
    connectionOptions = options,
    retryPolicy = RetryPolicy.exponential(
        maxRetries = 5,
        baseDelayMillis = 1000,
        maxDelayMillis = 30000,
        excludedExceptionCodes = hashSetOf(
            MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED
        )
    )
)

3. Composability

Commands can be chained and composed:

// Connect -> Subscribe -> Publish workflow
val connectCmd = ConnectCommand(options)
val subscribeCmd = SubscribeCommand(topics, dependencies = mutableListOf(connectCmd))
val publishCmd = PublishCommand(message, QOSLevel.QOS_1, dependencies = mutableListOf(subscribeCmd))

4. Monitoring and Debugging

Complete visibility into operation lifecycle:

override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
    metrics.recordCommandSuccess(
        type = command.type,
        duration = result.totalTimeMillis,
        attempts = result.attempt
    )
}

Advanced Usage

Custom Command Context

Pass custom context through commands:

data class OrderContext(val orderId: String, val userId: String)

val context = OrderContext(orderId = "123", userId = "user-456")

val command = PublishCommand(
    message = message,
    qos = QOSLevel.QOS_1,
    userContext = context
)

// Retrieve context in callback
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
    val context = result.data?.userContext as? OrderContext
    logger.info("Order ${context?.orderId} published successfully")
}

Command Timeout

Configure per-command timeouts:

val command = ConnectCommand(
    connectionOptions = options,
    commandTimeoutMillis = 30_000L // 30 seconds timeout
)

Bulk Operations

Submit multiple commands efficiently:

// Subscribe to multiple topic groups
val commands = listOf(
    SubscribeCommand(topicConfigs = orderTopics),
    SubscribeCommand(topicConfigs = riderTopics),
    SubscribeCommand(topicConfigs = notificationTopics)
)

commands.forEach { pulseMqttKit.submitCommand(it) }

Next Steps