Coverage for filip/clients/mqtt/encoder/base_encoder.py: 79%

38 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2Abstract class for all IoTA MQTT message encoders 

3""" 

4import logging 

5from abc import ABC 

6from datetime import datetime 

7from typing import Dict, Tuple 

8from paho.mqtt.client import MQTTMessage 

9from filip.models.mqtt import IoTAMQTTMessageType 

10from filip.utils import convert_datetime_to_iso_8601_with_z_suffix 

11 

12 

13class BaseEncoder(ABC): 

14 """ 

15 Abstract class for all IoTA MQTT message encoders 

16 """ 

17 prefix: str = '' 

18 

19 def __init__(self): 

20 # setup logging functionality 

21 self.logger = logging.getLogger( 

22 name=f"{self.__class__.__module__}." 

23 f"{self.__class__.__name__}") 

24 self.logger.addHandler(logging.NullHandler()) 

25 

26 def decode_message(self, 

27 msg: MQTTMessage, 

28 decoder: str = 'utf-8') -> Tuple[str, str, str]: 

29 """ 

30 Decode message for ingoing traffic 

31 Args: 

32 msg: Message class 

33 decoder: encoding identifier 

34 

35 Returns: 

36 apikey 

37 device_id 

38 payload 

39 """ 

40 topic = msg.topic.strip('/') 

41 topic = topic.split('/') 

42 apikey = None 

43 device_id = None 

44 payload = msg.payload.decode(decoder) 

45 if topic[-1] == 'cmd': 

46 apikey = topic[0] 

47 device_id = topic[1] 

48 

49 if any((apikey, device_id, payload)) is None: 

50 raise ValueError 

51 

52 return apikey, device_id, payload 

53 

54 def encode_msg(self, 

55 device_id: str, 

56 payload: Dict, 

57 msg_type: IoTAMQTTMessageType) -> str: 

58 """ 

59 Encode message for outgoing traffic 

60 

61 Args: 

62 device_id: id of the iot device 

63 payload: payload to send 

64 msg_type: kind of message to send 

65 

66 """ 

67 raise NotImplementedError 

68 

69 @classmethod 

70 def _parse_timestamp(cls, payload: Dict) -> Dict: 

71 """ 

72 Helper function to parse timestamps 

73 

74 Args: 

75 payload: payload to reformat 

76 

77 Returns: 

78 Dictionary containing the formatted payload 

79 """ 

80 if payload.get('timeInstant', None): 

81 timestamp = payload['timeInstant'] 

82 if isinstance(timestamp, str): 

83 timestamp = datetime.fromisoformat(payload["timeInstant"]) 

84 if isinstance(timestamp, datetime): 

85 payload['timeInstant'] = \ 

86 convert_datetime_to_iso_8601_with_z_suffix( 

87 payload['timeInstant']) 

88 else: 

89 raise ValueError('Not able to parse datetime') 

90 return payload 

91 

92 @classmethod 

93 def _raise_encoding_error(cls, 

94 payload: Dict, 

95 msg_type: IoTAMQTTMessageType): 

96 """ 

97 Helper function to provide consistent error messages 

98 Args: 

99 payload: Invalid message content 

100 msg_type: Invalid message type 

101 

102 Returns: 

103 None 

104 

105 Raises: 

106 ValueError 

107 """ 

108 ValueError(f"Message format not supported! \n " 

109 f"Message Type: {msg_type} \n " 

110 f"Payload: {payload}")