Coverage for filip/clients/mqtt/encoder/base_encoder.py: 79%
38 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1"""
2Abstract class for all IoTA MQTT message encoders
3"""
4import logging
5from abc import ABC
6from datetime import datetime
7from typing import Dict, Tuple
8from paho.mqtt.client import MQTTMessage
9from filip.models.mqtt import IoTAMQTTMessageType
10from filip.utils import convert_datetime_to_iso_8601_with_z_suffix
13class BaseEncoder(ABC):
14 """
15 Abstract class for all IoTA MQTT message encoders
16 """
17 prefix: str = ''
19 def __init__(self):
20 # setup logging functionality
21 self.logger = logging.getLogger(
22 name=f"{self.__class__.__module__}."
23 f"{self.__class__.__name__}")
24 self.logger.addHandler(logging.NullHandler())
26 def decode_message(self,
27 msg: MQTTMessage,
28 decoder: str = 'utf-8') -> Tuple[str, str, str]:
29 """
30 Decode message for ingoing traffic
31 Args:
32 msg: Message class
33 decoder: encoding identifier
35 Returns:
36 apikey
37 device_id
38 payload
39 """
40 topic = msg.topic.strip('/')
41 topic = topic.split('/')
42 apikey = None
43 device_id = None
44 payload = msg.payload.decode(decoder)
45 if topic[-1] == 'cmd':
46 apikey = topic[0]
47 device_id = topic[1]
49 if any((apikey, device_id, payload)) is None:
50 raise ValueError
52 return apikey, device_id, payload
54 def encode_msg(self,
55 device_id: str,
56 payload: Dict,
57 msg_type: IoTAMQTTMessageType) -> str:
58 """
59 Encode message for outgoing traffic
61 Args:
62 device_id: id of the iot device
63 payload: payload to send
64 msg_type: kind of message to send
66 """
67 raise NotImplementedError
69 @classmethod
70 def _parse_timestamp(cls, payload: Dict) -> Dict:
71 """
72 Helper function to parse timestamps
74 Args:
75 payload: payload to reformat
77 Returns:
78 Dictionary containing the formatted payload
79 """
80 if payload.get('timeInstant', None):
81 timestamp = payload['timeInstant']
82 if isinstance(timestamp, str):
83 timestamp = datetime.fromisoformat(payload["timeInstant"])
84 if isinstance(timestamp, datetime):
85 payload['timeInstant'] = \
86 convert_datetime_to_iso_8601_with_z_suffix(
87 payload['timeInstant'])
88 else:
89 raise ValueError('Not able to parse datetime')
90 return payload
92 @classmethod
93 def _raise_encoding_error(cls,
94 payload: Dict,
95 msg_type: IoTAMQTTMessageType):
96 """
97 Helper function to provide consistent error messages
98 Args:
99 payload: Invalid message content
100 msg_type: Invalid message type
102 Returns:
103 None
105 Raises:
106 ValueError
107 """
108 ValueError(f"Message format not supported! \n "
109 f"Message Type: {msg_type} \n "
110 f"Payload: {payload}")