Source code for rdflib.plugins.serializers.longturtle

"""
LongTurtle RDF graph serializer for RDFLib.
See <http://www.w3.org/TeamSubmission/turtle/> for syntax specification.

This variant, longturtle as opposed to just turtle, makes some small format changes
to turtle - the original turtle serializer. It:

* uses PREFIX instead of @prefix
* uses BASE instead of @base
* adds a new line at RDF.type, or 'a'
* adds a newline and an indent for all triples with more than one object (object list)
* adds a new line and ';' for the last triple in a set with '.'
    on the start of the next line
* uses default encoding (encode()) is used instead of "latin-1"

- Nicholas Car, 2023
"""

from rdflib.exceptions import Error
from rdflib.namespace import RDF
from rdflib.term import BNode, Literal, URIRef

from .turtle import RecursiveSerializer

__all__ = ["LongTurtleSerializer"]

SUBJECT = 0
VERB = 1
OBJECT = 2

_GEN_QNAME_FOR_DT = False
_SPACIOUS_OUTPUT = False


[docs]class LongTurtleSerializer(RecursiveSerializer): short_name = "longturtle" indentString = " "
[docs] def __init__(self, store): self._ns_rewrite = {} super(LongTurtleSerializer, self).__init__(store) self.keywords = {RDF.type: "a"} self.reset() self.stream = None self._spacious = _SPACIOUS_OUTPUT
[docs] def addNamespace(self, prefix, namespace): # Turtle does not support prefixes that start with _ # if they occur in the graph, rewrite to p_blah # this is more complicated since we need to make sure p_blah # does not already exist. And we register namespaces as we go, i.e. # we may first see a triple with prefix _9 - rewrite it to p_9 # and then later find a triple with a "real" p_9 prefix # so we need to keep track of ns rewrites we made so far. if (prefix > "" and prefix[0] == "_") or self.namespaces.get( prefix, namespace ) != namespace: if prefix not in self._ns_rewrite: p = "p" + prefix while p in self.namespaces: p = "p" + p self._ns_rewrite[prefix] = p prefix = self._ns_rewrite.get(prefix, prefix) super(LongTurtleSerializer, self).addNamespace(prefix, namespace) return prefix
[docs] def reset(self): super(LongTurtleSerializer, self).reset() self._shortNames = {} self._started = False self._ns_rewrite = {}
[docs] def serialize(self, stream, base=None, encoding=None, spacious=None, **args): self.reset() self.stream = stream # if base is given here, use, if not and a base is set for the graph use that if base is not None: self.base = base elif self.store.base is not None: self.base = self.store.base if spacious is not None: self._spacious = spacious self.preprocess() subjects_list = self.orderSubjects() self.startDocument() firstTime = True for subject in subjects_list: if self.isDone(subject): continue if firstTime: firstTime = False if self.statement(subject) and not firstTime: self.write("\n") self.endDocument() self.base = None
[docs] def preprocessTriple(self, triple): super(LongTurtleSerializer, self).preprocessTriple(triple) for i, node in enumerate(triple): if node in self.keywords: continue # Don't use generated prefixes for subjects and objects self.getQName(node, gen_prefix=(i == VERB)) if isinstance(node, Literal) and node.datatype: self.getQName(node.datatype, gen_prefix=_GEN_QNAME_FOR_DT) p = triple[1] if isinstance(p, BNode): # hmm - when is P ever a bnode? self._references[p] += 1
[docs] def getQName(self, uri, gen_prefix=True): if not isinstance(uri, URIRef): return None try: parts = self.store.compute_qname(uri, generate=gen_prefix) except Exception: # is the uri a namespace in itself? pfx = self.store.store.prefix(uri) if pfx is not None: parts = (pfx, uri, "") else: # nothing worked return None prefix, namespace, local = parts # QName cannot end with . if local.endswith("."): return None prefix = self.addNamespace(prefix, namespace) return "%s:%s" % (prefix, local)
[docs] def startDocument(self): self._started = True ns_list = sorted(self.namespaces.items()) if self.base: self.write(self.indent() + "BASE <%s>\n" % self.base) for prefix, uri in ns_list: self.write(self.indent() + "PREFIX %s: <%s>\n" % (prefix, uri)) if ns_list and self._spacious: self.write("\n")
[docs] def endDocument(self): if self._spacious: self.write("\n")
[docs] def statement(self, subject): self.subjectDone(subject) return self.s_squared(subject) or self.s_default(subject)
[docs] def s_default(self, subject): self.write("\n" + self.indent()) self.path(subject, SUBJECT) self.write("\n" + self.indent()) self.predicateList(subject) self.write("\n.") return True
[docs] def s_squared(self, subject): if (self._references[subject] > 0) or not isinstance(subject, BNode): return False self.write("\n" + self.indent() + "[]") self.predicateList(subject, newline=False) self.write(" ;\n.") return True
[docs] def path(self, node, position, newline=False): if not ( self.p_squared(node, position) or self.p_default(node, position, newline) ): raise Error("Cannot serialize node '%s'" % (node,))
[docs] def p_default(self, node, position, newline=False): if position != SUBJECT and not newline: self.write(" ") self.write(self.label(node, position)) return True
[docs] def label(self, node, position): if node == RDF.nil: return "()" if position is VERB and node in self.keywords: return self.keywords[node] if isinstance(node, Literal): return node._literal_n3( use_plain=True, qname_callback=lambda dt: self.getQName(dt, _GEN_QNAME_FOR_DT), ) else: node = self.relativize(node) return self.getQName(node, position == VERB) or node.n3()
[docs] def p_squared( self, node, position, ): if ( not isinstance(node, BNode) or node in self._serialized or self._references[node] > 1 or position == SUBJECT ): return False if self.isValidList(node): # this is a list self.depth += 2 self.write(" (\n") self.depth -= 2 self.doList(node) self.write("\n" + self.indent() + ")") else: # this is a Blank Node self.subjectDone(node) self.write("\n" + self.indent(1) + "[\n") self.depth += 1 self.predicateList(node) self.depth -= 1 self.write("\n" + self.indent(1) + "]") return True
[docs] def isValidList(self, l_): """ Checks if l is a valid RDF list, i.e. no nodes have other properties. """ try: if self.store.value(l_, RDF.first) is None: return False except Exception: return False while l_: if l_ != RDF.nil and len(list(self.store.predicate_objects(l_))) != 2: return False l_ = self.store.value(l_, RDF.rest) return True
[docs] def doList(self, l_): i = 0 while l_: item = self.store.value(l_, RDF.first) if item is not None: if i == 0: self.write(self.indent(1)) else: self.write("\n" + self.indent(1)) self.path(item, OBJECT, newline=True) self.subjectDone(l_) l_ = self.store.value(l_, RDF.rest) i += 1
[docs] def predicateList(self, subject, newline=False): properties = self.buildPredicateHash(subject) propList = self.sortProperties(properties) if len(propList) == 0: return self.write(self.indent(1)) self.verb(propList[0], newline=True) self.objectList(properties[propList[0]]) for predicate in propList[1:]: self.write(" ;\n" + self.indent(1)) self.verb(predicate, newline=True) self.objectList(properties[predicate]) self.write(" ;")
[docs] def verb(self, node, newline=False): self.path(node, VERB, newline)
[docs] def objectList(self, objects): count = len(objects) if count == 0: return depthmod = (count == 1) and 0 or 1 self.depth += depthmod first_nl = False if count > 1: if not isinstance(objects[0], BNode): self.write("\n" + self.indent(1)) first_nl = True self.path(objects[0], OBJECT, newline=first_nl) for obj in objects[1:]: self.write(" ,") if not isinstance(obj, BNode): self.write("\n" + self.indent(1)) self.path(obj, OBJECT, newline=True) self.depth -= depthmod