Source code for filip.semantics.vocabulary.relation

"""Vocabulary Models for Relations"""

from typing import Set, Dict, TYPE_CHECKING, Optional

if TYPE_CHECKING:
    from . import Vocabulary

from aenum import Enum
from typing import List, TYPE_CHECKING

from pydantic import BaseModel, Field

from .source import DependencyStatement

if TYPE_CHECKING:
    from . import Vocabulary, IdType


[docs]class StatementType(str, Enum): """ A statement is either a leaf and holds an iri/label or it is a combination of leafs with or / and """ OR = 'or' AND = 'and' LEAF = 'leaf'
[docs]class TargetStatement(BaseModel): """ A target statement is the statement the sits in a relation statement behind the restrictionType: E.g: consists_of some Device or Sensor. here Device or Sensor is the targetstatement as it sits behind "some" A targetstatement is build recursively: It is either a Leaf: str or a union (or) or an intersection(and) of targetstatements the combination of statements is purely logical and not numerical: device and device is true as soon as one device is given, it does not need two separate devices. """ target_data_value: Optional[str] = Field( default=None, description="Holds the value if the relation is a hasValue (LEAF only)") target_iri: str = Field( default="", description="The IRI of the target (LEAF only)") target_statements: List['TargetStatement'] = Field( default=[], description="The targetstatements that are combined with this " "targetstatement (and/or only)" ) type: StatementType = Field(default=StatementType.LEAF, description="Statement types")
[docs] def set_target(self, target_iri: str, target_data_value: str = None): """ Set target for this statement and make it a LEAF statement Args: target_iri (str): iri of the target (class or datatype iri) target_data_value (str): Value of the targetstatment if no IRI but a fixed default value is given Returns: None """ self.type = StatementType.LEAF self.target_iri = target_iri self.target_data_value = target_data_value
[docs] def get_all_targets(self) -> List[List[str]]: """Extract possible targets out of statements interpretation: [[a,b], [c]]-> (a and b) or c -> the target needs to have ancestors(or be): (both a anb b) or c items inside the inner brackets are connected via and the innerbrackets are all connect over or Returns: List[List[str]] """ if self.type == StatementType.LEAF: if self.target_data_value is not None: return [[]] else: return [[self.target_iri]] else: collection = [] # form: [ [[]], [[],[]], ..] for statement in self.target_statements: collection.append(statement.get_all_targets()) if self.type == StatementType.OR: result = [] for sublist in collection: # sublist form: [[],[],...] result.extend(sublist) return result else: # AND # with and we distribute our lists # example: col= [ [[1],[2]], [[3,4]] ] => [[1,3,4], [2,3,4]] # we build the results in an all in one way, we compute the # number of entries in the final solution # for each list li we fill the i-th position of all results # example: col= [ [[a,b]], [[c],[d]] , [[e],[f]] ] => # statement: (a and b) and (c or d) and (e or f) # 0 : res = [[], [], [], []] # 1 : res = [[a,b], [a,b], [a,b], [a,b]] # 2 : res = [[a,b,c], [a,b,c], [a,b,d], [a,b,d]] # 3 : res = [[a,b,c,e], [a,b,c,f], [a,b,d,e], [a,b,d,f]] result = [] # result form: [[],[],...] lengths = [] number_of_entries = 1 for sublist in collection: # sublist form: [[],[],...] number_of_entries = number_of_entries * len(sublist) lengths.append(len(sublist)) # init with empty lists for empty in range(number_of_entries): result.append([]) for i in range(0, len(collection)): mod = 1 for j in range(i + 1, len(lengths)): mod = mod * lengths[j] counter = 0 while counter < number_of_entries: for sublist in collection[i]: for entry in sublist: for j in range(mod): result[counter].append(entry) counter += 1 return result
[docs] def to_string(self, vocabulary: 'Vocabulary') -> str: """Get a string representation of the targetstatment Args: vocabulary (Vocabulary): vocabulary of the project Returns: str """ if self.type == StatementType.LEAF: label = self.retrieve_label(vocabulary) if label == "": return self.target_iri return label else: result = "(" + self.target_statements[0].to_string(vocabulary) for statement in self.target_statements[1:]: result += " " + self.type + " " result += statement.to_string(vocabulary) + ")" return result
[docs] def is_fulfilled_by_iri_value(self, value: str, ancestor_values: List[str]) \ -> bool: """ Test if a set of values fulfills the targetstatement; Only for objectRelations Args: value (str): value to check: Class_iri of instance/individual ancestor_values(List[List[str]]): List containing the ancestors iris for each value (linked over index) Returns: bool """ targets = self.get_all_targets() values = ancestor_values values.append(value) # one sublist of targets needs to be fulfilled targets: # [(a and b) or c or (d and a),....] for sublist in targets: sublist_fulfilled = True for item in sublist: if item not in values: sublist_fulfilled = False if sublist_fulfilled: return True return False
[docs] def is_fulfilled_by_data_value(self, value: str, vocabulary: 'Vocabulary') \ -> bool: """ Test if a set of values fulfills the targetstatement; Only for dataRelations Args: value (List[str]): value to check vocabulary (Vocabulary) Returns: bool """ # a data target_statement theoraticly only has one statement if self.target_data_value is not None: return value == self.target_data_value from .vocabulary import IdType if not vocabulary.is_id_of_type(self.target_iri, IdType.datatype): return False else: datatype = vocabulary.get_datatype(self.target_iri) return datatype.value_is_valid(value)
[docs] def retrieve_label(self, vocabulary: 'Vocabulary') -> str: """Get the label of the target_iri. Only logical for Leaf statements Args: vocabulary (Vocabulary): Vocabulary of the project Returns: str """ if self.type == StatementType.LEAF: if self.target_data_value is not None: return self.target_data_value else: return vocabulary.get_label_for_entity_iri(self.target_iri) return ""
[docs] def get_dependency_statements( self, vocabulary: 'Vocabulary', ontology_iri: str, class_iri: str) -> List[DependencyStatement]: """ Get a list of all pointers/iris that are not contained in the vocabulary. Purging is done in class Args: vocabulary (Vocabulary): Vocabulary of this project ontology_iri (str): IRI of the source ontology class_iri (str): IRI of class (legacy) Returns: List[Dict[str, str]]: List of purged statements dicts with keys: Parent Class, class, dependency, fulfilled """ statements = [] if self.type == StatementType.LEAF: # if we have a given data value, we do not have an iri if self.target_data_value is None: # check if predefined datatype if not vocabulary.iri_is_predefined_datatype(self.target_iri): found = self.target_iri in vocabulary.classes or \ self.target_iri in vocabulary.datatypes or \ self.target_iri in vocabulary.individuals statements.append( DependencyStatement( type="Relation Target", class_iri=class_iri, dependency_iri=self.target_iri, fulfilled=found) ) else: for target_statement in self.target_statements: statements.extend( target_statement.get_dependency_statements( vocabulary, ontology_iri, class_iri)) return statements
# target_statements is a forward reference, so that the class can refer to # itself this forward reference need to be resolved after the class has fully # loaded TargetStatement.model_rebuild()
[docs]class RestrictionType(str, Enum): """RestrictionTypes, as defined for OWL""" _init_ = 'value __doc__' some = 'some', 'at least 1 value of that target' only = 'only', 'only value of that target' min = 'min', 'min n values of that target' max = 'max', 'max n values of that target' exactly = 'exactly', 'exactly n values of that target' value = 'value', 'predefined value'
[docs]class Relation(BaseModel): """ A Relation is defined in the source for a class. It has the form: RestrictionType property target_statement It defines a set of allowed/required values which each instance of this class can/should have under this property A Relation is defined in a OWL:class, but all child classes of that class inherit it """ id: str = Field(description="Unique generated Relation ID, " "for internal use") restriction_type: RestrictionType = Field( default=None, description="Restriction type of this relation") restriction_cardinality: int = Field( default=-1, description="Only needed for min, max, equaly states the 'n'") property_iri: str = Field( default="", description="IRI of the property (data- or object-)") target_statement: TargetStatement = Field( default=None, description="Complex statement which classes/datatype_catalogue " "are allowed/required")
[docs] def get_targets(self) -> List[List[str]]: """Get all targets specified in the target statement in AND-OR Notation Returns: List[List[str]]: [[a,b],[c]] either a and b needs to present, or c """ return self.target_statement.get_all_targets()
[docs] def to_string(self, vocabulary: 'Vocabulary') -> str: """ Get a string representation of the relation Args: vocabulary (Vocabulary): Vocabulary of this project Returns: str """ if self.restriction_cardinality == -1: return "{} {}".format(self.restriction_type, self.target_statement. to_string(vocabulary)) else: return self.restriction_type + " " + \ str(self.restriction_cardinality) + " " \ + self.target_statement.to_string(vocabulary)
[docs] def is_restriction_fulfilled(self, number_of_fulfilling_values: int, total_number_of_values: int) -> bool: """Test if the restriction type is fulfilled by comparing the number of fulfilling values against the total number of values given Args: number_of_fulfilling_values (int): Number of values that fulfill the relation total_number_of_values (int): the number of values given for this relation Returns: bool """ if self.restriction_type == RestrictionType.some: return number_of_fulfilling_values >= 1 if self.restriction_type == RestrictionType.only: return number_of_fulfilling_values == total_number_of_values if self.restriction_type == RestrictionType.min: return number_of_fulfilling_values >= \ (int)(self.restriction_cardinality) if self.restriction_type == RestrictionType.max: return number_of_fulfilling_values <= \ (int)(self.restriction_cardinality) if self.restriction_type == RestrictionType.exactly: return number_of_fulfilling_values == \ (int)(self.restriction_cardinality) if self.restriction_type == RestrictionType.value: return number_of_fulfilling_values >= 1
[docs] def get_dependency_statements( self, vocabulary: 'Vocabulary', ontology_iri: str, class_iri: str) \ -> List[DependencyStatement]: """ Get a list of all pointers/iris that are not contained in the vocabulary Purging is done in class Args: vocabulary (Vocabulary): Vocabulary of this project ontology_iri (str): IRI of the source ontology class_iri (str): IRI of class (legacy) Returns: List[Dict[str, str]]: List of purged statements dicts with keys: Parent Class, class, dependency, fulfilled """ statements = [] found = self.property_iri in vocabulary.object_properties or \ self.property_iri in vocabulary.data_properties statements.append(DependencyStatement(type="Relation Property", class_iri=class_iri, dependency_iri=self.property_iri, fulfilled=found)) statements.extend(self.target_statement.get_dependency_statements( vocabulary, ontology_iri, class_iri)) return statements
[docs] def is_fulfilled_with_iris( self, vocabulary: 'Vocabulary', values: List[str], ancestor_values: List[List[str]]) -> bool: """Test if a set of values fulfills the rules of the relation Args: vocabulary (Vocabulary): Vocabulary of the project values (List[str]): List of values to check ancestor_values(List[List[str]]): List containing the ancestors iris for each value (linked over index) Returns: bool """ number_of_fulfilling_values = 0 for i in range(len(values)): if self.target_statement.is_fulfilled_by_iri_value( values[i], ancestor_values[i]): number_of_fulfilling_values += 1 return self.is_restriction_fulfilled(number_of_fulfilling_values, len(values))
[docs] def is_fulfilled_with_values(self, vocabulary: 'Vocabulary', values: List[str]) -> bool: """Test if a set of values fulfills the rules of the relation. Used if property is a data property Args: vocabulary (Vocabulary): Vocabulary of the project values (List[str]): List of values to check Returns: bool """ number_of_fulfilling_values = 0 for i in range(len(values)): if self.target_statement.is_fulfilled_by_data_value(values[i], vocabulary): number_of_fulfilling_values += 1 return self.is_restriction_fulfilled(number_of_fulfilling_values, len(values))
[docs] def get_all_possible_target_class_iris(self, vocabulary: 'Vocabulary') \ -> Set[str]: """Get a set of class iris that are possible values for an objectRelation Args: vocabulary (Vocabulary): Vocabulary of this project Returns: Set[str]: class_iris """ # if the relation is of type value it only defines that this relation # has the given values # not that it could have some more if self.restriction_type == RestrictionType.value: return set() possible_class_iris = set() targets = self.get_targets() for target_list in targets: for class_ in vocabulary.get_classes(): if class_.is_child_of_all_classes(target_list): possible_class_iris.add(class_.iri) children = vocabulary.get_class_by_iri(class_.iri). \ child_class_iris possible_class_iris.update(children) return possible_class_iris
[docs] def get_possible_enum_target_values(self, vocabulary: 'Vocabulary') -> \ List[str]: """Get all allowed enum target values for a data relation Args: vocabulary (Vocabulary): Vocabulary of this project Returns: List[str] """ targets: List[List[str]] = self.target_statement.get_all_targets() from .vocabulary import IdType # methode only makes sense for data relations if not vocabulary.is_id_of_type(self.property_iri, IdType.data_property): return [] res = [] # as it is a datarelation the targets should only contain single lists, # to be flexible with changes we loop also # these lists for list in targets: for entry_iri in list: if vocabulary.is_id_of_type(entry_iri, IdType.datatype): datatype = vocabulary.get_datatype(entry_iri) res.extend(datatype.enum_values) return res
[docs] def get_all_target_iris(self) -> Set[str]: """Get all iris of targets Returns: Set(str) """ iris = set() statements = [self.target_statement] while len(statements) > 0: statement = statements.pop() if statement.type == StatementType.LEAF: if not statement.target_iri == "": iris.add(statement.target_iri) else: statements.extend(statement.target_statements) return iris
[docs] def export_rule(self, vocabulary: 'Vocabulary') -> (str, str): """Get the rule as string Args: vocabulary (Vocabulary): Vocabulary of the project Returns: str """ targets = [] for inner_list in self.get_targets(): new_list = [] targets.append(new_list) for iri in inner_list: new_list.append(vocabulary.get_label_for_entity_iri(iri)) if (int)(self.restriction_cardinality) > 0: return f'"{self.restriction_type.value}|' \ f'{self.restriction_cardinality}"', targets else: return f'"{self.restriction_type.value}"', targets