Source code for filip.clients.mqtt.encoder.base_encoder

"""
Abstract class for all IoTA MQTT message encoders
"""
import logging
from abc import ABC
from datetime import datetime
from typing import Dict, Tuple
from paho.mqtt.client import MQTTMessage
from filip.models.mqtt import IoTAMQTTMessageType
from filip.utils import convert_datetime_to_iso_8601_with_z_suffix


[docs]class BaseEncoder(ABC): """ Abstract class for all IoTA MQTT message encoders """ prefix: str = '' def __init__(self): # setup logging functionality self.logger = logging.getLogger( name=f"{self.__class__.__module__}." f"{self.__class__.__name__}") self.logger.addHandler(logging.NullHandler())
[docs] def decode_message(self, msg: MQTTMessage, decoder: str = 'utf-8') -> Tuple[str, str, str]: """ Decode message for ingoing traffic Args: msg: Message class decoder: encoding identifier Returns: apikey device_id payload """ topic = msg.topic.strip('/') topic = topic.split('/') apikey = None device_id = None payload = msg.payload.decode(decoder) if topic[-1] == 'cmd': apikey = topic[0] device_id = topic[1] if any((apikey, device_id, payload)) is None: raise ValueError return apikey, device_id, payload
[docs] def encode_msg(self, device_id: str, payload: Dict, msg_type: IoTAMQTTMessageType) -> str: """ Encode message for outgoing traffic Args: device_id: id of the iot device payload: payload to send msg_type: kind of message to send """ raise NotImplementedError
@classmethod def _parse_timestamp(cls, payload: Dict) -> Dict: """ Helper function to parse timestamps Args: payload: payload to reformat Returns: Dictionary containing the formatted payload """ if payload.get('timeInstant', None): timestamp = payload['timeInstant'] if isinstance(timestamp, str): timestamp = datetime.fromisoformat(payload["timeInstant"]) if isinstance(timestamp, datetime): payload['timeInstant'] = \ convert_datetime_to_iso_8601_with_z_suffix( payload['timeInstant']) else: raise ValueError('Not able to parse datetime') return payload @classmethod def _raise_encoding_error(cls, payload: Dict, msg_type: IoTAMQTTMessageType): """ Helper function to provide consistent error messages Args: payload: Invalid message content msg_type: Invalid message type Returns: None Raises: ValueError """ ValueError(f"Message format not supported! \n " f"Message Type: {msg_type} \n " f"Payload: {payload}")