"""
Turtle RDF graph serializer for RDFLib.
See <http://www.w3.org/TeamSubmission/turtle/> for syntax specification.
"""
from collections import defaultdict
from functools import cmp_to_key
from rdflib.exceptions import Error
from rdflib.namespace import RDF, RDFS
from rdflib.serializer import Serializer
from rdflib.term import BNode, Literal, URIRef
__all__ = ["RecursiveSerializer", "TurtleSerializer"]
def _object_comparator(a, b):
"""
for nice clean output we sort the objects of triples,
some of them are literals,
these are sorted according to the sort order of the underlying python objects
in py3 not all things are comparable.
This falls back on comparing string representations when not.
"""
try:
if a > b:
return 1
if a < b:
return -1
return 0
except TypeError:
a = str(a)
b = str(b)
return (a > b) - (a < b)
[docs]class RecursiveSerializer(Serializer):
topClasses = [RDFS.Class]
predicateOrder = [RDF.type, RDFS.label]
maxDepth = 10
indentString = " "
roundtrip_prefixes = ()
[docs] def __init__(self, store):
super(RecursiveSerializer, self).__init__(store)
self.stream = None
self.reset()
[docs] def addNamespace(self, prefix, uri):
if prefix in self.namespaces and self.namespaces[prefix] != uri:
raise Exception(
"Trying to override namespace prefix %s => %s, but it's already bound to %s"
% (prefix, uri, self.namespaces[prefix])
)
self.namespaces[prefix] = uri
[docs] def checkSubject(self, subject):
"""Check to see if the subject should be serialized yet"""
if (
(self.isDone(subject))
or (subject not in self._subjects)
or ((subject in self._topLevels) and (self.depth > 1))
or (isinstance(subject, URIRef) and (self.depth >= self.maxDepth))
):
return False
return True
[docs] def isDone(self, subject):
"""Return true if subject is serialized"""
return subject in self._serialized
[docs] def orderSubjects(self):
seen = {}
subjects = []
for classURI in self.topClasses:
members = list(self.store.subjects(RDF.type, classURI))
members.sort()
subjects.extend(members)
for member in members:
self._topLevels[member] = True
seen[member] = True
recursable = [
(isinstance(subject, BNode), self._references[subject], subject)
for subject in self._subjects
if subject not in seen
]
recursable.sort()
subjects.extend([subject for (isbnode, refs, subject) in recursable])
return subjects
[docs] def preprocess(self):
for triple in self.store.triples((None, None, None)):
self.preprocessTriple(triple)
[docs] def preprocessTriple(self, spo):
s, p, o = spo
self._references[o] += 1
self._subjects[s] = True
[docs] def reset(self):
self.depth = 0
self.lists = {}
self.namespaces = {}
self._references = defaultdict(int)
self._serialized = {}
self._subjects = {}
self._topLevels = {}
if self.roundtrip_prefixes:
if hasattr(self.roundtrip_prefixes, "__iter__"):
for prefix, ns in self.store.namespaces():
if prefix in self.roundtrip_prefixes:
self.addNamespace(prefix, ns)
else:
for prefix, ns in self.store.namespaces():
self.addNamespace(prefix, ns)
[docs] def buildPredicateHash(self, subject):
"""
Build a hash key by predicate to a list of objects for the given
subject
"""
properties = {}
for s, p, o in self.store.triples((subject, None, None)):
oList = properties.get(p, [])
oList.append(o)
properties[p] = oList
return properties
[docs] def sortProperties(self, properties):
"""Take a hash from predicate uris to lists of values.
Sort the lists of values. Return a sorted list of properties."""
# Sort object lists
for prop, objects in properties.items():
objects.sort(key=cmp_to_key(_object_comparator))
# Make sorted list of properties
propList = []
seen = {}
for prop in self.predicateOrder:
if (prop in properties) and (prop not in seen):
propList.append(prop)
seen[prop] = True
props = list(properties.keys())
props.sort()
for prop in props:
if prop not in seen:
propList.append(prop)
seen[prop] = True
return propList
[docs] def subjectDone(self, subject):
"""Mark a subject as done."""
self._serialized[subject] = True
[docs] def indent(self, modifier=0):
"""Returns indent string multiplied by the depth"""
return (self.depth + modifier) * self.indentString
[docs] def write(self, text):
"""Write text in given encoding."""
self.stream.write(text.encode(self.encoding, "replace"))
SUBJECT = 0
VERB = 1
OBJECT = 2
_GEN_QNAME_FOR_DT = False
_SPACIOUS_OUTPUT = False
[docs]class TurtleSerializer(RecursiveSerializer):
short_name = "turtle"
indentString = " "
[docs] def __init__(self, store):
self._ns_rewrite = {}
super(TurtleSerializer, self).__init__(store)
self.keywords = {RDF.type: "a"}
self.reset()
self.stream = None
self._spacious = _SPACIOUS_OUTPUT
[docs] def addNamespace(self, prefix, namespace):
# Turtle does not support prefix that start with _
# if they occur in the graph, rewrite to p_blah
# this is more complicated since we need to make sure p_blah
# does not already exist. And we register namespaces as we go, i.e.
# we may first see a triple with prefix _9 - rewrite it to p_9
# and then later find a triple with a "real" p_9 prefix
# so we need to keep track of ns rewrites we made so far.
if (prefix > "" and prefix[0] == "_") or self.namespaces.get(
prefix, namespace
) != namespace:
if prefix not in self._ns_rewrite:
p = "p" + prefix
while p in self.namespaces:
p = "p" + p
self._ns_rewrite[prefix] = p
prefix = self._ns_rewrite.get(prefix, prefix)
super(TurtleSerializer, self).addNamespace(prefix, namespace)
return prefix
[docs] def reset(self):
super(TurtleSerializer, self).reset()
self._shortNames = {}
self._started = False
self._ns_rewrite = {}
[docs] def serialize(self, stream, base=None, encoding=None, spacious=None, **args):
self.reset()
self.stream = stream
# if base is given here, use that, if not and a base is set for the graph use that
if base is not None:
self.base = base
elif self.store.base is not None:
self.base = self.store.base
if spacious is not None:
self._spacious = spacious
self.preprocess()
subjects_list = self.orderSubjects()
self.startDocument()
firstTime = True
for subject in subjects_list:
if self.isDone(subject):
continue
if firstTime:
firstTime = False
if self.statement(subject) and not firstTime:
self.write("\n")
self.endDocument()
stream.write("\n".encode("latin-1"))
self.base = None
[docs] def preprocessTriple(self, triple):
super(TurtleSerializer, self).preprocessTriple(triple)
for i, node in enumerate(triple):
if i == VERB and node in self.keywords:
# predicate is a keyword
continue
# Don't use generated prefixes for subjects and objects
self.getQName(node, gen_prefix=(i == VERB))
if isinstance(node, Literal) and node.datatype:
self.getQName(node.datatype, gen_prefix=_GEN_QNAME_FOR_DT)
p = triple[1]
if isinstance(p, BNode): # hmm - when is P ever a bnode?
self._references[p] += 1
# TODO: Rename to get_pname
[docs] def getQName(self, uri, gen_prefix=True):
if not isinstance(uri, URIRef):
return None
parts = None
try:
parts = self.store.compute_qname(uri, generate=gen_prefix)
except Exception:
# is the uri a namespace in itself?
pfx = self.store.store.prefix(uri)
if pfx is not None:
parts = (pfx, uri, "")
else:
# nothing worked
return None
prefix, namespace, local = parts
local = local.replace(r"(", r"\(").replace(r")", r"\)")
# QName cannot end with .
if local.endswith("."):
return None
prefix = self.addNamespace(prefix, namespace)
return "%s:%s" % (prefix, local)
[docs] def startDocument(self):
self._started = True
ns_list = sorted(self.namespaces.items())
if self.base:
self.write(self.indent() + "@base <%s> .\n" % self.base)
for prefix, uri in ns_list:
self.write(self.indent() + "@prefix %s: <%s> .\n" % (prefix, uri))
if ns_list and self._spacious:
self.write("\n")
[docs] def endDocument(self):
if self._spacious:
self.write("\n")
[docs] def statement(self, subject):
self.subjectDone(subject)
return self.s_squared(subject) or self.s_default(subject)
[docs] def s_default(self, subject):
self.write("\n" + self.indent())
self.path(subject, SUBJECT)
self.predicateList(subject)
self.write(" .")
return True
[docs] def s_squared(self, subject):
if (self._references[subject] > 0) or not isinstance(subject, BNode):
return False
self.write("\n" + self.indent() + "[]")
self.predicateList(subject)
self.write(" .")
return True
[docs] def path(self, node, position, newline=False):
if not (
self.p_squared(node, position, newline)
or self.p_default(node, position, newline)
):
raise Error("Cannot serialize node '%s'" % (node,))
[docs] def p_default(self, node, position, newline=False):
if position != SUBJECT and not newline:
self.write(" ")
self.write(self.label(node, position))
return True
[docs] def label(self, node, position):
if node == RDF.nil:
return "()"
if position is VERB and node in self.keywords:
return self.keywords[node]
if isinstance(node, Literal):
return node._literal_n3(
use_plain=True,
qname_callback=lambda dt: self.getQName(dt, _GEN_QNAME_FOR_DT),
)
else:
node = self.relativize(node)
return self.getQName(node, position == VERB) or node.n3()
[docs] def p_squared(self, node, position, newline=False):
if (
not isinstance(node, BNode)
or node in self._serialized
or self._references[node] > 1
or position == SUBJECT
):
return False
if not newline:
self.write(" ")
if self.isValidList(node):
# this is a list
self.write("(")
self.depth += 1 # 2
self.doList(node)
self.depth -= 1 # 2
self.write(" )")
else:
self.subjectDone(node)
self.depth += 2
# self.write('[\n' + self.indent())
self.write("[")
self.depth -= 1
# self.predicateList(node, newline=True)
self.predicateList(node, newline=False)
# self.write('\n' + self.indent() + ']')
self.write(" ]")
self.depth -= 1
return True
[docs] def isValidList(self, l_):
"""
Checks if l is a valid RDF list, i.e. no nodes have other properties.
"""
try:
if self.store.value(l_, RDF.first) is None:
return False
except Exception:
return False
while l_:
if l_ != RDF.nil and len(list(self.store.predicate_objects(l_))) != 2:
return False
l_ = self.store.value(l_, RDF.rest)
return True
[docs] def doList(self, l_):
while l_:
item = self.store.value(l_, RDF.first)
if item is not None:
self.path(item, OBJECT)
self.subjectDone(l_)
l_ = self.store.value(l_, RDF.rest)
[docs] def predicateList(self, subject, newline=False):
properties = self.buildPredicateHash(subject)
propList = self.sortProperties(properties)
if len(propList) == 0:
return
self.verb(propList[0], newline=newline)
self.objectList(properties[propList[0]])
for predicate in propList[1:]:
self.write(" ;\n" + self.indent(1))
self.verb(predicate, newline=True)
self.objectList(properties[predicate])
[docs] def verb(self, node, newline=False):
self.path(node, VERB, newline)
[docs] def objectList(self, objects):
count = len(objects)
if count == 0:
return
depthmod = (count == 1) and 0 or 1
self.depth += depthmod
self.path(objects[0], OBJECT)
for obj in objects[1:]:
self.write(",\n" + self.indent(1))
self.path(obj, OBJECT, newline=True)
self.depth -= depthmod