Type-Safe Messages¶
Pulse MQTT provides automatic JSON deserialization of MQTT messages into strongly-typed Kotlin data classes. This eliminates manual parsing, reduces errors, and enables compile-time type checking.
Overview¶
Traditional MQTT message handling:
// ❌ Traditional approach - manual parsing, error-prone
fun onMessageReceived(topic: String, payload: String) {
try {
val json = JSONObject(payload)
val orderId = json.getString("orderId")
val status = json.getString("status")
// Handle message...
} catch (e: Exception) {
// Handle parsing error
}
}
Pulse MQTT type-safe approach:
// ✅ Type-safe approach - automatic deserialization
data class OrderUpdate(
val orderId: String,
val status: String,
val timestamp: Long
)
fun onMqttMessageReceived(topic: String?, payload: String?, topicMessage: TopicMessage<*>?) {
when (topicMessage) {
is TopicMessage.Deserialized<*> -> {
when (val data = topicMessage.data) {
is OrderUpdate -> handleOrderUpdate(data)
}
}
}
}
Configuration¶
1. Define Message Types¶
Create data classes for your message structures:
data class OrderUpdate(
@SerializedName("order_id")
val orderId: String,
@SerializedName("status")
val status: OrderStatus,
@SerializedName("timestamp")
val timestamp: Long,
@SerializedName("rider_info")
val riderInfo: RiderInfo? = null
)
data class RiderInfo(
@SerializedName("rider_id")
val riderId: String,
@SerializedName("name")
val name: String,
@SerializedName("phone")
val phone: String
)
enum class OrderStatus {
@SerializedName("pending") PENDING,
@SerializedName("confirmed") CONFIRMED,
@SerializedName("preparing") PREPARING,
@SerializedName("ready") READY,
@SerializedName("picked_up") PICKED_UP,
@SerializedName("delivered") DELIVERED
}
2. Configure Topic Mappings¶
Associate topics with their message types:
val subscribeCommand = SubscribeCommand(
topicConfigs = mapOf(
"orders/+/updates" to TopicTypeConfig(
messageType = OrderUpdate::class.java,
qosLevel = QOSLevel.QOS_1
),
"rider/+/location" to TopicTypeConfig(
messageType = RiderLocation::class.java,
qosLevel = QOSLevel.QOS_0
),
"notifications/+" to TopicTypeConfig(
messageType = Notification::class.java,
qosLevel = QOSLevel.QOS_1
)
)
)
pulseMqttKit.submitCommand(subscribeCommand)
3. Handle Deserialized Messages¶
Receive strongly-typed messages in your listener:
class MyMqttListener : MqttUpdatesListener {
override fun onMqttMessageReceived(
topic: String?,
payload: String?,
topicMessage: TopicMessage<*>?
) {
when (topicMessage) {
is TopicMessage.Deserialized<*> -> {
// Successfully deserialized - type-safe handling
when (val data = topicMessage.data) {
is OrderUpdate -> handleOrderUpdate(topic, data)
is RiderLocation -> handleRiderLocation(topic, data)
is Notification -> handleNotification(topic, data)
}
}
is TopicMessage.Plain -> {
// No type mapping - received as plain string
logger.debug("Plain message on $topic: $payload")
}
is TopicMessage.Error -> {
// Deserialization failed
logger.error("Failed to deserialize message on $topic: ${topicMessage.message}",
topicMessage.error)
}
}
}
private fun handleOrderUpdate(topic: String?, order: OrderUpdate) {
logger.info("Order ${order.orderId} status: ${order.status}")
// Type-safe access to all fields
when (order.status) {
OrderStatus.DELIVERED -> {
showDeliveryNotification(order)
updateOrderHistory(order)
}
OrderStatus.PICKED_UP -> {
order.riderInfo?.let { rider ->
showRiderTrackingUI(rider.riderId, rider.name)
}
}
else -> updateOrderStatus(order)
}
}
}
TopicMessage Types¶
The TopicMessage sealed class represents the deserialization result:
Deserialized¶
Successfully parsed message with typed data:
Plain¶
Message received on topic without type mapping:
Error¶
Deserialization failed with error details:
is TopicMessage.Error -> {
logger.error("Deserialization failed: ${topicMessage.message}",
topicMessage.error)
// Handle error (malformed JSON, type mismatch, etc.)
}
Advanced Features¶
Nested Objects¶
Complex nested structures are fully supported:
data class DeliveryUpdate(
val orderId: String,
val delivery: DeliveryInfo,
val customer: CustomerInfo,
val restaurant: RestaurantInfo
)
data class DeliveryInfo(
val estimatedTime: Long,
val currentLocation: Location,
val route: List<LatLng>
)
data class Location(
val latitude: Double,
val longitude: Double,
val accuracy: Float,
val timestamp: Long
)
Collections¶
Lists, sets, and maps are automatically deserialized:
data class OrderBatch(
val batchId: String,
val orders: List<OrderSummary>,
val metadata: Map<String, String>,
val tags: Set<String>
)
data class OrderSummary(
val orderId: String,
val amount: Double,
val items: Int
)
Optional Fields¶
Use nullable types for optional fields:
data class OrderUpdate(
val orderId: String,
val status: String,
// Optional fields
val estimatedTime: Long? = null,
val cancellationReason: String? = null,
val specialInstructions: String? = null
)
Custom Gson Configuration¶
Provide a custom Gson instance through the bridge:
class MyBridge : PulseMqttKitBridge {
override fun getAppGson(): Gson {
return GsonBuilder()
.setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.registerTypeAdapter(LocalDateTime::class.java, LocalDateTimeAdapter())
.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
.create()
}
}
Best Practices¶
1. Use Data Classes¶
// ✅ Good - Immutable, concise, auto-generated methods
data class OrderUpdate(
val orderId: String,
val status: String
)
// ❌ Avoid - Mutable, verbose
class OrderUpdate {
var orderId: String = ""
var status: String = ""
}
2. Provide Default Values¶
// ✅ Good - Graceful handling of missing fields
data class OrderUpdate(
val orderId: String,
val status: String,
val priority: Int = 0,
val notes: String = ""
)
3. Use SerializedName Annotations¶
// ✅ Good - Maps snake_case JSON to camelCase Kotlin
data class OrderUpdate(
@SerializedName("order_id")
val orderId: String,
@SerializedName("created_at")
val createdAt: Long
)
4. Validate Critical Fields¶
data class OrderUpdate(
val orderId: String,
val status: String
) {
init {
require(orderId.isNotBlank()) { "Order ID cannot be blank" }
require(status.isNotBlank()) { "Status cannot be blank" }
}
}
5. Handle Deserialization Errors¶
override fun onMqttMessageReceived(
topic: String?,
payload: String?,
topicMessage: TopicMessage<*>?
) {
when (topicMessage) {
is TopicMessage.Error -> {
logger.error("""
Deserialization error on topic: $topic
Payload: $payload
Error: ${topicMessage.error.message}
""".trimIndent(), topicMessage.error)
// Send error to monitoring
crashlytics.recordException(topicMessage.error)
// Fall back to plain text processing if needed
handlePlainMessage(topic, payload)
}
else -> handleTypedMessage(topicMessage)
}
}
Common Patterns¶
Pattern 1: Message Polymorphism¶
Handle different message types on the same topic:
sealed class NotificationMessage {
data class OrderNotification(
val orderId: String,
val message: String
) : NotificationMessage()
data class SystemNotification(
val level: String,
val message: String
) : NotificationMessage()
}
// Custom deserializer
class NotificationAdapter : JsonDeserializer<NotificationMessage> {
override fun deserialize(
json: JsonElement,
typeOfT: Type,
context: JsonDeserializationContext
): NotificationMessage {
val jsonObject = json.asJsonObject
return when (jsonObject.get("type").asString) {
"order" -> context.deserialize(json, OrderNotification::class.java)
"system" -> context.deserialize(json, SystemNotification::class.java)
else -> throw JsonParseException("Unknown notification type")
}
}
}
Pattern 2: Versioned Messages¶
Handle multiple message versions:
sealed class OrderUpdateMessage {
abstract val orderId: String
abstract val status: String
data class V1(
override val orderId: String,
override val status: String
) : OrderUpdateMessage()
data class V2(
override val orderId: String,
override val status: String,
val timestamp: Long,
val metadata: Map<String, String>
) : OrderUpdateMessage()
}
// Version detection and deserialization
fun deserializeOrderUpdate(json: String): OrderUpdateMessage {
val jsonObject = JsonParser.parseString(json).asJsonObject
val version = jsonObject.get("version")?.asInt ?: 1
return when (version) {
1 -> gson.fromJson(json, OrderUpdateMessage.V1::class.java)
2 -> gson.fromJson(json, OrderUpdateMessage.V2::class.java)
else -> throw IllegalArgumentException("Unsupported version: $version")
}
}
Pattern 3: Type-Safe Topic Routing¶
Route messages based on type:
class TypeSafeRouter : MqttUpdatesListener {
private val handlers = mutableMapOf<KClass<*>, (Any) -> Unit>()
fun <T : Any> registerHandler(type: KClass<T>, handler: (T) -> Unit) {
handlers[type] = { handler(it as T) }
}
override fun onMqttMessageReceived(
topic: String?,
payload: String?,
topicMessage: TopicMessage<*>?
) {
if (topicMessage is TopicMessage.Deserialized<*>) {
val data = topicMessage.data ?: return
handlers[data::class]?.invoke(data)
}
}
}
// Usage
val router = TypeSafeRouter()
router.registerHandler(OrderUpdate::class) { order ->
handleOrder(order)
}
router.registerHandler(RiderLocation::class) { location ->
updateMap(location)
}
Next Steps¶
- Learn about Connection Management
- Explore QoS Levels