Coverage for filip/clients/mqtt/encoder/base_encoder.py: 79%

38 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-03-10 13:43 +0000

1""" 

2Abstract class for all IoTA MQTT message encoders 

3""" 

4 

5import logging 

6from abc import ABC 

7from datetime import datetime 

8from typing import Dict, Tuple 

9from paho.mqtt.client import MQTTMessage 

10from filip.models.mqtt import IoTAMQTTMessageType 

11from filip.utils import convert_datetime_to_iso_8601_with_z_suffix 

12 

13 

14class BaseEncoder(ABC): 

15 """ 

16 Abstract class for all IoTA MQTT message encoders 

17 """ 

18 

19 prefix: str = "" 

20 

21 def __init__(self): 

22 # setup logging functionality 

23 self.logger = logging.getLogger( 

24 name=f"{self.__class__.__module__}." f"{self.__class__.__name__}" 

25 ) 

26 self.logger.addHandler(logging.NullHandler()) 

27 

28 def decode_message( 

29 self, msg: MQTTMessage, decoder: str = "utf-8" 

30 ) -> Tuple[str, str, str]: 

31 """ 

32 Decode message for ingoing traffic 

33 Args: 

34 msg: Message class 

35 decoder: encoding identifier 

36 

37 Returns: 

38 apikey 

39 device_id 

40 payload 

41 """ 

42 topic = msg.topic.strip("/") 

43 topic = topic.split("/") 

44 apikey = None 

45 device_id = None 

46 payload = msg.payload.decode(decoder) 

47 if topic[-1] == "cmd": 

48 apikey = topic[0] 

49 device_id = topic[1] 

50 

51 if any((apikey, device_id, payload)) is None: 

52 raise ValueError 

53 

54 return apikey, device_id, payload 

55 

56 def encode_msg( 

57 self, device_id: str, payload: Dict, msg_type: IoTAMQTTMessageType 

58 ) -> str: 

59 """ 

60 Encode message for outgoing traffic 

61 

62 Args: 

63 device_id: id of the iot device 

64 payload: payload to send 

65 msg_type: kind of message to send 

66 

67 """ 

68 raise NotImplementedError 

69 

70 @classmethod 

71 def _parse_timestamp(cls, payload: Dict) -> Dict: 

72 """ 

73 Helper function to parse timestamps 

74 

75 Args: 

76 payload: payload to reformat 

77 

78 Returns: 

79 Dictionary containing the formatted payload 

80 """ 

81 if payload.get("timeInstant", None): 

82 timestamp = payload["timeInstant"] 

83 if isinstance(timestamp, str): 

84 timestamp = datetime.fromisoformat(payload["timeInstant"]) 

85 if isinstance(timestamp, datetime): 

86 payload["timeInstant"] = convert_datetime_to_iso_8601_with_z_suffix( 

87 payload["timeInstant"] 

88 ) 

89 else: 

90 raise ValueError("Not able to parse datetime") 

91 return payload 

92 

93 @classmethod 

94 def _raise_encoding_error(cls, payload: Dict, msg_type: IoTAMQTTMessageType): 

95 """ 

96 Helper function to provide consistent error messages 

97 Args: 

98 payload: Invalid message content 

99 msg_type: Invalid message type 

100 

101 Returns: 

102 None 

103 

104 Raises: 

105 ValueError 

106 """ 

107 ValueError( 

108 f"Message format not supported! \n " 

109 f"Message Type: {msg_type} \n " 

110 f"Payload: {payload}" 

111 )