Coverage for filip/clients/mqtt/encoder/ulralight.py: 82%

40 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1from pydantic import validate_arguments 

2from typing import Any, Dict, Tuple, Union 

3from filip.clients.mqtt.encoder import BaseEncoder 

4from filip.models.mqtt import IoTAMQTTMessageType 

5 

6class Ultralight(BaseEncoder): 

7 prefix = '/ul' 

8 

9 def __init__(self): 

10 super().__init__() 

11 

12 @staticmethod 

13 @validate_arguments 

14 def __eval_value(value: Union[bool, float, str]): 

15 return value 

16 

17 def decode_message(self, msg, decoder='utf-8') -> Tuple[str, str, Dict]: 

18 apikey, device_id, payload = super().decode_message(msg=msg, 

19 decoder=decoder) 

20 payload = payload.split('@') 

21 if not device_id == payload[0]: 

22 self.logger.warning("Received invalid command") 

23 

24 payload = payload[1].split('|') 

25 payload = {payload[i]: self.__eval_value(payload[i + 1]) 

26 for i in range(0, len(payload), 2)} 

27 

28 return apikey, device_id, payload 

29 

30 def encode_msg(self, 

31 device_id: str, 

32 payload: Any, 

33 msg_type: IoTAMQTTMessageType) -> str: 

34 if msg_type == IoTAMQTTMessageType.SINGLE: 

35 return payload 

36 elif msg_type == IoTAMQTTMessageType.MULTI: 

37 payload = super()._parse_timestamp(payload=payload) 

38 timestamp = str(payload.pop('timeInstant', '')) 

39 data = '|'.join([f"{key}|{value}" for key, value in 

40 payload.items()]) 

41 data = '|'.join([timestamp, data]).strip('|') 

42 return data 

43 elif msg_type == IoTAMQTTMessageType.CMDEXE: 

44 for key, value in payload.items(): 

45 if isinstance(value, bool): 

46 value = str(value).lower() 

47 elif isinstance(value, (float, int)): 

48 value = str(value) 

49 elif isinstance(value, str): 

50 pass 

51 else: 

52 raise ValueError("Cannot parse command acknowledgement!") 

53 return f"{device_id}@{key}|{value}" 

54 super()._raise_encoding_error(payload=payload, msg_type=msg_type)