from pydantic import validate_arguments
from typing import Any, Dict, Tuple, Union
from filip.clients.mqtt.encoder import BaseEncoder
from filip.models.mqtt import IoTAMQTTMessageType
[docs]class Ultralight(BaseEncoder):
prefix = '/ul'
def __init__(self):
super().__init__()
@staticmethod
@validate_arguments
def __eval_value(value: Union[bool, float, str]):
return value
[docs] def decode_message(self, msg, decoder='utf-8') -> Tuple[str, str, Dict]:
apikey, device_id, payload = super().decode_message(msg=msg,
decoder=decoder)
payload = payload.split('@')
if not device_id == payload[0]:
self.logger.warning("Received invalid command")
payload = payload[1].split('|')
payload = {payload[i]: self.__eval_value(payload[i + 1])
for i in range(0, len(payload), 2)}
return apikey, device_id, payload
[docs] def encode_msg(self,
device_id: str,
payload: Any,
msg_type: IoTAMQTTMessageType) -> str:
if msg_type == IoTAMQTTMessageType.SINGLE:
return payload
elif msg_type == IoTAMQTTMessageType.MULTI:
payload = super()._parse_timestamp(payload=payload)
timestamp = str(payload.pop('timeInstant', ''))
data = '|'.join([f"{key}|{value}" for key, value in
payload.items()])
data = '|'.join([timestamp, data]).strip('|')
return data
elif msg_type == IoTAMQTTMessageType.CMDEXE:
for key, value in payload.items():
if isinstance(value, bool):
value = str(value).lower()
elif isinstance(value, (float, int)):
value = str(value)
elif isinstance(value, str):
pass
else:
raise ValueError("Cannot parse command acknowledgement!")
return f"{device_id}@{key}|{value}"
super()._raise_encoding_error(payload=payload, msg_type=msg_type)