Source code for rdflib.extras.infixowl

"""RDFLib Python binding for OWL Abstract Syntax

OWL Constructor     DL Syntax       Manchester OWL Syntax   Example
====================================================================================
intersectionOf      C ∩ D              C AND D             Human AND Male
unionOf             C ∪ D              C OR D              Man OR Woman
complementOf         ¬ C               NOT C               NOT Male
oneOf             {a} ∪ {b}...        {a b ...}            {England Italy Spain}
someValuesFrom      ∃ R C              R SOME C            hasColleague SOME Professor
allValuesFrom       ∀ R C              R ONLY C            hasColleague ONLY Professor
minCardinality      ≥ N R              R MIN 3             hasColleague MIN 3
maxCardinality      ≤ N R              R MAX 3             hasColleague MAX 3
cardinality         = N R              R EXACTLY 3         hasColleague EXACTLY 3
hasValue             ∃ R               {a} R VALUE a       hasColleague VALUE Matthew

see: http://www.w3.org/TR/owl-semantics/syntax.html
     http://owl-workshop.man.ac.uk/acceptedLong/submission_9.pdf

3.2.3 Axioms for complete classes without using owl:equivalentClass

Named class description of type 2 (with owl:oneOf) or type 4-6
(with owl:intersectionOf, owl:unionOf or owl:complementOf

Uses Manchester Syntax for __repr__

>>> exNs = Namespace("http://example.com/")
>>> g = Graph()
>>> g.bind("ex", exNs, override=False)

Now we have an empty graph, we can construct OWL classes in it
using the Python classes defined in this module

>>> a = Class(exNs.Opera, graph=g)

Now we can assert rdfs:subClassOf and owl:equivalentClass relationships
(in the underlying graph) with other classes using the 'subClassOf'
and 'equivalentClass' descriptors which can be set to a list
of objects for the corresponding predicates.

>>> a.subClassOf = [exNs.MusicalWork]

We can then access the rdfs:subClassOf relationships

>>> print(list(a.subClassOf))
[Class: ex:MusicalWork ]

This can also be used against already populated graphs:

>>> owlGraph = Graph().parse(str(OWL))
>>> list(Class(OWL.Class, graph=owlGraph).subClassOf)
[Class: rdfs:Class ]

Operators are also available. For instance we can add ex:Opera to the extension
of the ex:CreativeWork class via the '+=' operator

>>> a
Class: ex:Opera SubClassOf: ex:MusicalWork
>>> b = Class(exNs.CreativeWork, graph=g)
>>> b += a
>>> print(sorted(a.subClassOf, key=lambda c:c.identifier))
[Class: ex:CreativeWork , Class: ex:MusicalWork ]

And we can then remove it from the extension as well

>>> b -= a
>>> a
Class: ex:Opera SubClassOf: ex:MusicalWork

Boolean class constructions can also  be created with Python operators.
For example, The | operator can be used to construct a class consisting of a
owl:unionOf the operands:

>>> c =  a | b | Class(exNs.Work, graph=g)
>>> c
( ex:Opera OR ex:CreativeWork OR ex:Work )

Boolean class expressions can also be operated as lists (using python list
operators)

>>> del c[c.index(Class(exNs.Work, graph=g))]
>>> c
( ex:Opera OR ex:CreativeWork )

The '&' operator can be used to construct class intersection:

>>> woman = Class(exNs.Female, graph=g) & Class(exNs.Human, graph=g)
>>> woman.identifier = exNs.Woman
>>> woman
( ex:Female AND ex:Human )
>>> len(woman)
2

Enumerated classes can also be manipulated

>>> contList = [Class(exNs.Africa, graph=g), Class(exNs.NorthAmerica, graph=g)]
>>> EnumeratedClass(members=contList, graph=g)
{ ex:Africa ex:NorthAmerica }

owl:Restrictions can also be instantiated:

>>> Restriction(exNs.hasParent, graph=g, allValuesFrom=exNs.Human)
( ex:hasParent ONLY ex:Human )

Restrictions can also be created using Manchester OWL syntax in 'colloquial'
Python
>>> exNs.hasParent @ some @ Class(exNs.Physician, graph=g)
( ex:hasParent SOME ex:Physician )

>>> Property(exNs.hasParent, graph=g) @ max @ Literal(1)
( ex:hasParent MAX 1 )

>>> print(g.serialize(format='pretty-xml'))  # doctest: +SKIP

"""

from __future__ import annotations

import itertools
import logging
from typing import Iterable, Union

from rdflib.collection import Collection
from rdflib.graph import Graph, _ObjectType
from rdflib.namespace import OWL, RDF, RDFS, XSD, Namespace, NamespaceManager
from rdflib.term import BNode, Identifier, Literal, URIRef, Variable
from rdflib.util import first

logger = logging.getLogger(__name__)


"""
From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/384122

Python has the wonderful "in" operator and it would be nice to have additional
infix operator like this. This recipe shows how (almost) arbitrary infix
operators can be defined.

"""

__all__ = [
    "ACE_NS",
    "AllClasses",
    "AllDifferent",
    "AllProperties",
    "AnnotatableTerms",
    "BooleanClass",
    "CLASS_RELATIONS",
    "Callable",
    "CastClass",
    "Class",
    "ClassNamespaceFactory",
    "CommonNSBindings",
    "ComponentTerms",
    "DeepClassClear",
    "EnumeratedClass",
    "GetIdentifiedClasses",
    "Individual",
    "Infix",
    "MalformedClass",
    "MalformedClassError",
    "OWLRDFListProxy",
    "Ontology",
    "Property",
    "PropertyAbstractSyntax",
    "Restriction",
    "classOrIdentifier",
    "classOrTerm",
    "exactly",
    "generateQName",
    "manchesterSyntax",
    "max",
    "min",
    "nsBinds",
    "only",
    "propertyOrIdentifier",
    "some",
    "value",
]

# definition of an Infix operator class
# this recipe also works in jython
# calling sequence for the infix is:
#  x @ op @ y


[docs]class Infix:
[docs] def __init__(self, function): self.function = function
[docs] def __rlshift__(self, other): return Infix(lambda x, self=self, other=other: self.function(other, x))
[docs] def __rshift__(self, other): return self.function(other)
[docs] def __rmatmul__(self, other): return Infix(lambda x, self=self, other=other: self.function(other, x))
[docs] def __matmul__(self, other): return self.function(other)
[docs] def __call__(self, value1, value2): return self.function(value1, value2) # pragma: no cover
nsBinds = { # noqa: N816 "skos": "http://www.w3.org/2004/02/skos/core#", "rdf": RDF, "rdfs": RDFS, "owl": OWL, "list": URIRef("http://www.w3.org/2000/10/swap/list#"), "dc": "http://purl.org/dc/elements/1.1/", }
[docs]def generateQName(graph, uri): # noqa: N802 prefix, uri, localname = graph.compute_qname(classOrIdentifier(uri)) return ":".join([prefix, localname])
[docs]def classOrTerm(thing): # noqa: N802 if isinstance(thing, Class): return thing.identifier else: assert isinstance(thing, (URIRef, BNode, Literal)) return thing
[docs]def classOrIdentifier(thing): # noqa: N802 if isinstance(thing, (Property, Class)): return thing.identifier else: assert isinstance(thing, (URIRef, BNode)), ( "Expecting a Class, Property, URIRef, or BNode.. not a %s" % thing ) return thing
[docs]def propertyOrIdentifier(thing): # noqa: N802 if isinstance(thing, Property): return thing.identifier else: assert isinstance(thing, URIRef) return thing
[docs]def manchesterSyntax( # noqa: N802 thing, store, boolean=None, transientList=False # noqa: N803 ): """ Core serialization thing is a Class and is processed as a subject store is an RDFLib Graph to be queried about thing """ assert thing is not None if boolean: if transientList: livechildren = iter(thing) children = [manchesterSyntax(child, store) for child in thing] else: livechildren = iter(Collection(store, thing)) children = [ manchesterSyntax(child, store) for child in Collection(store, thing) ] if boolean == OWL.intersectionOf: childlist = [] named = [] for child in livechildren: if isinstance(child, URIRef): named.append(child) else: childlist.append(child) if named: def castToQName(x): # noqa: N802 prefix, uri, localname = store.compute_qname(x) return ":".join([prefix, localname]) if len(named) > 1: prefix = "( " + " AND ".join(map(castToQName, named)) + " )" else: prefix = manchesterSyntax(named[0], store) if childlist: return ( str(prefix) + " THAT " + " AND ".join( [str(manchesterSyntax(x, store)) for x in childlist] ) ) else: return prefix else: return "( " + " AND ".join([str(c) for c in children]) + " )" elif boolean == OWL.unionOf: return "( " + " OR ".join([str(c) for c in children]) + " )" elif boolean == OWL.oneOf: return "{ " + " ".join([str(c) for c in children]) + " }" else: assert boolean == OWL.complementOf elif OWL.Restriction in store.objects(subject=thing, predicate=RDF.type): prop = list(store.objects(subject=thing, predicate=OWL.onProperty))[0] prefix, uri, localname = store.compute_qname(prop) propstring = ":".join([prefix, localname]) label = first(store.objects(subject=prop, predicate=RDFS.label)) if label: propstring = "'%s'" % label for onlyclass in store.objects(subject=thing, predicate=OWL.allValuesFrom): return "( %s ONLY %s )" % (propstring, manchesterSyntax(onlyclass, store)) for val in store.objects(subject=thing, predicate=OWL.hasValue): return "( %s VALUE %s )" % (propstring, manchesterSyntax(val, store)) for someclass in store.objects(subject=thing, predicate=OWL.someValuesFrom): return "( %s SOME %s )" % (propstring, manchesterSyntax(someclass, store)) cardlookup = { OWL.maxCardinality: "MAX", OWL.minCardinality: "MIN", OWL.cardinality: "EQUALS", } for _s, p, o in store.triples_choices((thing, list(cardlookup.keys()), None)): return "( %s %s %s )" % (propstring, cardlookup[p], o) # is thing a complement of anything compl = list(store.objects(subject=thing, predicate=OWL.complementOf)) if compl: return "( NOT %s )" % (manchesterSyntax(compl[0], store)) else: prolog = "\n".join(["PREFIX %s: <%s>" % (k, nsBinds[k]) for k in nsBinds]) qstr = ( prolog + "\nSELECT ?p ?bool WHERE {?class a owl:Class; ?p ?bool ." + "?bool rdf:first ?foo }" ) initb = {Variable("?class"): thing} for boolprop, col in store.query(qstr, processor="sparql", initBindings=initb): if not isinstance(thing, URIRef): return manchesterSyntax(col, store, boolean=boolprop) try: prefix, uri, localname = store.compute_qname(thing) qname = ":".join([prefix, localname]) except Exception: if isinstance(thing, BNode): return thing.n3() # Expect the unexpected return thing.identifier if not isinstance(thing, str) else thing label = first(Class(thing, graph=store).label) if label: return label else: return qname
[docs]def GetIdentifiedClasses(graph): # noqa: N802 for c in graph.subjects(predicate=RDF.type, object=OWL.Class): if isinstance(c, URIRef): yield Class(c)
class TermDeletionHelper: def __init__(self, prop): self.prop = prop def __call__(self, f): def _remover(inst): inst.graph.remove((inst.identifier, self.prop, None)) return _remover
[docs]class Individual: """ A typed individual, the base class of the InfixOWL classes. """ factoryGraph = Graph() # noqa: N815
[docs] def serialize(self, graph): for fact in self.factoryGraph.triples((self.identifier, None, None)): graph.add(fact)
[docs] def __init__(self, identifier=None, graph=None): self.__identifier = identifier is not None and identifier or BNode() if graph is None: self.graph = self.factoryGraph else: self.graph = graph self.qname = None if not isinstance(self.identifier, BNode): try: prefix, uri, localname = self.graph.compute_qname(self.identifier) self.qname = ":".join([prefix, localname]) except Exception: # pragma: no cover pass # pragma: no cover
[docs] def clearInDegree(self): # noqa: N802 """ Remove references to this individual as an object in the backing store. """ self.graph.remove((None, None, self.identifier))
[docs] def clearOutDegree(self): # noqa: N802 """ Remove all statements to this individual as a subject in the backing store. Note that this only removes the statements themselves, not the blank node closure so there is a chance that this will cause orphaned blank nodes to remain in the graph. """ self.graph.remove((self.identifier, None, None))
[docs] def delete(self): """ Delete the individual from the graph, clearing the in and out degrees. """ self.clearInDegree() self.clearOutDegree()
[docs] def replace(self, other): """ Replace the individual in the graph with the given other, causing all triples that refer to it to be changed and then delete the individual. >>> g = Graph() >>> b = Individual(OWL.Restriction, g) >>> b.type = RDFS.Resource >>> len(list(b.type)) 1 >>> del b.type >>> len(list(b.type)) 0 """ for s, p, _o in self.graph.triples((None, None, self.identifier)): self.graph.add((s, p, classOrIdentifier(other))) self.delete()
def _get_type(self) -> Iterable[_ObjectType]: for _t in self.graph.objects(subject=self.identifier, predicate=RDF.type): yield _t def _set_type(self, kind: Union[Individual, Identifier, Iterable[_ObjectType]]): if not kind: return if isinstance(kind, (Individual, Identifier)): self.graph.add((self.identifier, RDF.type, classOrIdentifier(kind))) else: for c in kind: assert isinstance(c, (Individual, Identifier)) self.graph.add((self.identifier, RDF.type, classOrIdentifier(c))) @TermDeletionHelper(RDF.type) def _delete_type(self): """ >>> g = Graph() >>> b = Individual(OWL.Restriction, g) >>> b.type = RDFS.Resource >>> len(list(b.type)) 1 >>> del b.type >>> len(list(b.type)) 0 """ pass # pragma: no cover type = property(_get_type, _set_type, _delete_type) def _get_identifier(self) -> Identifier: return self.__identifier def _set_identifier(self, i: Identifier): assert i if i != self.__identifier: oldstatements_out = [ (p, o) for s, p, o in self.graph.triples((self.__identifier, None, None)) ] oldstatements_in = [ (s, p) for s, p, o in self.graph.triples((None, None, self.__identifier)) ] for p1, o1 in oldstatements_out: self.graph.remove((self.__identifier, p1, o1)) for s1, p1 in oldstatements_in: self.graph.remove((s1, p1, self.__identifier)) self.__identifier = i self.graph.addN([(i, p1, o1, self.graph) for p1, o1 in oldstatements_out]) self.graph.addN([(s1, p1, i, self.graph) for s1, p1 in oldstatements_in]) if not isinstance(i, BNode): try: prefix, uri, localname = self.graph.compute_qname(i) self.qname = ":".join([prefix, localname]) except Exception: # pragma: no cover pass # pragma: no cover identifier = property(_get_identifier, _set_identifier) def _get_sameAs(self) -> Iterable[_ObjectType]: # noqa: N802 for _t in self.graph.objects(subject=self.identifier, predicate=OWL.sameAs): yield _t def _set_sameAs( # noqa: N802 self, term: Union[Individual, Identifier, Iterable[_ObjectType]] ): # if not kind: # return if isinstance(term, (Individual, Identifier)): self.graph.add((self.identifier, OWL.sameAs, classOrIdentifier(term))) else: for c in term: assert isinstance(c, (Individual, Identifier)) self.graph.add((self.identifier, OWL.sameAs, classOrIdentifier(c))) @TermDeletionHelper(OWL.sameAs) def _delete_sameAs(self): # noqa: N802 pass # pragma: no cover sameAs = property(_get_sameAs, _set_sameAs, _delete_sameAs) # noqa: N815
ACE_NS = Namespace("http://attempto.ifi.uzh.ch/ace_lexicon#")
[docs]class AnnotatableTerms(Individual): """ Terms in an OWL ontology with rdfs:label and rdfs:comment ## Interface with ATTEMPTO (http://attempto.ifi.uzh.ch/site) ### Verbalisation of OWL entity IRIS #### How are OWL entity IRIs verbalized? The OWL verbalizer maps OWL entity IRIs to ACE content words such that - OWL individuals map to ACE proper names (PN) - OWL classes map to ACE common nouns (CN) - OWL properties map to ACE transitive verbs (TV) There are 6 morphological categories that determine the surface form of an IRI: - singular form of a proper name (e.g. John) - singular form of a common noun (e.g. man) - plural form of a common noun (e.g. men) - singular form of a transitive verb (e.g. mans) - plural form of a transitive verb (e.g. man) - past participle form a transitive verb (e.g. manned) The user has full control over the eventual surface forms of the IRIs but has to choose them in terms of the above categories. Furthermore, - the surface forms must be legal ACE content words (e.g. they should not contain punctuation symbols); - the mapping of IRIs to surface forms must be bidirectional within the same word class, in order to be able to (if needed) parse the verbalization back into OWL in a semantics preserving way. ### Using the lexicon It is possible to specify the mapping of IRIs to surface forms using the following annotation properties: .. code-block:: none http://attempto.ifi.uzh.ch/ace_lexicon#PN_sg http://attempto.ifi.uzh.ch/ace_lexicon#CN_sg http://attempto.ifi.uzh.ch/ace_lexicon#CN_pl http://attempto.ifi.uzh.ch/ace_lexicon#TV_sg http://attempto.ifi.uzh.ch/ace_lexicon#TV_pl http://attempto.ifi.uzh.ch/ace_lexicon#TV_vbg For example, the following axioms state that if the IRI "#man" is used as a plural common noun, then the wordform men must be used by the verbalizer. If, however, it is used as a singular transitive verb, then mans must be used. .. code-block:: none <AnnotationAssertion> <AnnotationProperty IRI="http://attempto.ifi.uzh.ch/ace_lexicon#CN_pl"/> <IRI>#man</IRI> <Literal datatypeIRI="&xsd;string">men</Literal> </AnnotationAssertion> <AnnotationAssertion> <AnnotationProperty IRI="http://attempto.ifi.uzh.ch/ace_lexicon#TV_sg"/> <IRI>#man</IRI> <Literal datatypeIRI="&xsd;string">mans</Literal> </AnnotationAssertion> """
[docs] def __init__( self, identifier, graph=None, nameAnnotation=None, # noqa: N803 nameIsLabel=False, # noqa: N803 ): super(AnnotatableTerms, self).__init__(identifier, graph) if nameAnnotation: self.setupACEAnnotations() self.PN_sgprop.extent = [ (self.identifier, self.handleAnnotation(nameAnnotation)) ] if nameIsLabel: self.label = [nameAnnotation]
[docs] def handleAnnotation(self, val): # noqa: N802 return val if isinstance(val, Literal) else Literal(val)
[docs] def setupACEAnnotations(self): # noqa: N802 self.graph.bind("ace", ACE_NS, override=False) # PN_sg singular form of a proper name () self.PN_sgprop = Property( ACE_NS.PN_sg, baseType=OWL.AnnotationProperty, graph=self.graph ) # CN_sg singular form of a common noun self.CN_sgprop = Property( ACE_NS.CN_sg, baseType=OWL.AnnotationProperty, graph=self.graph ) # CN_pl plural form of a common noun self.CN_plprop = Property( ACE_NS.CN_pl, baseType=OWL.AnnotationProperty, graph=self.graph ) # singular form of a transitive verb self.tv_sgprop = Property( ACE_NS.TV_sg, baseType=OWL.AnnotationProperty, graph=self.graph ) # plural form of a transitive verb self.tv_plprop = Property( ACE_NS.TV_pl, baseType=OWL.AnnotationProperty, graph=self.graph ) # past participle form a transitive verb self.tv_vbgprop = Property( ACE_NS.TV_vbg, baseType=OWL.AnnotationProperty, graph=self.graph )
def _get_comment(self): for comment in self.graph.objects( subject=self.identifier, predicate=RDFS.comment ): yield comment def _set_comment(self, comment): if not comment: return if isinstance(comment, Identifier): self.graph.add((self.identifier, RDFS.comment, comment)) else: for c in comment: self.graph.add((self.identifier, RDFS.comment, c)) @TermDeletionHelper(RDFS.comment) def _del_comment(self): pass # pragma: no cover comment = property(_get_comment, _set_comment, _del_comment) def _get_seealso(self): for seealso in self.graph.objects( subject=self.identifier, predicate=RDFS.seeAlso ): yield seealso def _set_seealso(self, seealsos): if not seealsos: return for s in seealsos: self.graph.add((self.identifier, RDFS.seeAlso, s)) @TermDeletionHelper(RDFS.seeAlso) def _del_seealso(self): pass # pragma: no cover seeAlso = property(_get_seealso, _set_seealso, _del_seealso) # noqa: N815 def _get_label(self): for label in self.graph.objects(subject=self.identifier, predicate=RDFS.label): yield label def _set_label(self, label): if not label: return if isinstance(label, Identifier): self.graph.add((self.identifier, RDFS.label, label)) else: for l_ in label: self.graph.add((self.identifier, RDFS.label, l_)) @TermDeletionHelper(RDFS.label) def _delete_label(self): """ >>> g = Graph() >>> b = Individual(OWL.Restriction,g) >>> b.label = Literal('boo') >>> len(list(b.label)) 1 >>> del b.label >>> len(list(b.label)) 0 """ pass # pragma: no cover label = property(_get_label, _set_label, _delete_label)
[docs]class Ontology(AnnotatableTerms): """The owl ontology metadata"""
[docs] def __init__(self, identifier=None, imports=None, comment=None, graph=None): super(Ontology, self).__init__(identifier, graph) self.imports = [] if imports is None else imports self.comment = [] if comment is None else comment if (self.identifier, RDF.type, OWL.Ontology) not in self.graph: self.graph.add((self.identifier, RDF.type, OWL.Ontology))
[docs] def setVersion(self, version): # noqa: N802 self.graph.set((self.identifier, OWL.versionInfo, version))
def _get_imports(self): for owl in self.graph.objects( subject=self.identifier, predicate=OWL["imports"] ): yield owl def _set_imports(self, other): if not other: return for o in other: self.graph.add((self.identifier, OWL["imports"], o)) @TermDeletionHelper(OWL["imports"]) def _del_imports(self): pass # pragma: no cover imports = property(_get_imports, _set_imports, _del_imports)
[docs]def AllClasses(graph): # noqa: N802 for c in set(graph.subjects(predicate=RDF.type, object=OWL.Class)): yield Class(c)
[docs]def AllProperties(graph): # noqa: N802 prevprops = set() for s, _p, o in graph.triples_choices( ( None, RDF.type, [ OWL.SymmetricProperty, OWL.FunctionalProperty, OWL.InverseFunctionalProperty, OWL.TransitiveProperty, OWL.DatatypeProperty, OWL.ObjectProperty, OWL.AnnotationProperty, ], ) ): if o in [ OWL.SymmetricProperty, OWL.InverseFunctionalProperty, OWL.TransitiveProperty, OWL.ObjectProperty, ]: bType = OWL.ObjectProperty # noqa: N806 else: bType = OWL.DatatypeProperty # noqa: N806 if s not in prevprops: prevprops.add(s) yield Property(s, graph=graph, baseType=bType)
[docs]class ClassNamespaceFactory(Namespace):
[docs] def term(self, name): return Class(URIRef(self + name))
[docs] def __getitem__(self, key, default=None): return self.term(key)
[docs] def __getattr__(self, name): if name.startswith("__"): # ignore any special Python names! raise AttributeError else: return self.term(name)
CLASS_RELATIONS = set( Namespace("http://www.w3.org/2002/07/owl#resourceProperties") ).difference( [ OWL.onProperty, OWL.allValuesFrom, OWL.hasValue, OWL.someValuesFrom, OWL.inverseOf, OWL.imports, OWL.versionInfo, OWL.backwardCompatibleWith, OWL.incompatibleWith, OWL.unionOf, OWL.intersectionOf, OWL.oneOf, ] )
[docs]def ComponentTerms(cls): # noqa: N802 """ Takes a Class instance and returns a generator over the classes that are involved in its definition, ignoring unnamed classes """ if OWL.Restriction in cls.type: try: cls = CastClass(cls, Individual.factoryGraph) for _s, _p, inner_class_id in cls.factoryGraph.triples_choices( (cls.identifier, [OWL.allValuesFrom, OWL.someValuesFrom], None) ): inner_class = Class(inner_class_id, skipOWLClassMembership=True) if isinstance(inner_class_id, BNode): for _c in ComponentTerms(inner_class): yield _c else: yield inner_class except Exception: # pragma: no cover pass # pragma: no cover else: cls = CastClass(cls, Individual.factoryGraph) if isinstance(cls, BooleanClass): for _cls in cls: _cls = Class(_cls, skipOWLClassMembership=True) if isinstance(_cls.identifier, BNode): for _c in ComponentTerms(_cls): yield _c else: yield _cls else: for inner_class in cls.subClassOf: if isinstance(inner_class.identifier, BNode): for _c in ComponentTerms(inner_class): yield _c else: yield inner_class for _s, _p, o in cls.factoryGraph.triples_choices( (classOrIdentifier(cls), CLASS_RELATIONS, None) ): if isinstance(o, BNode): for _c in ComponentTerms(CastClass(o, Individual.factoryGraph)): yield _c else: yield inner_class
[docs]def DeepClassClear(class_to_prune): # noqa: N802 """ Recursively clear the given class, continuing where any related class is an anonymous class >>> EX = Namespace("http://example.com/") >>> g = Graph() >>> g.bind("ex", EX, override=False) >>> Individual.factoryGraph = g >>> classB = Class(EX.B) >>> classC = Class(EX.C) >>> classD = Class(EX.D) >>> classE = Class(EX.E) >>> classF = Class(EX.F) >>> anonClass = EX.someProp @ some @ classD >>> classF += anonClass >>> list(anonClass.subClassOf) [Class: ex:F ] >>> classA = classE | classF | anonClass >>> classB += classA >>> classA.equivalentClass = [Class()] >>> classB.subClassOf = [EX.someProp @ some @ classC] >>> classA ( ex:E OR ex:F OR ( ex:someProp SOME ex:D ) ) >>> DeepClassClear(classA) >>> classA ( ) >>> list(anonClass.subClassOf) [] >>> classB Class: ex:B SubClassOf: ( ex:someProp SOME ex:C ) >>> otherClass = classD | anonClass >>> otherClass ( ex:D OR ( ex:someProp SOME ex:D ) ) >>> DeepClassClear(otherClass) >>> otherClass ( ) >>> otherClass.delete() >>> list(g.triples((otherClass.identifier, None, None))) [] """ def deepClearIfBNode(_class): # noqa: N802 if isinstance(classOrIdentifier(_class), BNode): DeepClassClear(_class) class_to_prune = CastClass(class_to_prune, Individual.factoryGraph) for c in class_to_prune.subClassOf: deepClearIfBNode(c) class_to_prune.graph.remove((class_to_prune.identifier, RDFS.subClassOf, None)) for c in class_to_prune.equivalentClass: deepClearIfBNode(c) class_to_prune.graph.remove((class_to_prune.identifier, OWL.equivalentClass, None)) inverse_class = class_to_prune.complementOf if inverse_class: class_to_prune.graph.remove((class_to_prune.identifier, OWL.complementOf, None)) deepClearIfBNode(inverse_class) if isinstance(class_to_prune, BooleanClass): for c in class_to_prune: deepClearIfBNode(c) class_to_prune.clear() class_to_prune.graph.remove( (class_to_prune.identifier, class_to_prune._operator, None) )
[docs]class MalformedClass(ValueError): # noqa: N818 """ .. deprecated:: TODO-NEXT-VERSION This class will be removed in version ``7.0.0``. """ pass
[docs]class MalformedClassError(MalformedClass):
[docs] def __init__(self, msg): self.msg = msg
[docs] def __repr__(self): return self.msg
[docs]def CastClass(c, graph=None): # noqa: N802 graph = graph is None and c.factoryGraph or graph for kind in graph.objects(subject=classOrIdentifier(c), predicate=RDF.type): if kind == OWL.Restriction: kwargs = {"identifier": classOrIdentifier(c), "graph": graph} for _s, p, o in graph.triples((classOrIdentifier(c), None, None)): if p != RDF.type: if p == OWL.onProperty: kwargs["onProperty"] = o else: if p not in Restriction.restrictionKinds: continue kwargs[str(p.split(str(OWL))[-1])] = o if not set( [str(i.split(str(OWL))[-1]) for i in Restriction.restrictionKinds] ).intersection(kwargs): raise MalformedClassError("Malformed owl:Restriction") return Restriction(**kwargs) else: for _s, p, _o in graph.triples_choices( ( classOrIdentifier(c), [OWL.intersectionOf, OWL.unionOf, OWL.oneOf], None, ) ): if p == OWL.oneOf: return EnumeratedClass(classOrIdentifier(c), graph=graph) else: return BooleanClass(classOrIdentifier(c), operator=p, graph=graph) # assert (classOrIdentifier(c),RDF.type,OWL.Class) in graph return Class(classOrIdentifier(c), graph=graph, skipOWLClassMembership=True)
[docs]class Class(AnnotatableTerms): """ 'General form' for classes: The Manchester Syntax (supported in Protege) is used as the basis for the form of this class See: http://owl-workshop.man.ac.uk/acceptedLong/submission_9.pdf: [Annotation] ‘Class:’ classID {Annotation ( (‘SubClassOf:’ ClassExpression) | (‘EquivalentTo’ ClassExpression) | (’DisjointWith’ ClassExpression)) } Appropriate excerpts from OWL Reference: ".. Subclass axioms provide us with partial definitions: they represent necessary but not sufficient conditions for establishing class membership of an individual." ".. A class axiom may contain (multiple) owl:equivalentClass statements" "..A class axiom may also contain (multiple) owl:disjointWith statements.." "..An owl:complementOf property links a class to precisely one class description." """ def _serialize(self, graph): for cl in self.subClassOf: CastClass(cl, self.graph).serialize(graph) for cl in self.equivalentClass: CastClass(cl, self.graph).serialize(graph) for cl in self.disjointWith: CastClass(cl, self.graph).serialize(graph) if self.complementOf: CastClass(self.complementOf, self.graph).serialize(graph)
[docs] def serialize(self, graph): for fact in self.graph.triples((self.identifier, None, None)): graph.add(fact) self._serialize(graph)
[docs] def setupNounAnnotations(self, noun_annotations): # noqa: N802 if isinstance(noun_annotations, tuple): cn_sgprop, cn_plprop = noun_annotations else: cn_sgprop = noun_annotations cn_plprop = noun_annotations if cn_sgprop: self.CN_sgprop.extent = [ (self.identifier, self.handleAnnotation(cn_sgprop)) ] if cn_plprop: self.CN_plprop.extent = [ (self.identifier, self.handleAnnotation(cn_plprop)) ]
[docs] def __init__( self, identifier=None, subClassOf=None, # noqa: N803 equivalentClass=None, # noqa: N803 disjointWith=None, # noqa: N803 complementOf=None, # noqa: N803 graph=None, skipOWLClassMembership=False, # noqa: N803 comment=None, nounAnnotations=None, # noqa: N803 nameAnnotation=None, # noqa: N803 nameIsLabel=False, # noqa: N803 ): super(Class, self).__init__(identifier, graph, nameAnnotation, nameIsLabel) if nounAnnotations: self.setupNounAnnotations(nounAnnotations) if ( not skipOWLClassMembership and (self.identifier, RDF.type, OWL.Class) not in self.graph and (self.identifier, RDF.type, OWL.Restriction) not in self.graph ): self.graph.add((self.identifier, RDF.type, OWL.Class)) self.subClassOf = [] if subClassOf is None else subClassOf self.equivalentClass = [] if equivalentClass is None else equivalentClass self.disjointWith = [] if disjointWith is None else disjointWith if complementOf: self.complementOf = complementOf self.comment = [] if comment is None else comment
def _get_extent(self, graph=None): for member in (graph is None and self.graph or graph).subjects( predicate=RDF.type, object=self.identifier ): yield member def _set_extent(self, other): if not other: return for m in other: self.graph.add((classOrIdentifier(m), RDF.type, self.identifier)) @TermDeletionHelper(RDF.type) def _del_type(self): pass # pragma: no cover extent = property(_get_extent, _set_extent, _del_type) def _get_annotation(self, term=RDFS.label): for annotation in self.graph.objects(subject=self.identifier, predicate=term): yield annotation annotation = property(_get_annotation, lambda x: x) # type: ignore[arg-type,misc] def _get_extentquery(self): return (Variable("CLASS"), RDF.type, self.identifier) def _set_extentquery(self, other): pass # pragma: no cover extentQuery = property(_get_extentquery, _set_extentquery) # noqa: N815
[docs] def __hash__(self): """ >>> b = Class(OWL.Restriction) >>> c = Class(OWL.Restriction) >>> len(set([b,c])) 1 """ return hash(self.identifier)
[docs] def __eq__(self, other): assert isinstance(other, Class), repr(other) return self.identifier == other.identifier
[docs] def __iadd__(self, other): assert isinstance(other, Class) other.subClassOf = [self] return self
[docs] def __isub__(self, other): assert isinstance(other, Class) self.graph.remove((classOrIdentifier(other), RDFS.subClassOf, self.identifier)) return self
[docs] def __invert__(self): """ Shorthand for Manchester syntax's not operator """ return Class(complementOf=self)
[docs] def __or__(self, other): """ Construct an anonymous class description consisting of the union of this class and 'other' and return it """ return BooleanClass( operator=OWL.unionOf, members=[self, other], graph=self.graph )
[docs] def __and__(self, other): """ Construct an anonymous class description consisting of the intersection of this class and 'other' and return it Chaining 3 intersections >>> exNs = Namespace("http://example.com/") >>> g = Graph() >>> g.bind("ex", exNs, override=False) >>> female = Class(exNs.Female, graph=g) >>> human = Class(exNs.Human, graph=g) >>> youngPerson = Class(exNs.YoungPerson, graph=g) >>> youngWoman = female & human & youngPerson >>> youngWoman # doctest: +SKIP ex:YoungPerson THAT ( ex:Female AND ex:Human ) >>> isinstance(youngWoman, BooleanClass) True >>> isinstance(youngWoman.identifier, BNode) True """ return BooleanClass( operator=OWL.intersectionOf, members=[self, other], graph=self.graph )
def _get_subclassof(self): for anc in self.graph.objects( subject=self.identifier, predicate=RDFS.subClassOf ): yield Class(anc, graph=self.graph, skipOWLClassMembership=True) def _set_subclassof(self, other): if not other: return for sc in other: self.graph.add((self.identifier, RDFS.subClassOf, classOrIdentifier(sc))) @TermDeletionHelper(RDFS.subClassOf) def _del_subclassof(self): pass # pragma: no cover subClassOf = property( # noqa: N815 _get_subclassof, _set_subclassof, _del_subclassof ) def _get_equivalentclass(self): for ec in self.graph.objects( subject=self.identifier, predicate=OWL.equivalentClass ): yield Class(ec, graph=self.graph) def _set_equivalentclass(self, other): if not other: return for sc in other: self.graph.add( (self.identifier, OWL.equivalentClass, classOrIdentifier(sc)) ) @TermDeletionHelper(OWL.equivalentClass) def _del_equivalentclass(self): pass # pragma: no cover equivalentClass = property( # noqa: N815 _get_equivalentclass, _set_equivalentclass, _del_equivalentclass ) def _get_disjointwith(self): for dc in self.graph.objects( subject=self.identifier, predicate=OWL.disjointWith ): yield Class(dc, graph=self.graph) def _set_disjointwith(self, other): if not other: return for c in other: self.graph.add((self.identifier, OWL.disjointWith, classOrIdentifier(c))) @TermDeletionHelper(OWL.disjointWith) def _del_disjointwith(self): pass # pragma: no cover disjointWith = property( # noqa: N815 _get_disjointwith, _set_disjointwith, _del_disjointwith ) def _get_complementof(self): comp = list( self.graph.objects(subject=self.identifier, predicate=OWL.complementOf) ) if not comp: return None elif len(comp) == 1: return Class(comp[0], graph=self.graph) else: raise Exception(len(comp)) def _set_complementof(self, other): if not other: return self.graph.add((self.identifier, OWL.complementOf, classOrIdentifier(other))) @TermDeletionHelper(OWL.complementOf) def _del_complementof(self): pass # pragma: no cover complementOf = property( # noqa: N815 _get_complementof, _set_complementof, _del_complementof ) def _get_parents(self): """ computed attributes that returns a generator over taxonomic 'parents' by disjunction, conjunction, and subsumption >>> from rdflib.util import first >>> exNs = Namespace('http://example.com/') >>> g = Graph() >>> g.bind("ex", exNs, override=False) >>> Individual.factoryGraph = g >>> brother = Class(exNs.Brother) >>> sister = Class(exNs.Sister) >>> sibling = brother | sister >>> sibling.identifier = exNs.Sibling >>> sibling ( ex:Brother OR ex:Sister ) >>> first(brother.parents) Class: ex:Sibling EquivalentTo: ( ex:Brother OR ex:Sister ) >>> parent = Class(exNs.Parent) >>> male = Class(exNs.Male) >>> father = parent & male >>> father.identifier = exNs.Father >>> list(father.parents) [Class: ex:Parent , Class: ex:Male ] """ for parent in itertools.chain(self.subClassOf, self.equivalentClass): yield parent link = first(self.factoryGraph.subjects(RDF.first, self.identifier)) if link: siblingslist = list(self.factoryGraph.transitive_subjects(RDF.rest, link)) if siblingslist: collectionhead = siblingslist[-1] else: collectionhead = link for disjointclass in self.factoryGraph.subjects( OWL.unionOf, collectionhead ): if isinstance(disjointclass, URIRef): yield Class(disjointclass, skipOWLClassMembership=True) for rdf_list in self.factoryGraph.objects(self.identifier, OWL.intersectionOf): for member in OWLRDFListProxy([rdf_list], graph=self.factoryGraph): if isinstance(member, URIRef): yield Class(member, skipOWLClassMembership=True) parents = property(_get_parents)
[docs] def isPrimitive(self): # noqa: N802 if (self.identifier, RDF.type, OWL.Restriction) in self.graph: return False # sc = list(self.subClassOf) ec = list(self.equivalentClass) for _boolclass, p, rdf_list in self.graph.triples_choices( # type error: Argument 1 to "triples_choices" of "Graph" has incompatible type "Tuple[Any, List[URIRef], None]"; expected "Union[Tuple[List[Node], Node, Node], Tuple[Node, List[Node], Node], Tuple[Node, Node, List[Node]]]" (self.identifier, [OWL.intersectionOf, OWL.unionOf], None) # type: ignore[arg-type] ): ec.append(manchesterSyntax(rdf_list, self.graph, boolean=p)) for _e in ec: return False if self.complementOf: return False return True
[docs] def subSumpteeIds(self): # noqa: N802 for s in self.graph.subjects(predicate=RDFS.subClassOf, object=self.identifier): yield s
# def __iter__(self): # for s in self.graph.subjects( # predicate=RDFS.subClassOf,object=self.identifier): # yield Class(s,skipOWLClassMembership=True)
[docs] def __repr__(self, full=False, normalization=True): """ Returns the Manchester Syntax equivalent for this class """ exprs = [] sc = list(self.subClassOf) ec = list(self.equivalentClass) for _boolclass, p, rdf_list in self.graph.triples_choices( # type error: Argument 1 to "triples_choices" of "Graph" has incompatible type "Tuple[Any, List[URIRef], None]"; expected "Union[Tuple[List[Node], Node, Node], Tuple[Node, List[Node], Node], Tuple[Node, Node, List[Node]]]" (self.identifier, [OWL.intersectionOf, OWL.unionOf], None) # type: ignore[arg-type] ): ec.append(manchesterSyntax(rdf_list, self.graph, boolean=p)) dc = list(self.disjointWith) c = self.complementOf if c: dc.append(c) klasskind = "" label = list(self.graph.objects(self.identifier, RDFS.label)) # type error: Incompatible types in assignment (expression has type "str", variable has type "List[Node]") # type error: Unsupported operand types for + ("str" and "Node") label = label and "(" + label[0] + ")" or "" # type: ignore[assignment, operator] if sc: if full: scjoin = "\n " else: scjoin = ", " nec_statements = [ isinstance(s, Class) and isinstance(self.identifier, BNode) and repr(CastClass(s, self.graph)) or # repr(BooleanClass(classOrIdentifier(s), # operator=None, # graph=self.graph)) or manchesterSyntax(classOrIdentifier(s), self.graph) for s in sc ] if nec_statements: klasskind = "Primitive Type %s" % label exprs.append( "SubClassOf: %s" % scjoin.join([str(n) for n in nec_statements]) ) if full: exprs[-1] = "\n " + exprs[-1] if ec: nec_suff_statements = [ isinstance(s, str) and s or manchesterSyntax(classOrIdentifier(s), self.graph) for s in ec ] if nec_suff_statements: klasskind = "A Defined Class %s" % label exprs.append("EquivalentTo: %s" % ", ".join(nec_suff_statements)) if full: exprs[-1] = "\n " + exprs[-1] if dc: exprs.append( "DisjointWith %s\n" % "\n ".join( [manchesterSyntax(classOrIdentifier(s), self.graph) for s in dc] ) ) if full: exprs[-1] = "\n " + exprs[-1] descr = list(self.graph.objects(self.identifier, RDFS.comment)) if full and normalization: klassdescr = ( klasskind and "\n ## %s ##" % klasskind + (descr and "\n %s" % descr[0] or "") + " . ".join(exprs) or " . ".join(exprs) ) else: klassdescr = ( full and (descr and "\n %s" % descr[0] or "") or "" + " . ".join(exprs) ) return ( isinstance(self.identifier, BNode) and "Some Class " or "Class: %s " % self.qname ) + klassdescr
[docs]class OWLRDFListProxy:
[docs] def __init__(self, rdf_list, members=None, graph=None): if graph: self.graph = graph members = [] if members is None else members if rdf_list: self._rdfList = Collection(self.graph, rdf_list[0]) for member in members: if member not in self._rdfList: self._rdfList.append(classOrIdentifier(member)) else: self._rdfList = Collection( self.graph, BNode(), [classOrIdentifier(m) for m in members] ) # type error: "OWLRDFListProxy" has no attribute "identifier" # type error: "OWLRDFListProxy" has no attribute "_operator" self.graph.add((self.identifier, self._operator, self._rdfList.uri)) # type: ignore[attr-defined]
[docs] def __eq__(self, other): """ Equivalence of boolean class constructors is determined by equivalence of its members """ assert isinstance(other, Class), repr(other) + repr(type(other)) if isinstance(other, BooleanClass): length = len(self) if length != len(other): return False else: for idx in range(length): if self[idx] != other[idx]: return False return True else: # type error: "OWLRDFListProxy" has no attribute "identifier" return self.identifier == other.identifier # type: ignore[attr-defined]
# Redirect python list accessors to the underlying Collection instance
[docs] def __len__(self): return len(self._rdfList)
[docs] def index(self, item): return self._rdfList.index(classOrIdentifier(item))
[docs] def __getitem__(self, key): return self._rdfList[key]
[docs] def __setitem__(self, key, value): self._rdfList[key] = classOrIdentifier(value)
[docs] def __delitem__(self, key): del self._rdfList[key]
[docs] def clear(self): self._rdfList.clear()
[docs] def __iter__(self): for item in self._rdfList: yield item
[docs] def __contains__(self, item): for i in self._rdfList: if i == classOrIdentifier(item): return 1 return 0
[docs] def append(self, item): self._rdfList.append(item)
[docs] def __iadd__(self, other): self._rdfList.append(classOrIdentifier(other)) return self
[docs]class EnumeratedClass(OWLRDFListProxy, Class): """ Class for owl:oneOf forms: OWL Abstract Syntax is used axiom ::= 'EnumeratedClass(' classID ['Deprecated'] { annotation } { individualID } ')' >>> exNs = Namespace("http://example.com/") >>> g = Graph() >>> g.bind("ex", exNs, override=False) >>> Individual.factoryGraph = g >>> ogbujiBros = EnumeratedClass(exNs.ogbujicBros, ... members=[exNs.chime, ... exNs.uche, ... exNs.ejike]) >>> ogbujiBros # doctest: +SKIP { ex:chime ex:uche ex:ejike } >>> col = Collection(g, first( ... g.objects(predicate=OWL.oneOf, subject=ogbujiBros.identifier))) >>> sorted([g.qname(item) for item in col]) ['ex:chime', 'ex:ejike', 'ex:uche'] >>> print(g.serialize(format='n3')) # doctest: +SKIP @prefix ex: <http://example.com/> . @prefix owl: <http://www.w3.org/2002/07/owl#> . @prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . <BLANKLINE> ex:ogbujicBros a owl:Class; owl:oneOf ( ex:chime ex:uche ex:ejike ) . <BLANKLINE> <BLANKLINE> """ _operator = OWL.oneOf
[docs] def isPrimitive(self): # noqa: N802 return False
[docs] def __init__(self, identifier=None, members=None, graph=None): Class.__init__(self, identifier, graph=graph) members = [] if members is None else members rdfList = list( # noqa: N806 self.graph.objects(predicate=OWL.oneOf, subject=self.identifier) ) OWLRDFListProxy.__init__(self, rdfList, members)
[docs] def __repr__(self): """ Returns the Manchester Syntax equivalent for this class """ return manchesterSyntax(self._rdfList.uri, self.graph, boolean=self._operator)
[docs] def serialize(self, graph): clonedlist = Collection(graph, BNode()) for cl in self._rdfList: clonedlist.append(cl) CastClass(cl, self.graph).serialize(graph) graph.add((self.identifier, self._operator, clonedlist.uri)) for s, p, o in self.graph.triples((self.identifier, None, None)): if p != self._operator: graph.add((s, p, o)) self._serialize(graph)
BooleanPredicates = [OWL.intersectionOf, OWL.unionOf] class BooleanClassExtentHelper: """ >>> testGraph = Graph() >>> Individual.factoryGraph = testGraph >>> EX = Namespace("http://example.com/") >>> testGraph.bind("ex", EX, override=False) >>> fire = Class(EX.Fire) >>> water = Class(EX.Water) >>> testClass = BooleanClass(members=[fire, water]) >>> testClass2 = BooleanClass( ... operator=OWL.unionOf, members=[fire, water]) >>> for c in BooleanClass.getIntersections(): ... print(c) # doctest: +SKIP ( ex:Fire AND ex:Water ) >>> for c in BooleanClass.getUnions(): ... print(c) #doctest: +SKIP ( ex:Fire OR ex:Water ) """ def __init__(self, operator): self.operator = operator def __call__(self, f): def _getExtent(): # noqa: N802 for c in Individual.factoryGraph.subjects(self.operator): yield BooleanClass(c, operator=self.operator) return _getExtent
[docs]class Callable:
[docs] def __init__(self, anycallable): self._callfn = anycallable
[docs] def __call__(self, *args, **kwargs): return self._callfn(*args, **kwargs)
[docs]class BooleanClass(OWLRDFListProxy, Class): """ See: http://www.w3.org/TR/owl-ref/#Boolean owl:complementOf is an attribute of Class, however """
[docs] @BooleanClassExtentHelper(OWL.intersectionOf) @Callable def getIntersections(): # type: ignore[misc] # noqa: N802 pass # pragma: no cover
getIntersections = Callable(getIntersections) # noqa: N815
[docs] @BooleanClassExtentHelper(OWL.unionOf) @Callable def getUnions(): # type: ignore[misc] # noqa: N802 pass # pragma: no cover
getUnions = Callable(getUnions) # noqa: N815
[docs] def __init__( self, identifier=None, operator=OWL.intersectionOf, members=None, graph=None ): if operator is None: props = [] for _s, p, _o in graph.triples_choices( (identifier, [OWL.intersectionOf, OWL.unionOf], None) ): props.append(p) operator = p assert len(props) == 1, repr(props) Class.__init__(self, identifier, graph=graph) assert operator in [OWL.intersectionOf, OWL.unionOf], str(operator) self._operator = operator rdf_list = list(self.graph.objects(predicate=operator, subject=self.identifier)) assert ( not members or not rdf_list ), "This is a previous boolean class description." OWLRDFListProxy.__init__(self, rdf_list, members)
[docs] def copy(self): """ Create a copy of this class """ copy_of_class = BooleanClass( operator=self._operator, members=list(self), graph=self.graph ) return copy_of_class
[docs] def serialize(self, graph): clonedlist = Collection(graph, BNode()) for cl in self._rdfList: clonedlist.append(cl) CastClass(cl, self.graph).serialize(graph) graph.add((self.identifier, self._operator, clonedlist.uri)) for s, p, o in self.graph.triples((self.identifier, None, None)): if p != self._operator: graph.add((s, p, o)) self._serialize(graph)
[docs] def isPrimitive(self): # noqa: N802 return False
[docs] def changeOperator(self, newOperator): # noqa: N802, N803 """ Converts a unionOf / intersectionOf class expression into one that instead uses the given operator >>> testGraph = Graph() >>> Individual.factoryGraph = testGraph >>> EX = Namespace("http://example.com/") >>> testGraph.bind("ex", EX, override=False) >>> fire = Class(EX.Fire) >>> water = Class(EX.Water) >>> testClass = BooleanClass(members=[fire,water]) >>> testClass ( ex:Fire AND ex:Water ) >>> testClass.changeOperator(OWL.unionOf) >>> testClass ( ex:Fire OR ex:Water ) >>> try: ... testClass.changeOperator(OWL.unionOf) ... except Exception as e: ... print(e) # doctest: +SKIP The new operator is already being used! """ assert newOperator != self._operator, "The new operator is already being used!" self.graph.remove((self.identifier, self._operator, self._rdfList.uri)) self.graph.add((self.identifier, newOperator, self._rdfList.uri)) self._operator = newOperator
[docs] def __repr__(self): """ Returns the Manchester Syntax equivalent for this class """ return manchesterSyntax( self._rdfList.uri if isinstance(self._rdfList, Collection) else BNode(), self.graph, boolean=self._operator, )
[docs] def __or__(self, other): """ Adds other to the list and returns self """ assert self._operator == OWL.unionOf self._rdfList.append(classOrIdentifier(other)) return self
[docs]def AllDifferent(members): # noqa: N802 """ TODO: implement this function DisjointClasses(' description description { description } ')' """ pass # pragma: no cover
[docs]class Restriction(Class): """ restriction ::= 'restriction(' datavaluedPropertyID dataRestrictionComponent { dataRestrictionComponent } ')' | 'restriction(' individualvaluedPropertyID individualRestrictionComponent { individualRestrictionComponent } ')' """ restrictionKinds = [ # noqa: N815 OWL.allValuesFrom, OWL.someValuesFrom, OWL.hasValue, OWL.cardinality, OWL.maxCardinality, OWL.minCardinality, ]
[docs] def __init__( self, onProperty, # noqa: N803 graph=None, allValuesFrom=None, # noqa: N803 someValuesFrom=None, # noqa: N803 value=None, cardinality=None, maxCardinality=None, # noqa: N803 minCardinality=None, # noqa: N803 identifier=None, ): graph = Graph() if graph is None else graph super(Restriction, self).__init__( identifier, graph=graph, skipOWLClassMembership=True ) if ( self.identifier, OWL.onProperty, propertyOrIdentifier(onProperty), ) not in graph: graph.add( (self.identifier, OWL.onProperty, propertyOrIdentifier(onProperty)) ) self.onProperty = onProperty restr_types = [ (allValuesFrom, OWL.allValuesFrom), (someValuesFrom, OWL.someValuesFrom), (value, OWL.hasValue), (cardinality, OWL.cardinality), (maxCardinality, OWL.maxCardinality), (minCardinality, OWL.minCardinality), ] valid_restr_props = [(i, oterm) for (i, oterm) in restr_types if i is not None] if not len(valid_restr_props): raise ValueError( "Missing value. One of: allValuesFrom, someValuesFrom," "value, cardinality, maxCardinality or minCardinality" "must have a value." ) restriction_range, restriction_type = valid_restr_props.pop() self.restrictionType = restriction_type if isinstance(restriction_range, Identifier): self.restrictionRange = restriction_range elif isinstance(restriction_range, Class): self.restrictionRange = classOrIdentifier(restriction_range) else: # error: Incompatible types in assignment (expression has type "Optional[Identifier]", variable has type "Identifier") self.restrictionRange = first( # type: ignore[assignment] # type error: Argument 1 to "first" has incompatible type "Generator[Node, None, None]"; expected "Iterable[Identifier]" self.graph.objects(self.identifier, restriction_type) # type: ignore[arg-type] ) if ( self.identifier, restriction_type, self.restrictionRange, ) not in self.graph: self.graph.add((self.identifier, restriction_type, self.restrictionRange)) assert self.restrictionRange is not None, Class(self.identifier) if (self.identifier, RDF.type, OWL.Restriction) not in self.graph: self.graph.add((self.identifier, RDF.type, OWL.Restriction)) self.graph.remove((self.identifier, RDF.type, OWL.Class))
[docs] def serialize(self, graph): """ >>> g1 = Graph() >>> g2 = Graph() >>> EX = Namespace("http://example.com/") >>> g1.bind("ex", EX, override=False) >>> g2.bind("ex", EX, override=False) >>> Individual.factoryGraph = g1 >>> prop = Property(EX.someProp, baseType=OWL.DatatypeProperty) >>> restr1 = (Property( ... EX.someProp, ... baseType=OWL.DatatypeProperty)) @ some @ (Class(EX.Foo)) >>> restr1 # doctest: +SKIP ( ex:someProp SOME ex:Foo ) >>> restr1.serialize(g2) >>> Individual.factoryGraph = g2 >>> list(Property( ... EX.someProp,baseType=None).type ... ) #doctest: +NORMALIZE_WHITESPACE +SKIP [rdflib.term.URIRef( 'http://www.w3.org/2002/07/owl#DatatypeProperty')] """ Property(self.onProperty, graph=self.graph, baseType=None).serialize(graph) for s, p, o in self.graph.triples((self.identifier, None, None)): graph.add((s, p, o)) if p in [OWL.allValuesFrom, OWL.someValuesFrom]: CastClass(o, self.graph).serialize(graph)
[docs] def isPrimitive(self): # noqa: N802 return False
[docs] def __hash__(self): return hash((self.onProperty, self.restrictionRange))
[docs] def __eq__(self, other): """ Equivalence of restrictions is determined by equivalence of the property in question and the restriction 'range' """ assert isinstance(other, Class), repr(other) + repr(type(other)) if isinstance(other, Restriction): return ( other.onProperty == self.onProperty # type error: "Restriction" has no attribute "restriction_range"; maybe "restrictionRange"? and other.restriction_range == self.restrictionRange # type: ignore[attr-defined] ) else: return False
def _get_onproperty(self): return list( self.graph.objects(subject=self.identifier, predicate=OWL.onProperty) )[0] def _set_onproperty(self, prop): if not prop: return triple = (self.identifier, OWL.onProperty, propertyOrIdentifier(prop)) if triple in self.graph: return else: self.graph.set(triple) @TermDeletionHelper(OWL.onProperty) def _del_onproperty(self): pass # pragma: no cover onProperty = property( # noqa: N815 _get_onproperty, _set_onproperty, _del_onproperty ) def _get_allvaluesfrom(self): for i in self.graph.objects( subject=self.identifier, predicate=OWL.allValuesFrom ): return Class(i, graph=self.graph) return None def _set_allvaluesfrom(self, other): if not other: return triple = (self.identifier, OWL.allValuesFrom, classOrIdentifier(other)) if triple in self.graph: return else: self.graph.set(triple) @TermDeletionHelper(OWL.allValuesFrom) def _del_allvaluesfrom(self): pass # pragma: no cover allValuesFrom = property( # noqa: N815 _get_allvaluesfrom, _set_allvaluesfrom, _del_allvaluesfrom ) def _get_somevaluesfrom(self): for i in self.graph.objects( subject=self.identifier, predicate=OWL.someValuesFrom ): return Class(i, graph=self.graph) return None def _set_somevaluesfrom(self, other): if not other: return triple = (self.identifier, OWL.someValuesFrom, classOrIdentifier(other)) if triple in self.graph: return else: self.graph.set(triple) @TermDeletionHelper(OWL.someValuesFrom) def _del_somevaluesfrom(self): pass # pragma: no cover someValuesFrom = property( # noqa: N815 _get_somevaluesfrom, _set_somevaluesfrom, _del_somevaluesfrom ) def _get_hasvalue(self): for i in self.graph.objects(subject=self.identifier, predicate=OWL.hasValue): return Class(i, graph=self.graph) return None def _set_hasvalue(self, other): if not other: return triple = (self.identifier, OWL.hasValue, classOrIdentifier(other)) if triple in self.graph: return else: self.graph.set(triple) @TermDeletionHelper(OWL.hasValue) def _del_hasvalue(self): pass # pragma: no cover hasValue = property(_get_hasvalue, _set_hasvalue, _del_hasvalue) # noqa: N815 def _get_cardinality(self): for i in self.graph.objects(subject=self.identifier, predicate=OWL.cardinality): return Class(i, graph=self.graph) return None def _set_cardinality(self, other): if not other: return triple = (self.identifier, OWL.cardinality, classOrTerm(other)) if triple in self.graph: return else: self.graph.set(triple) @TermDeletionHelper(OWL.cardinality) def _del_cardinality(self): pass # pragma: no cover cardinality = property(_get_cardinality, _set_cardinality, _del_cardinality) def _get_maxcardinality(self): for i in self.graph.objects( subject=self.identifier, predicate=OWL.maxCardinality ): return Class(i, graph=self.graph) return None def _set_maxcardinality(self, other): if not other: return triple = (self.identifier, OWL.maxCardinality, classOrTerm(other)) if triple in self.graph: return else: self.graph.set(triple) @TermDeletionHelper(OWL.maxCardinality) def _del_maxcardinality(self): pass # pragma: no cover maxCardinality = property( # noqa: N815 _get_maxcardinality, _set_maxcardinality, _del_maxcardinality ) def _get_mincardinality(self): for i in self.graph.objects( subject=self.identifier, predicate=OWL.minCardinality ): return Class(i, graph=self.graph) return None def _set_mincardinality(self, other): if not other: return triple = (self.identifier, OWL.minCardinality, classOrIdentifier(other)) if triple in self.graph: return else: self.graph.set(triple) @TermDeletionHelper(OWL.minCardinality) def _del_mincardinality(self): pass # pragma: no cover minCardinality = property( # noqa: N815 _get_mincardinality, _set_mincardinality, _del_mincardinality )
[docs] def restrictionKind(self): # noqa: N802 for s, p, o in self.graph.triples_choices( # type error: Argument 1 to "triples_choices" of "Graph" has incompatible type "Tuple[Any, List[URIRef], None]"; expected "Union[Tuple[List[Node], Node, Node], Tuple[Node, List[Node], Node], Tuple[Node, Node, List[Node]]]" (self.identifier, self.restrictionKinds, None) # type: ignore[arg-type] ): # type error: "Node" has no attribute "split" return p.split(str(OWL))[-1] # type: ignore[attr-defined] return None
[docs] def __repr__(self): """ Returns the Manchester Syntax equivalent for this restriction """ return manchesterSyntax(self.identifier, self.graph)
# Infix Operators # some = Infix( lambda prop, _class: Restriction(prop, graph=_class.graph, someValuesFrom=_class) ) only = Infix( lambda prop, _class: Restriction(prop, graph=_class.graph, allValuesFrom=_class) ) max = Infix( lambda prop, _class: Restriction(prop, graph=prop.graph, maxCardinality=_class) ) min = Infix( lambda prop, _class: Restriction(prop, graph=prop.graph, minCardinality=_class) ) exactly = Infix( lambda prop, _class: Restriction(prop, graph=prop.graph, cardinality=_class) ) value = Infix(lambda prop, _class: Restriction(prop, graph=prop.graph, value=_class)) # Unused PropertyAbstractSyntax = """ %s( %s { %s } %s { 'super(' datavaluedPropertyID ')'} ['Functional'] { domain( %s ) } { range( %s ) } )"""
[docs]class Property(AnnotatableTerms): """ axiom ::= 'DatatypeProperty(' datavaluedPropertyID ['Deprecated'] { annotation } { 'super(' datavaluedPropertyID ')'} ['Functional'] { 'domain(' description ')' } { 'range(' dataRange ')' } ')' | 'ObjectProperty(' individualvaluedPropertyID ['Deprecated'] { annotation } { 'super(' individualvaluedPropertyID ')' } [ 'inverseOf(' individualvaluedPropertyID ')' ] [ 'Symmetric' ] [ 'Functional' | 'InverseFunctional' | 'Functional' 'InverseFunctional' | 'Transitive' ] { 'domain(' description ')' } { 'range(' description ')' } ') """
[docs] def setupVerbAnnotations(self, verb_annotations): # noqa: N802 """ OWL properties map to ACE transitive verbs (TV) There are 6 morphological categories that determine the surface form of an IRI: singular form of a transitive verb (e.g. mans) plural form of a transitive verb (e.g. man) past participle form a transitive verb (e.g. manned) http://attempto.ifi.uzh.ch/ace_lexicon#TV_sg http://attempto.ifi.uzh.ch/ace_lexicon#TV_pl http://attempto.ifi.uzh.ch/ace_lexicon#TV_vbg """ if isinstance(verb_annotations, tuple): tv_sgprop, tv_plprop, tv_vbg = verb_annotations else: tv_sgprop = verb_annotations tv_plprop = verb_annotations tv_vbg = verb_annotations if tv_sgprop: self.tv_sgprop.extent = [ (self.identifier, self.handleAnnotation(tv_sgprop)) ] if tv_plprop: self.tv_plprop.extent = [ (self.identifier, self.handleAnnotation(tv_plprop)) ] if tv_vbg: self.tv_vbgprop.extent = [(self.identifier, self.handleAnnotation(tv_vbg))]
[docs] def __init__( self, identifier=None, graph=None, baseType=OWL.ObjectProperty, # noqa: N803 subPropertyOf=None, # noqa: N803 domain=None, range=None, inverseOf=None, # noqa: N803 otherType=None, # noqa: N803 equivalentProperty=None, # noqa: N803 comment=None, verbAnnotations=None, # noqa: N803 nameAnnotation=None, # noqa: N803 nameIsLabel=False, # noqa: N803 ): super(Property, self).__init__(identifier, graph, nameAnnotation, nameIsLabel) if verbAnnotations: self.setupVerbAnnotations(verbAnnotations) assert not isinstance(self.identifier, BNode) if baseType is None: # None give, determine via introspection self._baseType = first(Individual(self.identifier, graph=self.graph).type) else: if (self.identifier, RDF.type, baseType) not in self.graph: self.graph.add((self.identifier, RDF.type, baseType)) self._baseType = baseType self.subPropertyOf = subPropertyOf self.inverseOf = inverseOf self.domain = domain self.range = range self.comment = [] if comment is None else comment
[docs] def serialize(self, graph): for fact in self.graph.triples((self.identifier, None, None)): graph.add(fact) for p in itertools.chain(self.subPropertyOf, self.inverseOf): p.serialize(graph) for c in itertools.chain(self.domain, self.range): CastClass(c, self.graph).serialize(graph)
def _get_extent(self, graph=None): for triple in (graph is None and self.graph or graph).triples( (None, self.identifier, None) ): yield triple def _set_extent(self, other): if not other: return for subj, obj in other: self.graph.add((subj, self.identifier, obj)) extent = property(_get_extent, _set_extent)
[docs] def __repr__(self): rt = [] if OWL.ObjectProperty in self.type: rt.append( "ObjectProperty( %s annotation(%s)" % (self.qname, first(self.comment) and first(self.comment) or "") ) if first(self.inverseOf): # type error: Item "None" of "Optional[Any]" has no attribute "inverseOf" two_link_inverse = first(first(self.inverseOf).inverseOf) # type: ignore[union-attr] if two_link_inverse and two_link_inverse.identifier == self.identifier: # type error: Item "None" of "Optional[Any]" has no attribute "qname" inverserepr = first(self.inverseOf).qname # type: ignore[union-attr] else: inverserepr = repr(first(self.inverseOf)) rt.append( " inverseOf( %s )%s" % ( inverserepr, OWL.SymmetricProperty in self.type and " Symmetric" or "", ) ) for _s, _p, roletype in self.graph.triples_choices( # type error: Argument 1 to "triples_choices" of "Graph" has incompatible type "Tuple[Any, URIRef, List[URIRef]]"; expected "Union[Tuple[List[Node], Node, Node], Tuple[Node, List[Node], Node], Tuple[Node, Node, List[Node]]]" ( # type: ignore[arg-type] self.identifier, RDF.type, [ OWL.FunctionalProperty, OWL.InverseFunctionalProperty, OWL.TransitiveProperty, ], ) ): # type error: "Node" has no attribute "split" rt.append(str(roletype.split(str(OWL))[-1])) # type: ignore[attr-defined] else: rt.append( "DatatypeProperty( %s %s" % (self.qname, first(self.comment) and first(self.comment) or "") ) for _s, _p, roletype in self.graph.triples( (self.identifier, RDF.type, OWL.FunctionalProperty) ): rt.append(" Functional") def canonicalName(term, g): # noqa: N802 normalized_name = classOrIdentifier(term) if isinstance(normalized_name, BNode): return term elif normalized_name.startswith(XSD): return str(term) elif first( g.triples_choices( (normalized_name, [OWL.unionOf, OWL.intersectionOf], None) ) ): return repr(term) else: return str(term.qname) rt.append( " ".join( [ " super( %s )" % canonicalName(super_property, self.graph) for super_property in self.subPropertyOf ] ) ) rt.append( " ".join( [ " domain( %s )" % canonicalName(domain, self.graph) for domain in self.domain ] ) ) rt.append( " ".join( [ " range( %s )" % canonicalName(range, self.graph) for range in self.range ] ) ) # type error: Incompatible types in assignment (expression has type "str", variable has type "List[str]") rt = "\n".join([expr for expr in rt if expr]) # type: ignore[assignment] rt += "\n)" return rt
def _get_subpropertyof(self): for anc in self.graph.objects( subject=self.identifier, predicate=RDFS.subPropertyOf ): yield Property(anc, graph=self.graph, baseType=None) def _set_subpropertyof(self, other): if not other: return for subproperty in other: self.graph.add( (self.identifier, RDFS.subPropertyOf, classOrIdentifier(subproperty)) ) @TermDeletionHelper(RDFS.subPropertyOf) def _del_subpropertyof(self): pass # pragma: no cover subPropertyOf = property( # noqa: N815 _get_subpropertyof, _set_subpropertyof, _del_subpropertyof ) def _get_inverseof(self): for anc in self.graph.objects(subject=self.identifier, predicate=OWL.inverseOf): yield Property(anc, graph=self.graph, baseType=None) def _set_inverseof(self, other): if not other: return self.graph.add((self.identifier, OWL.inverseOf, classOrIdentifier(other))) @TermDeletionHelper(OWL.inverseOf) def _del_inverseof(self): pass # pragma: no cover inverseOf = property(_get_inverseof, _set_inverseof, _del_inverseof) # noqa: N815 def _get_domain(self): for dom in self.graph.objects(subject=self.identifier, predicate=RDFS.domain): yield Class(dom, graph=self.graph) def _set_domain(self, other): if not other: return if isinstance(other, (Individual, Identifier)): self.graph.add((self.identifier, RDFS.domain, classOrIdentifier(other))) else: for dom in other: self.graph.add((self.identifier, RDFS.domain, classOrIdentifier(dom))) @TermDeletionHelper(RDFS.domain) def _del_domain(self): pass # pragma: no cover domain = property(_get_domain, _set_domain, _del_domain) def _get_range(self): for ran in self.graph.objects(subject=self.identifier, predicate=RDFS.range): yield Class(ran, graph=self.graph) def _set_range(self, ranges): if not ranges: return if isinstance(ranges, (Individual, Identifier)): self.graph.add((self.identifier, RDFS.range, classOrIdentifier(ranges))) else: for range in ranges: self.graph.add((self.identifier, RDFS.range, classOrIdentifier(range))) @TermDeletionHelper(RDFS.range) def _del_range(self): pass # pragma: no cover range = property(_get_range, _set_range, _del_range)
[docs] def replace(self, other): # extension = [] for s, _p, o in self.extent: self.graph.add((s, propertyOrIdentifier(other), o)) self.graph.remove((None, self.identifier, None))
[docs]def CommonNSBindings(graph, additionalNS=None): # noqa: N802, N803 """ Takes a graph and binds the common namespaces (rdf,rdfs, & owl) """ additional_ns = {} if additionalNS is None else additionalNS namespace_manager = NamespaceManager(graph) namespace_manager.bind("rdfs", RDFS) namespace_manager.bind("rdf", RDF) namespace_manager.bind("owl", OWL) for prefix, uri in list(additional_ns.items()): namespace_manager.bind(prefix, uri, override=False) graph.namespace_manager = namespace_manager