"""
LongTurtle RDF graph serializer for RDFLib.
See <http://www.w3.org/TeamSubmission/turtle/> for syntax specification.
This variant, longturtle as opposed to just turtle, makes some small format changes
to turtle - the original turtle serializer. It:
* uses PREFIX instead of @prefix
* uses BASE instead of @base
* adds a new line at RDF.type, or 'a'
* adds a newline and an indent for all triples with more than one object (object list)
* adds a new line and ';' for the last triple in a set with '.'
on the start of the next line
* uses default encoding (encode()) is used instead of "latin-1"
- Nicholas Car, 2023
"""
from rdflib.exceptions import Error
from rdflib.namespace import RDF
from rdflib.term import BNode, Literal, URIRef
from .turtle import RecursiveSerializer
__all__ = ["LongTurtleSerializer"]
SUBJECT = 0
VERB = 1
OBJECT = 2
_GEN_QNAME_FOR_DT = False
_SPACIOUS_OUTPUT = False
[docs]class LongTurtleSerializer(RecursiveSerializer):
short_name = "longturtle"
indentString = " "
[docs] def __init__(self, store):
self._ns_rewrite = {}
super(LongTurtleSerializer, self).__init__(store)
self.keywords = {RDF.type: "a"}
self.reset()
self.stream = None
self._spacious = _SPACIOUS_OUTPUT
[docs] def addNamespace(self, prefix, namespace):
# Turtle does not support prefixes that start with _
# if they occur in the graph, rewrite to p_blah
# this is more complicated since we need to make sure p_blah
# does not already exist. And we register namespaces as we go, i.e.
# we may first see a triple with prefix _9 - rewrite it to p_9
# and then later find a triple with a "real" p_9 prefix
# so we need to keep track of ns rewrites we made so far.
if (prefix > "" and prefix[0] == "_") or self.namespaces.get(
prefix, namespace
) != namespace:
if prefix not in self._ns_rewrite:
p = "p" + prefix
while p in self.namespaces:
p = "p" + p
self._ns_rewrite[prefix] = p
prefix = self._ns_rewrite.get(prefix, prefix)
super(LongTurtleSerializer, self).addNamespace(prefix, namespace)
return prefix
[docs] def reset(self):
super(LongTurtleSerializer, self).reset()
self._shortNames = {}
self._started = False
self._ns_rewrite = {}
[docs] def serialize(self, stream, base=None, encoding=None, spacious=None, **args):
self.reset()
self.stream = stream
# if base is given here, use, if not and a base is set for the graph use that
if base is not None:
self.base = base
elif self.store.base is not None:
self.base = self.store.base
if spacious is not None:
self._spacious = spacious
self.preprocess()
subjects_list = self.orderSubjects()
self.startDocument()
firstTime = True
for subject in subjects_list:
if self.isDone(subject):
continue
if firstTime:
firstTime = False
if self.statement(subject) and not firstTime:
self.write("\n")
self.endDocument()
self.base = None
[docs] def preprocessTriple(self, triple):
super(LongTurtleSerializer, self).preprocessTriple(triple)
for i, node in enumerate(triple):
if node in self.keywords:
continue
# Don't use generated prefixes for subjects and objects
self.getQName(node, gen_prefix=(i == VERB))
if isinstance(node, Literal) and node.datatype:
self.getQName(node.datatype, gen_prefix=_GEN_QNAME_FOR_DT)
p = triple[1]
if isinstance(p, BNode): # hmm - when is P ever a bnode?
self._references[p] += 1
[docs] def getQName(self, uri, gen_prefix=True):
if not isinstance(uri, URIRef):
return None
try:
parts = self.store.compute_qname(uri, generate=gen_prefix)
except Exception:
# is the uri a namespace in itself?
pfx = self.store.store.prefix(uri)
if pfx is not None:
parts = (pfx, uri, "")
else:
# nothing worked
return None
prefix, namespace, local = parts
# QName cannot end with .
if local.endswith("."):
return None
prefix = self.addNamespace(prefix, namespace)
return "%s:%s" % (prefix, local)
[docs] def startDocument(self):
self._started = True
ns_list = sorted(self.namespaces.items())
if self.base:
self.write(self.indent() + "BASE <%s>\n" % self.base)
for prefix, uri in ns_list:
self.write(self.indent() + "PREFIX %s: <%s>\n" % (prefix, uri))
if ns_list and self._spacious:
self.write("\n")
[docs] def endDocument(self):
if self._spacious:
self.write("\n")
[docs] def statement(self, subject):
self.subjectDone(subject)
return self.s_squared(subject) or self.s_default(subject)
[docs] def s_default(self, subject):
self.write("\n" + self.indent())
self.path(subject, SUBJECT)
self.write("\n" + self.indent())
self.predicateList(subject)
self.write("\n.")
return True
[docs] def s_squared(self, subject):
if (self._references[subject] > 0) or not isinstance(subject, BNode):
return False
self.write("\n" + self.indent() + "[]")
self.predicateList(subject, newline=False)
self.write(" ;\n.")
return True
[docs] def path(self, node, position, newline=False):
if not (
self.p_squared(node, position) or self.p_default(node, position, newline)
):
raise Error("Cannot serialize node '%s'" % (node,))
[docs] def p_default(self, node, position, newline=False):
if position != SUBJECT and not newline:
self.write(" ")
self.write(self.label(node, position))
return True
[docs] def label(self, node, position):
if node == RDF.nil:
return "()"
if position is VERB and node in self.keywords:
return self.keywords[node]
if isinstance(node, Literal):
return node._literal_n3(
use_plain=True,
qname_callback=lambda dt: self.getQName(dt, _GEN_QNAME_FOR_DT),
)
else:
node = self.relativize(node)
return self.getQName(node, position == VERB) or node.n3()
[docs] def p_squared(
self,
node,
position,
):
if (
not isinstance(node, BNode)
or node in self._serialized
or self._references[node] > 1
or position == SUBJECT
):
return False
if self.isValidList(node):
# this is a list
self.depth += 2
self.write(" (\n")
self.depth -= 2
self.doList(node)
self.write("\n" + self.indent() + ")")
else:
# this is a Blank Node
self.subjectDone(node)
self.write("\n" + self.indent(1) + "[\n")
self.depth += 1
self.predicateList(node)
self.depth -= 1
self.write("\n" + self.indent(1) + "]")
return True
[docs] def isValidList(self, l_):
"""
Checks if l is a valid RDF list, i.e. no nodes have other properties.
"""
try:
if self.store.value(l_, RDF.first) is None:
return False
except Exception:
return False
while l_:
if l_ != RDF.nil and len(list(self.store.predicate_objects(l_))) != 2:
return False
l_ = self.store.value(l_, RDF.rest)
return True
[docs] def doList(self, l_):
i = 0
while l_:
item = self.store.value(l_, RDF.first)
if item is not None:
if i == 0:
self.write(self.indent(1))
else:
self.write("\n" + self.indent(1))
self.path(item, OBJECT, newline=True)
self.subjectDone(l_)
l_ = self.store.value(l_, RDF.rest)
i += 1
[docs] def predicateList(self, subject, newline=False):
properties = self.buildPredicateHash(subject)
propList = self.sortProperties(properties)
if len(propList) == 0:
return
self.write(self.indent(1))
self.verb(propList[0], newline=True)
self.objectList(properties[propList[0]])
for predicate in propList[1:]:
self.write(" ;\n" + self.indent(1))
self.verb(predicate, newline=True)
self.objectList(properties[predicate])
self.write(" ;")
[docs] def verb(self, node, newline=False):
self.path(node, VERB, newline)
[docs] def objectList(self, objects):
count = len(objects)
if count == 0:
return
depthmod = (count == 1) and 0 or 1
self.depth += depthmod
first_nl = False
if count > 1:
if not isinstance(objects[0], BNode):
self.write("\n" + self.indent(1))
first_nl = True
self.path(objects[0], OBJECT, newline=first_nl)
for obj in objects[1:]:
self.write(" ,")
if not isinstance(obj, BNode):
self.write("\n" + self.indent(1))
self.path(obj, OBJECT, newline=True)
self.depth -= depthmod