Source code for owlapy.fast_instance_checker

from collections import defaultdict
import logging
import operator
from functools import singledispatchmethod, reduce
from itertools import repeat
from types import MappingProxyType, FunctionType
from typing import DefaultDict, Iterable, Dict, Mapping, Set, Type, TypeVar, Union, Optional, FrozenSet

from owlapy.ext import OWLReasonerEx
from owlapy.model import OWLDataRange, OWLObjectOneOf, OWLOntology, OWLNamedIndividual, OWLClass, OWLClassExpression, \
    OWLObjectProperty, OWLDataProperty, OWLObjectUnionOf, OWLObjectIntersectionOf, OWLObjectSomeValuesFrom, \
    OWLObjectPropertyExpression, OWLObjectComplementOf, OWLObjectAllValuesFrom, IRI, OWLObjectInverseOf, \
    OWLDataSomeValuesFrom, OWLDataPropertyExpression, OWLDatatypeRestriction, OWLLiteral, \
    OWLDataComplementOf, OWLDataAllValuesFrom, OWLDatatype, OWLDataHasValue, OWLDataOneOf, OWLReasoner, \
    OWLDataIntersectionOf, OWLDataUnionOf, OWLObjectCardinalityRestriction, OWLObjectMinCardinality, \
    OWLObjectMaxCardinality, OWLObjectExactCardinality, OWLObjectHasValue, OWLPropertyExpression, OWLFacetRestriction
from owlapy.util import LRUCache

logger = logging.getLogger(__name__)

_P = TypeVar('_P', bound=OWLPropertyExpression)


[docs]class OWLReasoner_FastInstanceChecker(OWLReasonerEx): """Tries to check instances fast (but maybe incomplete)""" __slots__ = '_ontology', '_base_reasoner', \ '_ind_set', '_cls_to_ind', \ '_has_prop', \ '_objectsomevalues_cache', '_datasomevalues_cache', '_objectcardinality_cache', \ '_property_cache', \ '_obj_prop', '_obj_prop_inv', '_data_prop', \ '_negation_default', \ '__warned' _ontology: OWLOntology _base_reasoner: OWLReasoner _cls_to_ind: Dict[OWLClass, FrozenSet[OWLNamedIndividual]] # Class => individuals _has_prop: Mapping[Type[_P], LRUCache[_P, FrozenSet[OWLNamedIndividual]]] # Type => Property => individuals _ind_set: FrozenSet[OWLNamedIndividual] # ObjectSomeValuesFrom => individuals _objectsomevalues_cache: LRUCache[OWLClassExpression, FrozenSet[OWLNamedIndividual]] # DataSomeValuesFrom => individuals _datasomevalues_cache: LRUCache[OWLClassExpression, FrozenSet[OWLNamedIndividual]] # ObjectCardinalityRestriction => individuals _objectcardinality_cache: LRUCache[OWLClassExpression, FrozenSet[OWLNamedIndividual]] # ObjectProperty => { individual => individuals } _obj_prop: Dict[OWLObjectProperty, Mapping[OWLNamedIndividual, Set[OWLNamedIndividual]]] # ObjectProperty => { individual => individuals } _obj_prop_inv: Dict[OWLObjectProperty, Mapping[OWLNamedIndividual, Set[OWLNamedIndividual]]] # DataProperty => { individual => literals } _data_prop: Dict[OWLDataProperty, Mapping[OWLNamedIndividual, Set[OWLLiteral]]] _property_cache: bool _negation_default: bool def __init__(self, ontology: OWLOntology, base_reasoner: OWLReasoner, *, property_cache=True, negation_default=False): """Fast instance checker Args: ontology: Ontology to use base_reasoner: Reasoner to get instances/types from property_cache: Whether to cache property values negation_default: Whether to assume a missing fact means it is false ("closed world view") """ super().__init__(ontology) self._ontology = ontology self._base_reasoner = base_reasoner self._property_cache = property_cache self._negation_default = negation_default self.__warned = 0 self._init() def _init(self, cache_size=128): self._cls_to_ind = dict() individuals = self._ontology.individuals_in_signature() self._ind_set = frozenset(individuals) self._objectsomevalues_cache = LRUCache(maxsize=cache_size) self._datasomevalues_cache = LRUCache(maxsize=cache_size) self._objectcardinality_cache = LRUCache(maxsize=cache_size) if self._property_cache: self._obj_prop = dict() self._obj_prop_inv = dict() self._data_prop = dict() else: self._has_prop = MappingProxyType({ OWLDataProperty: LRUCache(maxsize=cache_size), OWLObjectProperty: LRUCache(maxsize=cache_size), OWLObjectInverseOf: LRUCache(maxsize=cache_size), })
[docs] def reset(self): """The reset method shall reset any cached state""" self._init()
[docs] def data_property_domains(self, pe: OWLDataProperty, direct: bool = False) -> Iterable[OWLClassExpression]: yield from self._base_reasoner.data_property_domains(pe, direct=direct)
[docs] def data_property_ranges(self, pe: OWLDataProperty, direct: bool = False) -> Iterable[OWLDataRange]: yield from self._base_reasoner.data_property_ranges(pe, direct=direct)
[docs] def object_property_domains(self, pe: OWLObjectProperty, direct: bool = False) -> Iterable[OWLClassExpression]: yield from self._base_reasoner.object_property_domains(pe, direct=direct)
[docs] def object_property_ranges(self, pe: OWLObjectProperty, direct: bool = False) -> Iterable[OWLClassExpression]: yield from self._base_reasoner.object_property_ranges(pe, direct=direct)
[docs] def equivalent_classes(self, ce: OWLClassExpression) -> Iterable[OWLClass]: yield from self._base_reasoner.equivalent_classes(ce)
[docs] def disjoint_classes(self, ce: OWLClassExpression) -> Iterable[OWLClass]: yield from self._base_reasoner.disjoint_classes(ce)
[docs] def different_individuals(self, ce: OWLNamedIndividual) -> Iterable[OWLNamedIndividual]: yield from self._base_reasoner.different_individuals(ce)
[docs] def same_individuals(self, ce: OWLNamedIndividual) -> Iterable[OWLNamedIndividual]: yield from self._base_reasoner.same_individuals(ce)
[docs] def data_property_values(self, ind: OWLNamedIndividual, pe: OWLDataProperty) -> Iterable[OWLLiteral]: yield from self._base_reasoner.data_property_values(ind, pe)
[docs] def all_data_property_values(self, pe: OWLDataProperty) -> Iterable[OWLLiteral]: yield from self._base_reasoner.all_data_property_values(pe)
[docs] def object_property_values(self, ind: OWLNamedIndividual, pe: OWLObjectPropertyExpression) \ -> Iterable[OWLNamedIndividual]: if self._property_cache: self._lazy_cache_obj_prop(pe) if isinstance(pe, OWLObjectProperty): yield from self._obj_prop[pe][ind] elif isinstance(pe, OWLObjectInverseOf): yield from self._obj_prop_inv[pe.get_named_property()][ind] else: raise NotImplementedError else: yield from self._base_reasoner.object_property_values(ind, pe)
[docs] def flush(self) -> None: self._base_reasoner.flush()
[docs] def instances(self, ce: OWLClassExpression, direct: bool = False) -> Iterable[OWLNamedIndividual]: if direct: if not self.__warned & 2: logger.warning("direct not implemented") self.__warned |= 2 temp = self._find_instances(ce) yield from temp
[docs] def sub_classes(self, ce: OWLClassExpression, direct: bool = False) -> Iterable[OWLClass]: yield from self._base_reasoner.sub_classes(ce, direct=direct)
[docs] def super_classes(self, ce: OWLClassExpression, direct: bool = False) -> Iterable[OWLClass]: yield from self._base_reasoner.super_classes(ce, direct=direct)
[docs] def types(self, ind: OWLNamedIndividual, direct: bool = False) -> Iterable[OWLClass]: yield from self._base_reasoner.types(ind, direct=direct)
[docs] def equivalent_object_properties(self, dp: OWLObjectPropertyExpression) -> Iterable[OWLObjectPropertyExpression]: yield from self._base_reasoner.equivalent_object_properties(dp)
[docs] def equivalent_data_properties(self, dp: OWLDataProperty) -> Iterable[OWLDataProperty]: yield from self._base_reasoner.equivalent_data_properties(dp)
[docs] def disjoint_object_properties(self, dp: OWLObjectPropertyExpression) -> Iterable[OWLObjectPropertyExpression]: yield from self._base_reasoner.disjoint_object_properties(dp)
[docs] def disjoint_data_properties(self, dp: OWLDataProperty) -> Iterable[OWLDataProperty]: yield from self._base_reasoner.disjoint_data_properties(dp)
[docs] def sub_data_properties(self, dp: OWLDataProperty, direct: bool = False) -> Iterable[OWLDataProperty]: yield from self._base_reasoner.sub_data_properties(dp=dp, direct=direct)
[docs] def sub_object_properties(self, op: OWLObjectPropertyExpression, direct: bool = False) \ -> Iterable[OWLObjectPropertyExpression]: yield from self._base_reasoner.sub_object_properties(op=op, direct=direct)
[docs] def get_root_ontology(self) -> OWLOntology: return self._ontology
def _lazy_cache_obj_prop(self, pe: OWLObjectPropertyExpression) -> None: """Get all individuals involved in this object property and put them in a Dict""" if isinstance(pe, OWLObjectInverseOf): inverse = True if pe.get_named_property() in self._obj_prop_inv: return elif isinstance(pe, OWLObjectProperty): inverse = False if pe in self._obj_prop: return else: raise NotImplementedError # Dict with Individual => Set[Individual] opc: DefaultDict[OWLNamedIndividual, Set[OWLNamedIndividual]] = defaultdict(set) # shortcut for owlready2 from owlapy.owlready2 import OWLOntology_Owlready2 if isinstance(self._ontology, OWLOntology_Owlready2): import owlready2 # _x => owlready2 objects p_x: owlready2.ObjectProperty = self._ontology._world[pe.get_named_property().get_iri().as_str()] for l_x, r_x in p_x.get_relations(): if inverse: o_x = l_x s_x = r_x else: s_x = l_x o_x = r_x if isinstance(s_x, owlready2.Thing) and isinstance(o_x, owlready2.Thing): s = OWLNamedIndividual(IRI.create(s_x.iri)) o = OWLNamedIndividual(IRI.create(o_x.iri)) if s not in opc: opc[s] = set() opc[s] |= {o} else: for s in self._ind_set: individuals = set(self._base_reasoner.object_property_values(s, pe)) if individuals: opc[s] = individuals if inverse: self._obj_prop_inv[pe.get_named_property()] = MappingProxyType(opc) else: self._obj_prop[pe] = MappingProxyType(opc) def _some_values_subject_index(self, pe: OWLPropertyExpression) -> FrozenSet[OWLNamedIndividual]: if isinstance(pe, OWLDataProperty): typ = OWLDataProperty elif isinstance(pe, OWLObjectProperty): typ = OWLObjectProperty elif isinstance(pe, OWLObjectInverseOf): typ = OWLObjectInverseOf else: raise NotImplementedError if pe not in self._has_prop[typ]: subs = set() # shortcut for owlready2 from owlapy.owlready2 import OWLOntology_Owlready2 if isinstance(self._ontology, OWLOntology_Owlready2): if isinstance(pe, OWLObjectInverseOf): inverse = True iri = pe.get_named_property().get_iri() else: inverse = False iri = pe.get_iri() import owlready2 # _x => owlready2 objects p_x: Union[owlready2.ObjectProperty, owlready2.DataProperty] = self._ontology._world[iri.as_str()] for s_x, o_x in p_x.get_relations(): if inverse: l_x = o_x else: l_x = s_x if isinstance(l_x, owlready2.Thing): subs |= {OWLNamedIndividual(IRI.create(l_x.iri))} else: if isinstance(pe, OWLDataProperty): func = self._base_reasoner.data_property_values else: func = self._base_reasoner.object_property_values for s in self._ind_set: try: next(iter(func(s, pe))) subs |= {s} except StopIteration: pass self._has_prop[typ][pe] = frozenset(subs) return self._has_prop[typ][pe] def _find_some_values(self, pe: OWLObjectPropertyExpression, filler_inds: Set[OWLNamedIndividual], min_count: int = 1, max_count: Optional[int] = None) -> FrozenSet[OWLNamedIndividual]: """Get all individuals that have one of filler_inds as their object property value""" ret = set() if self._property_cache: self._lazy_cache_obj_prop(pe) if isinstance(pe, OWLObjectInverseOf): ops = self._obj_prop_inv[pe.get_named_property()] elif isinstance(pe, OWLObjectProperty): ops = self._obj_prop[pe] else: raise ValueError exists_p = min_count == 1 and max_count is None for s, o_set in ops.items(): if exists_p: if o_set & filler_inds: ret |= {s} else: count = len(o_set & filler_inds) if count >= min_count and (max_count is None or count <= max_count): ret |= {s} else: subs = self._some_values_subject_index(pe) for s in subs: count = 0 for o in self._base_reasoner.object_property_values(s, pe): if {o} & filler_inds: count = count + 1 if max_count is None and count >= min_count: break if count >= min_count and (max_count is None or count <= max_count): ret |= {s} return frozenset(ret) def _lazy_cache_data_prop(self, pe: OWLDataPropertyExpression) -> None: """Get all individuals and values involved in this data property and put them in a Dict""" assert (isinstance(pe, OWLDataProperty)) if pe in self._data_prop: return opc: Dict[OWLNamedIndividual, Set[OWLLiteral]] = dict() # shortcut for owlready2 from owlapy.owlready2 import OWLOntology_Owlready2 if isinstance(self._ontology, OWLOntology_Owlready2): import owlready2 # _x => owlready2 objects p_x: owlready2.DataProperty = self._ontology._world[pe.get_iri().as_str()] for s_x, o_x in p_x.get_relations(): if isinstance(s_x, owlready2.Thing): o_literal = OWLLiteral(o_x) s = OWLNamedIndividual(IRI.create(s_x.iri)) if s not in opc: opc[s] = set() opc[s].add(o_literal) else: for s in self._ind_set: values = set(self._base_reasoner.data_property_values(s, pe)) if len(values) > 0: opc[s] = values self._data_prop[pe] = MappingProxyType(opc) # single dispatch is still not implemented in mypy, see https://github.com/python/mypy/issues/2904 @singledispatchmethod def _find_instances(self, ce: OWLClassExpression) -> FrozenSet[OWLNamedIndividual]: raise NotImplementedError(ce) @_find_instances.register def _(self, c: OWLClass) -> FrozenSet[OWLNamedIndividual]: self._lazy_cache_class(c) return self._cls_to_ind[c] @_find_instances.register def _(self, ce: OWLObjectUnionOf) -> FrozenSet[OWLNamedIndividual]: return reduce(operator.or_, map(self._find_instances, ce.operands())) @_find_instances.register def _(self, ce: OWLObjectIntersectionOf) -> FrozenSet[OWLNamedIndividual]: return reduce(operator.and_, map(self._find_instances, ce.operands())) @_find_instances.register def _(self, ce: OWLObjectSomeValuesFrom) -> FrozenSet[OWLNamedIndividual]: if ce in self._objectsomevalues_cache: return self._objectsomevalues_cache[ce] p = ce.get_property() assert isinstance(p, OWLObjectPropertyExpression) if not self._property_cache and ce.get_filler().is_owl_thing(): return self._some_values_subject_index(p) filler_ind = self._find_instances(ce.get_filler()) ind = self._find_some_values(p, filler_ind) self._objectsomevalues_cache[ce] = ind return ind @_find_instances.register def _(self, ce: OWLObjectComplementOf) -> FrozenSet[OWLNamedIndividual]: if self._negation_default: all_ = self._ind_set complement_ind = self._find_instances(ce.get_operand()) return all_ ^ complement_ind else: # TODO! XXX if not self.__warned & 1: logger.warning("Object Complement Of not implemented at %s", ce) self.__warned |= 1 return frozenset() # if self.complement_as_negation: # ... # else: # self._lazy_cache_negation @_find_instances.register def _(self, ce: OWLObjectAllValuesFrom) -> FrozenSet[OWLNamedIndividual]: return self._find_instances( OWLObjectSomeValuesFrom( property=ce.get_property(), filler=ce.get_filler().get_object_complement_of().get_nnf() ).get_object_complement_of()) @_find_instances.register def _(self, ce: OWLObjectOneOf) -> FrozenSet[OWLNamedIndividual]: return frozenset(ce.individuals()) @_find_instances.register def _(self, ce: OWLObjectHasValue) -> FrozenSet[OWLNamedIndividual]: return self._find_instances(ce.as_some_values_from()) @_find_instances.register def _(self, ce: OWLObjectMinCardinality) -> FrozenSet[OWLNamedIndividual]: return self._get_instances_object_card_restriction(ce) @_find_instances.register def _(self, ce: OWLObjectMaxCardinality) -> FrozenSet[OWLNamedIndividual]: all_ = self._ind_set min_ind = self._find_instances(OWLObjectMinCardinality(cardinality=ce.get_cardinality() + 1, property=ce.get_property(), filler=ce.get_filler())) return all_ ^ min_ind @_find_instances.register def _(self, ce: OWLObjectExactCardinality) -> FrozenSet[OWLNamedIndividual]: return self._get_instances_object_card_restriction(ce) def _get_instances_object_card_restriction(self, ce: OWLObjectCardinalityRestriction): if ce in self._objectcardinality_cache: return self._objectcardinality_cache[ce] p = ce.get_property() assert isinstance(p, OWLObjectPropertyExpression) if isinstance(ce, OWLObjectMinCardinality): min_count = ce.get_cardinality() max_count = None elif isinstance(ce, OWLObjectExactCardinality): min_count = max_count = ce.get_cardinality() elif isinstance(ce, OWLObjectMaxCardinality): min_count = 0 max_count = ce.get_cardinality() else: assert isinstance(ce, OWLObjectCardinalityRestriction) raise NotImplementedError assert min_count >= 0 assert max_count is None or max_count >= 0 filler_ind = self._find_instances(ce.get_filler()) ind = self._find_some_values(p, filler_ind, min_count=min_count, max_count=max_count) self._objectcardinality_cache[ce] = ind return ind @_find_instances.register def _(self, ce: OWLDataSomeValuesFrom) -> FrozenSet[OWLNamedIndividual]: if ce in self._datasomevalues_cache: return self._datasomevalues_cache[ce] pe = ce.get_property() filler = ce.get_filler() assert isinstance(pe, OWLDataProperty) # property_cache = self._property_cache if property_cache: self._lazy_cache_data_prop(pe) dps = self._data_prop[pe] else: subs = self._some_values_subject_index(pe) ind = set() if isinstance(filler, OWLDatatype): if property_cache: # TODO: Currently we just assume that the values are of the given type (also done in DLLearner) for s in dps.keys(): ind |= {s} else: for s in subs: for lit in self._base_reasoner.data_property_values(s, pe): if lit.get_datatype() == filler: ind |= {s} break elif isinstance(filler, OWLDataOneOf): values = set(filler.values()) if property_cache: for s, literals in dps.items(): if literals & values: ind |= {s} else: for s in subs: for lit in self._base_reasoner.data_property_values(s, pe): if lit in values: ind |= {s} break elif isinstance(filler, OWLDataComplementOf): temp = self._find_instances( OWLDataSomeValuesFrom(property=pe, filler=filler.get_data_range())) if property_cache: subs = set() for s in dps.keys(): subs |= {s} ind = subs.difference(temp) elif isinstance(filler, OWLDataUnionOf): operands = [OWLDataSomeValuesFrom(pe, op) for op in filler.operands()] ind = reduce(operator.or_, map(self._find_instances, operands)) elif isinstance(filler, OWLDataIntersectionOf): operands = [OWLDataSomeValuesFrom(pe, op) for op in filler.operands()] ind = reduce(operator.and_, map(self._find_instances, operands)) elif isinstance(filler, OWLDatatypeRestriction): def res_to_callable(res: OWLFacetRestriction): op = res.get_facet().operator v = res.get_facet_value() def inner(lv: OWLLiteral): return op(lv, v) return inner apply = FunctionType.__call__ facet_restrictions = tuple(map(res_to_callable, filler.get_facet_restrictions())) def include(lv: OWLLiteral): return lv.get_datatype() == filler.get_datatype() and \ all(map(apply, facet_restrictions, repeat(lv))) if property_cache: for s, literals in dps.items(): for lit in literals: if include(lit): ind |= {s} break else: for s in subs: for lit in self._base_reasoner.data_property_values(s, pe): if include(lit): ind |= {s} break else: raise ValueError r = frozenset(ind) self._datasomevalues_cache[ce] = r return r @_find_instances.register def _(self, ce: OWLDataAllValuesFrom) -> FrozenSet[OWLNamedIndividual]: filler = ce.get_filler() if isinstance(filler, OWLDataComplementOf): filler = filler.get_data_range() else: filler = OWLDataComplementOf(filler) return self._find_instances( OWLDataSomeValuesFrom( property=ce.get_property(), filler=filler ).get_object_complement_of()) @_find_instances.register def _(self, ce: OWLDataHasValue) -> FrozenSet[OWLNamedIndividual]: return self._find_instances(ce.as_some_values_from()) def _lazy_cache_class(self, c: OWLClass) -> None: if c in self._cls_to_ind: return temp = self._base_reasoner.instances(c) self._cls_to_ind[c] = frozenset(temp)