Source code for filip.models.ngsi_v2.iot

"""
Module contains models for accessing and interaction with FIWARE's IoT-Agents.
"""

from __future__ import annotations
import logging
import itertools
import warnings
from enum import Enum
from typing import Any, Dict, Optional, List, Union
import pytz
from pydantic import (
    field_validator,
    model_validator,
    ConfigDict,
    BaseModel,
    Field,
    AnyHttpUrl,
)
from filip.models.base import NgsiVersion, DataType
from filip.models.ngsi_v2.base import (
    BaseAttribute,
    BaseValueAttribute,
    BaseNameAttribute,
)
from filip.utils.validators import (
    validate_fiware_datatype_string_protect,
    validate_fiware_datatype_standard,
    validate_jexl_expression,
    validate_expression_language,
    validate_fiware_attribute_name_regex,
)

logger = logging.getLogger()


[docs]class ExpressionLanguage(str, Enum): """ Options for expression language """ LEGACY = "legacy" JEXL = "jexl"
[docs]class PayloadProtocol(str, Enum): """ Options for payload protocols """ IOTA_JSON = "IoTA-JSON" IOTA_UL = "PDI-IoTA-UltraLight" LORAWAN = "LoRaWAN"
[docs]class TransportProtocol(str, Enum): """ Options for transport protocols """ MQTT = "MQTT" AMQP = "AMQP" HTTP = "HTTP"
[docs]class IoTABaseAttribute(BaseAttribute, BaseNameAttribute): """ Base model for device attributes """ expression: Optional[str] = Field( default=None, description="indicates that the value of the target attribute will " "not be the plain value or the measurement, but an " "expression based on a combination of the reported values. " "See the Expression Language definition for details " "(https://iotagent-node-lib.readthedocs.io/en/latest/" "api.html#expression-language-support)", ) entity_name: Optional[str] = Field( default=None, description="entity_name: the presence of this attribute indicates " "that the value will not be stored in the original device " "entity but in a new entity with an ID given by this " "attribute. The type of this additional entity can be " "configured with the entity_type attribute. If no type is " "configured, the device entity type is used instead. " "Entity names can be defined as expressions, using the " "Expression Language definition " "(https://iotagent-node-lib.readthedocs.io/en/latest/" "api.html#expression-language-support). Allowed characters are" " the ones in the plain ASCII set, except the following " "ones: control characters, whitespace, &, ?, / and #.", max_length=256, min_length=1, ) valid_entity_name = field_validator("entity_name")( validate_fiware_datatype_standard ) entity_type: Optional[str] = Field( default=None, description="configures the type of an alternative entity. " "Allowed characters " "are the ones in the plain ASCII set, except the following " "ones: control characters, whitespace, &, ?, / and #.", max_length=256, min_length=1, ) valid_entity_type = field_validator("entity_type")( validate_fiware_datatype_standard ) reverse: Optional[str] = Field( default=None, description="add bidirectionality expressions to the attribute. See " "the bidirectionality transformation plugin in the " "Data Mapping Plugins section for details. " "(https://iotagent-node-lib.readthedocs.io/en/latest/api/" "index.html#data-mapping-plugins)", ) def __eq__(self, other): if isinstance(other, BaseAttribute): return self.name == other.name else: return self.model_dump() == other
[docs]class DeviceAttribute(IoTABaseAttribute): """ Model for active device attributes """ object_id: Optional[str] = Field( default=None, description="name of the attribute as coming from the device." )
[docs]class LazyDeviceAttribute(BaseNameAttribute): """ Model for lazy device attributes """ type: Union[DataType, str] = Field( default=DataType.TEXT, description="The attribute type represents the NGSI value type of the " "attribute value. Note that FIWARE NGSI has its own type " "system for attribute values, so NGSI value types are not " "the same as JSON types. Allowed characters " "are the ones in the plain ASCII set, except the following " "ones: control characters, whitespace, &, ?, / and #.", max_length=256, min_length=1, ) valid_type = field_validator("type")(validate_fiware_datatype_string_protect)
[docs]class DeviceCommand(BaseModel): """ Model for commands """ name: str = Field( description="ID of the attribute in the target entity in the " "Context Broker. Allowed characters " "are the ones in the plain ASCII set, except the following " "ones: control characters, whitespace, &, ?, / and #.", max_length=256, min_length=1, ) valid_name = field_validator("name")(validate_fiware_attribute_name_regex) type: Union[DataType, str] = Field( description="name of the type of the attribute in the target entity. ", default=DataType.COMMAND, )
[docs]class StaticDeviceAttribute(IoTABaseAttribute, BaseValueAttribute): """ Model for static device attributes """ pass
[docs]class ServiceGroup(BaseModel): """ Model for device service group. https://iotagent-node-lib.readthedocs.io/en/latest/api/index.html#service-group-api """ service: Optional[str] = Field( default=None, description="ServiceGroup of the devices of this type" ) subservice: Optional[str] = Field( default=None, description="Subservice of the devices of this type.", pattern="^/", ) resource: str = Field( description="string representing the Southbound resource that will be " "used to assign a type to a device (e.g.: pathname in the " "southbound port)." ) apikey: str = Field( description="API Key string. It is a key used for devices belonging " "to this service_group. If " ", service_group does not use " "apikey, but it must be specified." ) timestamp: Optional[bool] = Field( default=None, description="Optional flag about whether or not to add the TimeInstant " "attribute to the device entity created, as well as a " "TimeInstant metadata to each attribute, with the current " "timestamp. With NGSI-LD, the Standard observedAt " "property-of-a-property is created instead.", ) entity_type: Optional[str] = Field( default=None, description="name of the Entity type to assign to the group. " "Allowed characters " "are the ones in the plain ASCII set, except the following " "ones: control characters, whitespace, &, ?, / and #.", max_length=256, min_length=1, ) valid_entity_type = field_validator("entity_type")( validate_fiware_datatype_standard ) trust: Optional[str] = Field( default=None, description="trust token to use for secured access to the " "Context Broker for this type of devices (optional; only " "needed for secured scenarios).", ) cbHost: Optional[AnyHttpUrl] = Field( default=None, description="Context Broker connection information. This options can " "be used to override the global ones for specific types of " "devices.", )
[docs] @field_validator("cbHost") @classmethod def validate_cbHost(cls, value): """ convert cbHost to str Returns: timezone """ return str(value) if value else value
lazy: Optional[List[LazyDeviceAttribute]] = Field( default=[], description="list of common lazy attributes of the device. For each " "attribute, its name and type must be provided.", ) commands: Optional[List[DeviceCommand]] = Field( default=[], description="list of common commands attributes of the device. For each " "attribute, its name and type must be provided, additional " "metadata is optional", ) attributes: Optional[List[DeviceAttribute]] = Field( default=[], description="list of common commands attributes of the device. For " "each attribute, its name and type must be provided, " "additional metadata is optional.", ) static_attributes: Optional[List[StaticDeviceAttribute]] = Field( default=[], description="this attributes will be added to all the entities of this " "group 'as is', additional metadata is optional.", ) internal_attributes: Optional[List[Dict[str, Any]]] = Field( default=[], description="optional section with free format, to allow specific " "IoT Agents to store information along with the devices " "in the Device Registry.", ) expressionLanguage: Optional[ExpressionLanguage] = Field( default=ExpressionLanguage.JEXL, description="optional boolean value, to set expression language used " "to compute expressions, possible values are: " "legacy or jexl, but legacy is deprecated. If it is set None, jexl is used.", ) valid_expressionLanguage = field_validator("expressionLanguage")( validate_expression_language ) explicitAttrs: Optional[bool] = Field( default=False, description="optional boolean value, to support selective ignore " "of measures so that IOTA does not progress. If not " "specified default is false.", ) autoprovision: Optional[bool] = Field( default=True, description="optional boolean: If false, autoprovisioned devices " "(i.e. devices that are not created with an explicit " "provision operation but when the first measure arrives) " "are not allowed in this group. " "Default (in the case of omitting the field) is true.", ) ngsiVersion: Optional[NgsiVersion] = Field( default="v2", description="optional string value used in mixed mode to switch between" " NGSI-v2 and NGSI-LD payloads. Possible values are: " "v2 or ld. The default is v2. When not running in mixed " "mode, this field is ignored.", ) defaultEntityNameConjunction: Optional[str] = Field( default=None, description="optional string value to set default conjunction string " "used to compose a default entity_name when is not " "provided at device provisioning time.", )
[docs]class DeviceSettings(BaseModel): """ Model for iot device settings """ model_config = ConfigDict(validate_assignment=True) timezone: Optional[str] = Field( default="Europe/London", description="Time zone of the sensor if it has any" ) timestamp: Optional[bool] = Field( default=None, description="Optional flag about whether or not to add the TimeInstant " "attribute to the device entity created, as well as a " "TimeInstant metadata to each attribute, with the current " "timestamp. With NGSI-LD, the Standard observedAt " "property-of-a-property is created instead.", ) apikey: Optional[str] = Field( default=None, description="Optional Apikey key string to use instead of group apikey", ) endpoint: Optional[AnyHttpUrl] = Field( default=None, description="Endpoint where the device is going to receive commands, " "if any.", ) protocol: Optional[Union[PayloadProtocol, str]] = Field( default=None, description="Name of the device protocol, for its use with an " "IoT Manager.", ) transport: Optional[Union[TransportProtocol, str]] = Field( default=None, description="Name of the device transport protocol, for the IoT Agents " "with multiple transport protocols.", ) expressionLanguage: Optional[ExpressionLanguage] = Field( default=ExpressionLanguage.JEXL, description="optional boolean value, to set expression language used " "to compute expressions, possible values are: " "legacy or jexl, but legacy is deprecated. If it is set None, jexl is used.", ) valid_expressionLanguage = field_validator("expressionLanguage")( validate_expression_language ) explicitAttrs: Optional[bool] = Field( default=False, description="optional boolean value, to support selective ignore " "of measures so that IOTA does not progress. If not " "specified default is false.", )
[docs]class Device(DeviceSettings): """ Model for iot devices. https://iotagent-node-lib.readthedocs.io/en/latest/api/index.html#device-api """ model_config = ConfigDict(validate_default=True, validate_assignment=True) device_id: str = Field( description="Device ID that will be used to identify the device" ) service: Optional[str] = Field( default=None, description="Name of the service the device belongs to " "(will be used in the fiware-service header).", max_length=50, ) service_path: Optional[str] = Field( default="/", description="Name of the subservice the device belongs to " "(used in the fiware-servicepath header).", max_length=51, pattern="^/", ) entity_name: str = Field( description="Name of the entity representing the device in " "the Context Broker Allowed characters " "are the ones in the plain ASCII set, except the following " "ones: control characters, whitespace, &, ?, / and #.", max_length=256, min_length=1, ) valid_entity_name = field_validator("entity_name")( validate_fiware_datatype_standard ) entity_type: str = Field( description="Type of the entity in the Context Broker. " "Allowed characters " "are the ones in the plain ASCII set, except the following " "ones: control characters, whitespace, &, ?, / and #.", max_length=256, min_length=1, ) valid_entity_type = field_validator("entity_type")( validate_fiware_datatype_standard ) lazy: List[LazyDeviceAttribute] = Field( default=[], description="List of lazy attributes of the device" ) commands: List[DeviceCommand] = Field( default=[], description="List of commands of the device" ) attributes: List[DeviceAttribute] = Field( default=[], description="List of active attributes of the device" ) static_attributes: Optional[List[StaticDeviceAttribute]] = Field( default=[], description="List of static attributes to append to the entity. All the" " updateContext requests to the CB will have this set of " "attributes appended.", ) internal_attributes: Optional[List[Dict[str, Any]]] = Field( default=[], description="List of internal attributes with free format for specific " "IoT Agent configuration", ) ngsiVersion: NgsiVersion = Field( default=NgsiVersion.v2, description="optional string value used in mixed mode to switch between" " NGSI-v2 and NGSI-LD payloads. Possible values are: " "v2 or ld. The default is v2. When not running in " "mixed mode, this field is ignored.", )
[docs] @field_validator("timezone") @classmethod def validate_timezone(cls, value): """ validate timezone Returns: timezone """ assert value in pytz.all_timezones return value
[docs] @model_validator(mode="after") def validate_device_attributes_expression(self): """ Validates device attributes expressions based on the expression language (JEXL or Legacy, where Legacy is deprecated). Args: self: The Device instance. Returns: The Device instance after validation. """ if self.expressionLanguage == ExpressionLanguage.JEXL: for attribute in self.attributes: if attribute.expression: validate_jexl_expression( attribute.expression, attribute.name, self.device_id ) elif self.expressionLanguage == ExpressionLanguage.LEGACY: warnings.warn( f"No validation for legacy expression language of Device {self.device_id}." ) return self
[docs] @model_validator(mode="after") def validate_duplicated_device_attributes(self): """ Check whether device has identical attributes Args: self: dict of Device instance. Returns: The dict of Device instance after validation. """ for i, attr in enumerate(self.attributes): for other_attr in self.attributes[:i] + self.attributes[i + 1 :]: if attr.model_dump() == other_attr.model_dump(): raise ValueError(f"Duplicated attributes found: {attr.name}") return self
[docs] @model_validator(mode="after") def validate_device_attributes_name_object_id(self): """ Validate the device regarding the behavior with devices attributes. According to https://iotagent-node-lib.readthedocs.io/en/latest/api.html and based on our best practice, following rules are checked - name is required, but not necessarily unique - object_id is not required, if given must be unique, i.e. not equal to any existing object_id and name Args: self: dict of Device instance. Returns: The dict of Device instance after validation. """ for i, attr in enumerate(self.attributes): for other_attr in self.attributes[:i] + self.attributes[i + 1 :]: if ( attr.object_id and other_attr.object_id and attr.object_id == other_attr.object_id ): raise ValueError(f"object_id {attr.object_id} is not unique") if attr.object_id and attr.object_id == other_attr.name: raise ValueError(f"object_id {attr.object_id} is not unique") return self
[docs] def get_attribute( self, attribute_name: str ) -> Union[ DeviceAttribute, LazyDeviceAttribute, StaticDeviceAttribute, DeviceCommand ]: """ Args: attribute_name: Returns: """ for attribute in itertools.chain( self.attributes, self.lazy, self.static_attributes, self.internal_attributes, self.commands, ): if attribute.name == attribute_name: return attribute msg = ( f"Device: {self.device_id}: Could not " f"find attribute with name {attribute_name}" ) logger.error(msg) raise KeyError(msg)
[docs] def add_attribute( self, attribute: Union[ DeviceAttribute, LazyDeviceAttribute, StaticDeviceAttribute, DeviceCommand ], update: bool = False, ) -> None: """ Args: attribute: update (bool): If 'True' and attribute does already exists tries to update the attribute if not Returns: None """ try: if type(attribute) == DeviceAttribute: if attribute.model_dump(exclude_none=True) in [ attr.model_dump(exclude_none=True) for attr in self.attributes ]: raise ValueError self.attributes.append(attribute) self.__setattr__(name="attributes", value=self.attributes) elif type(attribute) == LazyDeviceAttribute: if attribute in self.lazy: raise ValueError self.lazy.append(attribute) self.__setattr__(name="lazy", value=self.lazy) elif type(attribute) == StaticDeviceAttribute: if attribute in self.static_attributes: raise ValueError self.static_attributes.append(attribute) self.__setattr__(name="static_attributes", value=self.static_attributes) elif type(attribute) == DeviceCommand: if attribute in self.commands: raise ValueError self.commands.append(attribute) self.__setattr__(name="commands", value=self.commands) else: raise ValueError except ValueError: if update: self.update_attribute(attribute, append=False) logger.warning( "Device: %s: Attribute already " "exists. Will update: \n %s", self.device_id, attribute.model_dump_json(indent=2), ) else: logger.error( "Device: %s: Attribute already " "exists: \n %s", self.device_id, attribute.model_dump_json(indent=2), ) raise
[docs] def update_attribute( self, attribute: Union[ DeviceAttribute, LazyDeviceAttribute, StaticDeviceAttribute, DeviceCommand ], append: bool = False, ) -> None: """ Updates existing device attribute Args: attribute: Attribute to add to device configuration append (bool): Adds attribute if not exist Returns: None """ try: if type(attribute) == DeviceAttribute: idx = self.attributes.index(attribute) self.attributes[idx].model_dump().update(attribute.model_dump()) elif type(attribute) == LazyDeviceAttribute: idx = self.lazy.index(attribute) self.lazy[idx].model_dump().update(attribute.model_dump()) elif type(attribute) == StaticDeviceAttribute: idx = self.static_attributes.index(attribute) self.static_attributes[idx].model_dump().update(attribute.model_dump()) elif type(attribute) == DeviceCommand: idx = self.commands.index(attribute) self.commands[idx].model_dump().update(attribute.model_dump()) except ValueError: if append: logger.warning( "Device: %s: Could not find " "attribute: \n %s", self.device_id, attribute.model_dump_json(indent=2), ) self.add_attribute(attribute=attribute) else: msg = ( f"Device: {self.device_id}: Could not find " f"attribute: \n {attribute.model_dump_json(indent=2)}" ) raise KeyError(msg)
[docs] def delete_attribute( self, attribute: Union[ DeviceAttribute, LazyDeviceAttribute, StaticDeviceAttribute, DeviceCommand ], ): """ Deletes attribute from device Args: attribute: () Returns: """ try: if type(attribute) == DeviceAttribute: self.attributes.remove(attribute) elif type(attribute) == LazyDeviceAttribute: self.lazy.remove(attribute) elif type(attribute) == StaticDeviceAttribute: self.static_attributes.remove(attribute) elif type(attribute) == DeviceCommand: self.commands.remove(attribute) else: raise ValueError except ValueError: logger.warning( "Device: %s: Could not delete " "attribute: \n %s", self.device_id, attribute.model_dump_json(indent=2), ) raise logger.info( "Device: %s: Attribute deleted! \n %s", self.device_id, attribute.model_dump_json(indent=2), )
[docs] def get_command(self, command_name: str): """ Short for self.get_attributes Args: command_name (str): Returns: """ return self.get_attribute(attribute_name=command_name)
[docs] def add_command(self, command: DeviceCommand, update: bool = False): """ Short for self.add_attribute Args: command (DeviceCommand): update (bool): Update command if it already exists Returns: """ self.add_attribute(attribute=command, update=update)
[docs] def update_command(self, command: DeviceCommand, append: bool = False): """ Short for self.update_attribute Args: command: append: Returns: """ self.update_attribute(attribute=command, append=append)
[docs] def delete_command(self, command: DeviceCommand): """ Short for self.delete_attribute Args: command: Returns: None """ self.delete_attribute(attribute=command)