Coverage for filip/clients/mqtt/encoder/ulralight.py: 82%

40 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-03-10 13:43 +0000

1from pydantic import validate_arguments 

2from typing import Any, Dict, Tuple, Union 

3from filip.clients.mqtt.encoder import BaseEncoder 

4from filip.models.mqtt import IoTAMQTTMessageType 

5 

6 

7class Ultralight(BaseEncoder): 

8 prefix = "/ul" 

9 

10 def __init__(self): 

11 super().__init__() 

12 

13 @staticmethod 

14 @validate_arguments 

15 def __eval_value(value: Union[bool, float, str]): 

16 return value 

17 

18 def decode_message(self, msg, decoder="utf-8") -> Tuple[str, str, Dict]: 

19 apikey, device_id, payload = super().decode_message(msg=msg, decoder=decoder) 

20 payload = payload.split("@") 

21 if not device_id == payload[0]: 

22 self.logger.warning("Received invalid command") 

23 

24 payload = payload[1].split("|") 

25 payload = { 

26 payload[i]: self.__eval_value(payload[i + 1]) 

27 for i in range(0, len(payload), 2) 

28 } 

29 

30 return apikey, device_id, payload 

31 

32 def encode_msg( 

33 self, device_id: str, payload: Any, msg_type: IoTAMQTTMessageType 

34 ) -> str: 

35 if msg_type == IoTAMQTTMessageType.SINGLE: 

36 return payload 

37 elif msg_type == IoTAMQTTMessageType.MULTI: 

38 payload = super()._parse_timestamp(payload=payload) 

39 timestamp = str(payload.pop("timeInstant", "")) 

40 data = "|".join([f"{key}|{value}" for key, value in payload.items()]) 

41 data = "|".join([timestamp, data]).strip("|") 

42 return data 

43 elif msg_type == IoTAMQTTMessageType.CMDEXE: 

44 for key, value in payload.items(): 

45 if isinstance(value, bool): 

46 value = str(value).lower() 

47 elif isinstance(value, (float, int)): 

48 value = str(value) 

49 elif isinstance(value, str): 

50 pass 

51 else: 

52 raise ValueError("Cannot parse command acknowledgement!") 

53 return f"{device_id}@{key}|{value}" 

54 super()._raise_encoding_error(payload=payload, msg_type=msg_type)