Skip to content

Data Classes

Pulse MQTT uses several data classes to represent MQTT messages, command results, configuration, and error codes. These classes provide type-safe, structured data throughout the library.

Overview

Class Purpose
CommandResult Represents command execution outcomes
CommandResultData Carries contextual information with results
TopicTypeConfig Configures type-safe topic subscriptions
QOSLevel MQTT Quality of Service levels
ZMqttMessage MQTT message for publishing
MqttExceptionCode MQTT protocol exception codes

CommandResult

Sealed class representing the outcome of an MQTT command execution.

Structure

sealed class CommandResult {
    data class Success(...)
    data class Failure(...)
    data class Ignored(...)
    data class None(...)
}

Result Types

Success

Command executed successfully.

data class Success(
    val data: Any? = null,
    val totalTimeMillis: Long,
    val metadata: Map<String, Any> = emptyMap(),
    val attempt: Int = 1
)

Properties:

  • data - Optional result data, typically CommandResultData
  • totalTimeMillis - Total execution time from submission to completion
  • metadata - Additional contextual information about the execution
  • attempt - Attempt number (1 for first try, increments with retries)

Example:

override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
    println("Command succeeded in ${result.totalTimeMillis}ms")
    println("Attempt: ${result.attempt}")

    val resultData = result.data as? CommandResultData
    val userContext = resultData?.userContext
}

Failure

Command execution failed after exhausting retries.

data class Failure(
    val data: Any? = null,
    val error: Throwable,
    val totalTimeMillis: Long,
    val attempt: Int = 1,
    val metadata: Map<String, Any> = emptyMap()
)

Properties:

  • data - Optional context data, typically CommandResultData
  • error - The throwable that caused the failure
  • totalTimeMillis - Total time from submission to failure
  • attempt - Attempt number when failure occurred
  • metadata - Additional debugging information

Example:

override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
    println("Command failed: ${result.error.message}")
    println("After ${result.attempt} attempts")
    println("Total time: ${result.totalTimeMillis}ms")

    // Log for analytics
    analytics.logError(command.type, result.error)
}

Ignored

Command was not executed due to current state.

data class Ignored(
    val data: Any? = null,
    val reason: String,
    val totalTimeMillis: Long,
    val metadata: Map<String, Any> = emptyMap(),
    val attempt: Int = 1
)

Properties:

  • data - Optional context data, typically CommandResultData
  • reason - Human-readable explanation for why command was ignored
  • totalTimeMillis - Time taken to determine command should be ignored
  • metadata - Additional contextual information
  • attempt - Attempt number when command was ignored

Common Reasons:

  • Attempting to connect when already connected
  • Attempting to disconnect when already disconnected
  • Attempting operations on null/invalid client

Example:

override fun onCommandIgnored(command: MqttCommand, result: CommandResult.Ignored) {
    println("Command ignored: ${result.reason}")
    // Don't treat as error, just log
    logger.debug("${command.type} ignored: ${result.reason}")
}

None

Placeholder result when no meaningful result is available.

data class None(
    val totalTimeMillis: Long
)

Properties:

  • totalTimeMillis - Time taken before returning None result

Typically returned when MQTT client is null or command cannot be executed.

Example:

override fun onCommandNone(command: MqttCommand, result: CommandResult.None) {
    println("No result available for ${command.type}")
    // May indicate initialization issue
}

Usage with Listeners

pulseMqttKit.addListener(object : MqttUpdatesListener {
    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        when (command) {
            is ConnectCommand -> handleConnectSuccess(result)
            is SubscribeCommand -> handleSubscribeSuccess(result)
            is PublishCommand -> handlePublishSuccess(result)
        }
    }

    override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
        // Retry manually if needed
        if (result.attempt >= command.retryPolicy.maxRetries) {
            alertUser("Connection failed after ${result.attempt} attempts")
        }
    }

    override fun onCommandIgnored(command: MqttCommand, result: CommandResult.Ignored) {
        // Handle gracefully
        logger.info("Command ${command.type} ignored: ${result.reason}")
    }
})

CommandResultData

Data container for passing contextual information with command results.

Structure

class CommandResultData(
    var userContext: Any? = null,
    var commandId: String? = null
)

Properties

userContext

Type: Any?

Application-specific context object that flows through the command pipeline.

data class OrderContext(val orderId: String, val userId: String)

val context = OrderContext(orderId = "ORDER-123", userId = "USER-456")
val publishCommand = PublishCommand(
    message = message,
    qos = QOSLevel.QOS_1,
    userContext = context
)

commandId

Type: String?

Unique identifier for command tracking and correlation.

val commandId = UUID.randomUUID().toString()
val connectCommand = ConnectCommand(
    connectionOptions = options,
    userContext = commandId
)

Usage Example

// Store context when submitting command
data class MessageContext(
    val messageId: String,
    val priority: Int,
    val timestamp: Long
)

val context = MessageContext(
    messageId = "MSG-${UUID.randomUUID()}",
    priority = 1,
    timestamp = System.currentTimeMillis()
)

val publishCommand = PublishCommand(
    message = message,
    qos = QOSLevel.QOS_1,
    userContext = context
)

pulseMqttKit.submitCommand(publishCommand)

// Retrieve context in listener
pulseMqttKit.addListener(object : MqttUpdatesListener {
    override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
        val resultData = result.data as? CommandResultData
        val messageContext = resultData?.userContext as? MessageContext

        messageContext?.let {
            val latency = System.currentTimeMillis() - it.timestamp
            println("Message ${it.messageId} published in ${latency}ms")
        }
    }

    override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
        val resultData = result.data as? CommandResultData
        val messageContext = resultData?.userContext as? MessageContext

        messageContext?.let {
            println("Failed to publish message ${it.messageId}")
            retryHighPriorityMessage(it)
        }
    }
})

TopicTypeConfig

Configuration for type-safe MQTT topic subscription.

Structure

class TopicTypeConfig<T>(
    val messageType: Class<T>?,
    val qosLevel: QOSLevel,
    val expiryTimeStamp: Long? = null
)

Properties

messageType

Type: Class<T>?

The Java class of the expected message type for automatic deserialization.

data class OrderUpdate(val orderId: String, val status: String)

val config = TopicTypeConfig(
    messageType = OrderUpdate::class.java,
    qosLevel = QOSLevel.QOS_1
)

qosLevel

Type: QOSLevel

The Quality of Service level for subscription.

qosLevel = QOSLevel.QOS_1  // At least once delivery

expiryTimeStamp

Type: Long? (Optional)

Unix timestamp when subscription should expire and be automatically unsubscribed.

// Expire after 1 hour
expiryTimeStamp = System.currentTimeMillis() + 3600000

Usage Example

// Define message types
data class OrderUpdate(
    val orderId: String,
    val status: String,
    val timestamp: Long
)

data class RiderLocation(
    val riderId: String,
    val latitude: Double,
    val longitude: Double,
    val accuracy: Float
)

// Create topic configurations
val orderConfig = TopicTypeConfig(
    messageType = OrderUpdate::class.java,
    qosLevel = QOSLevel.QOS_1,
    expiryTimeStamp = System.currentTimeMillis() + 3600000  // 1 hour
)

val locationConfig = TopicTypeConfig(
    messageType = RiderLocation::class.java,
    qosLevel = QOSLevel.QOS_0  // Real-time, no need for guaranteed delivery
)

// Subscribe with type-safe configs
val subscribeCommand = SubscribeCommand(
    topicConfigs = mapOf(
        "orders/+/updates" to orderConfig,
        "rider/+/location" to locationConfig
    )
)

pulseMqttKit.submitCommand(subscribeCommand)

// Receive type-safe messages
pulseMqttKit.addListener(object : MqttUpdatesListener {
    override fun onMqttMessageReceived(
        topic: String?,
        payload: String?,
        topicMessage: TopicMessage<*>?
    ) {
        when (topicMessage) {
            is TopicMessage.Deserialized<*> -> {
                when (val data = topicMessage.data) {
                    is OrderUpdate -> handleOrderUpdate(data)
                    is RiderLocation -> handleLocationUpdate(data)
                }
            }
            is TopicMessage.Plain -> {
                println("Plain message: $payload")
            }
            is TopicMessage.Error -> {
                println("Deserialization error: ${topicMessage.message}")
            }
        }
    }
})

QOSLevel

MQTT Quality of Service (QoS) levels for message delivery.

Enum Values

enum class QOSLevel(val value: Int) {
    QOS_0(0),  // At most once delivery
    QOS_1(1),  // At least once delivery
    QOS_2(2)   // Exactly once delivery
}

QoS Level Comparison

Level Name Guarantee Overhead
QOS_0 At most once Fire and forget Lowest
QOS_1 At least once Acknowledged Moderate
QOS_2 Exactly once 4-way handshake Highest

QOS_0 - At Most Once

Delivery: Fire and forget (fastest, no guarantee)

val publishCommand = PublishCommand(
    message = temperatureMessage,
    qos = QOSLevel.QOS_0  // Best for high-frequency data
)

QOS_1 - At Least Once

Delivery: Acknowledged (recommended for most use cases)

val publishCommand = PublishCommand(
    message = orderUpdateMessage,
    qos = QOSLevel.QOS_1  // Recommended default
)

QOS_2 - Exactly Once

Delivery: 4-way handshake (slowest, guaranteed unique)

val publishCommand = PublishCommand(
    message = paymentMessage,
    qos = QOSLevel.QOS_2  // For critical operations
)

ZMqttMessage

Represents an MQTT message for publishing.

Structure

class ZMqttMessage(
    var topic: String,
    var payload: String
)

Properties

topic

Type: String (Required, non-empty)

The MQTT topic to publish to.

topic = "devices/device-123/status"

payload

Type: String (Required, non-empty)

The message payload as UTF-8 string.

payload = """{"status": "online", "timestamp": ${System.currentTimeMillis()}}"""

Usage Examples

Simple Text Message

val statusMessage = ZMqttMessage(
    topic = "device/123/status",
    payload = "online"
)

Structured JSON Message

data class OrderUpdate(val orderId: String, val status: String)

val orderUpdate = OrderUpdate("ORDER-123", "delivered")
val orderMessage = ZMqttMessage(
    topic = "orders/123/updates",
    payload = gson.toJson(orderUpdate)
)

Publishing Messages

// Create message
val message = ZMqttMessage(
    topic = "sensors/temperature",
    payload = """{"value": 23.5, "unit": "celsius"}"""
)

// Publish with QoS
val publishCommand = PublishCommand(
    message = message,
    qos = QOSLevel.QOS_1
)

pulseMqttKit.submitCommand(publishCommand)

See Also