Source code for filip.clients.mqtt.encoder.ulralight

from pydantic import validate_arguments
from typing import Any, Dict, Tuple, Union
from filip.clients.mqtt.encoder import BaseEncoder
from filip.models.mqtt import IoTAMQTTMessageType

[docs]class Ultralight(BaseEncoder): prefix = '/ul' def __init__(self): super().__init__() @staticmethod @validate_arguments def __eval_value(value: Union[bool, float, str]): return value
[docs] def decode_message(self, msg, decoder='utf-8') -> Tuple[str, str, Dict]: apikey, device_id, payload = super().decode_message(msg=msg, decoder=decoder) payload = payload.split('@') if not device_id == payload[0]: self.logger.warning("Received invalid command") payload = payload[1].split('|') payload = {payload[i]: self.__eval_value(payload[i + 1]) for i in range(0, len(payload), 2)} return apikey, device_id, payload
[docs] def encode_msg(self, device_id: str, payload: Any, msg_type: IoTAMQTTMessageType) -> str: if msg_type == IoTAMQTTMessageType.SINGLE: return payload elif msg_type == IoTAMQTTMessageType.MULTI: payload = super()._parse_timestamp(payload=payload) timestamp = str(payload.pop('timeInstant', '')) data = '|'.join([f"{key}|{value}" for key, value in payload.items()]) data = '|'.join([timestamp, data]).strip('|') return data elif msg_type == IoTAMQTTMessageType.CMDEXE: for key, value in payload.items(): if isinstance(value, bool): value = str(value).lower() elif isinstance(value, (float, int)): value = str(value) elif isinstance(value, str): pass else: raise ValueError("Cannot parse command acknowledgement!") return f"{device_id}@{key}|{value}" super()._raise_encoding_error(payload=payload, msg_type=msg_type)