Source code for owlapy.util

from functools import singledispatchmethod, total_ordering
from typing import Iterable, List, Type, TypeVar, Generic, Tuple, cast, Optional, Union, overload

from owlapy.model import HasIndex, HasIRI, OWLClassExpression, OWLClass, OWLObjectCardinalityRestriction, \
    OWLObjectComplementOf, OWLNothing, OWLPropertyRange, OWLRestriction, OWLThing, OWLObjectSomeValuesFrom, \
    OWLObjectHasValue, OWLObjectMinCardinality, OWLObjectMaxCardinality, OWLObjectExactCardinality, OWLObjectHasSelf, \
    OWLObjectOneOf, OWLDataMaxCardinality, OWLDataMinCardinality, OWLDataExactCardinality, OWLDataHasValue, \
    OWLDataAllValuesFrom, OWLDataSomeValuesFrom, OWLObjectAllValuesFrom, HasFiller, HasCardinality, HasOperands, \
    OWLObjectInverseOf, OWLDatatypeRestriction, OWLDataComplementOf, OWLDatatype, OWLDataUnionOf, \
    OWLDataIntersectionOf, OWLDataOneOf, OWLFacetRestriction, OWLLiteral, OWLObjectIntersectionOf, \
    OWLDataCardinalityRestriction, OWLNaryBooleanClassExpression, OWLNaryDataRange, OWLObjectUnionOf, \
    OWLDataRange, OWLObject


_HasIRI = TypeVar('_HasIRI', bound=HasIRI)  #:
_HasIndex = TypeVar('_HasIndex', bound=HasIndex)  #:
_O = TypeVar('_O')  #:
_Enc = TypeVar('_Enc')
_Con = TypeVar('_Con')
_K = TypeVar('_K')
_V = TypeVar('_V')


[docs]@total_ordering class OrderedOWLObject: """Holder of OWL Objects that can be used for Python sorted The Ordering is dependent on the type_index of the impl. classes recursively followed by all components of the OWL Object. """ __slots__ = 'o', '_chain' o: _HasIndex # o: Intersection[OWLObject, HasIndex] _chain: Optional[Tuple] # we are limited by https://github.com/python/typing/issues/213 # o: Intersection[OWLObject, HasIndex] def __init__(self, o: _HasIndex): """OWL Object holder with a defined sort order Args: o: OWL Object """ self.o = o self._chain = None def _comparison_chain(self): if self._chain is None: c = [self.o.type_index] if isinstance(self.o, OWLRestriction): c.append(OrderedOWLObject(as_index(self.o.get_property()))) if isinstance(self.o, OWLObjectInverseOf): c.append(self.o.get_named_property().get_iri().as_str()) if isinstance(self.o, HasFiller): c.append(OrderedOWLObject(self.o.get_filler())) if isinstance(self.o, HasCardinality): c.append(self.o.get_cardinality()) if isinstance(self.o, HasOperands): c.append(tuple(map(OrderedOWLObject, self.o.operands()))) if isinstance(self.o, HasIRI): c.append(self.o.get_iri().as_str()) if isinstance(self.o, OWLDataComplementOf): c.append(OrderedOWLObject(self.o.get_data_range())) if isinstance(self.o, OWLDatatypeRestriction): c.append((OrderedOWLObject(self.o.get_datatype()), tuple(map(OrderedOWLObject, self.o.get_facet_restrictions())))) if isinstance(self.o, OWLFacetRestriction): c.append((self.o.get_facet().get_iri().as_str(), self.o.get_facet_value().get_literal())) if isinstance(self.o, OWLLiteral): c.append(self.o.get_literal()) if len(c) == 1: raise NotImplementedError(type(self.o)) self._chain = tuple(c) return self._chain def __lt__(self, other): if self.o.type_index < other.o.type_index: return True elif self.o.type_index > other.o.type_index: return False else: return self._comparison_chain() < other._comparison_chain() def __eq__(self, other): return self.o == other.o
def _sort_by_ordered_owl_object(i: Iterable[_O]) -> Iterable[_O]: return sorted(i, key=OrderedOWLObject)
[docs]class NNF: """This class contains functions to transform a Class Expression into Negation Normal Form"""
[docs] @singledispatchmethod def get_class_nnf(self, ce: OWLClassExpression, negated: bool = False) -> OWLClassExpression: """Convert a Class Expression to Negation Normal Form. Operands will be sorted. Args: ce: Class Expression negated: whether the result should be negated Returns: Class Expression in Negation Normal Form """ raise NotImplementedError
@get_class_nnf.register def _(self, ce: OWLClass, negated: bool = False): if negated: if ce.is_owl_thing(): return OWLNothing if ce.is_owl_nothing(): return OWLThing return OWLObjectComplementOf(ce) return ce @get_class_nnf.register def _(self, ce: OWLObjectIntersectionOf, negated: bool = False): ops = map(lambda _: self.get_class_nnf(_, negated), _sort_by_ordered_owl_object(ce.operands())) if negated: return OWLObjectUnionOf(ops) return OWLObjectIntersectionOf(ops) @get_class_nnf.register def _(self, ce: OWLObjectUnionOf, negated: bool = False): ops = map(lambda _: self.get_class_nnf(_, negated), _sort_by_ordered_owl_object(ce.operands())) if negated: return OWLObjectIntersectionOf(ops) return OWLObjectUnionOf(ops) @get_class_nnf.register def _(self, ce: OWLObjectComplementOf, negated: bool = False): return self.get_class_nnf(ce.get_operand(), not negated) @get_class_nnf.register def _(self, ce: OWLObjectSomeValuesFrom, negated: bool = False): filler = self.get_class_nnf(ce.get_filler(), negated) if negated: return OWLObjectAllValuesFrom(ce.get_property(), filler) return OWLObjectSomeValuesFrom(ce.get_property(), filler) @get_class_nnf.register def _(self, ce: OWLObjectAllValuesFrom, negated: bool = False): filler = self.get_class_nnf(ce.get_filler(), negated) if negated: return OWLObjectSomeValuesFrom(ce.get_property(), filler) return OWLObjectAllValuesFrom(ce.get_property(), filler) @get_class_nnf.register def _(self, ce: OWLObjectHasValue, negated: bool = False): return self.get_class_nnf(ce.as_some_values_from(), negated) @get_class_nnf.register def _(self, ce: OWLObjectMinCardinality, negated: bool = False): card = ce.get_cardinality() if negated: card = max(0, card - 1) filler = self.get_class_nnf(ce.get_filler(), negated=False) if negated: return OWLObjectMaxCardinality(card, ce.get_property(), filler) return OWLObjectMinCardinality(card, ce.get_property(), filler) @get_class_nnf.register def _(self, ce: OWLObjectExactCardinality, negated: bool = False): return self.get_class_nnf(ce.as_intersection_of_min_max(), negated) @get_class_nnf.register def _(self, ce: OWLObjectMaxCardinality, negated: bool = False): card = ce.get_cardinality() if negated: card = card + 1 filler = self.get_class_nnf(ce.get_filler(), negated=False) if negated: return OWLObjectMinCardinality(card, ce.get_property(), filler) return OWLObjectMaxCardinality(card, ce.get_property(), filler) @get_class_nnf.register def _(self, ce: OWLObjectHasSelf, negated: bool = False): if negated: return ce.get_object_complement_of() return ce @get_class_nnf.register def _(self, ce: OWLObjectOneOf, negated: bool = False): union = ce.as_object_union_of() if isinstance(union, OWLObjectOneOf): if negated: return ce.get_object_complement_of() return ce return self.get_class_nnf(union, negated) @get_class_nnf.register def _(self, ce: OWLDataSomeValuesFrom, negated: bool = False): filler = self.get_class_nnf(ce.get_filler(), negated) if negated: return OWLDataAllValuesFrom(ce.get_property(), filler) return OWLDataSomeValuesFrom(ce.get_property(), filler) @get_class_nnf.register def _(self, ce: OWLDataAllValuesFrom, negated: bool = False): filler = self.get_class_nnf(ce.get_filler(), negated) if negated: return OWLDataSomeValuesFrom(ce.get_property(), filler) return OWLDataAllValuesFrom(ce.get_property(), filler) @get_class_nnf.register def _(self, ce: OWLDatatypeRestriction, negated: bool = False): if negated: return OWLDataComplementOf(ce) return ce @get_class_nnf.register def _(self, ce: OWLDatatype, negated: bool = False): if negated: return OWLDataComplementOf(ce) return ce @get_class_nnf.register def _(self, ce: OWLDataComplementOf, negated: bool = False): return self.get_class_nnf(ce.get_data_range(), not negated) @get_class_nnf.register def _(self, ce: OWLDataHasValue, negated: bool = False): return self.get_class_nnf(ce.as_some_values_from(), negated) @get_class_nnf.register def _(self, ce: OWLDataOneOf, negated: bool = False): if len(list(ce.values())) == 1: if negated: return OWLDataComplementOf(ce) return ce union = OWLDataUnionOf([OWLDataOneOf(v) for v in ce.values()]) return self.get_class_nnf(union, negated) @get_class_nnf.register def _(self, ce: OWLDataIntersectionOf, negated: bool = False): ops = map(lambda _: self.get_class_nnf(_, negated), _sort_by_ordered_owl_object(ce.operands())) if negated: return OWLDataUnionOf(ops) return OWLDataIntersectionOf(ops) @get_class_nnf.register def _(self, ce: OWLDataUnionOf, negated: bool = False): ops = map(lambda _: self.get_class_nnf(_, negated), _sort_by_ordered_owl_object(ce.operands())) if negated: return OWLDataIntersectionOf(ops) return OWLDataUnionOf(ops) @get_class_nnf.register def _(self, ce: OWLDataExactCardinality, negated: bool = False): return self.get_class_nnf(ce.as_intersection_of_min_max(), negated) @get_class_nnf.register def _(self, ce: OWLDataMinCardinality, negated: bool = False): card = ce.get_cardinality() if negated: card = max(0, card - 1) filler = self.get_class_nnf(ce.get_filler(), negated=False) if negated: return OWLDataMaxCardinality(card, ce.get_property(), filler) return OWLDataMinCardinality(card, ce.get_property(), filler) @get_class_nnf.register def _(self, ce: OWLDataMaxCardinality, negated: bool = False): card = ce.get_cardinality() if negated: card = card + 1 filler = self.get_class_nnf(ce.get_filler(), negated=False) if negated: return OWLDataMinCardinality(card, ce.get_property(), filler) return OWLDataMaxCardinality(card, ce.get_property(), filler)
# OWL-APy custom util start
[docs]class TopLevelCNF: """This class contains functions to transform a class expression into Top-Level Conjunctive Normal Form"""
[docs] def get_top_level_cnf(self, ce: OWLClassExpression) -> OWLClassExpression: """Convert a class expression into Top-Level Conjunctive Normal Form. Operands will be sorted. Args: ce: Class Expression Returns: Class Expression in Top-Level Conjunctive Normal Form """ c = _get_top_level_form(ce.get_nnf(), OWLObjectUnionOf, OWLObjectIntersectionOf) return combine_nary_expressions(c)
[docs]class TopLevelDNF: """This class contains functions to transform a class expression into Top-Level Disjunctive Normal Form"""
[docs] def get_top_level_dnf(self, ce: OWLClassExpression) -> OWLClassExpression: """Convert a class expression into Top-Level Disjunctive Normal Form. Operands will be sorted. Args: ce: Class Expression Returns: Class Expression in Top-Level Disjunctive Normal Form """ c = _get_top_level_form(ce.get_nnf(), OWLObjectIntersectionOf, OWLObjectUnionOf) return combine_nary_expressions(c)
def _get_top_level_form(ce: OWLClassExpression, type_a: Type[OWLNaryBooleanClassExpression], type_b: Type[OWLNaryBooleanClassExpression]) -> OWLClassExpression: """ Transforms a class expression (that's already in NNF) into Top-Level Conjunctive/Disjunctive Normal Form. Here type_a specifies the operand which should be distributed inwards over type_b. Conjunctive Normal form: type_a = OWLObjectUnionOf type_b = OWLObjectIntersectionOf Disjunctive Normal form: type_a = OWLObjectIntersectionOf type_b = OWLObjectUnionOf """ def distributive_law(a: OWLClassExpression, b: OWLNaryBooleanClassExpression) -> OWLNaryBooleanClassExpression: return type_b(type_a([a, op]) for op in b.operands()) if isinstance(ce, type_a): ce = cast(OWLNaryBooleanClassExpression, combine_nary_expressions(ce)) type_b_exprs = [op for op in ce.operands() if isinstance(op, type_b)] non_type_b_exprs = [op for op in ce.operands() if not isinstance(op, type_b)] if not len(type_b_exprs): return ce if len(non_type_b_exprs): expr = non_type_b_exprs[0] if len(non_type_b_exprs) == 1 \ else type_a(non_type_b_exprs) expr = distributive_law(expr, type_b_exprs[0]) else: expr = type_b_exprs[0] if len(type_b_exprs) == 1: return _get_top_level_form(expr, type_a, type_b) for type_b_expr in type_b_exprs[1:]: expr = distributive_law(type_b_expr, expr) return _get_top_level_form(expr, type_a, type_b) elif isinstance(ce, type_b): return type_b(_get_top_level_form(op, type_a, type_b) for op in ce.operands()) elif isinstance(ce, OWLClassExpression): return ce else: raise ValueError('Top-Level CNF/DNF only applicable on class expressions', ce) @overload def combine_nary_expressions(ce: OWLClassExpression) -> OWLClassExpression: ... @overload def combine_nary_expressions(ce: OWLDataRange) -> OWLDataRange: ...
[docs]def combine_nary_expressions(ce: OWLPropertyRange) -> OWLPropertyRange: ''' Shortens an OWLClassExpression or OWLDataRange by combining all nested nary expressions of the same type. Operands will be sorted. E.g. OWLObjectUnionOf(A, OWLObjectUnionOf(C, B)) -> OWLObjectUnionOf(A, B, C) ''' if isinstance(ce, (OWLNaryBooleanClassExpression, OWLNaryDataRange)): expressions: List[OWLPropertyRange] = [] for op in ce.operands(): expr = combine_nary_expressions(op) if type(expr) is type(ce): expr = cast(Union[OWLNaryBooleanClassExpression, OWLNaryDataRange], expr) expressions.extend(expr.operands()) else: expressions.append(expr) return type(ce)(_sort_by_ordered_owl_object(expressions)) # type: ignore elif isinstance(ce, OWLObjectComplementOf): return OWLObjectComplementOf(combine_nary_expressions(ce.get_operand())) elif isinstance(ce, OWLDataComplementOf): return OWLDataComplementOf(combine_nary_expressions(ce.get_data_range())) elif isinstance(ce, OWLObjectCardinalityRestriction): return type(ce)(ce.get_cardinality(), ce.get_property(), combine_nary_expressions(ce.get_filler())) elif isinstance(ce, OWLDataCardinalityRestriction): return type(ce)(ce.get_cardinality(), ce.get_property(), combine_nary_expressions(ce.get_filler())) elif isinstance(ce, (OWLObjectSomeValuesFrom, OWLObjectAllValuesFrom)): return type(ce)(ce.get_property(), combine_nary_expressions(ce.get_filler())) elif isinstance(ce, (OWLDataSomeValuesFrom, OWLDataAllValuesFrom)): return type(ce)(ce.get_property(), combine_nary_expressions(ce.get_filler())) elif isinstance(ce, OWLObjectOneOf): return OWLObjectOneOf(_sort_by_ordered_owl_object(ce.operands())) elif isinstance(ce, OWLDataOneOf): return OWLDataOneOf(_sort_by_ordered_owl_object(ce.operands())) elif isinstance(ce, OWLPropertyRange): return ce else: raise ValueError(f'({expr}) is not an OWLObject.')
[docs]def iter_count(i: Iterable) -> int: """Count the number of elements in an iterable""" return sum(1 for _ in i)
[docs]def as_index(o: OWLObject) -> HasIndex: """Cast OWL Object to HasIndex""" i = cast(HasIndex, o) assert type(i).type_index return i
# adapted from functools.lru_cache
[docs]class LRUCache(Generic[_K, _V]): # Constants shared by all lru cache instances: sentinel = object() # unique object used to signal cache misses PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 # names for the link fields def __init__(self, maxsize: Optional[int] = None): from _thread import RLock self.cache = {} self.hits = self.misses = 0 self.full = False self.cache_get = self.cache.get # bound method to lookup a key or return None self.cache_len = self.cache.__len__ # get cache size without calling len() self.lock = RLock() # because linkedlist updates aren't threadsafe self.root = [] # root of the circular doubly linked list self.root[:] = [self.root, self.root, None, None] # initialize by pointing to self self.maxsize = maxsize def __contains__(self, item: _K) -> bool: with self.lock: link = self.cache_get(item) if link is not None: self.hits += 1 return True self.misses += 1 return False def __getitem__(self, item: _K) -> _V: with self.lock: link = self.cache_get(item) if link is not None: # Move the link to the front of the circular queue link_prev, link_next, _key, result = link link_prev[LRUCache.NEXT] = link_next link_next[LRUCache.PREV] = link_prev last = self.root[LRUCache.PREV] last[LRUCache.NEXT] = self.root[LRUCache.PREV] = link link[LRUCache.PREV] = last link[LRUCache.NEXT] = self.root return result def __setitem__(self, key: _K, value: _V): with self.lock: if key in self.cache: # Getting here means that this same key was added to the # cache while the lock was released. Since the link # update is already done, we need only return the # computed result and update the count of misses. pass elif self.full: # Use the old root to store the new key and result. oldroot = self.root oldroot[LRUCache.KEY] = key oldroot[LRUCache.RESULT] = value # Empty the oldest link and make it the new root. # Keep a reference to the old key and old result to # prevent their ref counts from going to zero during the # update. That will prevent potentially arbitrary object # clean-up code (i.e. __del__) from running while we're # still adjusting the links. self.root = oldroot[LRUCache.NEXT] oldkey = self.root[LRUCache.KEY] _oldresult = self.root[LRUCache.RESULT] # noqa: F841 self.root[LRUCache.KEY] = self.root[LRUCache.RESULT] = None # Now update the cache dictionary. del self.cache[oldkey] # Save the potentially reentrant cache[key] assignment # for last, after the root and links have been put in # a consistent state. self.cache[key] = oldroot else: # Put result in a new link at the front of the queue. last = self.root[LRUCache.PREV] link = [last, self.root, key, value] last[LRUCache.NEXT] = self.root[LRUCache.PREV] = self.cache[key] = link # Use the cache_len bound method instead of the len() function # which could potentially be wrapped in an lru_cache itself. if self.maxsize is not None: self.full = (self.cache_len() >= self.maxsize)
[docs] def cache_info(self): """Report cache statistics""" with self.lock: from collections import namedtuple return namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])( self.hits, self.misses, self.maxsize, self.cache_len())
[docs] def cache_clear(self): """Clear the cache and cache statistics""" with self.lock: self.cache.clear() self.root[:] = [self.root, self.root, None, None] self.hits = self.misses = 0 self.full = False