Coverage for filip/clients/mqtt/encoder/ulralight.py: 82%
40 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1from pydantic import validate_arguments
2from typing import Any, Dict, Tuple, Union
3from filip.clients.mqtt.encoder import BaseEncoder
4from filip.models.mqtt import IoTAMQTTMessageType
6class Ultralight(BaseEncoder):
7 prefix = '/ul'
9 def __init__(self):
10 super().__init__()
12 @staticmethod
13 @validate_arguments
14 def __eval_value(value: Union[bool, float, str]):
15 return value
17 def decode_message(self, msg, decoder='utf-8') -> Tuple[str, str, Dict]:
18 apikey, device_id, payload = super().decode_message(msg=msg,
19 decoder=decoder)
20 payload = payload.split('@')
21 if not device_id == payload[0]:
22 self.logger.warning("Received invalid command")
24 payload = payload[1].split('|')
25 payload = {payload[i]: self.__eval_value(payload[i + 1])
26 for i in range(0, len(payload), 2)}
28 return apikey, device_id, payload
30 def encode_msg(self,
31 device_id: str,
32 payload: Any,
33 msg_type: IoTAMQTTMessageType) -> str:
34 if msg_type == IoTAMQTTMessageType.SINGLE:
35 return payload
36 elif msg_type == IoTAMQTTMessageType.MULTI:
37 payload = super()._parse_timestamp(payload=payload)
38 timestamp = str(payload.pop('timeInstant', ''))
39 data = '|'.join([f"{key}|{value}" for key, value in
40 payload.items()])
41 data = '|'.join([timestamp, data]).strip('|')
42 return data
43 elif msg_type == IoTAMQTTMessageType.CMDEXE:
44 for key, value in payload.items():
45 if isinstance(value, bool):
46 value = str(value).lower()
47 elif isinstance(value, (float, int)):
48 value = str(value)
49 elif isinstance(value, str):
50 pass
51 else:
52 raise ValueError("Cannot parse command acknowledgement!")
53 return f"{device_id}@{key}|{value}"
54 super()._raise_encoding_error(payload=payload, msg_type=msg_type)