"""
Module contains models for accessing and interaction with FIWARE's IoT-Agents.
"""
from __future__ import annotations
import logging
import itertools
import warnings
from enum import Enum
from typing import Any, Dict, Optional, List, Union
import pytz
from pydantic import field_validator, model_validator, ConfigDict, BaseModel, Field, AnyHttpUrl
from filip.models.base import NgsiVersion, DataType
from filip.models.ngsi_v2.base import \
BaseAttribute, \
BaseValueAttribute, \
BaseNameAttribute
from filip.utils.validators import (validate_fiware_datatype_string_protect,
validate_fiware_datatype_standard,
validate_jexl_expression,
validate_expression_language)
logger = logging.getLogger()
[docs]class ExpressionLanguage(str, Enum):
"""
Options for expression language
"""
LEGACY = "legacy"
JEXL = "jexl"
[docs]class PayloadProtocol(str, Enum):
"""
Options for payload protocols
"""
IOTA_JSON = "IoTA-JSON"
IOTA_UL = "PDI-IoTA-UltraLight"
LORAWAN = "LoRaWAN"
[docs]class TransportProtocol(str, Enum):
"""
Options for transport protocols
"""
MQTT = "MQTT"
AMQP = "AMQP"
HTTP = "HTTP"
[docs]class IoTABaseAttribute(BaseAttribute, BaseNameAttribute):
"""
Base model for device attributes
"""
expression: Optional[str] = Field(
default=None,
description="indicates that the value of the target attribute will "
"not be the plain value or the measurement, but an "
"expression based on a combination of the reported values. "
"See the Expression Language definition for details "
"(https://iotagent-node-lib.readthedocs.io/en/latest/"
"api.html#expression-language-support)"
)
entity_name: Optional[str] = Field(
default=None,
description="entity_name: the presence of this attribute indicates "
"that the value will not be stored in the original device "
"entity but in a new entity with an ID given by this "
"attribute. The type of this additional entity can be "
"configured with the entity_type attribute. If no type is "
"configured, the device entity type is used instead. "
"Entity names can be defined as expressions, using the "
"Expression Language definition "
"(https://iotagent-node-lib.readthedocs.io/en/latest/"
"api.html#expression-language-support). Allowed characters are"
" the ones in the plain ASCII set, except the following "
"ones: control characters, whitespace, &, ?, / and #.",
max_length=256,
min_length=1,
)
valid_entity_name = field_validator("entity_name")(validate_fiware_datatype_standard)
entity_type: Optional[str] = Field(
default=None,
description="configures the type of an alternative entity. "
"Allowed characters "
"are the ones in the plain ASCII set, except the following "
"ones: control characters, whitespace, &, ?, / and #.",
max_length=256,
min_length=1,
)
valid_entity_type = field_validator("entity_type")(validate_fiware_datatype_standard)
reverse: Optional[str] = Field(
default=None,
description="add bidirectionality expressions to the attribute. See "
"the bidirectionality transformation plugin in the "
"Data Mapping Plugins section for details. "
"(https://iotagent-node-lib.readthedocs.io/en/latest/api/"
"index.html#data-mapping-plugins)"
)
def __eq__(self, other):
if isinstance(other, BaseAttribute):
return self.name == other.name
else:
return self.model_dump() == other
[docs]class DeviceAttribute(IoTABaseAttribute):
"""
Model for active device attributes
"""
object_id: Optional[str] = Field(
default=None,
description="name of the attribute as coming from the device."
)
[docs]class LazyDeviceAttribute(BaseNameAttribute):
"""
Model for lazy device attributes
"""
type: Union[DataType, str] = Field(
default=DataType.TEXT,
description="The attribute type represents the NGSI value type of the "
"attribute value. Note that FIWARE NGSI has its own type "
"system for attribute values, so NGSI value types are not "
"the same as JSON types. Allowed characters "
"are the ones in the plain ASCII set, except the following "
"ones: control characters, whitespace, &, ?, / and #.",
max_length=256,
min_length=1,
)
valid_type = field_validator("type")(validate_fiware_datatype_string_protect)
[docs]class DeviceCommand(BaseModel):
"""
Model for commands
"""
name: str = Field(
description="ID of the attribute in the target entity in the "
"Context Broker. Allowed characters "
"are the ones in the plain ASCII set, except the following "
"ones: control characters, whitespace, &, ?, / and #.",
max_length=256,
min_length=1,
)
valid_name = field_validator("name")(validate_fiware_datatype_string_protect)
type: Union[DataType, str] = Field(
description="name of the type of the attribute in the target entity. ",
default=DataType.COMMAND
)
[docs]class StaticDeviceAttribute(IoTABaseAttribute, BaseValueAttribute):
"""
Model for static device attributes
"""
pass
[docs]class ServiceGroup(BaseModel):
"""
Model for device service group.
https://iotagent-node-lib.readthedocs.io/en/latest/api/index.html#service-group-api
"""
service: Optional[str] = Field(
default=None,
description="ServiceGroup of the devices of this type"
)
subservice: Optional[str] = Field(
default=None,
description="Subservice of the devices of this type.",
pattern="^/"
)
resource: str = Field(
description="string representing the Southbound resource that will be "
"used to assign a type to a device (e.g.: pathname in the "
"southbound port)."
)
apikey: str = Field(
description="API Key string. It is a key used for devices belonging "
"to this service_group. If "", service_group does not use "
"apikey, but it must be specified."
)
timestamp: Optional[bool] = Field(
default=None,
description="Optional flag about whether or not to add the TimeInstant "
"attribute to the device entity created, as well as a "
"TimeInstant metadata to each attribute, with the current "
"timestamp. With NGSI-LD, the Standard observedAt "
"property-of-a-property is created instead."
)
entity_type: Optional[str] = Field(
default=None,
description="name of the Entity type to assign to the group. "
"Allowed characters "
"are the ones in the plain ASCII set, except the following "
"ones: control characters, whitespace, &, ?, / and #.",
max_length=256,
min_length=1,
)
valid_entity_type = field_validator("entity_type")(validate_fiware_datatype_standard)
trust: Optional[str] = Field(
default=None,
description="trust token to use for secured access to the "
"Context Broker for this type of devices (optional; only "
"needed for secured scenarios)."
)
cbHost: Optional[AnyHttpUrl] = Field(
default=None,
description="Context Broker connection information. This options can "
"be used to override the global ones for specific types of "
"devices."
)
[docs] @field_validator('cbHost')
@classmethod
def validate_cbHost(cls, value):
"""
convert cbHost to str
Returns:
timezone
"""
return str(value) if value else value
lazy: Optional[List[LazyDeviceAttribute]] = Field(
default=[],
desription="list of common lazy attributes of the device. For each "
"attribute, its name and type must be provided."
)
commands: Optional[List[DeviceCommand]] = Field(
default=[],
desription="list of common commands attributes of the device. For each "
"attribute, its name and type must be provided, additional "
"metadata is optional"
)
attributes: Optional[List[DeviceAttribute]] = Field(
default=[],
description="list of common commands attributes of the device. For "
"each attribute, its name and type must be provided, "
"additional metadata is optional."
)
static_attributes: Optional[List[StaticDeviceAttribute]] = Field(
default=[],
description="this attributes will be added to all the entities of this "
"group 'as is', additional metadata is optional."
)
internal_attributes: Optional[List[Dict[str, Any]]] = Field(
default=[],
description="optional section with free format, to allow specific "
"IoT Agents to store information along with the devices "
"in the Device Registry."
)
expressionLanguage: Optional[ExpressionLanguage] = Field(
default=ExpressionLanguage.JEXL,
description="optional boolean value, to set expression language used "
"to compute expressions, possible values are: "
"legacy or jexl, but legacy is deprecated. If it is set None, jexl is used."
)
valid_expressionLanguage = field_validator("expressionLanguage")(validate_expression_language)
explicitAttrs: Optional[bool] = Field(
default=False,
description="optional boolean value, to support selective ignore "
"of measures so that IOTA does not progress. If not "
"specified default is false."
)
autoprovision: Optional[bool] = Field(
default=True,
description="optional boolean: If false, autoprovisioned devices "
"(i.e. devices that are not created with an explicit "
"provision operation but when the first measure arrives) "
"are not allowed in this group. "
"Default (in the case of omitting the field) is true."
)
ngsiVersion: Optional[NgsiVersion] = Field(
default="v2",
description="optional string value used in mixed mode to switch between"
" NGSI-v2 and NGSI-LD payloads. Possible values are: "
"v2 or ld. The default is v2. When not running in mixed "
"mode, this field is ignored.")
defaultEntityNameConjunction: Optional[str] = Field(
default=None,
description="optional string value to set default conjunction string "
"used to compose a default entity_name when is not "
"provided at device provisioning time."
)
[docs]class DeviceSettings(BaseModel):
"""
Model for iot device settings
"""
model_config = ConfigDict(validate_assignment=True)
timezone: Optional[str] = Field(
default='Europe/London',
description="Time zone of the sensor if it has any"
)
timestamp: Optional[bool] = Field(
default=None,
description="Optional flag about whether or not to add the TimeInstant "
"attribute to the device entity created, as well as a "
"TimeInstant metadata to each attribute, with the current "
"timestamp. With NGSI-LD, the Standard observedAt "
"property-of-a-property is created instead."
)
apikey: Optional[str] = Field(
default=None,
description="Optional Apikey key string to use instead of group apikey"
)
endpoint: Optional[AnyHttpUrl] = Field(
default=None,
description="Endpoint where the device is going to receive commands, "
"if any."
)
protocol: Optional[Union[PayloadProtocol, str]] = Field(
default=None,
description="Name of the device protocol, for its use with an "
"IoT Manager."
)
transport: Optional[Union[TransportProtocol, str]] = Field(
default=None,
description="Name of the device transport protocol, for the IoT Agents "
"with multiple transport protocols."
)
expressionLanguage: Optional[ExpressionLanguage] = Field(
default=ExpressionLanguage.JEXL,
description="optional boolean value, to set expression language used "
"to compute expressions, possible values are: "
"legacy or jexl, but legacy is deprecated. If it is set None, jexl is used."
)
valid_expressionLanguage = field_validator("expressionLanguage")(validate_expression_language)
explicitAttrs: Optional[bool] = Field(
default=False,
description="optional boolean value, to support selective ignore "
"of measures so that IOTA does not progress. If not "
"specified default is false."
)
[docs]class Device(DeviceSettings):
"""
Model for iot devices.
https://iotagent-node-lib.readthedocs.io/en/latest/api/index.html#device-api
"""
model_config = ConfigDict(validate_default=True, validate_assignment=True)
device_id: str = Field(
description="Device ID that will be used to identify the device"
)
service: Optional[str] = Field(
default=None,
description="Name of the service the device belongs to "
"(will be used in the fiware-service header).",
max_length=50
)
service_path: Optional[str] = Field(
default="/",
description="Name of the subservice the device belongs to "
"(used in the fiware-servicepath header).",
max_length=51,
pattern="^/"
)
entity_name: str = Field(
description="Name of the entity representing the device in "
"the Context Broker Allowed characters "
"are the ones in the plain ASCII set, except the following "
"ones: control characters, whitespace, &, ?, / and #.",
max_length=256,
min_length=1,
)
valid_entity_name = field_validator("entity_name")(validate_fiware_datatype_standard)
entity_type: str = Field(
description="Type of the entity in the Context Broker. "
"Allowed characters "
"are the ones in the plain ASCII set, except the following "
"ones: control characters, whitespace, &, ?, / and #.",
max_length=256,
min_length=1,
)
valid_entity_type = field_validator("entity_type")(validate_fiware_datatype_standard)
lazy: List[LazyDeviceAttribute] = Field(
default=[],
description="List of lazy attributes of the device"
)
commands: List[DeviceCommand] = Field(
default=[],
desription="List of commands of the device"
)
attributes: List[DeviceAttribute] = Field(
default=[],
description="List of active attributes of the device"
)
static_attributes: Optional[List[StaticDeviceAttribute]] = Field(
default=[],
description="List of static attributes to append to the entity. All the"
" updateContext requests to the CB will have this set of "
"attributes appended."
)
internal_attributes: Optional[List[Dict[str, Any]]] = Field(
default=[],
description="List of internal attributes with free format for specific "
"IoT Agent configuration"
)
ngsiVersion: NgsiVersion = Field(
default=NgsiVersion.v2,
description="optional string value used in mixed mode to switch between"
" NGSI-v2 and NGSI-LD payloads. Possible values are: "
"v2 or ld. The default is v2. When not running in "
"mixed mode, this field is ignored.")
[docs] @field_validator('timezone')
@classmethod
def validate_timezone(cls, value):
"""
validate timezone
Returns:
timezone
"""
assert value in pytz.all_timezones
return value
[docs] @model_validator(mode='after')
def validate_device_attributes_expression(self):
"""
Validates device attributes expressions based on the expression language (JEXL or Legacy, where Legacy is
deprecated).
Args:
self: The Device instance.
Returns:
The Device instance after validation.
"""
if self.expressionLanguage == ExpressionLanguage.JEXL:
for attribute in self.attributes:
if attribute.expression:
validate_jexl_expression(attribute.expression, attribute.name, self.device_id)
elif self.expressionLanguage == ExpressionLanguage.LEGACY:
warnings.warn(f"No validation for legacy expression language of Device {self.device_id}.")
return self
[docs] @model_validator(mode='after')
def validate_duplicated_device_attributes(self):
"""
Check whether device has identical attributes
Args:
self: dict of Device instance.
Returns:
The dict of Device instance after validation.
"""
for i, attr in enumerate(self.attributes):
for other_attr in self.attributes[:i] + self.attributes[i + 1:]:
if attr.model_dump() == other_attr.model_dump():
raise ValueError(f"Duplicated attributes found: {attr.name}")
return self
[docs] @model_validator(mode='after')
def validate_device_attributes_name_object_id(self):
"""
Validate the device regarding the behavior with devices attributes.
According to https://iotagent-node-lib.readthedocs.io/en/latest/api.html and
based on our best practice, following rules are checked
- name is required, but not necessarily unique
- object_id is not required, if given must be unique, i.e. not equal to any
existing object_id and name
Args:
self: dict of Device instance.
Returns:
The dict of Device instance after validation.
"""
for i, attr in enumerate(self.attributes):
for other_attr in self.attributes[:i] + self.attributes[i + 1:]:
if attr.object_id and other_attr.object_id and \
attr.object_id == other_attr.object_id:
raise ValueError(f"object_id {attr.object_id} is not unique")
if attr.object_id and attr.object_id == other_attr.name:
raise ValueError(f"object_id {attr.object_id} is not unique")
return self
[docs] def get_attribute(self, attribute_name: str) -> Union[DeviceAttribute,
LazyDeviceAttribute,
StaticDeviceAttribute,
DeviceCommand]:
"""
Args:
attribute_name:
Returns:
"""
for attribute in itertools.chain(self.attributes,
self.lazy,
self.static_attributes,
self.internal_attributes,
self.commands):
if attribute.name == attribute_name:
return attribute
msg = f"Device: {self.device_id}: Could not " \
f"find attribute with name {attribute_name}"
logger.error(msg)
raise KeyError(msg)
[docs] def add_attribute(self,
attribute: Union[DeviceAttribute,
LazyDeviceAttribute,
StaticDeviceAttribute,
DeviceCommand],
update: bool = False) -> None:
"""
Args:
attribute:
update (bool): If 'True' and attribute does already exists tries
to update the attribute if not
Returns:
None
"""
try:
if type(attribute) == DeviceAttribute:
if attribute.model_dump(exclude_none=True) in \
[attr.model_dump(exclude_none=True) for attr in self.attributes]:
raise ValueError
self.attributes.append(attribute)
self.__setattr__(name='attributes',
value=self.attributes)
elif type(attribute) == LazyDeviceAttribute:
if attribute in self.lazy:
raise ValueError
self.lazy.append(attribute)
self.__setattr__(name='lazy',
value=self.lazy)
elif type(attribute) == StaticDeviceAttribute:
if attribute in self.static_attributes:
raise ValueError
self.static_attributes.append(attribute)
self.__setattr__(name='static_attributes',
value=self.static_attributes)
elif type(attribute) == DeviceCommand:
if attribute in self.commands:
raise ValueError
self.commands.append(attribute)
self.__setattr__(name='commands',
value=self.commands)
else:
raise ValueError
except ValueError:
if update:
self.update_attribute(attribute, append=False)
logger.warning("Device: %s: Attribute already "
"exists. Will update: \n %s",
self.device_id, attribute.model_dump_json(indent=2))
else:
logger.error("Device: %s: Attribute already "
"exists: \n %s", self.device_id,
attribute.model_dump_json(indent=2))
raise
[docs] def update_attribute(self,
attribute: Union[DeviceAttribute,
LazyDeviceAttribute,
StaticDeviceAttribute,
DeviceCommand],
append: bool = False) -> None:
"""
Updates existing device attribute
Args:
attribute: Attribute to add to device configuration
append (bool): Adds attribute if not exist
Returns:
None
"""
try:
if type(attribute) == DeviceAttribute:
idx = self.attributes.index(attribute)
self.attributes[idx].model_dump().update(attribute.model_dump())
elif type(attribute) == LazyDeviceAttribute:
idx = self.lazy.index(attribute)
self.lazy[idx].model_dump().update(attribute.model_dump())
elif type(attribute) == StaticDeviceAttribute:
idx = self.static_attributes.index(attribute)
self.static_attributes[idx].model_dump().update(attribute.model_dump())
elif type(attribute) == DeviceCommand:
idx = self.commands.index(attribute)
self.commands[idx].model_dump().update(attribute.model_dump())
except ValueError:
if append:
logger.warning("Device: %s: Could not find "
"attribute: \n %s",
self.device_id, attribute.model_dump_json(indent=2))
self.add_attribute(attribute=attribute)
else:
msg = f"Device: {self.device_id}: Could not find "\
f"attribute: \n {attribute.model_dump_json(indent=2)}"
raise KeyError(msg)
[docs] def delete_attribute(self, attribute: Union[DeviceAttribute,
LazyDeviceAttribute,
StaticDeviceAttribute,
DeviceCommand]):
"""
Deletes attribute from device
Args:
attribute: ()
Returns:
"""
try:
if type(attribute) == DeviceAttribute:
self.attributes.remove(attribute)
elif type(attribute) == LazyDeviceAttribute:
self.lazy.remove(attribute)
elif type(attribute) == StaticDeviceAttribute:
self.static_attributes.remove(attribute)
elif type(attribute) == DeviceCommand:
self.commands.remove(attribute)
else:
raise ValueError
except ValueError:
logger.warning("Device: %s: Could not delete "
"attribute: \n %s",
self.device_id, attribute.model_dump_json(indent=2))
raise
logger.info("Device: %s: Attribute deleted! \n %s",
self.device_id, attribute.model_dump_json(indent=2))
[docs] def get_command(self, command_name: str):
"""
Short for self.get_attributes
Args:
command_name (str):
Returns:
"""
return self.get_attribute(attribute_name=command_name)
[docs] def add_command(self, command: DeviceCommand, update: bool = False):
"""
Short for self.add_attribute
Args:
command (DeviceCommand):
update (bool): Update command if it already exists
Returns:
"""
self.add_attribute(attribute=command, update=update)
[docs] def update_command(self, command: DeviceCommand, append: bool = False):
"""
Short for self.update_attribute
Args:
command:
append:
Returns:
"""
self.update_attribute(attribute=command, append=append)
[docs] def delete_command(self, command: DeviceCommand):
"""
Short for self.delete_attribute
Args:
command:
Returns:
None
"""
self.delete_attribute(attribute=command)