Source code for rdflib.plugins.serializers.turtle

"""
Turtle RDF graph serializer for RDFLib.
See <http://www.w3.org/TeamSubmission/turtle/> for syntax specification.
"""

from __future__ import annotations

from collections import defaultdict
from typing import (
    IO,
    TYPE_CHECKING,
    Any,
    DefaultDict,
    Dict,
    List,
    Mapping,
    Optional,
    Sequence,
    Tuple,
)

from rdflib.exceptions import Error
from rdflib.graph import Graph
from rdflib.namespace import RDF, RDFS
from rdflib.serializer import Serializer
from rdflib.term import BNode, Literal, Node, URIRef

if TYPE_CHECKING:
    from rdflib.graph import _PredicateType, _SubjectType, _TripleType

__all__ = ["RecursiveSerializer", "TurtleSerializer"]


[docs]class RecursiveSerializer(Serializer): topClasses = [RDFS.Class] predicateOrder = [RDF.type, RDFS.label] maxDepth = 10 indentString = " " roundtrip_prefixes: Tuple[Any, ...] = ()
[docs] def __init__(self, store: Graph): super(RecursiveSerializer, self).__init__(store) self.stream: Optional[IO[bytes]] = None self.reset()
[docs] def addNamespace(self, prefix: str, uri: URIRef) -> None: if prefix in self.namespaces and self.namespaces[prefix] != uri: raise Exception( "Trying to override namespace prefix %s => %s, but it's already bound to %s" % (prefix, uri, self.namespaces[prefix]) ) self.namespaces[prefix] = uri
[docs] def checkSubject(self, subject: _SubjectType) -> bool: """Check to see if the subject should be serialized yet""" if ( (self.isDone(subject)) or (subject not in self._subjects) or ((subject in self._topLevels) and (self.depth > 1)) or (isinstance(subject, URIRef) and (self.depth >= self.maxDepth)) ): return False return True
[docs] def isDone(self, subject: _SubjectType) -> bool: """Return true if subject is serialized""" return subject in self._serialized
[docs] def orderSubjects(self) -> List[_SubjectType]: seen: Dict[_SubjectType, bool] = {} subjects: List[_SubjectType] = [] for classURI in self.topClasses: members = list(self.store.subjects(RDF.type, classURI)) members.sort() subjects.extend(members) for member in members: self._topLevels[member] = True seen[member] = True recursable = [ (isinstance(subject, BNode), self._references[subject], subject) for subject in self._subjects if subject not in seen ] recursable.sort() subjects.extend([subject for (isbnode, refs, subject) in recursable]) return subjects
[docs] def preprocess(self) -> None: for triple in self.store.triples((None, None, None)): self.preprocessTriple(triple)
[docs] def preprocessTriple(self, spo: _TripleType) -> None: s, p, o = spo self._references[o] += 1 self._subjects[s] = True
[docs] def reset(self) -> None: self.depth = 0 # Typed none because nothing is using it ... self.lists: Dict[None, None] = {} self.namespaces: Dict[str, URIRef] = {} self._references: DefaultDict[Node, int] = defaultdict(int) self._serialized: Dict[_SubjectType, bool] = {} self._subjects: Dict[_SubjectType, bool] = {} self._topLevels: Dict[_SubjectType, bool] = {} if self.roundtrip_prefixes: if hasattr(self.roundtrip_prefixes, "__iter__"): for prefix, ns in self.store.namespaces(): if prefix in self.roundtrip_prefixes: self.addNamespace(prefix, ns) else: for prefix, ns in self.store.namespaces(): self.addNamespace(prefix, ns)
[docs] def buildPredicateHash( self, subject: _SubjectType ) -> Mapping[_PredicateType, List[Node]]: """ Build a hash key by predicate to a list of objects for the given subject """ properties: Dict[_PredicateType, List[Node]] = {} for s, p, o in self.store.triples((subject, None, None)): oList = properties.get(p, []) oList.append(o) properties[p] = oList return properties
[docs] def sortProperties( self, properties: Mapping[_PredicateType, List[Node]] ) -> List[_PredicateType]: """Take a hash from predicate uris to lists of values. Sort the lists of values. Return a sorted list of properties.""" # Sort object lists for prop, objects in properties.items(): objects.sort() # Make sorted list of properties propList: List[_PredicateType] = [] seen: Dict[_PredicateType, bool] = {} for prop in self.predicateOrder: if (prop in properties) and (prop not in seen): propList.append(prop) seen[prop] = True props = list(properties.keys()) props.sort() for prop in props: if prop not in seen: propList.append(prop) seen[prop] = True return propList
[docs] def subjectDone(self, subject: _SubjectType) -> None: """Mark a subject as done.""" self._serialized[subject] = True
[docs] def indent(self, modifier: int = 0) -> str: """Returns indent string multiplied by the depth""" return (self.depth + modifier) * self.indentString
[docs] def write(self, text: str) -> None: """Write text in given encoding.""" # type error: Item "None" of "Optional[IO[bytes]]" has no attribute "write" self.stream.write(text.encode(self.encoding, "replace")) # type: ignore[union-attr]
SUBJECT = 0 VERB = 1 OBJECT = 2 _GEN_QNAME_FOR_DT = False _SPACIOUS_OUTPUT = False
[docs]class TurtleSerializer(RecursiveSerializer): short_name = "turtle" indentString = " "
[docs] def __init__(self, store: Graph): self._ns_rewrite: Dict[str, str] = {} super(TurtleSerializer, self).__init__(store) self.keywords: Dict[Node, str] = {RDF.type: "a"} self.reset() self.stream = None self._spacious = _SPACIOUS_OUTPUT
# type error: Return type "str" of "addNamespace" incompatible with return type "None" in supertype "RecursiveSerializer"
[docs] def addNamespace(self, prefix: str, namespace: URIRef) -> str: # type: ignore[override] # Turtle does not support prefix that start with _ # if they occur in the graph, rewrite to p_blah # this is more complicated since we need to make sure p_blah # does not already exist. And we register namespaces as we go, i.e. # we may first see a triple with prefix _9 - rewrite it to p_9 # and then later find a triple with a "real" p_9 prefix # so we need to keep track of ns rewrites we made so far. if (prefix > "" and prefix[0] == "_") or self.namespaces.get( prefix, namespace ) != namespace: if prefix not in self._ns_rewrite: p = "p" + prefix while p in self.namespaces: p = "p" + p self._ns_rewrite[prefix] = p prefix = self._ns_rewrite.get(prefix, prefix) super(TurtleSerializer, self).addNamespace(prefix, namespace) return prefix
[docs] def reset(self) -> None: super(TurtleSerializer, self).reset() # typing as Dict[None, None] because nothing seems to be using it self._shortNames: Dict[None, None] = {} self._started = False self._ns_rewrite = {}
[docs] def serialize( self, stream: IO[bytes], base: Optional[str] = None, encoding: Optional[str] = None, spacious: Optional[bool] = None, **args: Any, ) -> None: self.reset() self.stream = stream # if base is given here, use that, if not and a base is set for the graph use that if base is not None: self.base = base elif self.store.base is not None: self.base = self.store.base if spacious is not None: self._spacious = spacious self.preprocess() subjects_list = self.orderSubjects() self.startDocument() firstTime = True for subject in subjects_list: if self.isDone(subject): continue if firstTime: firstTime = False if self.statement(subject) and not firstTime: self.write("\n") self.endDocument() stream.write("\n".encode("latin-1")) self.base = None
[docs] def preprocessTriple(self, triple: _TripleType) -> None: super(TurtleSerializer, self).preprocessTriple(triple) for i, node in enumerate(triple): if i == VERB and node in self.keywords: # predicate is a keyword continue # Don't use generated prefixes for subjects and objects self.getQName(node, gen_prefix=(i == VERB)) if isinstance(node, Literal) and node.datatype: self.getQName(node.datatype, gen_prefix=_GEN_QNAME_FOR_DT) p = triple[1] if isinstance(p, BNode): # hmm - when is P ever a bnode? self._references[p] += 1
# TODO: Rename to get_pname
[docs] def getQName(self, uri: Node, gen_prefix: bool = True) -> Optional[str]: if not isinstance(uri, URIRef): return None parts = None try: parts = self.store.compute_qname(uri, generate=gen_prefix) except Exception: # is the uri a namespace in itself? pfx = self.store.store.prefix(uri) if pfx is not None: parts = (pfx, uri, "") else: # nothing worked return None prefix, namespace, local = parts local = local.replace(r"(", r"\(").replace(r")", r"\)") # QName cannot end with . if local.endswith("."): return None prefix = self.addNamespace(prefix, namespace) return "%s:%s" % (prefix, local)
[docs] def startDocument(self) -> None: self._started = True ns_list = sorted(self.namespaces.items()) if self.base: self.write(self.indent() + "@base <%s> .\n" % self.base) for prefix, uri in ns_list: self.write(self.indent() + "@prefix %s: <%s> .\n" % (prefix, uri)) if ns_list and self._spacious: self.write("\n")
[docs] def endDocument(self) -> None: if self._spacious: self.write("\n")
[docs] def statement(self, subject: _SubjectType) -> bool: self.subjectDone(subject) return self.s_squared(subject) or self.s_default(subject)
[docs] def s_default(self, subject: _SubjectType) -> bool: self.write("\n" + self.indent()) self.path(subject, SUBJECT) self.predicateList(subject) self.write(" .") return True
[docs] def s_squared(self, subject: _SubjectType) -> bool: if (self._references[subject] > 0) or not isinstance(subject, BNode): return False self.write("\n" + self.indent() + "[]") self.predicateList(subject) self.write(" .") return True
[docs] def path(self, node: Node, position: int, newline: bool = False) -> None: if not ( self.p_squared(node, position, newline) or self.p_default(node, position, newline) ): raise Error("Cannot serialize node '%s'" % (node,))
[docs] def p_default(self, node: Node, position: int, newline: bool = False) -> bool: if position != SUBJECT and not newline: self.write(" ") self.write(self.label(node, position)) return True
[docs] def label(self, node: Node, position: int) -> str: if node == RDF.nil: return "()" if position is VERB and node in self.keywords: return self.keywords[node] if isinstance(node, Literal): return node._literal_n3( use_plain=True, qname_callback=lambda dt: self.getQName(dt, _GEN_QNAME_FOR_DT), ) else: node = self.relativize(node) # type: ignore[type-var] return self.getQName(node, position == VERB) or node.n3()
[docs] def p_squared(self, node: Node, position: int, newline: bool = False) -> bool: if ( not isinstance(node, BNode) or node in self._serialized or self._references[node] > 1 or position == SUBJECT ): return False if not newline: self.write(" ") if self.isValidList(node): # this is a list self.write("(") self.depth += 1 # 2 self.doList(node) self.depth -= 1 # 2 self.write(" )") else: self.subjectDone(node) self.depth += 2 # self.write('[\n' + self.indent()) self.write("[") self.depth -= 1 # self.predicateList(node, newline=True) self.predicateList(node, newline=False) # self.write('\n' + self.indent() + ']') self.write(" ]") self.depth -= 1 return True
[docs] def isValidList(self, l_: Node) -> bool: """ Checks if l is a valid RDF list, i.e. no nodes have other properties. """ try: if self.store.value(l_, RDF.first) is None: return False except Exception: return False while l_: if l_ != RDF.nil and len(list(self.store.predicate_objects(l_))) != 2: return False # type error: Incompatible types in assignment (expression has type "Optional[Node]", variable has type "Node") l_ = self.store.value(l_, RDF.rest) # type: ignore[assignment] return True
[docs] def doList(self, l_: Node) -> None: while l_: item = self.store.value(l_, RDF.first) if item is not None: self.path(item, OBJECT) self.subjectDone(l_) # type error: Incompatible types in assignment (expression has type "Optional[Node]", variable has type "Node") l_ = self.store.value(l_, RDF.rest) # type: ignore[assignment]
[docs] def predicateList(self, subject: Node, newline: bool = False) -> None: properties = self.buildPredicateHash(subject) propList = self.sortProperties(properties) if len(propList) == 0: return self.verb(propList[0], newline=newline) self.objectList(properties[propList[0]]) for predicate in propList[1:]: self.write(" ;\n" + self.indent(1)) self.verb(predicate, newline=True) self.objectList(properties[predicate])
[docs] def verb(self, node: Node, newline: bool = False) -> None: self.path(node, VERB, newline)
[docs] def objectList(self, objects: Sequence[Node]) -> None: count = len(objects) if count == 0: return depthmod = (count == 1) and 0 or 1 self.depth += depthmod self.path(objects[0], OBJECT) for obj in objects[1:]: self.write(",\n" + self.indent(1)) self.path(obj, OBJECT, newline=True) self.depth -= depthmod