Skip to content

Type-Safe Messages

Pulse MQTT provides automatic JSON deserialization of MQTT messages into strongly-typed Kotlin data classes. This eliminates manual parsing, reduces errors, and enables compile-time type checking.

Overview

Traditional MQTT message handling:

// ❌ Traditional approach - manual parsing, error-prone
fun onMessageReceived(topic: String, payload: String) {
    try {
        val json = JSONObject(payload)
        val orderId = json.getString("orderId")
        val status = json.getString("status")
        // Handle message...
    } catch (e: Exception) {
        // Handle parsing error
    }
}

Pulse MQTT type-safe approach:

// ✅ Type-safe approach - automatic deserialization
data class OrderUpdate(
    val orderId: String,
    val status: String,
    val timestamp: Long
)

fun onMqttMessageReceived(topic: String?, payload: String?, topicMessage: TopicMessage<*>?) {
    when (topicMessage) {
        is TopicMessage.Deserialized<*> -> {
            when (val data = topicMessage.data) {
                is OrderUpdate -> handleOrderUpdate(data)
            }
        }
    }
}

Configuration

1. Define Message Types

Create data classes for your message structures:

data class OrderUpdate(
    @SerializedName("order_id")
    val orderId: String,

    @SerializedName("status")
    val status: OrderStatus,

    @SerializedName("timestamp")
    val timestamp: Long,

    @SerializedName("rider_info")
    val riderInfo: RiderInfo? = null
)

data class RiderInfo(
    @SerializedName("rider_id")
    val riderId: String,

    @SerializedName("name")
    val name: String,

    @SerializedName("phone")
    val phone: String
)

enum class OrderStatus {
    @SerializedName("pending") PENDING,
    @SerializedName("confirmed") CONFIRMED,
    @SerializedName("preparing") PREPARING,
    @SerializedName("ready") READY,
    @SerializedName("picked_up") PICKED_UP,
    @SerializedName("delivered") DELIVERED
}

2. Configure Topic Mappings

Associate topics with their message types:

val subscribeCommand = SubscribeCommand(
    topicConfigs = mapOf(
        "orders/+/updates" to TopicTypeConfig(
            messageType = OrderUpdate::class.java,
            qosLevel = QOSLevel.QOS_1
        ),
        "rider/+/location" to TopicTypeConfig(
            messageType = RiderLocation::class.java,
            qosLevel = QOSLevel.QOS_0
        ),
        "notifications/+" to TopicTypeConfig(
            messageType = Notification::class.java,
            qosLevel = QOSLevel.QOS_1
        )
    )
)

pulseMqttKit.submitCommand(subscribeCommand)

3. Handle Deserialized Messages

Receive strongly-typed messages in your listener:

class MyMqttListener : MqttUpdatesListener {
    override fun onMqttMessageReceived(
        topic: String?,
        payload: String?,
        topicMessage: TopicMessage<*>?
    ) {
        when (topicMessage) {
            is TopicMessage.Deserialized<*> -> {
                // Successfully deserialized - type-safe handling
                when (val data = topicMessage.data) {
                    is OrderUpdate -> handleOrderUpdate(topic, data)
                    is RiderLocation -> handleRiderLocation(topic, data)
                    is Notification -> handleNotification(topic, data)
                }
            }

            is TopicMessage.Plain -> {
                // No type mapping - received as plain string
                logger.debug("Plain message on $topic: $payload")
            }

            is TopicMessage.Error -> {
                // Deserialization failed
                logger.error("Failed to deserialize message on $topic: ${topicMessage.message}", 
                            topicMessage.error)
            }
        }
    }

    private fun handleOrderUpdate(topic: String?, order: OrderUpdate) {
        logger.info("Order ${order.orderId} status: ${order.status}")

        // Type-safe access to all fields
        when (order.status) {
            OrderStatus.DELIVERED -> {
                showDeliveryNotification(order)
                updateOrderHistory(order)
            }
            OrderStatus.PICKED_UP -> {
                order.riderInfo?.let { rider ->
                    showRiderTrackingUI(rider.riderId, rider.name)
                }
            }
            else -> updateOrderStatus(order)
        }
    }
}

TopicMessage Types

The TopicMessage sealed class represents the deserialization result:

Deserialized

Successfully parsed message with typed data:

is TopicMessage.Deserialized<*> -> {
    val data = topicMessage.data
    // Access type-safe properties
}

Plain

Message received on topic without type mapping:

is TopicMessage.Plain -> {
    val plainText = topicMessage.message
    // Handle as string
}

Error

Deserialization failed with error details:

is TopicMessage.Error -> {
    logger.error("Deserialization failed: ${topicMessage.message}", 
                topicMessage.error)
    // Handle error (malformed JSON, type mismatch, etc.)
}

Advanced Features

Nested Objects

Complex nested structures are fully supported:

data class DeliveryUpdate(
    val orderId: String,
    val delivery: DeliveryInfo,
    val customer: CustomerInfo,
    val restaurant: RestaurantInfo
)

data class DeliveryInfo(
    val estimatedTime: Long,
    val currentLocation: Location,
    val route: List<LatLng>
)

data class Location(
    val latitude: Double,
    val longitude: Double,
    val accuracy: Float,
    val timestamp: Long
)

Collections

Lists, sets, and maps are automatically deserialized:

data class OrderBatch(
    val batchId: String,
    val orders: List<OrderSummary>,
    val metadata: Map<String, String>,
    val tags: Set<String>
)

data class OrderSummary(
    val orderId: String,
    val amount: Double,
    val items: Int
)

Optional Fields

Use nullable types for optional fields:

data class OrderUpdate(
    val orderId: String,
    val status: String,

    // Optional fields
    val estimatedTime: Long? = null,
    val cancellationReason: String? = null,
    val specialInstructions: String? = null
)

Custom Gson Configuration

Provide a custom Gson instance through the bridge:

class MyBridge : PulseMqttKitBridge {
    override fun getAppGson(): Gson {
        return GsonBuilder()
            .setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
            .registerTypeAdapter(LocalDateTime::class.java, LocalDateTimeAdapter())
            .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
            .create()
    }
}

Best Practices

1. Use Data Classes

// ✅ Good - Immutable, concise, auto-generated methods
data class OrderUpdate(
    val orderId: String,
    val status: String
)

// ❌ Avoid - Mutable, verbose
class OrderUpdate {
    var orderId: String = ""
    var status: String = ""
}

2. Provide Default Values

// ✅ Good - Graceful handling of missing fields
data class OrderUpdate(
    val orderId: String,
    val status: String,
    val priority: Int = 0,
    val notes: String = ""
)

3. Use SerializedName Annotations

// ✅ Good - Maps snake_case JSON to camelCase Kotlin
data class OrderUpdate(
    @SerializedName("order_id")
    val orderId: String,

    @SerializedName("created_at")
    val createdAt: Long
)

4. Validate Critical Fields

data class OrderUpdate(
    val orderId: String,
    val status: String
) {
    init {
        require(orderId.isNotBlank()) { "Order ID cannot be blank" }
        require(status.isNotBlank()) { "Status cannot be blank" }
    }
}

5. Handle Deserialization Errors

override fun onMqttMessageReceived(
    topic: String?,
    payload: String?,
    topicMessage: TopicMessage<*>?
) {
    when (topicMessage) {
        is TopicMessage.Error -> {
            logger.error("""
                Deserialization error on topic: $topic
                Payload: $payload
                Error: ${topicMessage.error.message}
            """.trimIndent(), topicMessage.error)

            // Send error to monitoring
            crashlytics.recordException(topicMessage.error)

            // Fall back to plain text processing if needed
            handlePlainMessage(topic, payload)
        }

        else -> handleTypedMessage(topicMessage)
    }
}

Common Patterns

Pattern 1: Message Polymorphism

Handle different message types on the same topic:

sealed class NotificationMessage {
    data class OrderNotification(
        val orderId: String,
        val message: String
    ) : NotificationMessage()

    data class SystemNotification(
        val level: String,
        val message: String
    ) : NotificationMessage()
}

// Custom deserializer
class NotificationAdapter : JsonDeserializer<NotificationMessage> {
    override fun deserialize(
        json: JsonElement,
        typeOfT: Type,
        context: JsonDeserializationContext
    ): NotificationMessage {
        val jsonObject = json.asJsonObject
        return when (jsonObject.get("type").asString) {
            "order" -> context.deserialize(json, OrderNotification::class.java)
            "system" -> context.deserialize(json, SystemNotification::class.java)
            else -> throw JsonParseException("Unknown notification type")
        }
    }
}

Pattern 2: Versioned Messages

Handle multiple message versions:

sealed class OrderUpdateMessage {
    abstract val orderId: String
    abstract val status: String

    data class V1(
        override val orderId: String,
        override val status: String
    ) : OrderUpdateMessage()

    data class V2(
        override val orderId: String,
        override val status: String,
        val timestamp: Long,
        val metadata: Map<String, String>
    ) : OrderUpdateMessage()
}

// Version detection and deserialization
fun deserializeOrderUpdate(json: String): OrderUpdateMessage {
    val jsonObject = JsonParser.parseString(json).asJsonObject
    val version = jsonObject.get("version")?.asInt ?: 1

    return when (version) {
        1 -> gson.fromJson(json, OrderUpdateMessage.V1::class.java)
        2 -> gson.fromJson(json, OrderUpdateMessage.V2::class.java)
        else -> throw IllegalArgumentException("Unsupported version: $version")
    }
}

Pattern 3: Type-Safe Topic Routing

Route messages based on type:

class TypeSafeRouter : MqttUpdatesListener {
    private val handlers = mutableMapOf<KClass<*>, (Any) -> Unit>()

    fun <T : Any> registerHandler(type: KClass<T>, handler: (T) -> Unit) {
        handlers[type] = { handler(it as T) }
    }

    override fun onMqttMessageReceived(
        topic: String?,
        payload: String?,
        topicMessage: TopicMessage<*>?
    ) {
        if (topicMessage is TopicMessage.Deserialized<*>) {
            val data = topicMessage.data ?: return
            handlers[data::class]?.invoke(data)
        }
    }
}

// Usage
val router = TypeSafeRouter()
router.registerHandler(OrderUpdate::class) { order ->
    handleOrder(order)
}
router.registerHandler(RiderLocation::class) { location ->
    updateMap(location)
}

Next Steps