Source code for rdflib.plugins.serializers.turtle

Turtle RDF graph serializer for RDFLib.
See <> for syntax specification.

from collections import defaultdict
from functools import cmp_to_key

from rdflib.exceptions import Error
from rdflib.namespace import RDF, RDFS
from rdflib.serializer import Serializer
from rdflib.term import BNode, Literal, URIRef

__all__ = ["RecursiveSerializer", "TurtleSerializer"]

def _object_comparator(a, b):
    for nice clean output we sort the objects of triples,
    some of them are literals,
    these are sorted according to the sort order of the underlying python objects
    in py3 not all things are comparable.
    This falls back on comparing string representations when not.

        if a > b:
            return 1
        if a < b:
            return -1
        return 0

    except TypeError:
        a = str(a)
        b = str(b)
        return (a > b) - (a < b)

[docs]class RecursiveSerializer(Serializer): topClasses = [RDFS.Class] predicateOrder = [RDF.type, RDFS.label] maxDepth = 10 indentString = " " roundtrip_prefixes = ()
[docs] def __init__(self, store): super(RecursiveSerializer, self).__init__(store) = None self.reset()
[docs] def addNamespace(self, prefix, uri): if prefix in self.namespaces and self.namespaces[prefix] != uri: raise Exception( "Trying to override namespace prefix %s => %s, but it's already bound to %s" % (prefix, uri, self.namespaces[prefix]) ) self.namespaces[prefix] = uri
[docs] def checkSubject(self, subject): """Check to see if the subject should be serialized yet""" if ( (self.isDone(subject)) or (subject not in self._subjects) or ((subject in self._topLevels) and (self.depth > 1)) or (isinstance(subject, URIRef) and (self.depth >= self.maxDepth)) ): return False return True
[docs] def isDone(self, subject): """Return true if subject is serialized""" return subject in self._serialized
[docs] def orderSubjects(self): seen = {} subjects = [] for classURI in self.topClasses: members = list(, classURI)) members.sort() subjects.extend(members) for member in members: self._topLevels[member] = True seen[member] = True recursable = [ (isinstance(subject, BNode), self._references[subject], subject) for subject in self._subjects if subject not in seen ] recursable.sort() subjects.extend([subject for (isbnode, refs, subject) in recursable]) return subjects
[docs] def preprocess(self): for triple in, None, None)): self.preprocessTriple(triple)
[docs] def preprocessTriple(self, spo): s, p, o = spo self._references[o] += 1 self._subjects[s] = True
[docs] def reset(self): self.depth = 0 self.lists = {} self.namespaces = {} self._references = defaultdict(int) self._serialized = {} self._subjects = {} self._topLevels = {} if self.roundtrip_prefixes: if hasattr(self.roundtrip_prefixes, "__iter__"): for prefix, ns in if prefix in self.roundtrip_prefixes: self.addNamespace(prefix, ns) else: for prefix, ns in self.addNamespace(prefix, ns)
[docs] def buildPredicateHash(self, subject): """ Build a hash key by predicate to a list of objects for the given subject """ properties = {} for s, p, o in, None, None)): oList = properties.get(p, []) oList.append(o) properties[p] = oList return properties
[docs] def sortProperties(self, properties): """Take a hash from predicate uris to lists of values. Sort the lists of values. Return a sorted list of properties.""" # Sort object lists for prop, objects in properties.items(): objects.sort(key=cmp_to_key(_object_comparator)) # Make sorted list of properties propList = [] seen = {} for prop in self.predicateOrder: if (prop in properties) and (prop not in seen): propList.append(prop) seen[prop] = True props = list(properties.keys()) props.sort() for prop in props: if prop not in seen: propList.append(prop) seen[prop] = True return propList
[docs] def subjectDone(self, subject): """Mark a subject as done.""" self._serialized[subject] = True
[docs] def indent(self, modifier=0): """Returns indent string multiplied by the depth""" return (self.depth + modifier) * self.indentString
[docs] def write(self, text): """Write text in given encoding.""", "replace"))
[docs]class TurtleSerializer(RecursiveSerializer): short_name = "turtle" indentString = " "
[docs] def __init__(self, store): self._ns_rewrite = {} super(TurtleSerializer, self).__init__(store) self.keywords = {RDF.type: "a"} self.reset() = None self._spacious = _SPACIOUS_OUTPUT
[docs] def addNamespace(self, prefix, namespace): # Turtle does not support prefix that start with _ # if they occur in the graph, rewrite to p_blah # this is more complicated since we need to make sure p_blah # does not already exist. And we register namespaces as we go, i.e. # we may first see a triple with prefix _9 - rewrite it to p_9 # and then later find a triple with a "real" p_9 prefix # so we need to keep track of ns rewrites we made so far. if (prefix > "" and prefix[0] == "_") or self.namespaces.get( prefix, namespace ) != namespace: if prefix not in self._ns_rewrite: p = "p" + prefix while p in self.namespaces: p = "p" + p self._ns_rewrite[prefix] = p prefix = self._ns_rewrite.get(prefix, prefix) super(TurtleSerializer, self).addNamespace(prefix, namespace) return prefix
[docs] def reset(self): super(TurtleSerializer, self).reset() self._shortNames = {} self._started = False self._ns_rewrite = {}
[docs] def serialize(self, stream, base=None, encoding=None, spacious=None, **args): self.reset() = stream # if base is given here, use that, if not and a base is set for the graph use that if base is not None: self.base = base elif is not None: self.base = if spacious is not None: self._spacious = spacious self.preprocess() subjects_list = self.orderSubjects() self.startDocument() firstTime = True for subject in subjects_list: if self.isDone(subject): continue if firstTime: firstTime = False if self.statement(subject) and not firstTime: self.write("\n") self.endDocument() stream.write("\n".encode("latin-1")) self.base = None
[docs] def preprocessTriple(self, triple): super(TurtleSerializer, self).preprocessTriple(triple) for i, node in enumerate(triple): if i == VERB and node in self.keywords: # predicate is a keyword continue # Don't use generated prefixes for subjects and objects self.getQName(node, gen_prefix=(i == VERB)) if isinstance(node, Literal) and node.datatype: self.getQName(node.datatype, gen_prefix=_GEN_QNAME_FOR_DT) p = triple[1] if isinstance(p, BNode): # hmm - when is P ever a bnode? self._references[p] += 1
# TODO: Rename to get_pname
[docs] def getQName(self, uri, gen_prefix=True): if not isinstance(uri, URIRef): return None parts = None try: parts =, generate=gen_prefix) except: # is the uri a namespace in itself? pfx = if pfx is not None: parts = (pfx, uri, "") else: # nothing worked return None prefix, namespace, local = parts local = local.replace(r"(", r"\(").replace(r")", r"\)") # QName cannot end with . if local.endswith("."): return None prefix = self.addNamespace(prefix, namespace) return "%s:%s" % (prefix, local)
[docs] def startDocument(self): self._started = True ns_list = sorted(self.namespaces.items()) if self.base: self.write(self.indent() + "@base <%s> .\n" % self.base) for prefix, uri in ns_list: self.write(self.indent() + "@prefix %s: <%s> .\n" % (prefix, uri)) if ns_list and self._spacious: self.write("\n")
[docs] def endDocument(self): if self._spacious: self.write("\n")
[docs] def statement(self, subject): self.subjectDone(subject) return self.s_squared(subject) or self.s_default(subject)
[docs] def s_default(self, subject): self.write("\n" + self.indent()) self.path(subject, SUBJECT) self.predicateList(subject) self.write(" .") return True
[docs] def s_squared(self, subject): if (self._references[subject] > 0) or not isinstance(subject, BNode): return False self.write("\n" + self.indent() + "[]") self.predicateList(subject) self.write(" .") return True
[docs] def path(self, node, position, newline=False): if not ( self.p_squared(node, position, newline) or self.p_default(node, position, newline) ): raise Error("Cannot serialize node '%s'" % (node,))
[docs] def p_default(self, node, position, newline=False): if position != SUBJECT and not newline: self.write(" ") self.write(self.label(node, position)) return True
[docs] def label(self, node, position): if node == RDF.nil: return "()" if position is VERB and node in self.keywords: return self.keywords[node] if isinstance(node, Literal): return node._literal_n3( use_plain=True, qname_callback=lambda dt: self.getQName(dt, _GEN_QNAME_FOR_DT), ) else: node = self.relativize(node) return self.getQName(node, position == VERB) or node.n3()
[docs] def p_squared(self, node, position, newline=False): if ( not isinstance(node, BNode) or node in self._serialized or self._references[node] > 1 or position == SUBJECT ): return False if not newline: self.write(" ") if self.isValidList(node): # this is a list self.write("(") self.depth += 1 # 2 self.doList(node) self.depth -= 1 # 2 self.write(" )") else: self.subjectDone(node) self.depth += 2 # self.write('[\n' + self.indent()) self.write("[") self.depth -= 1 # self.predicateList(node, newline=True) self.predicateList(node, newline=False) # self.write('\n' + self.indent() + ']') self.write(" ]") self.depth -= 1 return True
[docs] def isValidList(self, l_): """ Checks if l is a valid RDF list, i.e. no nodes have other properties. """ try: if, RDF.first) is None: return False except: return False while l_: if l_ != RDF.nil and len(list( != 2: return False l_ =, return True
[docs] def doList(self, l_): while l_: item =, RDF.first) if item is not None: self.path(item, OBJECT) self.subjectDone(l_) l_ =,
[docs] def predicateList(self, subject, newline=False): properties = self.buildPredicateHash(subject) propList = self.sortProperties(properties) if len(propList) == 0: return self.verb(propList[0], newline=newline) self.objectList(properties[propList[0]]) for predicate in propList[1:]: self.write(" ;\n" + self.indent(1)) self.verb(predicate, newline=True) self.objectList(properties[predicate])
[docs] def verb(self, node, newline=False): self.path(node, VERB, newline)
[docs] def objectList(self, objects): count = len(objects) if count == 0: return depthmod = (count == 1) and 0 or 1 self.depth += depthmod self.path(objects[0], OBJECT) for obj in objects[1:]: self.write(",\n" + self.indent(1)) self.path(obj, OBJECT, newline=True) self.depth -= depthmod