Coverage for filip/clients/mqtt/encoder/base_encoder.py: 79%
38 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
1"""
2Abstract class for all IoTA MQTT message encoders
3"""
5import logging
6from abc import ABC
7from datetime import datetime
8from typing import Dict, Tuple
9from paho.mqtt.client import MQTTMessage
10from filip.models.mqtt import IoTAMQTTMessageType
11from filip.utils import convert_datetime_to_iso_8601_with_z_suffix
14class BaseEncoder(ABC):
15 """
16 Abstract class for all IoTA MQTT message encoders
17 """
19 prefix: str = ""
21 def __init__(self):
22 # setup logging functionality
23 self.logger = logging.getLogger(
24 name=f"{self.__class__.__module__}." f"{self.__class__.__name__}"
25 )
26 self.logger.addHandler(logging.NullHandler())
28 def decode_message(
29 self, msg: MQTTMessage, decoder: str = "utf-8"
30 ) -> Tuple[str, str, str]:
31 """
32 Decode message for ingoing traffic
33 Args:
34 msg: Message class
35 decoder: encoding identifier
37 Returns:
38 apikey
39 device_id
40 payload
41 """
42 topic = msg.topic.strip("/")
43 topic = topic.split("/")
44 apikey = None
45 device_id = None
46 payload = msg.payload.decode(decoder)
47 if topic[-1] == "cmd":
48 apikey = topic[0]
49 device_id = topic[1]
51 if any((apikey, device_id, payload)) is None:
52 raise ValueError
54 return apikey, device_id, payload
56 def encode_msg(
57 self, device_id: str, payload: Dict, msg_type: IoTAMQTTMessageType
58 ) -> str:
59 """
60 Encode message for outgoing traffic
62 Args:
63 device_id: id of the iot device
64 payload: payload to send
65 msg_type: kind of message to send
67 """
68 raise NotImplementedError
70 @classmethod
71 def _parse_timestamp(cls, payload: Dict) -> Dict:
72 """
73 Helper function to parse timestamps
75 Args:
76 payload: payload to reformat
78 Returns:
79 Dictionary containing the formatted payload
80 """
81 if payload.get("timeInstant", None):
82 timestamp = payload["timeInstant"]
83 if isinstance(timestamp, str):
84 timestamp = datetime.fromisoformat(payload["timeInstant"])
85 if isinstance(timestamp, datetime):
86 payload["timeInstant"] = convert_datetime_to_iso_8601_with_z_suffix(
87 payload["timeInstant"]
88 )
89 else:
90 raise ValueError("Not able to parse datetime")
91 return payload
93 @classmethod
94 def _raise_encoding_error(cls, payload: Dict, msg_type: IoTAMQTTMessageType):
95 """
96 Helper function to provide consistent error messages
97 Args:
98 payload: Invalid message content
99 msg_type: Invalid message type
101 Returns:
102 None
104 Raises:
105 ValueError
106 """
107 ValueError(
108 f"Message format not supported! \n "
109 f"Message Type: {msg_type} \n "
110 f"Payload: {payload}"
111 )