Data Classes¶
Pulse MQTT uses several data classes to represent MQTT messages, command results, configuration, and error codes. These classes provide type-safe, structured data throughout the library.
Overview¶
| Class | Purpose |
|---|---|
CommandResult |
Represents command execution outcomes |
CommandResultData |
Carries contextual information with results |
TopicTypeConfig |
Configures type-safe topic subscriptions |
QOSLevel |
MQTT Quality of Service levels |
ZMqttMessage |
MQTT message for publishing |
MqttExceptionCode |
MQTT protocol exception codes |
CommandResult¶
Sealed class representing the outcome of an MQTT command execution.
Structure¶
sealed class CommandResult {
data class Success(...)
data class Failure(...)
data class Ignored(...)
data class None(...)
}
Result Types¶
Success¶
Command executed successfully.
data class Success(
val data: Any? = null,
val totalTimeMillis: Long,
val metadata: Map<String, Any> = emptyMap(),
val attempt: Int = 1
)
Properties:
data- Optional result data, typicallyCommandResultDatatotalTimeMillis- Total execution time from submission to completionmetadata- Additional contextual information about the executionattempt- Attempt number (1 for first try, increments with retries)
Example:
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
println("Command succeeded in ${result.totalTimeMillis}ms")
println("Attempt: ${result.attempt}")
val resultData = result.data as? CommandResultData
val userContext = resultData?.userContext
}
Failure¶
Command execution failed after exhausting retries.
data class Failure(
val data: Any? = null,
val error: Throwable,
val totalTimeMillis: Long,
val attempt: Int = 1,
val metadata: Map<String, Any> = emptyMap()
)
Properties:
data- Optional context data, typicallyCommandResultDataerror- The throwable that caused the failuretotalTimeMillis- Total time from submission to failureattempt- Attempt number when failure occurredmetadata- Additional debugging information
Example:
override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
println("Command failed: ${result.error.message}")
println("After ${result.attempt} attempts")
println("Total time: ${result.totalTimeMillis}ms")
// Log for analytics
analytics.logError(command.type, result.error)
}
Ignored¶
Command was not executed due to current state.
data class Ignored(
val data: Any? = null,
val reason: String,
val totalTimeMillis: Long,
val metadata: Map<String, Any> = emptyMap(),
val attempt: Int = 1
)
Properties:
data- Optional context data, typicallyCommandResultDatareason- Human-readable explanation for why command was ignoredtotalTimeMillis- Time taken to determine command should be ignoredmetadata- Additional contextual informationattempt- Attempt number when command was ignored
Common Reasons:
- Attempting to connect when already connected
- Attempting to disconnect when already disconnected
- Attempting operations on null/invalid client
Example:
override fun onCommandIgnored(command: MqttCommand, result: CommandResult.Ignored) {
println("Command ignored: ${result.reason}")
// Don't treat as error, just log
logger.debug("${command.type} ignored: ${result.reason}")
}
None¶
Placeholder result when no meaningful result is available.
Properties:
totalTimeMillis- Time taken before returning None result
Typically returned when MQTT client is null or command cannot be executed.
Example:
override fun onCommandNone(command: MqttCommand, result: CommandResult.None) {
println("No result available for ${command.type}")
// May indicate initialization issue
}
Usage with Listeners¶
pulseMqttKit.addListener(object : MqttUpdatesListener {
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
when (command) {
is ConnectCommand -> handleConnectSuccess(result)
is SubscribeCommand -> handleSubscribeSuccess(result)
is PublishCommand -> handlePublishSuccess(result)
}
}
override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
// Retry manually if needed
if (result.attempt >= command.retryPolicy.maxRetries) {
alertUser("Connection failed after ${result.attempt} attempts")
}
}
override fun onCommandIgnored(command: MqttCommand, result: CommandResult.Ignored) {
// Handle gracefully
logger.info("Command ${command.type} ignored: ${result.reason}")
}
})
CommandResultData¶
Data container for passing contextual information with command results.
Structure¶
Properties¶
userContext¶
Type: Any?
Application-specific context object that flows through the command pipeline.
data class OrderContext(val orderId: String, val userId: String)
val context = OrderContext(orderId = "ORDER-123", userId = "USER-456")
val publishCommand = PublishCommand(
message = message,
qos = QOSLevel.QOS_1,
userContext = context
)
commandId¶
Type: String?
Unique identifier for command tracking and correlation.
val commandId = UUID.randomUUID().toString()
val connectCommand = ConnectCommand(
connectionOptions = options,
userContext = commandId
)
Usage Example¶
// Store context when submitting command
data class MessageContext(
val messageId: String,
val priority: Int,
val timestamp: Long
)
val context = MessageContext(
messageId = "MSG-${UUID.randomUUID()}",
priority = 1,
timestamp = System.currentTimeMillis()
)
val publishCommand = PublishCommand(
message = message,
qos = QOSLevel.QOS_1,
userContext = context
)
pulseMqttKit.submitCommand(publishCommand)
// Retrieve context in listener
pulseMqttKit.addListener(object : MqttUpdatesListener {
override fun onCommandSuccess(command: MqttCommand, result: CommandResult.Success) {
val resultData = result.data as? CommandResultData
val messageContext = resultData?.userContext as? MessageContext
messageContext?.let {
val latency = System.currentTimeMillis() - it.timestamp
println("Message ${it.messageId} published in ${latency}ms")
}
}
override fun onCommandFailure(command: MqttCommand, result: CommandResult.Failure) {
val resultData = result.data as? CommandResultData
val messageContext = resultData?.userContext as? MessageContext
messageContext?.let {
println("Failed to publish message ${it.messageId}")
retryHighPriorityMessage(it)
}
}
})
TopicTypeConfig¶
Configuration for type-safe MQTT topic subscription.
Structure¶
class TopicTypeConfig<T>(
val messageType: Class<T>?,
val qosLevel: QOSLevel,
val expiryTimeStamp: Long? = null
)
Properties¶
messageType¶
Type: Class<T>?
The Java class of the expected message type for automatic deserialization.
data class OrderUpdate(val orderId: String, val status: String)
val config = TopicTypeConfig(
messageType = OrderUpdate::class.java,
qosLevel = QOSLevel.QOS_1
)
qosLevel¶
Type: QOSLevel
The Quality of Service level for subscription.
expiryTimeStamp¶
Type: Long? (Optional)
Unix timestamp when subscription should expire and be automatically unsubscribed.
Usage Example¶
// Define message types
data class OrderUpdate(
val orderId: String,
val status: String,
val timestamp: Long
)
data class RiderLocation(
val riderId: String,
val latitude: Double,
val longitude: Double,
val accuracy: Float
)
// Create topic configurations
val orderConfig = TopicTypeConfig(
messageType = OrderUpdate::class.java,
qosLevel = QOSLevel.QOS_1,
expiryTimeStamp = System.currentTimeMillis() + 3600000 // 1 hour
)
val locationConfig = TopicTypeConfig(
messageType = RiderLocation::class.java,
qosLevel = QOSLevel.QOS_0 // Real-time, no need for guaranteed delivery
)
// Subscribe with type-safe configs
val subscribeCommand = SubscribeCommand(
topicConfigs = mapOf(
"orders/+/updates" to orderConfig,
"rider/+/location" to locationConfig
)
)
pulseMqttKit.submitCommand(subscribeCommand)
// Receive type-safe messages
pulseMqttKit.addListener(object : MqttUpdatesListener {
override fun onMqttMessageReceived(
topic: String?,
payload: String?,
topicMessage: TopicMessage<*>?
) {
when (topicMessage) {
is TopicMessage.Deserialized<*> -> {
when (val data = topicMessage.data) {
is OrderUpdate -> handleOrderUpdate(data)
is RiderLocation -> handleLocationUpdate(data)
}
}
is TopicMessage.Plain -> {
println("Plain message: $payload")
}
is TopicMessage.Error -> {
println("Deserialization error: ${topicMessage.message}")
}
}
}
})
QOSLevel¶
MQTT Quality of Service (QoS) levels for message delivery.
Enum Values¶
enum class QOSLevel(val value: Int) {
QOS_0(0), // At most once delivery
QOS_1(1), // At least once delivery
QOS_2(2) // Exactly once delivery
}
QoS Level Comparison¶
| Level | Name | Guarantee | Overhead |
|---|---|---|---|
| QOS_0 | At most once | Fire and forget | Lowest |
| QOS_1 | At least once | Acknowledged | Moderate |
| QOS_2 | Exactly once | 4-way handshake | Highest |
QOS_0 - At Most Once¶
Delivery: Fire and forget (fastest, no guarantee)
val publishCommand = PublishCommand(
message = temperatureMessage,
qos = QOSLevel.QOS_0 // Best for high-frequency data
)
QOS_1 - At Least Once¶
Delivery: Acknowledged (recommended for most use cases)
val publishCommand = PublishCommand(
message = orderUpdateMessage,
qos = QOSLevel.QOS_1 // Recommended default
)
QOS_2 - Exactly Once¶
Delivery: 4-way handshake (slowest, guaranteed unique)
val publishCommand = PublishCommand(
message = paymentMessage,
qos = QOSLevel.QOS_2 // For critical operations
)
ZMqttMessage¶
Represents an MQTT message for publishing.
Structure¶
Properties¶
topic¶
Type: String (Required, non-empty)
The MQTT topic to publish to.
payload¶
Type: String (Required, non-empty)
The message payload as UTF-8 string.
Usage Examples¶
Simple Text Message¶
Structured JSON Message¶
data class OrderUpdate(val orderId: String, val status: String)
val orderUpdate = OrderUpdate("ORDER-123", "delivered")
val orderMessage = ZMqttMessage(
topic = "orders/123/updates",
payload = gson.toJson(orderUpdate)
)
Publishing Messages¶
// Create message
val message = ZMqttMessage(
topic = "sensors/temperature",
payload = """{"value": 23.5, "unit": "celsius"}"""
)
// Publish with QoS
val publishCommand = PublishCommand(
message = message,
qos = QOSLevel.QOS_1
)
pulseMqttKit.submitCommand(publishCommand)
See Also¶
- Commands API - Using data classes with commands
- Retry Policies - Exception filtering
- Type-Safe Messages - Message deserialization