Source code for owlapy.owlready2._base

import logging
from datetime import date, datetime
from enum import Enum, auto
from itertools import chain
from types import MappingProxyType
from typing import Iterable, Set, Final

import owlready2
from owlready2 import declare_datatype
from pandas import Timedelta

from owlapy.owlready2 import axioms
from owlapy import namespaces
from owlapy.ext import OWLReasonerEx
from owlapy.model import OWLObjectPropertyRangeAxiom, OWLOntologyManager, OWLDataProperty, OWLObjectProperty, \
    OWLNamedIndividual, OWLClassExpression, OWLObjectPropertyExpression, OWLOntologyID, OWLAxiom, OWLOntology, \
    OWLOntologyChange, AddImport, OWLThing, DoubleOWLDatatype, OWLObjectPropertyDomainAxiom, OWLLiteral, \
    OWLObjectInverseOf, BooleanOWLDatatype, IntegerOWLDatatype, DateOWLDatatype, DateTimeOWLDatatype, OWLClass, \
    DurationOWLDatatype, StringOWLDatatype, IRI, OWLDataPropertyRangeAxiom, OWLDataPropertyDomainAxiom
from owlapy.owlready2.utils import FromOwlready2

logger = logging.getLogger(__name__)

_Datatype_map: Final = MappingProxyType({
    int: IntegerOWLDatatype,
    float: DoubleOWLDatatype,
    bool: BooleanOWLDatatype,
    str: StringOWLDatatype,
    date: DateOWLDatatype,
    datetime: DateTimeOWLDatatype,
    Timedelta: DurationOWLDatatype,
})

_parse_concept_to_owlapy = FromOwlready2().map_concept
_parse_datarange_to_owlapy = FromOwlready2().map_datarange

_VERSION_IRI: Final = IRI.create(namespaces.OWL, "versionIRI")


def _parse_duration_datatype(literal: str):
    return Timedelta(literal)


def _unparse_duration_datatype(literal: Timedelta):
    return literal.isoformat()


declare_datatype(Timedelta, IRI.create(namespaces.XSD, "duration").as_str(),
                 _parse_duration_datatype, _unparse_duration_datatype)


class BaseReasoner_Owlready2(Enum):
    PELLET = auto()
    HERMIT = auto()


class OWLOntologyManager_Owlready2(OWLOntologyManager):
    __slots__ = '_world'

    _world: owlready2.namespace.World

    def __init__(self, world_store=None):
        if world_store is None:
            self._world = owlready2.World()
        else:
            self._world = owlready2.World(filename=world_store)

[docs] def create_ontology(self, iri: IRI) -> 'OWLOntology_Owlready2': return OWLOntology_Owlready2(self, iri, load=False)
[docs] def load_ontology(self, iri: IRI) -> 'OWLOntology_Owlready2': return OWLOntology_Owlready2(self, iri, load=True)
[docs] def apply_change(self, change: OWLOntologyChange): if isinstance(change, AddImport): ont_x: owlready2.namespace.Ontology = self._world.get_ontology( change.get_ontology().get_ontology_id().get_ontology_iri().as_str()) ont_x.imported_ontologies.append( self._world.get_ontology(change.get_import_declaration().get_iri().as_str())) else: # TODO XXX raise NotImplementedError
[docs] def add_axiom(self, ontology: OWLOntology, axiom: OWLAxiom): axioms._add_axiom(axiom, ontology, self._world)
[docs] def remove_axiom(self, ontology: OWLOntology, axiom: OWLAxiom): axioms._remove_axiom(axiom, ontology, self._world)
[docs] def save_ontology(self, ontology: OWLOntology, document_iri: IRI): ont_x: owlready2.namespace.Ontology = self._world.get_ontology( ontology.get_ontology_id().get_ontology_iri().as_str() ) if document_iri.get_namespace().startswith('file:/'): filename = document_iri.as_str()[len('file:/'):] ont_x.save(file=filename) else: # TODO XXX raise NotImplementedError
[docs] def save_world(self): self._world.save()
class OWLOntology_Owlready2(OWLOntology): __slots__ = '_manager', '_world', '_onto' _manager: OWLOntologyManager_Owlready2 _onto: owlready2.Ontology _world: owlready2.World def __init__(self, manager: OWLOntologyManager_Owlready2, ontology_iri: IRI, load: bool): self._manager = manager self._world = manager._world onto = self._world.get_ontology(ontology_iri.as_str()) if load: onto = onto.load() self._onto = onto
[docs] def classes_in_signature(self) -> Iterable[OWLClass]: for c in self._onto.classes(): yield OWLClass(IRI.create(c.iri))
[docs] def data_properties_in_signature(self) -> Iterable[OWLDataProperty]: for dp in self._onto.data_properties(): yield OWLDataProperty(IRI.create(dp.iri))
[docs] def object_properties_in_signature(self) -> Iterable[OWLObjectProperty]: for op in self._onto.object_properties(): yield OWLObjectProperty(IRI.create(op.iri))
[docs] def individuals_in_signature(self) -> Iterable[OWLNamedIndividual]: for i in self._onto.individuals(): yield OWLNamedIndividual(IRI.create(i.iri))
[docs] def get_owl_ontology_manager(self) -> OWLOntologyManager_Owlready2: return self._manager
[docs] def get_ontology_id(self) -> OWLOntologyID: onto_iri = self._world._unabbreviate(self._onto.storid) look_version = self._world._get_obj_triple_sp_o( self._onto.storid, self._world._abbreviate(_VERSION_IRI.as_str())) if look_version is not None: version_iri = self._world._unabbreviate(look_version) else: version_iri = None return OWLOntologyID(IRI.create(onto_iri) if onto_iri is not None else None, IRI.create(version_iri) if version_iri is not None else None)
[docs] def data_property_domain_axioms(self, pe: OWLDataProperty) -> Iterable[OWLDataPropertyDomainAxiom]: p_x: owlready2.DataPropertyClass = self._world[pe.get_iri().as_str()] domains = set(p_x.domains_indirect()) if len(domains) == 0: yield OWLDataPropertyDomainAxiom(pe, OWLThing) else: for dom in domains: if isinstance(dom, owlready2.ThingClass) or isinstance(dom, owlready2.ClassConstruct): yield OWLDataPropertyDomainAxiom(pe, _parse_concept_to_owlapy(dom)) else: logger.warning("Construct %s not implemented at %s", dom, pe) pass # XXX TODO
[docs] def data_property_range_axioms(self, pe: OWLDataProperty) -> Iterable[OWLDataPropertyRangeAxiom]: p_x: owlready2.DataPropertyClass = self._world[pe.get_iri().as_str()] ranges = set(chain.from_iterable(super_prop.range for super_prop in p_x.ancestors())) if len(ranges) == 0: pass # TODO else: for rng in ranges: if rng in _Datatype_map: yield OWLDataPropertyRangeAxiom(pe, _Datatype_map[rng]) elif isinstance(rng, owlready2.ClassConstruct): yield OWLDataPropertyRangeAxiom(pe, _parse_datarange_to_owlapy(rng)) else: logger.warning("Datatype %s not implemented at %s", rng, pe) pass # XXX TODO
[docs] def object_property_domain_axioms(self, pe: OWLObjectProperty) -> Iterable[OWLObjectPropertyDomainAxiom]: p_x: owlready2.ObjectPropertyClass = self._world[pe.get_iri().as_str()] domains = set(p_x.domains_indirect()) if len(domains) == 0: yield OWLObjectPropertyDomainAxiom(pe, OWLThing) else: for dom in domains: if isinstance(dom, owlready2.ThingClass) or isinstance(dom, owlready2.ClassConstruct): yield OWLObjectPropertyDomainAxiom(pe, _parse_concept_to_owlapy(dom)) else: logger.warning("Construct %s not implemented at %s", dom, pe) pass # XXX TODO
[docs] def object_property_range_axioms(self, pe: OWLObjectProperty) -> Iterable[OWLObjectPropertyRangeAxiom]: p_x: owlready2.ObjectPropertyClass = self._world[pe.get_iri().as_str()] ranges = set(chain.from_iterable(super_prop.range for super_prop in p_x.ancestors())) if len(ranges) == 0: yield OWLObjectPropertyRangeAxiom(pe, OWLThing) else: for rng in ranges: if isinstance(rng, owlready2.ThingClass) or isinstance(rng, owlready2.ClassConstruct): yield OWLObjectPropertyRangeAxiom(pe, _parse_concept_to_owlapy(rng)) else: logger.warning("Construct %s not implemented at %s", rng, pe) pass # XXX TODO
def __eq__(self, other): if type(other) == type(self): return self._onto.loaded == other._onto.loaded and self._onto.base_iri == other._onto.base_iri return NotImplemented def __hash__(self): return hash(self._onto.base_iri) def __repr__(self): return f'OWLOntology_Owlready2({IRI.create(self._onto.base_iri)}, {self._onto.loaded})' class OWLReasoner_Owlready2(OWLReasonerEx): __slots__ = '_ontology', '_world' _ontology: OWLOntology_Owlready2 _world: owlready2.World def __init__(self, ontology: OWLOntology_Owlready2): super().__init__(ontology) assert isinstance(ontology, OWLOntology_Owlready2) self._ontology = ontology self._world = ontology._world
[docs] def data_property_domains(self, pe: OWLDataProperty, direct: bool = False) -> Iterable[OWLClassExpression]: domains = {d.get_domain() for d in self.get_root_ontology().data_property_domain_axioms(pe)} # TODO: Remove if when super_classes is implemented for complex class expressions super_domains = set(chain.from_iterable([self.super_classes(d) for d in domains if isinstance(d, OWLClass)])) yield from domains - super_domains if not direct: yield from super_domains
[docs] def object_property_domains(self, pe: OWLObjectProperty, direct: bool = False) -> Iterable[OWLClassExpression]: domains = {d.get_domain() for d in self.get_root_ontology().object_property_domain_axioms(pe)} # TODO: Remove if when super_classes is implemented for complex class expressions super_domains = set(chain.from_iterable([self.super_classes(d) for d in domains if isinstance(d, OWLClass)])) yield from domains - super_domains if not direct: yield from super_domains
[docs] def object_property_ranges(self, pe: OWLObjectProperty, direct: bool = False) -> Iterable[OWLClassExpression]: ranges = {r.get_range() for r in self.get_root_ontology().object_property_range_axioms(pe)} # TODO: Remove if when super_classes is implemented for complex class expressions super_ranges = set(chain.from_iterable([self.super_classes(d) for d in ranges if isinstance(d, OWLClass)])) yield from ranges - super_ranges if not direct: yield from super_ranges
[docs] def equivalent_classes(self, ce: OWLClassExpression) -> Iterable[OWLClass]: """Return the named classes that are directly equivalent to the class expression""" if isinstance(ce, OWLClass): c_x: owlready2.ThingClass = self._world[ce.get_iri().as_str()] for c in c_x.equivalent_to: if isinstance(c, owlready2.ThingClass): yield OWLClass(IRI.create(c.iri)) # Anonymous classes are ignored else: raise NotImplementedError("equivalent_classes for complex class expressions not implemented", ce)
[docs] def disjoint_classes(self, ce: OWLClassExpression) -> Iterable[OWLClass]: if isinstance(ce, OWLClass): c_x: owlready2.ThingClass = self._world[ce.get_iri().as_str()] for c in chain.from_iterable(map(lambda d: d.entities, c_x.disjoints())): if isinstance(c, owlready2.ThingClass) and c != c_x: yield OWLClass(IRI.create(c.iri)) # Anonymous classes are ignored else: raise NotImplementedError("disjoint_classes for complex class expressions not implemented", ce)
[docs] def different_individuals(self, ind: OWLNamedIndividual) -> Iterable[OWLNamedIndividual]: i: owlready2.Thing = self._world[ind.get_iri().as_str()] yield from (OWLNamedIndividual(IRI.create(d_i.iri)) for d_i in chain.from_iterable(map(lambda x: x.entities, i.differents())) if isinstance(d_i, owlready2.Thing) and i != d_i)
[docs] def same_individuals(self, ind: OWLNamedIndividual) -> Iterable[OWLNamedIndividual]: i: owlready2.Thing = self._world[ind.get_iri().as_str()] yield from (OWLNamedIndividual(IRI.create(d_i.iri)) for d_i in i.equivalent_to if isinstance(d_i, owlready2.Thing))
[docs] def data_property_values(self, ind: OWLNamedIndividual, pe: OWLDataProperty) -> Iterable[OWLLiteral]: i: owlready2.Thing = self._world[ind.get_iri().as_str()] p: owlready2.DataPropertyClass = self._world[pe.get_iri().as_str()] for val in p._get_values_for_individual(i): yield OWLLiteral(val)
[docs] def all_data_property_values(self, pe: OWLDataProperty) -> Iterable[OWLLiteral]: p: owlready2.DataPropertyClass = self._world[pe.get_iri().as_str()] for _, val in p.get_relations(): yield OWLLiteral(val)
[docs] def object_property_values(self, ind: OWLNamedIndividual, pe: OWLObjectPropertyExpression) \ -> Iterable[OWLNamedIndividual]: if isinstance(pe, OWLObjectProperty): i: owlready2.Thing = self._world[ind.get_iri().as_str()] p: owlready2.ObjectPropertyClass = self._world[pe.get_iri().as_str()] for val in p._get_values_for_individual(i): yield OWLNamedIndividual(IRI.create(val.iri)) elif isinstance(pe, OWLObjectInverseOf): i: owlready2.Thing = self._world[ind.get_iri().as_str()] p: owlready2.ObjectPropertyClass = self._world[pe.get_named_property().get_iri().as_str()] for val in p._get_inverse_values_for_individual(i): yield OWLNamedIndividual(IRI.create(val.iri)) else: raise NotImplementedError(pe)
[docs] def flush(self) -> None: pass
[docs] def instances(self, ce: OWLClassExpression, direct: bool = False) -> Iterable[OWLNamedIndividual]: if direct: if isinstance(ce, OWLClass): c_x: owlready2.ThingClass = self._world[ce.get_iri().as_str()] for i in self._ontology._onto.get_instances_of(c_x): if isinstance(i, owlready2.Thing): yield OWLNamedIndividual(IRI.create(i.iri)) else: raise NotImplementedError("instances for complex class expressions not implemented", ce) else: if ce.is_owl_thing(): yield from self._ontology.individuals_in_signature() elif isinstance(ce, OWLClass): c_x: owlready2.ThingClass = self._world[ce.get_iri().as_str()] for i in c_x.instances(world=self._world): if isinstance(i, owlready2.Thing): yield OWLNamedIndividual(IRI.create(i.iri)) # elif isinstance(ce, OWLObjectSomeValuesFrom) and ce.get_filler().is_owl_thing()\ # and isinstance(ce.get_property(), OWLProperty): # seen_set = set() # p_x: owlready2.ObjectProperty = self._world[ce.get_property().get_named_property().get_iri().as_str()] # for i, _ in p_x.get_relations(): # if isinstance(i, owlready2.Thing) and i not in seen_set: # seen_set.add(i) # yield OWLNamedIndividual(IRI.create(i.iri)) else: raise NotImplementedError("instances for complex class expressions not implemented", ce)
def _named_sub_classes_recursive(self, c: OWLClass, seen_set: Set) -> Iterable[OWLClass]: c_x: owlready2.EntityClass = self._world[c.get_iri().as_str()] # work around issue in class equivalence detection in Owlready2 for c2 in [c_x, *c_x.equivalent_to]: if isinstance(c2, owlready2.ThingClass): for sc in c2.subclasses(world=self._world): if isinstance(sc, owlready2.ThingClass) and sc not in seen_set: seen_set.add(sc) owl_sc = OWLClass(IRI.create(sc.iri)) yield owl_sc yield from self._named_sub_classes_recursive(owl_sc, seen_set)
[docs] def sub_classes(self, ce: OWLClassExpression, direct: bool = False) -> Iterable[OWLClass]: if isinstance(ce, OWLClass): if direct: c_x: owlready2.ThingClass = self._world[ce.get_iri().as_str()] for sc in c_x.subclasses(world=self._world): if isinstance(sc, owlready2.ThingClass): yield OWLClass(IRI.create(sc.iri)) # Anonymous classes are ignored else: # indirect seen_set = set() yield from self._named_sub_classes_recursive(ce, seen_set) else: raise NotImplementedError("sub classes for complex class expressions not implemented", ce)
[docs] def super_classes(self, ce: OWLClassExpression, direct: bool = False) -> Iterable[OWLClass]: if isinstance(ce, OWLClass): c_x: owlready2.ThingClass = self._world[ce.get_iri().as_str()] if direct: for sc in c_x.is_a: if isinstance(sc, owlready2.ThingClass): yield OWLClass(IRI.create(sc.iri)) # Anonymous classes are ignored else: # indirect for sc in c_x.ancestors(include_self=False): if isinstance(sc, owlready2.ThingClass): yield OWLClass(IRI.create(sc.iri)) else: raise NotImplementedError("super classes for complex class expressions not implemented", ce)
[docs] def equivalent_object_properties(self, op: OWLObjectPropertyExpression) -> Iterable[OWLObjectPropertyExpression]: if isinstance(op, OWLObjectProperty): p_x: owlready2.ObjectPropertyClass = self._world[op.get_iri().as_str()] yield from (OWLObjectProperty(IRI.create(ep_x.iri)) for ep_x in p_x.equivalent_to if isinstance(ep_x, owlready2.ObjectPropertyClass)) else: raise NotImplementedError("equivalent properties of inverse properties not yet implemented", op)
[docs] def equivalent_data_properties(self, dp: OWLDataProperty) -> Iterable[OWLDataProperty]: p_x: owlready2.DataPropertyClass = self._world[dp.get_iri().as_str()] yield from (OWLDataProperty(IRI.create(ep_x.iri)) for ep_x in p_x.equivalent_to if isinstance(ep_x, owlready2.DataPropertyClass))
[docs] def disjoint_object_properties(self, op: OWLObjectPropertyExpression) -> Iterable[OWLObjectPropertyExpression]: if isinstance(op, OWLObjectProperty): p_x: owlready2.ObjectPropertyClass = self._world[op.get_iri().as_str()] ont_x: owlready2.Ontology = self.get_root_ontology()._onto for disjoint in ont_x.disjoint_properties(): if p_x in disjoint.entities: yield from (OWLObjectProperty(IRI.create(o_p.iri)) for o_p in disjoint.entities if isinstance(o_p, owlready2.ObjectPropertyClass) and o_p != p_x) else: raise NotImplementedError("disjoint object properties of inverse properties not yet implemented", op)
[docs] def disjoint_data_properties(self, dp: OWLDataProperty) -> Iterable[OWLDataProperty]: p_x: owlready2.DataPropertyClass = self._world[dp.get_iri().as_str()] ont_x: owlready2.Ontology = self.get_root_ontology()._onto for disjoint in ont_x.disjoint_properties(): if p_x in disjoint.entities: yield from (OWLDataProperty(IRI.create(o_p.iri)) for o_p in disjoint.entities if isinstance(o_p, owlready2.DataPropertyClass) and o_p != p_x)
def _sub_data_properties_recursive(self, dp: OWLDataProperty, seen_set: Set) -> Iterable[OWLDataProperty]: p_x: owlready2.DataPropertyClass = self._world[dp.get_iri().as_str()] assert isinstance(p_x, owlready2.DataPropertyClass) for sp_x in p_x.subclasses(world=self._world): if isinstance(sp_x, owlready2.DataPropertyClass) and sp_x not in seen_set: seen_set.add(sp_x) sp = OWLDataProperty(IRI.create(sp_x.iri)) yield sp yield from self._sub_data_properties_recursive(sp, seen_set)
[docs] def sub_data_properties(self, dp: OWLDataProperty, direct: bool = False) -> Iterable[OWLDataProperty]: assert isinstance(dp, OWLDataProperty) if direct: p_x: owlready2.DataPropertyClass = self._world[dp.get_iri().as_str()] for sp in p_x.subclasses(world=self._world): if isinstance(sp, owlready2.DataPropertyClass): yield OWLDataProperty(IRI.create(sp.iri)) else: seen_set = set() yield from self._sub_data_properties_recursive(dp, seen_set)
def _sub_object_properties_recursive(self, op: OWLObjectProperty, seen_set: Set) -> Iterable[OWLObjectProperty]: p_x: owlready2.ObjectPropertyClass = self._world[op.get_iri().as_str()] assert isinstance(p_x, owlready2.ObjectPropertyClass) for sp_x in p_x.subclasses(world=self._world): if isinstance(sp_x, owlready2.ObjectPropertyClass) and sp_x not in seen_set: seen_set.add(sp_x) sp = OWLObjectProperty(IRI.create(sp_x.iri)) yield sp yield from self._sub_object_properties_recursive(sp, seen_set)
[docs] def sub_object_properties(self, op: OWLObjectPropertyExpression, direct: bool = False) \ -> Iterable[OWLObjectPropertyExpression]: assert isinstance(op, OWLObjectPropertyExpression) if isinstance(op, OWLObjectProperty): if direct: p_x: owlready2.ObjectPropertyClass = self._world[op.get_iri().as_str()] for sp in p_x.subclasses(world=self._world): if isinstance(sp, owlready2.ObjectPropertyClass): yield OWLObjectProperty(IRI.create(sp.iri)) else: seen_set = set() yield from self._sub_object_properties_recursive(op, seen_set) else: raise NotImplementedError("sub properties of inverse properties not yet implemented", op)
[docs] def types(self, ind: OWLNamedIndividual, direct: bool = False) -> Iterable[OWLClass]: i: owlready2.Thing = self._world[ind.get_iri().as_str()] if direct: for c in i.is_a: if isinstance(c, owlready2.ThingClass): yield OWLClass(IRI.create(c.iri)) # Anonymous classes are ignored else: for c in i.INDIRECT_is_a: if isinstance(c, owlready2.ThingClass): yield OWLClass(IRI.create(c.iri))
# Anonymous classes are ignored def _sync_reasoner(self, other_reasoner: BaseReasoner_Owlready2 = None, infer_property_values: bool = True, infer_data_property_values: bool = True) -> None: """Call Owlready2's sync_reasoner method, which spawns a Java process on a temp file to infer more Args: other_reasoner: set to BaseReasoner.PELLET (default) or BaseReasoner.HERMIT infer_property_values: whether to infer property values infer_data_property_values: whether to infer data property values (only for PELLET) """ assert other_reasoner is None or isinstance(other_reasoner, BaseReasoner_Owlready2) with self.get_root_ontology()._onto: if other_reasoner == BaseReasoner_Owlready2.HERMIT: owlready2.sync_reasoner_hermit(self._world, infer_property_values=infer_property_values) else: owlready2.sync_reasoner_pellet(self._world, infer_property_values=infer_property_values, infer_data_property_values=infer_data_property_values)
[docs] def get_root_ontology(self) -> OWLOntology: return self._ontology