Source code for filip.semantics.vocabulary.entities

"""Vocabulary Models for Ontology Entities"""

from enum import Enum
from pydantic import BaseModel, Field
from typing import List, TYPE_CHECKING, Dict, Union, Set, Any

from .source import DependencyStatement

if TYPE_CHECKING:
    from . import \
        CombinedObjectRelation, \
        CombinedDataRelation, \
        CombinedRelation, \
        Relation, \
        Vocabulary, \
        Source


[docs]class Entity(BaseModel): """ Representing an OWL Entity (Class, Datatype, DataProperty, ObjectProperty, Individual) An Entity is characterised by a unique IRI and originates from a source An Entity needs a unique Label (displayname) as it is used in FIWARE as field key. The user can overwrite the given label """ iri: str = Field(description="Unique Internationalized Resource Identifier") label: str = Field( default="", description="Label (displayname) extracted from source file " "(multiple Entities could have the same label)") user_set_label: Any = Field( default="", description="Given by user and overwrites 'label'." " Needed to make labels unique") comment: str = Field( default="", description="Comment extracted from the ontology/source") source_ids: Set[str] = Field( default=set(), description="IDs of the sources that influenced this class") predefined: bool = Field( default=False, description="Stats if the entity is not extracted from a source, " "but predefined in the program (Standard Datatypes)")
[docs] def get_label(self) -> str: """ Get the label for the entity. If the user has set a label it is returned, else the label extracted from the source Returns: str """ if not self.user_set_label == "": return self.user_set_label return self.get_original_label()
[docs] def set_label(self, label:str): """ Change the display label of the entity Args: label (str): Label that the label should have """ self.user_set_label = label
[docs] def get_ontology_iri(self) -> str: """ Get the IRI of the ontology that this entity belongs to (extracted from IRI) Returns: str """ index = self.iri.find("#") return self.iri[:index]
[docs] def get_source_names(self, vocabulary: 'Vocabulary') -> List[str]: """ Get the names of all the sources Args: vocabulary (Vocabulary): Vocabulary of the project Returns: str """ names = [vocabulary.get_source(id).get_name() for id in self.source_ids] return names
[docs] def get_sources(self, vocabulary: 'Vocabulary') -> List['Source']: """ Get all the source objects that influenced this entity. The sources are sorted according to their names Args: vocabulary (Vocabulary): Vocabulary of the project Returns: str """ sources = [vocabulary.get_source(id) for id in self.source_ids] sources.sort(key=lambda x: x.source_name, reverse=False) return sources
def _lists_are_identical(self, a: List, b: List) -> bool: """ Methode to test if to lists contain the same entries Args: a (List): first list b (List): second list Returns: bool """ return len(set(a).intersection(b)) == len(set(a)) and len(a) == len(b)
[docs] def is_renamed(self) -> bool: """ Check if the entity was renamed by the user Returns: bool """ return not self.user_set_label == ""
[docs] def get_original_label(self) -> str: """ Get label as defined in the source It can be that the label is empty, then extract the label from the iri Returns: str """ if not self.label == "": return self.label index = self.iri.find("#") + 1 return self.iri[index:]
[docs]class Class(Entity): """ Representation of OWL:CLASS A class has a set of relations that are combined into CombinedRelations Instances are instantiations of a class A class can represent Devices, Agents, None or both """ # The objects whose ids/iris are listed here can be looked up in the # vocabulary of this class child_class_iris: List[str] = Field( default=[], description="All class_iris of classes that inherit from this class") ancestor_class_iris: List[str] = Field( default=[], description="All class_iris of classes from which this class inherits") parent_class_iris: List[str] = Field( default=[], description="All class_iris of classes that are direct parents of this " "class") relation_ids: List[str] = Field( default=[], description="All ids of relations defined for this class") combined_object_relation_ids: List[str] = Field( default=[], description="All combined_object_relations ids defined for this class") combined_data_relation_ids: List[str] = Field( default=[], description="All combined_data_relations ids defined for this class")
[docs] def get_relation_ids(self) -> List[str]: """Get all ids of relations belonging to this class Returns: List[str] """ return self.relation_ids
[docs] def get_relations(self, vocabulary: 'Vocabulary') -> List['Relation']: """Get all relations belonging to this class Args: vocabulary (Vocabulary): Vocabulary of this project Returns: List[Relation] """ result = [] for id in self.relation_ids: result.append(vocabulary.get_relation_by_id(id)) return result
[docs] def get_combined_object_relations(self, vocabulary: 'Vocabulary') -> \ List['CombinedObjectRelation']: """Get all combined object relations belonging to this class Args: vocabulary (Vocabulary): Vocabulary of this project Returns: List[CombinedObjectRelation] """ result = [] for id in self.combined_object_relation_ids: result.append(vocabulary.get_combined_object_relation_by_id(id)) return result
[docs] def get_combined_data_relations(self, vocabulary: 'Vocabulary') -> \ List['CombinedDataRelation']: """Get all combined data relations belonging to this class Args: vocabulary (Vocabulary): Vocabulary of this project Returns: List[CombinedDataRelation] """ result = [] for id in self.combined_data_relation_ids: result.append(vocabulary.get_combined_data_relation_by_id(id)) return result
[docs] def get_combined_relations(self, vocabulary: 'Vocabulary') -> \ List['CombinedRelation']: """Get all combined relations belonging to this class Args: vocabulary (Vocabulary): Vocabulary of this project Returns: List[CombinedRelation] """ result = self.get_combined_object_relations(vocabulary) result.extend(self.get_combined_data_relations(vocabulary)) return result
[docs] def is_child_of_all_classes(self, target_list: List[str]) -> bool: """Tests if this class is a child class for each of the given classes Args: target_list (List[str]): List of ancestor class_iris Returns: bool """ for target in target_list: if not target == self.iri: if target not in self.ancestor_class_iris: return False return True
[docs] def get_combined_object_relation_with_property_iri( self, obj_prop_iri: str, vocabulary: 'Vocabulary') \ -> 'CombinedObjectRelation': """ Get the CombinedObjectRelation of this class that combines the relations of the given ObjectProperty Args: obj_prop_iri (str): Iri of the ObjectProperty vocabulary (Vocabulary): Vocabulary of this project Returns: CombinedObjectRelation """ for cor in self.get_combined_object_relations(vocabulary): if cor.property_iri == obj_prop_iri: return cor return None
[docs] def get_combined_data_relation_with_property_iri(self, property_iri, vocabulary): """ Get the CombinedDataRelation of this class that combines the relations of the given DataProperty Args: property_iri (str): Iri of the DataProperty vocabulary (Vocabulary): Vocabulary of this project Returns: CombinedDataRelation """ for cdr in self.get_combined_data_relations(vocabulary): if cdr.property_iri == property_iri: return cdr return None
[docs] def get_combined_relation_with_property_iri(self, property_iri, vocabulary)\ -> Union['CombinedRelation', None]: """ Get the CombinedRelation of this class that combines the relations of the given Property If possible use the more specific access functions to save runtime. Args: property_iri (str): Iri of the Property vocabulary (Vocabulary): Vocabulary of this project Returns: CombinedRelation, None if iri is unknown """ for cdr in self.get_combined_data_relations(vocabulary): if cdr.property_iri == property_iri: return cdr for cor in self.get_combined_object_relations(vocabulary): if cor.property_iri == property_iri: return cor return None
[docs] def get_ancestor_classes(self, vocabulary: 'Vocabulary') -> List['Class']: """Get all ancestor classes of this class Args: vocabulary (Vocabulary): Vocabulary of this project Returns: List[Class] """ ancestors = [] for ancestor_iri in self.ancestor_class_iris: ancestors.append(vocabulary.get_class_by_iri(ancestor_iri)) return ancestors
[docs] def get_parent_classes(self, vocabulary: 'Vocabulary', remove_redundancy: bool = False) -> List['Class']: """Get all parent classes of this class Args: vocabulary (Vocabulary): Vocabulary of this project remove_redundancy (bool): if true the parents that are child of other parents are not included Returns: List[Class] """ parents = [] for parent_iri in self.parent_class_iris: parents.append(vocabulary.get_class_by_iri(parent_iri)) if remove_redundancy: child_iris = set() for parent in parents: child_iris.update(parent.child_class_iris) for parent in parents: if parent.iri in child_iris: parents.remove(parent) return parents
[docs] def treat_dependency_statements(self, vocabulary: 'Vocabulary') -> \ List[DependencyStatement]: """ Purge and list all pointers/iris that are not contained in the vocabulary Args: vocabulary (Vocabulary): Vocabulary of this project Returns: List[Dict[str, str]]: List of purged statements dicts with keys: Parent Class, class, dependency, fulfilled """ statements = [] # parent classes: parents_to_purge = [] for parent_iri in self.parent_class_iris: found = parent_iri in vocabulary.classes statements.append(DependencyStatement(type="Parent Class", class_iri=self.iri, dependency_iri=parent_iri, fulfilled=found )) if not found: parents_to_purge.append(parent_iri) for iri in parents_to_purge: self.parent_class_iris.remove(iri) # relations relation_ids_to_purge = set() for relation in self.get_relations(vocabulary): relation_statements = relation.get_dependency_statements( vocabulary, self.get_ontology_iri(), self.iri) for statement in relation_statements: if statement.fulfilled == False: relation_ids_to_purge.add(relation.id) statements.extend(relation_statements) for id in relation_ids_to_purge: self.relation_ids.remove(id) del vocabulary.relations[id] return statements
[docs] def get_next_combined_relation_id(self, current_cr_id: str, object_relations: bool) -> str: """Get the alphabetically(Property label) next CombinedRelation. If no CR is after the given one, the first is returned Args: current_cr_id (str): ID of the CombinedRelation of which the next should be found object_relations (bool): True if Searching for CombinedObjectRelations Returns: str: ID of next CR """ list_ = self.combined_data_relation_ids if object_relations: list_ = self.combined_object_relation_ids current_index = list_.index(current_cr_id) res_index = current_index+1 if res_index >= len(list_): res_index = 0 return list_[res_index]
[docs] def get_previous_combined_relation_id(self, current_cr_id: str, object_relations: bool) -> str: """Get the alphabetically(Property label) previous CombinedRelation. If no CR is before the given one, the last is returned Args: current_cr_id (str): ID of the CombinedRelation of which the previous should be found object_relations (bool): True if Searching for CombinedObjectRelations Returns: str: ID of previous CR """ list_ = self.combined_data_relation_ids if object_relations: list_ = self.combined_object_relation_ids current_index = list_.index(current_cr_id) res_index = current_index - 1 if res_index < 0: res_index = len(list_)-1 return list_[res_index]
[docs] def is_logically_equivalent_to(self, class_: 'Class', vocabulary: 'Vocabulary', old_vocabulary: 'Vocabulary') -> bool: """Test if a class is logically equivalent in two vocabularies. Args: class_ (Class): Class to be tested against, from the old_vocabulary vocabulary (Vocabulary): New project vocabulary old_vocabulary (Vocabulary): Old project vocabulary Returns: bool """ # test if parent classes are identical if not self._lists_are_identical(class_.parent_class_iris, self.parent_class_iris): return False # test if combined object relation ids are identical if not self._lists_are_identical(class_.combined_object_relation_ids, self.combined_object_relation_ids): return False # test if combined data relation ids are identical if not self._lists_are_identical(class_.combined_data_relation_ids, self.combined_data_relation_ids): return False # test if combined relations are identical for cr in self.get_combined_relations(vocabulary): old_cr = old_vocabulary.get_combined_relation_by_id(cr.id) relation_strings = [] for relation in cr.get_relations(vocabulary): relation_strings.append(relation.to_string(vocabulary)) old_relation_strings = [] for old_relation in old_cr.get_relations(old_vocabulary): old_relation_strings.append(old_relation.to_string(vocabulary)) if not self._lists_are_identical(relation_strings, old_relation_strings): return False return True
[docs] def is_iot_class(self, vocabulary: 'Vocabulary') -> bool: """ A class is an iot/device class if it contains one CDR, where the relation is marked as a device relation: DeviceAttribute/Command Args: vocabulary (Vocabulary): Vocabulary of the project Returns: bool """ for cdr_id in self.combined_data_relation_ids: cdr = vocabulary.get_combined_data_relation_by_id(cdr_id) prop = vocabulary.get_data_property(cdr.property_iri) if not prop.field_type == DataFieldType.simple: return True return False
[docs]class DatatypeType(str, Enum): """ Types of a Datatype """ string = 'string' number = 'number' date = 'date' enum = 'enum'
[docs]class DatatypeFields(BaseModel): """Key Fields describing a Datatype""" type: DatatypeType = Field(default=DatatypeType.string, description="Type of the datatype") number_has_range: Any = Field( default=False, description="If Type==Number: Does the datatype define a range") number_range_min: Union[int, str] = Field( default="/", description="If Type==Number: Min value of the datatype range, " "if a range is defined") number_range_max: Union[int, str] = Field( default="/", description="If Type==Number: Max value of the datatype range, " "if a range is defined") number_decimal_allowed: bool = Field( default=False, description="If Type==Number: Are decimal numbers allowed?") forbidden_chars: List[str] = Field( default=[], description="If Type==String: Blacklisted chars") allowed_chars: List[str] = Field( default=[], description="If Type==String: Whitelisted chars") enum_values: List[str] = Field( default=[], description="If Type==Enum: Enum values")
[docs]class Datatype(Entity, DatatypeFields): """ Represents OWL:Datatype A Datatype is the target of a DataRelation. The Datatype stats a set of values that are valid. This can be an ENUM, a number range, or a check for black/whitelisted chars In the Parsing PostProcesseor predefined datatype_catalogue are added to the vocabulary """
[docs] def export(self) -> Dict[str,str]: """ Export datatype as dict Returns: Dict[str,str] """ res = self.model_dump(include={'type', 'number_has_range', 'number_range_min', 'number_range_max', 'number_decimal_allowed', 'forbidden_chars', 'allowed_chars', 'enum_values'}, exclude_defaults=True) res['type'] = self.type.value return res
[docs] def value_is_valid(self, value: str) -> bool: """Test if value is valid for this datatype. Numbers are also given as strings Args: value (str): value to be tested Returns: bool """ if self.type == DatatypeType.string: if len(self.allowed_chars) > 0: # if allowed chars is empty all chars are allowed for char in value: if char not in self.allowed_chars: return False for char in self.forbidden_chars: if char in value: return False return True if self.type == DatatypeType.number: if self.number_decimal_allowed: try: number = float(value) except: return False else: try: number = int(value) except: return False if not self.number_range_min == "/": if number < self.number_range_min: return False if not self.number_range_max == "/": if number > self.number_range_max: return False return True if self.type == DatatypeType.enum: return value in self.enum_values if self.type == DatatypeType.date: try: from dateutil.parser import parse parse(value, fuzzy=False) return True except ValueError: return False return True
[docs] def is_logically_equivalent_to(self, datatype:'Datatype', vocabulary: 'Vocabulary', old_vocabulary: 'Vocabulary') -> bool: """Test if this datatype is logically equivalent to the given datatype Args: datatype (Datatype): Datatype to compare against vocabulary (Vocabulary): Not used, but needed to keep signature the same as other entities old_vocabulary (Vocabulary): Not used, but needed to keep signature the same as other entities Returns: bool """ if not self.type == datatype.type: return False if not self.number_has_range == datatype.number_has_range: return False if not self.enum_values == datatype.enum_values: return False return True
[docs]class Individual(Entity): """ Represents OWL:Individual An individual is a predefined "instance" of a class But they are here only used as values for Relations They are not instances, no value can be assigned to them, they are no agents or devices """ parent_class_iris: List[str] = Field( default=[], description="List of all parent class iris, " "an individual can have multiple parents")
[docs] def to_string(self) -> str: """Get a string representation of the Individual Returns: str """ return "(Individual)"+self.get_label()
[docs] def get_ancestor_iris(self, vocabulary: 'Vocabulary') -> List[str]: """ Get all iris of ancestor classes Args: vocabulary (Vocabulary): Vocabulary of the project Returns: List[str] """ ancestor_iris = set() for parent_iri in self.parent_class_iris: ancestor_iris.add(parent_iri) ancestor_iris.update(vocabulary.get_class_by_iri(parent_iri). ancestor_class_iris) return list(ancestor_iris)
[docs] def get_parent_classes(self, vocabulary: 'Vocabulary') -> List['Class']: """ Get all parent class objects Args: vocabulary (Vocabulary): Vocabulary of the project Returns: List[Class] """ parents = [] for parent_iri in self.parent_class_iris: parents.append(vocabulary.get_class_by_iri(parent_iri)) return parents
[docs] def is_logically_equivalent_to(self, individual: 'Individual', vocabulary: 'Vocabulary', old_vocabulary: 'Vocabulary') -> bool: """Test if this individal is logically equivalent in two vocabularies. Args: individual (Individual): Individual to be tested against, from the old vocabulary vocabulary (Vocabulary): New project vocabulary, not used but needed to keep signature the same old_vocabulary (Vocabulary): Old project vocabulary, not used but needed to keep signature the same Returns: bool """ if not self._lists_are_identical(self.parent_class_iris, individual.parent_class_iris): return False return True
[docs] def treat_dependency_statements(self, vocabulary: 'Vocabulary') -> \ List[DependencyStatement]: """ Purge and list all pointers/iris that are not contained in the vocabulary Args: vocabulary (Vocabulary): Vocabulary of this project Returns: List[Dict[str, str]]: List of purged statements dicts with keys: Parent Class, class, dependency, fulfilled """ statements = [] for parent_iri in self.parent_class_iris: found = parent_iri in vocabulary.classes statements.append(DependencyStatement(type="Parent Class", class_iri=self.iri, dependency_iri=parent_iri, fulfilled=found )) if not found: self.parent_class_iris.remove(parent_iri) return statements
[docs]class DataFieldType(str, Enum): """Type of the field that represents the DataProperty""" command = "command" device_attribute = "device_attribute" simple = "simple"
[docs]class DataProperty(Entity): """ Representation of OWL:DataProperty """ field_type: DataFieldType = Field( default=DataFieldType.simple, description="Type of the dataproperty; set by the user while " "configuring the vocabulary" )
[docs]class ObjectProperty(Entity): """ Representation of OWL:ObjectProperty """ inverse_property_iris: Set[str] = Field( default=set(), description="List of property iris that are inverse:Of; " "If an instance i2 is added in an instance i1 " "for this property. Then i1 is added to i2 under the" " inverseProperty (if the class has that property)")
[docs] def add_inverse_property_iri(self, iri: str): """Add an inverse property Args: iri (str): Iri of the inverse objectProperty Returns: None """ self.inverse_property_iris.add(iri)
[docs] def is_logically_equivalent_to(self, object_property: 'ObjectProperty', vocabulary: 'Vocabulary', old_vocabulary: 'Vocabulary') -> bool: """Test if this Property in the new_vocabulary is logically equivalent to the object_property in the old_vocabulary Args: object_property (ObjectProperty): ObjectProperty to be tested against, from the old vocabulary vocabulary (Vocabulary): New project vocabulary, not used but needed to keep signature the same old_vocabulary (Vocabulary): Old project vocabulary, not used but needed to keep signature the same Returns: bool """ if not self.inverse_property_iris == \ object_property.inverse_property_iris: return False return True