"""Vocabulary Models for Ontology Entities"""
from enum import Enum
from pydantic import BaseModel, Field
from typing import List, TYPE_CHECKING, Dict, Union, Set, Any
from .source import DependencyStatement
if TYPE_CHECKING:
from . import \
CombinedObjectRelation, \
CombinedDataRelation, \
CombinedRelation, \
Relation, \
Vocabulary, \
Source
[docs]class Entity(BaseModel):
"""
Representing an OWL Entity (Class, Datatype, DataProperty, ObjectProperty,
Individual)
An Entity is characterised by a unique IRI and originates from a source
An Entity needs a unique Label (displayname) as it is used in FIWARE as
field key. The user can overwrite the given
label
"""
iri: str = Field(description="Unique Internationalized Resource Identifier")
label: str = Field(
default="",
description="Label (displayname) extracted from source file "
"(multiple Entities could have the same label)")
user_set_label: Any = Field(
default="",
description="Given by user and overwrites 'label'."
" Needed to make labels unique")
comment: str = Field(
default="",
description="Comment extracted from the ontology/source")
source_ids: Set[str] = Field(
default=set(),
description="IDs of the sources that influenced this class")
predefined: bool = Field(
default=False,
description="Stats if the entity is not extracted from a source, "
"but predefined in the program (Standard Datatypes)")
[docs] def get_label(self) -> str:
""" Get the label for the entity.
If the user has set a label it is returned, else the label extracted
from the source
Returns:
str
"""
if not self.user_set_label == "":
return self.user_set_label
return self.get_original_label()
[docs] def set_label(self, label:str):
""" Change the display label of the entity
Args:
label (str): Label that the label should have
"""
self.user_set_label = label
[docs] def get_ontology_iri(self) -> str:
""" Get the IRI of the ontology that this entity belongs to
(extracted from IRI)
Returns:
str
"""
index = self.iri.find("#")
return self.iri[:index]
[docs] def get_source_names(self, vocabulary: 'Vocabulary') -> List[str]:
""" Get the names of all the sources
Args:
vocabulary (Vocabulary): Vocabulary of the project
Returns:
str
"""
names = [vocabulary.get_source(id).get_name() for
id in self.source_ids]
return names
[docs] def get_sources(self, vocabulary: 'Vocabulary') -> List['Source']:
""" Get all the source objects that influenced this entity.
The sources are sorted according to their names
Args:
vocabulary (Vocabulary): Vocabulary of the project
Returns:
str
"""
sources = [vocabulary.get_source(id) for id in self.source_ids]
sources.sort(key=lambda x: x.source_name, reverse=False)
return sources
def _lists_are_identical(self, a: List, b: List) -> bool:
""" Methode to test if to lists contain the same entries
Args:
a (List): first list
b (List): second list
Returns:
bool
"""
return len(set(a).intersection(b)) == len(set(a)) and len(a) == len(b)
[docs] def is_renamed(self) -> bool:
""" Check if the entity was renamed by the user
Returns:
bool
"""
return not self.user_set_label == ""
[docs] def get_original_label(self) -> str:
""" Get label as defined in the source
It can be that the label is empty, then extract the label from the iri
Returns:
str
"""
if not self.label == "":
return self.label
index = self.iri.find("#") + 1
return self.iri[index:]
[docs]class Class(Entity):
"""
Representation of OWL:CLASS
A class has a set of relations that are combined into CombinedRelations
Instances are instantiations of a class
A class can represent Devices, Agents, None or both
"""
# The objects whose ids/iris are listed here can be looked up in the
# vocabulary of this class
child_class_iris: List[str] = Field(
default=[],
description="All class_iris of classes that inherit from this class")
ancestor_class_iris: List[str] = Field(
default=[],
description="All class_iris of classes from which this class inherits")
parent_class_iris: List[str] = Field(
default=[],
description="All class_iris of classes that are direct parents of this "
"class")
relation_ids: List[str] = Field(
default=[],
description="All ids of relations defined for this class")
combined_object_relation_ids: List[str] = Field(
default=[],
description="All combined_object_relations ids defined for this class")
combined_data_relation_ids: List[str] = Field(
default=[],
description="All combined_data_relations ids defined for this class")
[docs] def get_relation_ids(self) -> List[str]:
"""Get all ids of relations belonging to this class
Returns:
List[str]
"""
return self.relation_ids
[docs] def get_relations(self, vocabulary: 'Vocabulary') -> List['Relation']:
"""Get all relations belonging to this class
Args:
vocabulary (Vocabulary): Vocabulary of this project
Returns:
List[Relation]
"""
result = []
for id in self.relation_ids:
result.append(vocabulary.get_relation_by_id(id))
return result
[docs] def get_combined_object_relations(self, vocabulary: 'Vocabulary') -> \
List['CombinedObjectRelation']:
"""Get all combined object relations belonging to this class
Args:
vocabulary (Vocabulary): Vocabulary of this project
Returns:
List[CombinedObjectRelation]
"""
result = []
for id in self.combined_object_relation_ids:
result.append(vocabulary.get_combined_object_relation_by_id(id))
return result
[docs] def get_combined_data_relations(self, vocabulary: 'Vocabulary') -> \
List['CombinedDataRelation']:
"""Get all combined data relations belonging to this class
Args:
vocabulary (Vocabulary): Vocabulary of this project
Returns:
List[CombinedDataRelation]
"""
result = []
for id in self.combined_data_relation_ids:
result.append(vocabulary.get_combined_data_relation_by_id(id))
return result
[docs] def get_combined_relations(self, vocabulary: 'Vocabulary') -> \
List['CombinedRelation']:
"""Get all combined relations belonging to this class
Args:
vocabulary (Vocabulary): Vocabulary of this project
Returns:
List[CombinedRelation]
"""
result = self.get_combined_object_relations(vocabulary)
result.extend(self.get_combined_data_relations(vocabulary))
return result
[docs] def is_child_of_all_classes(self, target_list: List[str]) -> bool:
"""Tests if this class is a child class for each of the given classes
Args:
target_list (List[str]): List of ancestor class_iris
Returns:
bool
"""
for target in target_list:
if not target == self.iri:
if target not in self.ancestor_class_iris:
return False
return True
[docs] def get_combined_object_relation_with_property_iri(
self, obj_prop_iri: str, vocabulary: 'Vocabulary') \
-> 'CombinedObjectRelation':
"""
Get the CombinedObjectRelation of this class that combines the
relations of the given ObjectProperty
Args:
obj_prop_iri (str): Iri of the ObjectProperty
vocabulary (Vocabulary): Vocabulary of this project
Returns:
CombinedObjectRelation
"""
for cor in self.get_combined_object_relations(vocabulary):
if cor.property_iri == obj_prop_iri:
return cor
return None
[docs] def get_combined_data_relation_with_property_iri(self, property_iri,
vocabulary):
"""
Get the CombinedDataRelation of this class that combines the
relations of the given DataProperty
Args:
property_iri (str): Iri of the DataProperty
vocabulary (Vocabulary): Vocabulary of this project
Returns:
CombinedDataRelation
"""
for cdr in self.get_combined_data_relations(vocabulary):
if cdr.property_iri == property_iri:
return cdr
return None
[docs] def get_combined_relation_with_property_iri(self, property_iri, vocabulary)\
-> Union['CombinedRelation', None]:
"""
Get the CombinedRelation of this class that combines the relations
of the given Property
If possible use the more specific access functions to save runtime.
Args:
property_iri (str): Iri of the Property
vocabulary (Vocabulary): Vocabulary of this project
Returns:
CombinedRelation, None if iri is unknown
"""
for cdr in self.get_combined_data_relations(vocabulary):
if cdr.property_iri == property_iri:
return cdr
for cor in self.get_combined_object_relations(vocabulary):
if cor.property_iri == property_iri:
return cor
return None
[docs] def get_ancestor_classes(self, vocabulary: 'Vocabulary') -> List['Class']:
"""Get all ancestor classes of this class
Args:
vocabulary (Vocabulary): Vocabulary of this project
Returns:
List[Class]
"""
ancestors = []
for ancestor_iri in self.ancestor_class_iris:
ancestors.append(vocabulary.get_class_by_iri(ancestor_iri))
return ancestors
[docs] def get_parent_classes(self,
vocabulary: 'Vocabulary',
remove_redundancy: bool = False) -> List['Class']:
"""Get all parent classes of this class
Args:
vocabulary (Vocabulary): Vocabulary of this project
remove_redundancy (bool): if true the parents that are child of
other parents are not included
Returns:
List[Class]
"""
parents = []
for parent_iri in self.parent_class_iris:
parents.append(vocabulary.get_class_by_iri(parent_iri))
if remove_redundancy:
child_iris = set()
for parent in parents:
child_iris.update(parent.child_class_iris)
for parent in parents:
if parent.iri in child_iris:
parents.remove(parent)
return parents
[docs] def treat_dependency_statements(self, vocabulary: 'Vocabulary') -> \
List[DependencyStatement]:
"""
Purge and list all pointers/iris that are not contained in
the vocabulary
Args:
vocabulary (Vocabulary): Vocabulary of this project
Returns:
List[Dict[str, str]]: List of purged statements dicts with keys:
Parent Class, class, dependency, fulfilled
"""
statements = []
# parent classes:
parents_to_purge = []
for parent_iri in self.parent_class_iris:
found = parent_iri in vocabulary.classes
statements.append(DependencyStatement(type="Parent Class",
class_iri=self.iri,
dependency_iri=parent_iri,
fulfilled=found
))
if not found:
parents_to_purge.append(parent_iri)
for iri in parents_to_purge:
self.parent_class_iris.remove(iri)
# relations
relation_ids_to_purge = set()
for relation in self.get_relations(vocabulary):
relation_statements = relation.get_dependency_statements(
vocabulary, self.get_ontology_iri(), self.iri)
for statement in relation_statements:
if statement.fulfilled == False:
relation_ids_to_purge.add(relation.id)
statements.extend(relation_statements)
for id in relation_ids_to_purge:
self.relation_ids.remove(id)
del vocabulary.relations[id]
return statements
[docs] def get_next_combined_relation_id(self, current_cr_id: str,
object_relations: bool) -> str:
"""Get the alphabetically(Property label) next CombinedRelation.
If no CR is after the given one, the first is returned
Args:
current_cr_id (str): ID of the CombinedRelation of which the next
should be found
object_relations (bool):
True if Searching for CombinedObjectRelations
Returns:
str: ID of next CR
"""
list_ = self.combined_data_relation_ids
if object_relations:
list_ = self.combined_object_relation_ids
current_index = list_.index(current_cr_id)
res_index = current_index+1
if res_index >= len(list_):
res_index = 0
return list_[res_index]
[docs] def get_previous_combined_relation_id(self, current_cr_id: str,
object_relations: bool) -> str:
"""Get the alphabetically(Property label) previous CombinedRelation.
If no CR is before the given one, the last is returned
Args:
current_cr_id (str): ID of the CombinedRelation of which the
previous should be found
object_relations (bool): True if Searching for
CombinedObjectRelations
Returns:
str: ID of previous CR
"""
list_ = self.combined_data_relation_ids
if object_relations:
list_ = self.combined_object_relation_ids
current_index = list_.index(current_cr_id)
res_index = current_index - 1
if res_index < 0:
res_index = len(list_)-1
return list_[res_index]
[docs] def is_logically_equivalent_to(self, class_: 'Class',
vocabulary: 'Vocabulary',
old_vocabulary: 'Vocabulary') -> bool:
"""Test if a class is logically equivalent in two vocabularies.
Args:
class_ (Class): Class to be tested against, from the old_vocabulary
vocabulary (Vocabulary): New project vocabulary
old_vocabulary (Vocabulary): Old project vocabulary
Returns:
bool
"""
# test if parent classes are identical
if not self._lists_are_identical(class_.parent_class_iris,
self.parent_class_iris):
return False
# test if combined object relation ids are identical
if not self._lists_are_identical(class_.combined_object_relation_ids,
self.combined_object_relation_ids):
return False
# test if combined data relation ids are identical
if not self._lists_are_identical(class_.combined_data_relation_ids,
self.combined_data_relation_ids):
return False
# test if combined relations are identical
for cr in self.get_combined_relations(vocabulary):
old_cr = old_vocabulary.get_combined_relation_by_id(cr.id)
relation_strings = []
for relation in cr.get_relations(vocabulary):
relation_strings.append(relation.to_string(vocabulary))
old_relation_strings = []
for old_relation in old_cr.get_relations(old_vocabulary):
old_relation_strings.append(old_relation.to_string(vocabulary))
if not self._lists_are_identical(relation_strings,
old_relation_strings):
return False
return True
[docs] def is_iot_class(self, vocabulary: 'Vocabulary') -> bool:
"""
A class is an iot/device class if it contains one CDR, where the
relation is marked as a device relation: DeviceAttribute/Command
Args:
vocabulary (Vocabulary): Vocabulary of the project
Returns:
bool
"""
for cdr_id in self.combined_data_relation_ids:
cdr = vocabulary.get_combined_data_relation_by_id(cdr_id)
prop = vocabulary.get_data_property(cdr.property_iri)
if not prop.field_type == DataFieldType.simple:
return True
return False
[docs]class DatatypeType(str, Enum):
"""
Types of a Datatype
"""
string = 'string'
number = 'number'
date = 'date'
enum = 'enum'
[docs]class DatatypeFields(BaseModel):
"""Key Fields describing a Datatype"""
type: DatatypeType = Field(default=DatatypeType.string,
description="Type of the datatype")
number_has_range: Any = Field(
default=False,
description="If Type==Number: Does the datatype define a range")
number_range_min: Union[int, str] = Field(
default="/",
description="If Type==Number: Min value of the datatype range, "
"if a range is defined")
number_range_max: Union[int, str] = Field(
default="/",
description="If Type==Number: Max value of the datatype range, "
"if a range is defined")
number_decimal_allowed: bool = Field(
default=False,
description="If Type==Number: Are decimal numbers allowed?")
forbidden_chars: List[str] = Field(
default=[],
description="If Type==String: Blacklisted chars")
allowed_chars: List[str] = Field(
default=[],
description="If Type==String: Whitelisted chars")
enum_values: List[str] = Field(
default=[],
description="If Type==Enum: Enum values")
[docs]class Datatype(Entity, DatatypeFields):
"""
Represents OWL:Datatype
A Datatype is the target of a DataRelation. The Datatype stats a set of
values that are valid.
This can be an ENUM, a number range, or a check for black/whitelisted chars
In the Parsing PostProcesseor predefined datatype_catalogue are added to the
vocabulary
"""
[docs] def export(self) -> Dict[str,str]:
""" Export datatype as dict
Returns:
Dict[str,str]
"""
res = self.model_dump(include={'type', 'number_has_range',
'number_range_min', 'number_range_max',
'number_decimal_allowed', 'forbidden_chars',
'allowed_chars', 'enum_values'},
exclude_defaults=True)
res['type'] = self.type.value
return res
[docs] def value_is_valid(self, value: str) -> bool:
"""Test if value is valid for this datatype.
Numbers are also given as strings
Args:
value (str): value to be tested
Returns:
bool
"""
if self.type == DatatypeType.string:
if len(self.allowed_chars) > 0:
# if allowed chars is empty all chars are allowed
for char in value:
if char not in self.allowed_chars:
return False
for char in self.forbidden_chars:
if char in value:
return False
return True
if self.type == DatatypeType.number:
if self.number_decimal_allowed:
try:
number = float(value)
except:
return False
else:
try:
number = int(value)
except:
return False
if not self.number_range_min == "/":
if number < self.number_range_min:
return False
if not self.number_range_max == "/":
if number > self.number_range_max:
return False
return True
if self.type == DatatypeType.enum:
return value in self.enum_values
if self.type == DatatypeType.date:
try:
from dateutil.parser import parse
parse(value, fuzzy=False)
return True
except ValueError:
return False
return True
[docs] def is_logically_equivalent_to(self, datatype:'Datatype',
vocabulary: 'Vocabulary',
old_vocabulary: 'Vocabulary') -> bool:
"""Test if this datatype is logically equivalent to the given datatype
Args:
datatype (Datatype): Datatype to compare against
vocabulary (Vocabulary): Not used, but needed to keep signature the
same as other entities
old_vocabulary (Vocabulary): Not used, but needed to keep signature
the same as other entities
Returns:
bool
"""
if not self.type == datatype.type:
return False
if not self.number_has_range == datatype.number_has_range:
return False
if not self.enum_values == datatype.enum_values:
return False
return True
[docs]class Individual(Entity):
"""
Represents OWL:Individual
An individual is a predefined "instance" of a class
But they are here only used as values for Relations
They are not instances, no value can be assigned to them, they are no
agents or devices
"""
parent_class_iris: List[str] = Field(
default=[],
description="List of all parent class iris, "
"an individual can have multiple parents")
[docs] def to_string(self) -> str:
"""Get a string representation of the Individual
Returns:
str
"""
return "(Individual)"+self.get_label()
[docs] def get_ancestor_iris(self, vocabulary: 'Vocabulary') -> List[str]:
""" Get all iris of ancestor classes
Args:
vocabulary (Vocabulary): Vocabulary of the project
Returns:
List[str]
"""
ancestor_iris = set()
for parent_iri in self.parent_class_iris:
ancestor_iris.add(parent_iri)
ancestor_iris.update(vocabulary.get_class_by_iri(parent_iri).
ancestor_class_iris)
return list(ancestor_iris)
[docs] def get_parent_classes(self, vocabulary: 'Vocabulary') -> List['Class']:
""" Get all parent class objects
Args:
vocabulary (Vocabulary): Vocabulary of the project
Returns:
List[Class]
"""
parents = []
for parent_iri in self.parent_class_iris:
parents.append(vocabulary.get_class_by_iri(parent_iri))
return parents
[docs] def is_logically_equivalent_to(self, individual: 'Individual',
vocabulary: 'Vocabulary',
old_vocabulary: 'Vocabulary') -> bool:
"""Test if this individal is logically equivalent in two vocabularies.
Args:
individual (Individual): Individual to be tested against, from the
old vocabulary
vocabulary (Vocabulary): New project vocabulary, not used but needed
to keep signature the same
old_vocabulary (Vocabulary): Old project vocabulary, not used but
needed to keep signature the same
Returns:
bool
"""
if not self._lists_are_identical(self.parent_class_iris,
individual.parent_class_iris):
return False
return True
[docs] def treat_dependency_statements(self, vocabulary: 'Vocabulary') -> \
List[DependencyStatement]:
""" Purge and list all pointers/iris that are not contained in the
vocabulary
Args:
vocabulary (Vocabulary): Vocabulary of this project
Returns:
List[Dict[str, str]]: List of purged statements dicts with keys:
Parent Class, class, dependency, fulfilled
"""
statements = []
for parent_iri in self.parent_class_iris:
found = parent_iri in vocabulary.classes
statements.append(DependencyStatement(type="Parent Class",
class_iri=self.iri,
dependency_iri=parent_iri,
fulfilled=found
))
if not found:
self.parent_class_iris.remove(parent_iri)
return statements
[docs]class DataFieldType(str, Enum):
"""Type of the field that represents the DataProperty"""
command = "command"
device_attribute = "device_attribute"
simple = "simple"
[docs]class DataProperty(Entity):
"""
Representation of OWL:DataProperty
"""
field_type: DataFieldType = Field(
default=DataFieldType.simple,
description="Type of the dataproperty; set by the user while "
"configuring the vocabulary"
)
[docs]class ObjectProperty(Entity):
"""
Representation of OWL:ObjectProperty
"""
inverse_property_iris: Set[str] = Field(
default=set(),
description="List of property iris that are inverse:Of; "
"If an instance i2 is added in an instance i1 "
"for this property. Then i1 is added to i2 under the"
" inverseProperty (if the class has that property)")
[docs] def add_inverse_property_iri(self, iri: str):
"""Add an inverse property
Args:
iri (str): Iri of the inverse objectProperty
Returns:
None
"""
self.inverse_property_iris.add(iri)
[docs] def is_logically_equivalent_to(self, object_property: 'ObjectProperty',
vocabulary: 'Vocabulary',
old_vocabulary: 'Vocabulary') -> bool:
"""Test if this Property in the new_vocabulary is logically equivalent
to the object_property in the old_vocabulary
Args:
object_property (ObjectProperty): ObjectProperty to be tested
against, from the old vocabulary
vocabulary (Vocabulary): New project vocabulary, not used but
needed to keep signature the same
old_vocabulary (Vocabulary): Old project vocabulary, not used but
needed to keep signature the same
Returns:
bool
"""
if not self.inverse_property_iris == \
object_property.inverse_property_iris:
return False
return True