#!/usr/bin/env python
# -*- coding: utf-8 -*-
__doc__ = """RDFLib Python binding for OWL Abstract Syntax
see: http://www.w3.org/TR/owl-semantics/syntax.html
http://owl-workshop.man.ac.uk/acceptedLong/submission_9.pdf
3.2.3 Axioms for complete classes without using owl:equivalentClass
Named class description of type 2 (with owl:oneOf) or type 4-6
(with owl:intersectionOf, owl:unionOf or owl:complementOf
Uses Manchester Syntax for __repr__
>>> exNs = Namespace('http://example.com/')
>>> namespace_manager = NamespaceManager(Graph())
>>> namespace_manager.bind('ex', exNs, override=False)
>>> namespace_manager.bind('owl', OWL, override=False)
>>> g = Graph()
>>> g.namespace_manager = namespace_manager
Now we have an empty graph, we can construct OWL classes in it
using the Python classes defined in this module
>>> a = Class(exNs.Opera, graph=g)
Now we can assert rdfs:subClassOf and owl:equivalentClass relationships
(in the underlying graph) with other classes using the 'subClassOf'
and 'equivalentClass' descriptors which can be set to a list
of objects for the corresponding predicates.
>>> a.subClassOf = [exNs.MusicalWork]
We can then access the rdfs:subClassOf relationships
>>> print(list(a.subClassOf))
[Class: ex:MusicalWork ]
This can also be used against already populated graphs:
>>> owlGraph = Graph().parse(str(OWL))
>>> namespace_manager.bind('owl', OWL, override=False)
>>> owlGraph.namespace_manager = namespace_manager
>>> list(Class(OWL.Class, graph=owlGraph).subClassOf)
[Class: rdfs:Class ]
Operators are also available. For instance we can add ex:Opera to the extension
of the ex:CreativeWork class via the '+=' operator
>>> a
Class: ex:Opera SubClassOf: ex:MusicalWork
>>> b = Class(exNs.CreativeWork, graph=g)
>>> b += a
>>> print(sorted(a.subClassOf, key=lambda c:c.identifier))
[Class: ex:CreativeWork , Class: ex:MusicalWork ]
And we can then remove it from the extension as well
>>> b -= a
>>> a
Class: ex:Opera SubClassOf: ex:MusicalWork
Boolean class constructions can also be created with Python operators.
For example, The | operator can be used to construct a class consisting of a
owl:unionOf the operands:
>>> c = a | b | Class(exNs.Work, graph=g)
>>> c
( ex:Opera OR ex:CreativeWork OR ex:Work )
Boolean class expressions can also be operated as lists (using python list
operators)
>>> del c[c.index(Class(exNs.Work, graph=g))]
>>> c
( ex:Opera OR ex:CreativeWork )
The '&' operator can be used to construct class intersection:
>>> woman = Class(exNs.Female, graph=g) & Class(exNs.Human, graph=g)
>>> woman.identifier = exNs.Woman
>>> woman
( ex:Female AND ex:Human )
>>> len(woman)
2
Enumerated classes can also be manipulated
>>> contList = [Class(exNs.Africa, graph=g), Class(exNs.NorthAmerica, graph=g)]
>>> EnumeratedClass(members=contList, graph=g)
{ ex:Africa ex:NorthAmerica }
owl:Restrictions can also be instantiated:
>>> Restriction(exNs.hasParent, graph=g, allValuesFrom=exNs.Human)
( ex:hasParent ONLY ex:Human )
Restrictions can also be created using Manchester OWL syntax in 'colloquial'
Python
>>> exNs.hasParent << some >> Class(exNs.Physician, graph=g)
( ex:hasParent SOME ex:Physician )
>>> Property(exNs.hasParent, graph=g) << max >> Literal(1)
( ex:hasParent MAX 1 )
>>> print(g.serialize(format='pretty-xml')) #doctest: +SKIP
"""
import itertools
import logging
from rdflib import RDF, RDFS, BNode, Literal, Namespace, URIRef, Variable
from rdflib.collection import Collection
from rdflib.graph import Graph
from rdflib.namespace import OWL, XSD, NamespaceManager
from rdflib.term import Identifier
from rdflib.util import first
logger = logging.getLogger(__name__)
"""
From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/384122
Python has the wonderful "in" operator and it would be nice to have additional
infix operator like this. This recipe shows how (almost) arbitrary infix
operators can be defined.
"""
__all__ = [
"nsBinds",
"ACE_NS",
"CLASS_RELATIONS",
"some",
"only",
"max",
"min",
"exactly",
"value",
"PropertyAbstractSyntax",
"AllClasses",
"AllDifferent",
"AllProperties",
"AnnotatableTerms",
"BooleanClass",
"Callable",
"CastClass",
"Class",
"ClassNamespaceFactory",
"classOrIdentifier",
"classOrTerm",
"CommonNSBindings",
"ComponentTerms",
"DeepClassClear",
"EnumeratedClass",
"generateQName",
"GetIdentifiedClasses",
"Individual",
"Infix",
"MalformedClass",
"manchesterSyntax",
"Ontology",
"OWLRDFListProxy",
"Property",
"propertyOrIdentifier",
"Restriction",
"termDeletionDecorator",
]
# definition of an Infix operator class
# this recipe also works in jython
# calling sequence for the infix is either:
# x |op| y
# or:
# x <<op>> y
[docs]class Infix:
[docs] def __init__(self, function):
self.function = function
[docs] def __ror__(self, other):
return Infix(lambda x, self=self, other=other: self.function(other, x))
[docs] def __or__(self, other):
return self.function(other)
[docs] def __rlshift__(self, other):
return Infix(lambda x, self=self, other=other: self.function(other, x))
[docs] def __rshift__(self, other):
return self.function(other)
[docs] def __rmatmul__(self, other):
return Infix(lambda x, self=self, other=other: self.function(other, x))
[docs] def __matmul__(self, other):
return self.function(other)
[docs] def __call__(self, value1, value2):
return self.function(value1, value2) # pragma: no cover
nsBinds = { # noqa: N816
"skos": "http://www.w3.org/2004/02/skos/core#",
"rdf": RDF,
"rdfs": RDFS,
"owl": OWL,
"list": URIRef("http://www.w3.org/2000/10/swap/list#"),
"dc": "http://purl.org/dc/elements/1.1/",
}
[docs]def generateQName(graph, uri): # noqa: N802
prefix, uri, localname = graph.compute_qname(classOrIdentifier(uri))
return ":".join([prefix, localname])
[docs]def classOrTerm(thing): # noqa: N802
if isinstance(thing, Class):
return thing.identifier
else:
assert isinstance(thing, (URIRef, BNode, Literal))
return thing
[docs]def classOrIdentifier(thing): # noqa: N802
if isinstance(thing, (Property, Class)):
return thing.identifier
else:
assert isinstance(thing, (URIRef, BNode)), (
"Expecting a Class, Property, URIRef, or BNode.. not a %s" % thing
)
return thing
[docs]def propertyOrIdentifier(thing): # noqa: N802
if isinstance(thing, Property):
return thing.identifier
else:
assert isinstance(thing, URIRef)
return thing
[docs]def manchesterSyntax( # noqa: N802
thing, store, boolean=None, transientList=False # noqa: N803
):
"""
Core serialization
"""
assert thing is not None
if boolean:
if transientList:
livechildren = iter(thing)
children = [manchesterSyntax(child, store) for child in thing]
else:
livechildren = iter(Collection(store, thing))
children = [
manchesterSyntax(child, store) for child in Collection(store, thing)
]
if boolean == OWL.intersectionOf:
childlist = []
named = []
for child in livechildren:
if isinstance(child, URIRef):
named.append(child)
else:
childlist.append(child)
if named:
def castToQName(x): # noqa: N802
prefix, uri, localname = store.compute_qname(x)
return ":".join([prefix, localname])
if len(named) > 1:
prefix = "( " + " AND ".join(map(castToQName, named)) + " )"
else:
prefix = manchesterSyntax(named[0], store)
if childlist:
return (
str(prefix)
+ " THAT "
+ " AND ".join(
[str(manchesterSyntax(x, store)) for x in childlist]
)
)
else:
return prefix
else:
return "( " + " AND ".join([str(c) for c in children]) + " )"
elif boolean == OWL.unionOf:
return "( " + " OR ".join([str(c) for c in children]) + " )"
elif boolean == OWL.oneOf:
return "{ " + " ".join([str(c) for c in children]) + " }"
else:
assert boolean == OWL.complementOf
elif OWL.Restriction in store.objects(subject=thing, predicate=RDF.type):
prop = list(store.objects(subject=thing, predicate=OWL.onProperty))[0]
prefix, uri, localname = store.compute_qname(prop)
propstring = ":".join([prefix, localname])
label = first(store.objects(subject=prop, predicate=RDFS.label))
if label:
propstring = "'%s'" % label
for onlyclass in store.objects(subject=thing, predicate=OWL.allValuesFrom):
return "( %s ONLY %s )" % (propstring, manchesterSyntax(onlyclass, store))
for val in store.objects(subject=thing, predicate=OWL.hasValue):
return "( %s VALUE %s )" % (propstring, manchesterSyntax(val, store))
for someclass in store.objects(subject=thing, predicate=OWL.someValuesFrom):
return "( %s SOME %s )" % (propstring, manchesterSyntax(someclass, store))
cardlookup = {
OWL.maxCardinality: "MAX",
OWL.minCardinality: "MIN",
OWL.cardinality: "EQUALS",
}
for s, p, o in store.triples_choices((thing, list(cardlookup.keys()), None)):
return "( %s %s %s )" % (propstring, cardlookup[p], o)
compl = list(store.objects(subject=thing, predicate=OWL.complementOf))
if compl:
return "( NOT %s )" % (manchesterSyntax(compl[0], store))
else:
prolog = "\n".join(["PREFIX %s: <%s>" % (k, nsBinds[k]) for k in nsBinds])
qstr = (
prolog
+ "\nSELECT ?p ?bool WHERE {?class a owl:Class; ?p ?bool ."
+ "?bool rdf:first ?foo }"
)
initb = {Variable("?class"): thing}
for boolprop, col in store.query(qstr, processor="sparql", initBindings=initb):
if not isinstance(thing, URIRef):
return manchesterSyntax(col, store, boolean=boolprop)
try:
prefix, uri, localname = store.compute_qname(thing)
qname = ":".join([prefix, localname])
except Exception:
if isinstance(thing, BNode):
return thing.n3()
return "<" + thing + ">"
label = first(Class(thing, graph=store).label)
if label:
return label
else:
return qname
[docs]def GetIdentifiedClasses(graph): # noqa: N802
for c in graph.subjects(predicate=RDF.type, object=OWL.Class):
if isinstance(c, URIRef):
yield Class(c)
[docs]def termDeletionDecorator(prop): # noqa: N802
def someFunc(func): # noqa: N802
func.property = prop
return func
return someFunc
class TermDeletionHelper:
def __init__(self, prop):
self.prop = prop
def __call__(self, f):
def _remover(inst):
inst.graph.remove((inst.identifier, self.prop, None))
return _remover
[docs]class Individual(object):
"""
A typed individual
"""
factoryGraph = Graph() # noqa: N815
[docs] def serialize(self, graph):
for fact in self.factoryGraph.triples((self.identifier, None, None)):
graph.add(fact)
[docs] def __init__(self, identifier=None, graph=None):
self.__identifier = identifier is not None and identifier or BNode()
if graph is None:
self.graph = self.factoryGraph
else:
self.graph = graph
self.qname = None
if not isinstance(self.identifier, BNode):
try:
prefix, uri, localname = self.graph.compute_qname(self.identifier)
self.qname = ":".join([prefix, localname])
except Exception: # pragma: no cover
pass # pragma: no cover
[docs] def clearInDegree(self): # noqa: N802
self.graph.remove((None, None, self.identifier))
[docs] def clearOutDegree(self): # noqa: N802
self.graph.remove((self.identifier, None, None))
[docs] def delete(self):
self.clearInDegree()
self.clearOutDegree()
[docs] def replace(self, other):
for s, p, o in self.graph.triples((None, None, self.identifier)):
self.graph.add((s, p, classOrIdentifier(other)))
self.delete()
def _get_type(self):
for _t in self.graph.objects(subject=self.identifier, predicate=RDF.type):
yield _t
def _set_type(self, kind):
if not kind:
return
if isinstance(kind, (Individual, Identifier)):
self.graph.add((self.identifier, RDF.type, classOrIdentifier(kind)))
else:
for c in kind:
assert isinstance(c, (Individual, Identifier))
self.graph.add((self.identifier, RDF.type, classOrIdentifier(c)))
@TermDeletionHelper(RDF.type)
def _delete_type(self):
"""
>>> g = Graph()
>>> b = Individual(OWL.Restriction, g)
>>> b.type = RDFS.Resource
>>> len(list(b.type))
1
>>> del b.type
>>> len(list(b.type))
0
"""
pass # pragma: no cover
type = property(_get_type, _set_type, _delete_type)
def _get_identifier(self):
return self.__identifier
def _set_identifier(self, i):
assert i
if i != self.__identifier:
oldstatements_out = [
(p, o)
for s, p, o in self.graph.triples((self.__identifier, None, None))
]
oldstatements_in = [
(s, p)
for s, p, o in self.graph.triples((None, None, self.__identifier))
]
for p1, o1 in oldstatements_out:
self.graph.remove((self.__identifier, p1, o1))
for s1, p1 in oldstatements_in:
self.graph.remove((s1, p1, self.__identifier))
self.__identifier = i
self.graph.addN([(i, p1, o1, self.graph) for p1, o1 in oldstatements_out])
self.graph.addN([(s1, p1, i, self.graph) for s1, p1 in oldstatements_in])
if not isinstance(i, BNode):
try:
prefix, uri, localname = self.graph.compute_qname(i)
self.qname = ":".join([prefix, localname])
except Exception: # pragma: no cover
pass # pragma: no cover
identifier = property(_get_identifier, _set_identifier)
def _get_sameAs(self): # noqa: N802
for _t in self.graph.objects(subject=self.identifier, predicate=OWL.sameAs):
yield _t
def _set_sameAs(self, term): # noqa: N802
# if not kind:
# return
if isinstance(term, (Individual, Identifier)):
self.graph.add((self.identifier, OWL.sameAs, classOrIdentifier(term)))
else:
for c in term:
assert isinstance(c, (Individual, Identifier))
self.graph.add((self.identifier, OWL.sameAs, classOrIdentifier(c)))
@TermDeletionHelper(OWL.sameAs)
def _delete_sameAs(self): # noqa: N802
pass # pragma: no cover
sameAs = property(_get_sameAs, _set_sameAs, _delete_sameAs) # noqa: N815
ACE_NS = Namespace("http://attempto.ifi.uzh.ch/ace_lexicon#")
[docs]class AnnotatableTerms(Individual):
"""
Terms in an OWL ontology with rdfs:label and rdfs:comment
"""
[docs] def __init__(
self,
identifier,
graph=None,
nameAnnotation=None, # noqa: N803
nameIsLabel=False, # noqa: N803
):
super(AnnotatableTerms, self).__init__(identifier, graph)
if nameAnnotation:
self.setupACEAnnotations()
self.PN_sgprop.extent = [
(self.identifier, self.handleAnnotation(nameAnnotation))
]
if nameIsLabel:
self.label = [nameAnnotation]
[docs] def handleAnnotation(self, val): # noqa: N802
return val if isinstance(val, Literal) else Literal(val)
[docs] def setupACEAnnotations(self): # noqa: N802
self.graph.bind("ace", ACE_NS, override=False)
# PN_sg singular form of a proper name ()
self.PN_sgprop = Property(
ACE_NS.PN_sg, baseType=OWL.AnnotationProperty, graph=self.graph
)
# CN_sg singular form of a common noun
self.CN_sgprop = Property(
ACE_NS.CN_sg, baseType=OWL.AnnotationProperty, graph=self.graph
)
# CN_pl plural form of a common noun
self.CN_plprop = Property(
ACE_NS.CN_pl, baseType=OWL.AnnotationProperty, graph=self.graph
)
# singular form of a transitive verb
self.tv_sgprop = Property(
ACE_NS.TV_sg, baseType=OWL.AnnotationProperty, graph=self.graph
)
# plural form of a transitive verb
self.tv_plprop = Property(
ACE_NS.TV_pl, baseType=OWL.AnnotationProperty, graph=self.graph
)
# past participle form a transitive verb
self.tv_vbgprop = Property(
ACE_NS.TV_vbg, baseType=OWL.AnnotationProperty, graph=self.graph
)
def _get_comment(self):
for comment in self.graph.objects(
subject=self.identifier, predicate=RDFS.comment
):
yield comment
def _set_comment(self, comment):
if not comment:
return
if isinstance(comment, Identifier):
self.graph.add((self.identifier, RDFS.comment, comment))
else:
for c in comment:
self.graph.add((self.identifier, RDFS.comment, c))
@TermDeletionHelper(RDFS.comment)
def _del_comment(self):
pass # pragma: no cover
comment = property(_get_comment, _set_comment, _del_comment)
def _get_seealso(self):
for seealso in self.graph.objects(
subject=self.identifier, predicate=RDFS.seeAlso
):
yield seealso
def _set_seealso(self, seealsos):
if not seealsos:
return
for s in seealsos:
self.graph.add((self.identifier, RDFS.seeAlso, s))
@TermDeletionHelper(RDFS.seeAlso)
def _del_seealso(self):
pass # pragma: no cover
seeAlso = property(_get_seealso, _set_seealso, _del_seealso) # noqa: N815
def _get_label(self):
for label in self.graph.objects(subject=self.identifier, predicate=RDFS.label):
yield label
def _set_label(self, label):
if not label:
return
if isinstance(label, Identifier):
self.graph.add((self.identifier, RDFS.label, label))
else:
for l_ in label:
self.graph.add((self.identifier, RDFS.label, l_))
@TermDeletionHelper(RDFS.label)
def _delete_label(self):
"""
>>> g = Graph()
>>> b = Individual(OWL.Restriction,g)
>>> b.label = Literal('boo')
>>> len(list(b.label))
1
>>> del b.label
>>> len(list(b.label))
0
"""
pass # pragma: no cover
label = property(_get_label, _set_label, _delete_label)
[docs]class Ontology(AnnotatableTerms):
"""The owl ontology metadata"""
[docs] def __init__(self, identifier=None, imports=None, comment=None, graph=None):
super(Ontology, self).__init__(identifier, graph)
self.imports = [] if imports is None else imports
self.comment = [] if comment is None else comment
if (self.identifier, RDF.type, OWL.Ontology) not in self.graph:
self.graph.add((self.identifier, RDF.type, OWL.Ontology))
[docs] def setVersion(self, version): # noqa: N802
self.graph.set((self.identifier, OWL.versionInfo, version))
def _get_imports(self):
for owl in self.graph.objects(
subject=self.identifier, predicate=OWL["imports"]
):
yield owl
def _set_imports(self, other):
if not other:
return
for o in other:
self.graph.add((self.identifier, OWL["imports"], o))
@TermDeletionHelper(OWL["imports"])
def _del_imports(self):
pass # pragma: no cover
imports = property(_get_imports, _set_imports, _del_imports)
[docs]def AllClasses(graph): # noqa: N802
prevclasses = set()
for c in graph.subjects(predicate=RDF.type, object=OWL.Class):
if c not in prevclasses:
prevclasses.add(c)
yield Class(c)
[docs]def AllProperties(graph): # noqa: N802
prevprops = set()
for s, p, o in graph.triples_choices(
(
None,
RDF.type,
[
OWL.SymmetricProperty,
OWL.FunctionalProperty,
OWL.InverseFunctionalProperty,
OWL.TransitiveProperty,
OWL.DatatypeProperty,
OWL.ObjectProperty,
OWL.AnnotationProperty,
],
)
):
if o in [
OWL.SymmetricProperty,
OWL.InverseFunctionalProperty,
OWL.TransitiveProperty,
OWL.ObjectProperty,
]:
bType = OWL.ObjectProperty # noqa: N806
else:
bType = OWL.DatatypeProperty # noqa: N806
if s not in prevprops:
prevprops.add(s)
yield Property(s, graph=graph, baseType=bType)
[docs]class ClassNamespaceFactory(Namespace):
[docs] def term(self, name):
return Class(URIRef(self + name))
[docs] def __getitem__(self, key, default=None):
return self.term(key)
[docs] def __getattr__(self, name):
if name.startswith("__"): # ignore any special Python names!
raise AttributeError
else:
return self.term(name)
CLASS_RELATIONS = set(
Namespace("http://www.w3.org/2002/07/owl#resourceProperties")
).difference(
[
OWL.onProperty,
OWL.allValuesFrom,
OWL.hasValue,
OWL.someValuesFrom,
OWL.inverseOf,
OWL.imports,
OWL.versionInfo,
OWL.backwardCompatibleWith,
OWL.incompatibleWith,
OWL.unionOf,
OWL.intersectionOf,
OWL.oneOf,
]
)
[docs]def ComponentTerms(cls): # noqa: N802
"""
Takes a Class instance and returns a generator over the classes that
are involved in its definition, ignoring unnamed classes
"""
if OWL.Restriction in cls.type:
try:
cls = CastClass(cls, Individual.factoryGraph)
for s, p, inner_class_id in cls.factoryGraph.triples_choices(
(cls.identifier, [OWL.allValuesFrom, OWL.someValuesFrom], None)
):
inner_class = Class(inner_class_id, skipOWLClassMembership=True)
if isinstance(inner_class_id, BNode):
for _c in ComponentTerms(inner_class):
yield _c
else:
yield inner_class
except Exception: # pragma: no cover
pass # pragma: no cover
else:
cls = CastClass(cls, Individual.factoryGraph)
if isinstance(cls, BooleanClass):
for _cls in cls:
_cls = Class(_cls, skipOWLClassMembership=True)
if isinstance(_cls.identifier, BNode):
for _c in ComponentTerms(_cls):
yield _c
else:
yield _cls
else:
for inner_class in cls.subClassOf:
if isinstance(inner_class.identifier, BNode):
for _c in ComponentTerms(inner_class):
yield _c
else:
yield inner_class
for s, p, o in cls.factoryGraph.triples_choices(
(classOrIdentifier(cls), CLASS_RELATIONS, None)
):
if isinstance(o, BNode):
for _c in ComponentTerms(CastClass(o, Individual.factoryGraph)):
yield _c
else:
yield inner_class
[docs]def DeepClassClear(class_to_prune): # noqa: N802
"""
Recursively clear the given class, continuing
where any related class is an anonymous class
>>> EX = Namespace('http://example.com/')
>>> namespace_manager = NamespaceManager(Graph())
>>> namespace_manager.bind('ex', EX, override=False)
>>> namespace_manager.bind('owl', OWL, override=False)
>>> g = Graph()
>>> g.namespace_manager = namespace_manager
>>> Individual.factoryGraph = g
>>> classB = Class(EX.B)
>>> classC = Class(EX.C)
>>> classD = Class(EX.D)
>>> classE = Class(EX.E)
>>> classF = Class(EX.F)
>>> anonClass = EX.someProp << some >> classD
>>> classF += anonClass
>>> list(anonClass.subClassOf)
[Class: ex:F ]
>>> classA = classE | classF | anonClass
>>> classB += classA
>>> classA.equivalentClass = [Class()]
>>> classB.subClassOf = [EX.someProp << some >> classC]
>>> classA
( ex:E OR ex:F OR ( ex:someProp SOME ex:D ) )
>>> DeepClassClear(classA)
>>> classA
( )
>>> list(anonClass.subClassOf)
[]
>>> classB
Class: ex:B SubClassOf: ( ex:someProp SOME ex:C )
>>> otherClass = classD | anonClass
>>> otherClass
( ex:D OR ( ex:someProp SOME ex:D ) )
>>> DeepClassClear(otherClass)
>>> otherClass
( )
>>> otherClass.delete()
>>> list(g.triples((otherClass.identifier, None, None)))
[]
"""
def deepClearIfBNode(_class): # noqa: N802
if isinstance(classOrIdentifier(_class), BNode):
DeepClassClear(_class)
class_to_prune = CastClass(class_to_prune, Individual.factoryGraph)
for c in class_to_prune.subClassOf:
deepClearIfBNode(c)
class_to_prune.graph.remove((class_to_prune.identifier, RDFS.subClassOf, None))
for c in class_to_prune.equivalentClass:
deepClearIfBNode(c)
class_to_prune.graph.remove((class_to_prune.identifier, OWL.equivalentClass, None))
inverse_class = class_to_prune.complementOf
if inverse_class:
class_to_prune.graph.remove((class_to_prune.identifier, OWL.complementOf, None))
deepClearIfBNode(inverse_class)
if isinstance(class_to_prune, BooleanClass):
for c in class_to_prune:
deepClearIfBNode(c)
class_to_prune.clear()
class_to_prune.graph.remove(
(class_to_prune.identifier, class_to_prune._operator, None)
)
[docs]def CastClass(c, graph=None): # noqa: N802
graph = graph is None and c.factoryGraph or graph
for kind in graph.objects(subject=classOrIdentifier(c), predicate=RDF.type):
if kind == OWL.Restriction:
kwargs = {"identifier": classOrIdentifier(c), "graph": graph}
for s, p, o in graph.triples((classOrIdentifier(c), None, None)):
if p != RDF.type:
if p == OWL.onProperty:
kwargs["onProperty"] = o
else:
if p not in Restriction.restrictionKinds:
continue
kwargs[str(p.split(str(OWL))[-1])] = o
if not set(
[str(i.split(str(OWL))[-1]) for i in Restriction.restrictionKinds]
).intersection(kwargs):
raise MalformedClass("Malformed owl:Restriction")
return Restriction(**kwargs)
else:
for s, p, o in graph.triples_choices(
(
classOrIdentifier(c),
[OWL.intersectionOf, OWL.unionOf, OWL.oneOf],
None,
)
):
if p == OWL.oneOf:
return EnumeratedClass(classOrIdentifier(c), graph=graph)
else:
return BooleanClass(classOrIdentifier(c), operator=p, graph=graph)
# assert (classOrIdentifier(c),RDF.type,OWL.Class) in graph
return Class(classOrIdentifier(c), graph=graph, skipOWLClassMembership=True)
[docs]class Class(AnnotatableTerms):
"""
'General form' for classes:
The Manchester Syntax (supported in Protege) is used as the basis for the
form of this class
See: http://owl-workshop.man.ac.uk/acceptedLong/submission_9.pdf:
[Annotation]
‘Class:’ classID {Annotation
( (‘SubClassOf:’ ClassExpression)
| (‘EquivalentTo’ ClassExpression)
| (’DisjointWith’ ClassExpression)) }
Appropriate excerpts from OWL Reference:
".. Subclass axioms provide us with partial definitions: they represent
necessary but not sufficient conditions for establishing class
membership of an individual."
".. A class axiom may contain (multiple) owl:equivalentClass statements"
"..A class axiom may also contain (multiple) owl:disjointWith statements.."
"..An owl:complementOf property links a class to precisely one class
description."
"""
def _serialize(self, graph):
for cl in self.subClassOf:
CastClass(cl, self.graph).serialize(graph)
for cl in self.equivalentClass:
CastClass(cl, self.graph).serialize(graph)
for cl in self.disjointWith:
CastClass(cl, self.graph).serialize(graph)
if self.complementOf:
CastClass(self.complementOf, self.graph).serialize(graph)
[docs] def serialize(self, graph):
for fact in self.graph.triples((self.identifier, None, None)):
graph.add(fact)
self._serialize(graph)
[docs] def setupNounAnnotations(self, noun_annotations): # noqa: N802
if isinstance(noun_annotations, tuple):
cn_sgprop, cn_plprop = noun_annotations
else:
cn_sgprop = noun_annotations
cn_plprop = noun_annotations
if cn_sgprop:
self.CN_sgprop.extent = [
(self.identifier, self.handleAnnotation(cn_sgprop))
]
if cn_plprop:
self.CN_plprop.extent = [
(self.identifier, self.handleAnnotation(cn_plprop))
]
[docs] def __init__(
self,
identifier=None,
subClassOf=None, # noqa: N803
equivalentClass=None,
disjointWith=None,
complementOf=None,
graph=None,
skipOWLClassMembership=False,
comment=None,
nounAnnotations=None,
nameAnnotation=None,
nameIsLabel=False,
):
super(Class, self).__init__(identifier, graph, nameAnnotation, nameIsLabel)
if nounAnnotations:
self.setupNounAnnotations(nounAnnotations)
if (
not skipOWLClassMembership
and (self.identifier, RDF.type, OWL.Class) not in self.graph
and (self.identifier, RDF.type, OWL.Restriction) not in self.graph
):
self.graph.add((self.identifier, RDF.type, OWL.Class))
self.subClassOf = [] if subClassOf is None else subClassOf
self.equivalentClass = [] if equivalentClass is None else equivalentClass
self.disjointWith = [] if disjointWith is None else disjointWith
if complementOf:
self.complementOf = complementOf
self.comment = [] if comment is None else comment
def _get_extent(self, graph=None):
for member in (graph is None and self.graph or graph).subjects(
predicate=RDF.type, object=self.identifier
):
yield member
def _set_extent(self, other):
if not other:
return
for m in other:
self.graph.add((classOrIdentifier(m), RDF.type, self.identifier))
@TermDeletionHelper(RDF.type)
def _del_type(self):
pass # pragma: no cover
extent = property(_get_extent, _set_extent, _del_type)
def _get_annotation(self, term=RDFS.label):
for annotation in self.graph.objects(subject=self.identifier, predicate=term):
yield annotation
annotation = property(_get_annotation, lambda x: x) # type: ignore[arg-type,misc]
def _get_extentquery(self):
return (Variable("CLASS"), RDF.type, self.identifier)
def _set_extentquery(self, other):
pass # pragma: no cover
extentQuery = property(_get_extentquery, _set_extentquery) # noqa: N815
[docs] def __hash__(self):
"""
>>> b = Class(OWL.Restriction)
>>> c = Class(OWL.Restriction)
>>> len(set([b,c]))
1
"""
return hash(self.identifier)
[docs] def __eq__(self, other):
assert isinstance(other, Class), repr(other)
return self.identifier == other.identifier
[docs] def __iadd__(self, other):
assert isinstance(other, Class)
other.subClassOf = [self]
return self
[docs] def __isub__(self, other):
assert isinstance(other, Class)
self.graph.remove((classOrIdentifier(other), RDFS.subClassOf, self.identifier))
return self
[docs] def __invert__(self):
"""
Shorthand for Manchester syntax's not operator
"""
return Class(complementOf=self)
[docs] def __or__(self, other):
"""
Construct an anonymous class description consisting of the union of
this class and 'other' and return it
"""
return BooleanClass(
operator=OWL.unionOf, members=[self, other], graph=self.graph
)
[docs] def __and__(self, other):
"""
Construct an anonymous class description consisting of the
intersection of this class and 'other' and return it
>>> exNs = Namespace('http://example.com/')
>>> namespace_manager = NamespaceManager(Graph())
>>> namespace_manager.bind('ex', exNs, override=False)
>>> namespace_manager.bind('owl', OWL, override=False)
>>> g = Graph()
>>> g.namespace_manager = namespace_manager
Chaining 3 intersections
>>> female = Class(exNs.Female, graph=g)
>>> human = Class(exNs.Human, graph=g)
>>> youngPerson = Class(exNs.YoungPerson, graph=g)
>>> youngWoman = female & human & youngPerson
>>> youngWoman #doctest: +SKIP
ex:YoungPerson THAT ( ex:Female AND ex:Human )
>>> isinstance(youngWoman, BooleanClass)
True
>>> isinstance(youngWoman.identifier, BNode)
True
"""
return BooleanClass(
operator=OWL.intersectionOf, members=[self, other], graph=self.graph
)
def _get_subclassof(self):
for anc in self.graph.objects(
subject=self.identifier, predicate=RDFS.subClassOf
):
yield Class(anc, graph=self.graph, skipOWLClassMembership=True)
def _set_subclassof(self, other):
if not other:
return
for sc in other:
self.graph.add((self.identifier, RDFS.subClassOf, classOrIdentifier(sc)))
@TermDeletionHelper(RDFS.subClassOf)
def _del_subclassof(self):
pass # pragma: no cover
subClassOf = property( # noqa: N815
_get_subclassof, _set_subclassof, _del_subclassof
)
def _get_equivalentclass(self):
for ec in self.graph.objects(
subject=self.identifier, predicate=OWL.equivalentClass
):
yield Class(ec, graph=self.graph)
def _set_equivalentclass(self, other):
if not other:
return
for sc in other:
self.graph.add(
(self.identifier, OWL.equivalentClass, classOrIdentifier(sc))
)
@TermDeletionHelper(OWL.equivalentClass)
def _del_equivalentclass(self):
pass # pragma: no cover
equivalentClass = property( # noqa: N815
_get_equivalentclass, _set_equivalentclass, _del_equivalentclass
)
def _get_disjointwith(self):
for dc in self.graph.objects(
subject=self.identifier, predicate=OWL.disjointWith
):
yield Class(dc, graph=self.graph)
def _set_disjointwith(self, other):
if not other:
return
for c in other:
self.graph.add((self.identifier, OWL.disjointWith, classOrIdentifier(c)))
@TermDeletionHelper(OWL.disjointWith)
def _del_disjointwith(self):
pass # pragma: no cover
disjointWith = property( # noqa: N815
_get_disjointwith, _set_disjointwith, _del_disjointwith
)
def _get_complementof(self):
comp = list(
self.graph.objects(subject=self.identifier, predicate=OWL.complementOf)
)
if not comp:
return None
elif len(comp) == 1:
return Class(comp[0], graph=self.graph)
else:
raise Exception(len(comp))
def _set_complementof(self, other):
if not other:
return
self.graph.add((self.identifier, OWL.complementOf, classOrIdentifier(other)))
@TermDeletionHelper(OWL.complementOf)
def _del_complementof(self):
pass # pragma: no cover
complementOf = property( # noqa: N815
_get_complementof, _set_complementof, _del_complementof
)
def _get_parents(self):
"""
computed attributes that returns a generator over taxonomic 'parents'
by disjunction, conjunction, and subsumption
>>> from rdflib.util import first
>>> exNs = Namespace('http://example.com/')
>>> namespace_manager = NamespaceManager(Graph())
>>> namespace_manager.bind('ex', exNs, override=False)
>>> namespace_manager.bind('owl', OWL, override=False)
>>> g = Graph()
>>> g.namespace_manager = namespace_manager
>>> Individual.factoryGraph = g
>>> brother = Class(exNs.Brother)
>>> sister = Class(exNs.Sister)
>>> sibling = brother | sister
>>> sibling.identifier = exNs.Sibling
>>> sibling
( ex:Brother OR ex:Sister )
>>> first(brother.parents)
Class: ex:Sibling EquivalentTo: ( ex:Brother OR ex:Sister )
>>> parent = Class(exNs.Parent)
>>> male = Class(exNs.Male)
>>> father = parent & male
>>> father.identifier = exNs.Father
>>> list(father.parents)
[Class: ex:Parent , Class: ex:Male ]
"""
for parent in itertools.chain(self.subClassOf, self.equivalentClass):
yield parent
link = first(self.factoryGraph.subjects(RDF.first, self.identifier))
if link:
siblingslist = list(self.factoryGraph.transitive_subjects(RDF.rest, link))
if siblingslist:
collectionhead = siblingslist[-1]
else:
collectionhead = link
for disjointclass in self.factoryGraph.subjects(
OWL.unionOf, collectionhead
):
if isinstance(disjointclass, URIRef):
yield Class(disjointclass, skipOWLClassMembership=True)
for rdf_list in self.factoryGraph.objects(self.identifier, OWL.intersectionOf):
for member in OWLRDFListProxy([rdf_list], graph=self.factoryGraph):
if isinstance(member, URIRef):
yield Class(member, skipOWLClassMembership=True)
parents = property(_get_parents)
[docs] def isPrimitive(self): # noqa: N802
if (self.identifier, RDF.type, OWL.Restriction) in self.graph:
return False
# sc = list(self.subClassOf)
ec = list(self.equivalentClass)
for boolclass, p, rdf_list in self.graph.triples_choices(
(self.identifier, [OWL.intersectionOf, OWL.unionOf], None)
):
ec.append(manchesterSyntax(rdf_list, self.graph, boolean=p))
for e in ec:
return False
if self.complementOf:
return False
return True
[docs] def subSumpteeIds(self): # noqa: N802
for s in self.graph.subjects(predicate=RDFS.subClassOf, object=self.identifier):
yield s
# def __iter__(self):
# for s in self.graph.subjects(
# predicate=RDFS.subClassOf,object=self.identifier):
# yield Class(s,skipOWLClassMembership=True)
[docs] def __repr__(self, full=False, normalization=True):
"""
Returns the Manchester Syntax equivalent for this class
"""
exprs = []
sc = list(self.subClassOf)
ec = list(self.equivalentClass)
for boolclass, p, rdf_list in self.graph.triples_choices(
(self.identifier, [OWL.intersectionOf, OWL.unionOf], None)
):
ec.append(manchesterSyntax(rdf_list, self.graph, boolean=p))
dc = list(self.disjointWith)
c = self.complementOf
if c:
dc.append(c)
klasskind = ""
label = list(self.graph.objects(self.identifier, RDFS.label))
label = label and "(" + label[0] + ")" or ""
if sc:
if full:
scjoin = "\n "
else:
scjoin = ", "
nec_statements = [
isinstance(s, Class)
and isinstance(self.identifier, BNode)
and repr(CastClass(s, self.graph))
or # noqa: W504
# repr(BooleanClass(classOrIdentifier(s),
# operator=None,
# graph=self.graph)) or
manchesterSyntax(classOrIdentifier(s), self.graph)
for s in sc
]
if nec_statements:
klasskind = "Primitive Type %s" % label
exprs.append(
"SubClassOf: %s" % scjoin.join([str(n) for n in nec_statements])
)
if full:
exprs[-1] = "\n " + exprs[-1]
if ec:
nec_suff_statements = [
isinstance(s, str)
and s
or manchesterSyntax(classOrIdentifier(s), self.graph)
for s in ec
]
if nec_suff_statements:
klasskind = "A Defined Class %s" % label
exprs.append("EquivalentTo: %s" % ", ".join(nec_suff_statements))
if full:
exprs[-1] = "\n " + exprs[-1]
if dc:
exprs.append(
"DisjointWith %s\n"
% "\n ".join(
[manchesterSyntax(classOrIdentifier(s), self.graph) for s in dc]
)
)
if full:
exprs[-1] = "\n " + exprs[-1]
descr = list(self.graph.objects(self.identifier, RDFS.comment))
if full and normalization:
klassdescr = (
klasskind
and "\n ## %s ##" % klasskind
+ (descr and "\n %s" % descr[0] or "")
+ " . ".join(exprs)
or " . ".join(exprs)
)
else:
klassdescr = (
full
and (descr and "\n %s" % descr[0] or "")
or "" + " . ".join(exprs)
)
return (
isinstance(self.identifier, BNode)
and "Some Class "
or "Class: %s " % self.qname
) + klassdescr
[docs]class OWLRDFListProxy(object):
[docs] def __init__(self, rdf_list, members=None, graph=None):
if graph:
self.graph = graph
members = [] if members is None else members
if rdf_list:
self._rdfList = Collection(self.graph, rdf_list[0])
for member in members:
if member not in self._rdfList:
self._rdfList.append(classOrIdentifier(member))
else:
self._rdfList = Collection(
self.graph, BNode(), [classOrIdentifier(m) for m in members]
)
self.graph.add((self.identifier, self._operator, self._rdfList.uri))
[docs] def __eq__(self, other):
"""
Equivalence of boolean class constructors is determined by
equivalence of its members
"""
assert isinstance(other, Class), repr(other) + repr(type(other))
if isinstance(other, BooleanClass):
length = len(self)
if length != len(other):
return False
else:
for idx in range(length):
if self[idx] != other[idx]:
return False
return True
else:
return self.identifier == other.identifier
# Redirect python list accessors to the underlying Collection instance
[docs] def __len__(self):
return len(self._rdfList)
[docs] def index(self, item):
return self._rdfList.index(classOrIdentifier(item))
[docs] def __getitem__(self, key):
return self._rdfList[key]
[docs] def __setitem__(self, key, value):
self._rdfList[key] = classOrIdentifier(value)
[docs] def __delitem__(self, key):
del self._rdfList[key]
[docs] def clear(self):
self._rdfList.clear()
[docs] def __iter__(self):
for item in self._rdfList:
yield item
[docs] def __contains__(self, item):
for i in self._rdfList:
if i == classOrIdentifier(item):
return 1
return 0
[docs] def append(self, item):
self._rdfList.append(item)
[docs] def __iadd__(self, other):
self._rdfList.append(classOrIdentifier(other))
return self
[docs]class EnumeratedClass(OWLRDFListProxy, Class):
"""
Class for owl:oneOf forms:
OWL Abstract Syntax is used
axiom ::= 'EnumeratedClass('
classID ['Deprecated'] { annotation } { individualID } ')'
>>> exNs = Namespace('http://example.com/')
>>> namespace_manager = NamespaceManager(Graph())
>>> namespace_manager.bind('ex', exNs, override=False)
>>> namespace_manager.bind('owl', OWL, override=False)
>>> g = Graph()
>>> g.namespace_manager = namespace_manager
>>> Individual.factoryGraph = g
>>> ogbujiBros = EnumeratedClass(exNs.ogbujicBros,
... members=[exNs.chime,
... exNs.uche,
... exNs.ejike])
>>> ogbujiBros #doctest: +SKIP
{ ex:chime ex:uche ex:ejike }
>>> col = Collection(g, first(
... g.objects(predicate=OWL.oneOf, subject=ogbujiBros.identifier)))
>>> sorted([g.qname(item) for item in col])
['ex:chime', 'ex:ejike', 'ex:uche']
>>> print(g.serialize(format='n3')) #doctest: +SKIP
@prefix ex: <http://example.com/> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
<BLANKLINE>
ex:ogbujicBros a owl:Class;
owl:oneOf ( ex:chime ex:uche ex:ejike ) .
<BLANKLINE>
<BLANKLINE>
"""
_operator = OWL.oneOf
[docs] def isPrimitive(self): # noqa: N802
return False
[docs] def __init__(self, identifier=None, members=None, graph=None):
Class.__init__(self, identifier, graph=graph)
members = [] if members is None else members
rdfList = list( # noqa: N806
self.graph.objects(predicate=OWL.oneOf, subject=self.identifier)
)
OWLRDFListProxy.__init__(self, rdfList, members)
[docs] def __repr__(self):
"""
Returns the Manchester Syntax equivalent for this class
"""
return manchesterSyntax(self._rdfList.uri, self.graph, boolean=self._operator)
[docs] def serialize(self, graph):
clonedlist = Collection(graph, BNode())
for cl in self._rdfList:
clonedlist.append(cl)
CastClass(cl, self.graph).serialize(graph)
graph.add((self.identifier, self._operator, clonedlist.uri))
for s, p, o in self.graph.triples((self.identifier, None, None)):
if p != self._operator:
graph.add((s, p, o))
self._serialize(graph)
BooleanPredicates = [OWL.intersectionOf, OWL.unionOf]
class BooleanClassExtentHelper:
"""
>>> testGraph = Graph()
>>> Individual.factoryGraph = testGraph
>>> EX = Namespace("http://example.com/")
>>> namespace_manager = NamespaceManager(Graph())
>>> namespace_manager.bind('ex', EX, override=False)
>>> testGraph.namespace_manager = namespace_manager
>>> fire = Class(EX.Fire)
>>> water = Class(EX.Water)
>>> testClass = BooleanClass(members=[fire, water])
>>> testClass2 = BooleanClass(
... operator=OWL.unionOf, members=[fire, water])
>>> for c in BooleanClass.getIntersections():
... print(c) #doctest: +SKIP
( ex:Fire AND ex:Water )
>>> for c in BooleanClass.getUnions():
... print(c) #doctest: +SKIP
( ex:Fire OR ex:Water )
"""
def __init__(self, operator):
self.operator = operator
def __call__(self, f):
def _getExtent(): # noqa: N802
for c in Individual.factoryGraph.subjects(self.operator):
yield BooleanClass(c, operator=self.operator)
return _getExtent
[docs]class Callable:
[docs] def __init__(self, anycallable):
self.__call__ = anycallable
[docs]class BooleanClass(OWLRDFListProxy, Class):
"""
See: http://www.w3.org/TR/owl-ref/#Boolean
owl:complementOf is an attribute of Class, however
"""
[docs] @BooleanClassExtentHelper(OWL.intersectionOf)
@Callable
def getIntersections(): # type: ignore[misc] # noqa: N802
pass # pragma: no cover
getIntersections = Callable(getIntersections) # noqa: N815
[docs] @BooleanClassExtentHelper(OWL.unionOf)
@Callable
def getUnions(): # type: ignore[misc] # noqa: N802
pass # pragma: no cover
getUnions = Callable(getUnions) # noqa: N815
[docs] def __init__(
self, identifier=None, operator=OWL.intersectionOf, members=None, graph=None
):
if operator is None:
props = []
for s, p, o in graph.triples_choices(
(identifier, [OWL.intersectionOf, OWL.unionOf], None)
):
props.append(p)
operator = p
assert len(props) == 1, repr(props)
Class.__init__(self, identifier, graph=graph)
assert operator in [OWL.intersectionOf, OWL.unionOf], str(operator)
self._operator = operator
rdf_list = list(self.graph.objects(predicate=operator, subject=self.identifier))
assert (
not members or not rdf_list
), "This is a previous boolean class description!" + repr(
Collection(self.graph, rdf_list[0]).n3()
)
OWLRDFListProxy.__init__(self, rdf_list, members)
[docs] def copy(self):
"""
Create a copy of this class
"""
copy_of_class = BooleanClass(
operator=self._operator, members=list(self), graph=self.graph
)
return copy_of_class
[docs] def serialize(self, graph):
clonedlist = Collection(graph, BNode())
for cl in self._rdfList:
clonedlist.append(cl)
CastClass(cl, self.graph).serialize(graph)
graph.add((self.identifier, self._operator, clonedlist.uri))
for s, p, o in self.graph.triples((self.identifier, None, None)):
if p != self._operator:
graph.add((s, p, o))
self._serialize(graph)
[docs] def isPrimitive(self): # noqa: N802
return False
[docs] def changeOperator(self, newOperator): # noqa: N802, N803
"""
Converts a unionOf / intersectionOf class expression into one
that instead uses the given operator
>>> testGraph = Graph()
>>> Individual.factoryGraph = testGraph
>>> EX = Namespace("http://example.com/")
>>> namespace_manager = NamespaceManager(Graph())
>>> namespace_manager.bind('ex', EX, override=False)
>>> testGraph.namespace_manager = namespace_manager
>>> fire = Class(EX.Fire)
>>> water = Class(EX.Water)
>>> testClass = BooleanClass(members=[fire,water])
>>> testClass #doctest: +SKIP
( ex:Fire AND ex:Water )
>>> testClass.changeOperator(OWL.unionOf)
>>> testClass #doctest: +SKIP
( ex:Fire OR ex:Water )
>>> try: testClass.changeOperator(OWL.unionOf)
... except Exception as e: print(e)
The new operator is already being used!
"""
assert newOperator != self._operator, "The new operator is already being used!"
self.graph.remove((self.identifier, self._operator, self._rdfList.uri))
self.graph.add((self.identifier, newOperator, self._rdfList.uri))
self._operator = newOperator
[docs] def __repr__(self):
"""
Returns the Manchester Syntax equivalent for this class
"""
return manchesterSyntax(self._rdfList.uri, self.graph, boolean=self._operator)
[docs] def __or__(self, other):
"""
Adds other to the list and returns self
"""
assert self._operator == OWL.unionOf
self._rdfList.append(classOrIdentifier(other))
return self
[docs]def AllDifferent(members): # noqa: N802
"""
DisjointClasses(' description description { description } ')'
"""
pass # pragma: no cover
[docs]class Restriction(Class):
"""
restriction ::= 'restriction('
datavaluedPropertyID dataRestrictionComponent
{ dataRestrictionComponent } ')'
| 'restriction(' individualvaluedPropertyID
individualRestrictionComponent
{ individualRestrictionComponent } ')'
"""
restrictionKinds = [ # noqa: N815
OWL.allValuesFrom,
OWL.someValuesFrom,
OWL.hasValue,
OWL.maxCardinality,
OWL.minCardinality,
]
[docs] def __init__(
self,
onProperty, # noqa: N803
graph=Graph(),
allValuesFrom=None,
someValuesFrom=None,
value=None,
cardinality=None,
maxCardinality=None,
minCardinality=None,
identifier=None,
):
super(Restriction, self).__init__(
identifier, graph=graph, skipOWLClassMembership=True
)
if (
self.identifier,
OWL.onProperty,
propertyOrIdentifier(onProperty),
) not in graph:
graph.add(
(self.identifier, OWL.onProperty, propertyOrIdentifier(onProperty))
)
self.onProperty = onProperty
restr_types = [
(allValuesFrom, OWL.allValuesFrom),
(someValuesFrom, OWL.someValuesFrom),
(value, OWL.hasValue),
(cardinality, OWL.cardinality),
(maxCardinality, OWL.maxCardinality),
(minCardinality, OWL.minCardinality),
]
valid_restr_props = [(i, oterm) for (i, oterm) in restr_types if i is not None]
assert len(valid_restr_props)
restriction_range, restriction_type = valid_restr_props.pop()
self.restrictionType = restriction_type
if isinstance(restriction_range, Identifier):
self.restrictionRange = restriction_range
elif isinstance(restriction_range, Class):
self.restrictionRange = classOrIdentifier(restriction_range)
else:
self.restrictionRange = first(
self.graph.objects(self.identifier, restriction_type)
)
if (
self.identifier,
restriction_type,
self.restrictionRange,
) not in self.graph:
self.graph.add((self.identifier, restriction_type, self.restrictionRange))
assert self.restrictionRange is not None, Class(self.identifier)
if (self.identifier, RDF.type, OWL.Restriction) not in self.graph:
self.graph.add((self.identifier, RDF.type, OWL.Restriction))
self.graph.remove((self.identifier, RDF.type, OWL.Class))
[docs] def serialize(self, graph):
"""
>>> g1 = Graph()
>>> g2 = Graph()
>>> EX = Namespace("http://example.com/")
>>> namespace_manager = NamespaceManager(g1)
>>> namespace_manager.bind('ex', EX, override=False)
>>> namespace_manager = NamespaceManager(g2)
>>> namespace_manager.bind('ex', EX, override=False)
>>> Individual.factoryGraph = g1
>>> prop = Property(EX.someProp, baseType=OWL.DatatypeProperty)
>>> restr1 = (Property(
... EX.someProp,
... baseType=OWL.DatatypeProperty)) << some >> (Class(EX.Foo))
>>> restr1 #doctest: +SKIP
( ex:someProp SOME ex:Foo )
>>> restr1.serialize(g2)
>>> Individual.factoryGraph = g2
>>> list(Property(
... EX.someProp,baseType=None).type
... ) #doctest: +NORMALIZE_WHITESPACE +SKIP
[rdflib.term.URIRef(
'http://www.w3.org/2002/07/owl#DatatypeProperty')]
"""
Property(self.onProperty, graph=self.graph, baseType=None).serialize(graph)
for s, p, o in self.graph.triples((self.identifier, None, None)):
graph.add((s, p, o))
if p in [OWL.allValuesFrom, OWL.someValuesFrom]:
CastClass(o, self.graph).serialize(graph)
[docs] def isPrimitive(self): # noqa: N802
return False
[docs] def __hash__(self):
return hash((self.onProperty, self.restrictionRange))
[docs] def __eq__(self, other):
"""
Equivalence of restrictions is determined by equivalence of the
property in question and the restriction 'range'
"""
assert isinstance(other, Class), repr(other) + repr(type(other))
if isinstance(other, Restriction):
return (
other.onProperty == self.onProperty
and other.restriction_range == self.restrictionRange
)
else:
return False
def _get_onproperty(self):
return list(
self.graph.objects(subject=self.identifier, predicate=OWL.onProperty)
)[0]
def _set_onproperty(self, prop):
triple = (self.identifier, OWL.onProperty, propertyOrIdentifier(prop))
if not prop:
return
elif triple in self.graph:
return
else:
self.graph.set(triple)
@TermDeletionHelper(OWL.onProperty)
def _del_onproperty(self):
pass # pragma: no cover
onProperty = property( # noqa: N815
_get_onproperty, _set_onproperty, _del_onproperty
)
def _get_allvaluesfrom(self):
for i in self.graph.objects(
subject=self.identifier, predicate=OWL.allValuesFrom
):
return Class(i, graph=self.graph)
return None
def _set_allvaluesfrom(self, other):
triple = (self.identifier, OWL.allValuesFrom, classOrIdentifier(other))
if not other:
return
elif triple in self.graph:
return
else:
self.graph.set(triple)
@TermDeletionHelper(OWL.allValuesFrom)
def _del_allvaluesfrom(self):
pass # pragma: no cover
allValuesFrom = property( # noqa: N815
_get_allvaluesfrom, _set_allvaluesfrom, _del_allvaluesfrom
)
def _get_somevaluesfrom(self):
for i in self.graph.objects(
subject=self.identifier, predicate=OWL.someValuesFrom
):
return Class(i, graph=self.graph)
return None
def _set_somevaluesfrom(self, other):
triple = (self.identifier, OWL.someValuesFrom, classOrIdentifier(other))
if not other:
return
elif triple in self.graph:
return
else:
self.graph.set(triple)
@TermDeletionHelper(OWL.someValuesFrom)
def _del_somevaluesfrom(self):
pass # pragma: no cover
someValuesFrom = property( # noqa: N815
_get_somevaluesfrom, _set_somevaluesfrom, _del_somevaluesfrom
)
def _get_hasvalue(self):
for i in self.graph.objects(subject=self.identifier, predicate=OWL.hasValue):
return Class(i, graph=self.graph)
return None
def _set_hasvalue(self, other):
triple = (self.identifier, OWL.hasValue, classOrIdentifier(other))
if not other:
return
elif triple in self.graph:
return
else:
self.graph.set(triple)
@TermDeletionHelper(OWL.hasValue)
def _del_hasvalue(self):
pass # pragma: no cover
hasValue = property(_get_hasvalue, _set_hasvalue, _del_hasvalue) # noqa: N815
def _get_cardinality(self):
for i in self.graph.objects(subject=self.identifier, predicate=OWL.cardinality):
return Class(i, graph=self.graph)
return None
def _set_cardinality(self, other):
triple = (self.identifier, OWL.cardinality, classOrIdentifier(other))
if not other:
return
elif triple in self.graph:
return
else:
self.graph.set(triple)
@TermDeletionHelper(OWL.cardinality)
def _del_cardinality(self):
pass # pragma: no cover
cardinality = property(_get_cardinality, _set_cardinality, _del_cardinality)
def _get_maxcardinality(self):
for i in self.graph.objects(
subject=self.identifier, predicate=OWL.maxCardinality
):
return Class(i, graph=self.graph)
return None
def _set_maxcardinality(self, other):
triple = (self.identifier, OWL.maxCardinality, classOrIdentifier(other))
if not other:
return
elif triple in self.graph:
return
else:
self.graph.set(triple)
@TermDeletionHelper(OWL.maxCardinality)
def _del_maxcardinality(self):
pass # pragma: no cover
maxCardinality = property( # noqa: N815
_get_maxcardinality, _set_maxcardinality, _del_maxcardinality
)
def _get_mincardinality(self):
for i in self.graph.objects(
subject=self.identifier, predicate=OWL.minCardinality
):
return Class(i, graph=self.graph)
return None
def _set_mincardinality(self, other):
triple = (self.identifier, OWL.minCardinality, classOrIdentifier(other))
if not other:
return
elif triple in self.graph:
return
else:
self.graph.set(triple)
@TermDeletionHelper(OWL.minCardinality)
def _del_mincardinality(self):
pass # pragma: no cover
minCardinality = property( # noqa: N815
_get_mincardinality, _set_mincardinality, _del_mincardinality
)
[docs] def restrictionKind(self): # noqa: N802
for p in self.graph.triple_choices(
(self.identifier, self.restrictionKinds, None)
):
return p.split(OWL)[-1]
raise
[docs] def __repr__(self):
"""
Returns the Manchester Syntax equivalent for this restriction
"""
return manchesterSyntax(self.identifier, self.graph)
# Infix Operators #
some = Infix(
lambda prop, _class: Restriction(prop, graph=_class.graph, someValuesFrom=_class)
)
only = Infix(
lambda prop, _class: Restriction(prop, graph=_class.graph, allValuesFrom=_class)
)
max = Infix(
lambda prop, _class: Restriction(prop, graph=prop.graph, maxCardinality=_class)
)
min = Infix(
lambda prop, _class: Restriction(prop, graph=prop.graph, minCardinality=_class)
)
exactly = Infix(
lambda prop, _class: Restriction(prop, graph=prop.graph, cardinality=_class)
)
value = Infix(lambda prop, _class: Restriction(prop, graph=prop.graph, value=_class))
PropertyAbstractSyntax = """
%s( %s { %s }
%s
{ 'super(' datavaluedPropertyID ')'} ['Functional']
{ domain( %s ) } { range( %s ) } )"""
[docs]class Property(AnnotatableTerms):
"""
axiom ::= 'DatatypeProperty(' datavaluedPropertyID ['Deprecated']
{ annotation }
{ 'super(' datavaluedPropertyID ')'} ['Functional']
{ 'domain(' description ')' } { 'range(' dataRange ')' } ')'
| 'ObjectProperty(' individualvaluedPropertyID ['Deprecated']
{ annotation }
{ 'super(' individualvaluedPropertyID ')' }
[ 'inverseOf(' individualvaluedPropertyID ')' ] [ 'Symmetric' ]
[ 'Functional' | 'InverseFunctional' |
'Functional' 'InverseFunctional' |
'Transitive' ]
{ 'domain(' description ')' } { 'range(' description ')' } ')
"""
[docs] def setupVerbAnnotations(self, verb_annotations): # noqa: N802
if isinstance(verb_annotations, tuple):
tv_sgprop, tv_plprop, tv_vbg = verb_annotations
else:
tv_sgprop = verb_annotations
tv_plprop = verb_annotations
tv_vbg = verb_annotations
if tv_sgprop:
self.tv_sgprop.extent = [
(self.identifier, self.handleAnnotation(tv_sgprop))
]
if tv_plprop:
self.tv_plprop.extent = [
(self.identifier, self.handleAnnotation(tv_plprop))
]
if tv_vbg:
self.tv_vbgprop.extent = [(self.identifier, self.handleAnnotation(tv_vbg))]
[docs] def __init__(
self,
identifier=None,
graph=None,
baseType=OWL.ObjectProperty, # noqa: N803
subPropertyOf=None,
domain=None,
range=None,
inverseOf=None,
otherType=None,
equivalentProperty=None,
comment=None,
verbAnnotations=None,
nameAnnotation=None,
nameIsLabel=False,
):
super(Property, self).__init__(identifier, graph, nameAnnotation, nameIsLabel)
if verbAnnotations:
self.setupVerbAnnotations(verbAnnotations)
assert not isinstance(self.identifier, BNode)
if baseType is None:
# None give, determine via introspection
self._baseType = first(Individual(self.identifier, graph=self.graph).type)
else:
if (self.identifier, RDF.type, baseType) not in self.graph:
self.graph.add((self.identifier, RDF.type, baseType))
self._baseType = baseType
self.subPropertyOf = subPropertyOf
self.inverseOf = inverseOf
self.domain = domain
self.range = range
self.comment = [] if comment is None else comment
[docs] def serialize(self, graph):
for fact in self.graph.triples((self.identifier, None, None)):
graph.add(fact)
for p in itertools.chain(self.subPropertyOf, self.inverseOf):
p.serialize(graph)
for c in itertools.chain(self.domain, self.range):
CastClass(c, self.graph).serialize(graph)
def _get_extent(self, graph=None):
for triple in (graph is None and self.graph or graph).triples(
(None, self.identifier, None)
):
yield triple
def _set_extent(self, other):
if not other:
return
for subj, obj in other:
self.graph.add((subj, self.identifier, obj))
extent = property(_get_extent, _set_extent)
[docs] def __repr__(self):
rt = []
if OWL.ObjectProperty in self.type:
rt.append(
"ObjectProperty( %s annotation(%s)"
% (self.qname, first(self.comment) and first(self.comment) or "")
)
if first(self.inverseOf):
two_link_inverse = first(first(self.inverseOf).inverseOf)
if two_link_inverse and two_link_inverse.identifier == self.identifier:
inverserepr = first(self.inverseOf).qname
else:
inverserepr = repr(first(self.inverseOf))
rt.append(
" inverseOf( %s )%s"
% (
inverserepr,
OWL.SymmetricProperty in self.type and " Symmetric" or "",
)
)
for s, p, roletype in self.graph.triples_choices(
(
self.identifier,
RDF.type,
[
OWL.FunctionalProperty,
OWL.InverseFunctionalProperty,
OWL.TransitiveProperty,
],
)
):
rt.append(str(roletype.split(OWL)[-1]))
else:
rt.append(
"DatatypeProperty( %s %s"
% (self.qname, first(self.comment) and first(self.comment) or "")
)
for s, p, roletype in self.graph.triples(
(self.identifier, RDF.type, OWL.FunctionalProperty)
):
rt.append(" Functional")
def canonicalName(term, g): # noqa: N802
normalized_name = classOrIdentifier(term)
if isinstance(normalized_name, BNode):
return term
elif normalized_name.startswith(XSD):
return str(term)
elif first(
g.triples_choices(
(normalized_name, [OWL.unionOf, OWL.intersectionOf], None)
)
):
return repr(term)
else:
return str(term.qname)
rt.append(
" ".join(
[
" super( %s )" % canonicalName(super_property, self.graph)
for super_property in self.subPropertyOf
]
)
)
rt.append(
" ".join(
[
" domain( %s )" % canonicalName(domain, self.graph)
for domain in self.domain
]
)
)
rt.append(
" ".join(
[
" range( %s )" % canonicalName(range, self.graph)
for range in self.range
]
)
)
rt = "\n".join([expr for expr in rt if expr])
rt += "\n)"
return rt
def _get_subpropertyof(self):
for anc in self.graph.objects(
subject=self.identifier, predicate=RDFS.subPropertyOf
):
yield Property(anc, graph=self.graph, baseType=None)
def _set_subpropertyof(self, other):
if not other:
return
for subproperty in other:
self.graph.add(
(self.identifier, RDFS.subPropertyOf, classOrIdentifier(subproperty))
)
@TermDeletionHelper(RDFS.subPropertyOf)
def _del_subpropertyof(self):
pass # pragma: no cover
subPropertyOf = property( # noqa: N815
_get_subpropertyof, _set_subpropertyof, _del_subpropertyof
)
def _get_inverseof(self):
for anc in self.graph.objects(subject=self.identifier, predicate=OWL.inverseOf):
yield Property(anc, graph=self.graph, baseType=None)
def _set_inverseof(self, other):
if not other:
return
self.graph.add((self.identifier, OWL.inverseOf, classOrIdentifier(other)))
@TermDeletionHelper(OWL.inverseOf)
def _del_inverseof(self):
pass # pragma: no cover
inverseOf = property(_get_inverseof, _set_inverseof, _del_inverseof) # noqa: N815
def _get_domain(self):
for dom in self.graph.objects(subject=self.identifier, predicate=RDFS.domain):
yield Class(dom, graph=self.graph)
def _set_domain(self, other):
if not other:
return
if isinstance(other, (Individual, Identifier)):
self.graph.add((self.identifier, RDFS.domain, classOrIdentifier(other)))
else:
for dom in other:
self.graph.add((self.identifier, RDFS.domain, classOrIdentifier(dom)))
@TermDeletionHelper(RDFS.domain)
def _del_domain(self):
pass # pragma: no cover
domain = property(_get_domain, _set_domain, _del_domain)
def _get_range(self):
for ran in self.graph.objects(subject=self.identifier, predicate=RDFS.range):
yield Class(ran, graph=self.graph)
def _set_range(self, ranges):
if not ranges:
return
if isinstance(ranges, (Individual, Identifier)):
self.graph.add((self.identifier, RDFS.range, classOrIdentifier(ranges)))
else:
for range in ranges:
self.graph.add((self.identifier, RDFS.range, classOrIdentifier(range)))
@TermDeletionHelper(RDFS.range)
def _del_range(self):
pass # pragma: no cover
range = property(_get_range, _set_range, _del_range)
[docs] def replace(self, other):
# extension = []
for s, p, o in self.extent:
self.graph.add((s, propertyOrIdentifier(other), o))
self.graph.remove((None, self.identifier, None))
[docs]def CommonNSBindings(graph, additionalNS={}): # noqa: N802, N803
"""
Takes a graph and binds the common namespaces (rdf,rdfs, & owl)
"""
namespace_manager = NamespaceManager(graph)
namespace_manager.bind("rdfs", RDFS)
namespace_manager.bind("rdf", RDF)
namespace_manager.bind("owl", OWL)
for prefix, uri in list(additionalNS.items()):
namespace_manager.bind(prefix, uri, override=False)
graph.namespace_manager = namespace_manager