Source code for rdflib.plugins.serializers.xmlwriter

from __future__ import annotations

import codecs
from typing import IO, TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
from xml.sax.saxutils import escape, quoteattr

from rdflib.term import URIRef

if TYPE_CHECKING:
    from rdflib.namespace import Namespace, NamespaceManager


__all__ = ["XMLWriter"]

ESCAPE_ENTITIES = {"\r": "
"}


[docs]class XMLWriter:
[docs] def __init__( self, stream: IO[bytes], namespace_manager: NamespaceManager, encoding: Optional[str] = None, decl: int = 1, extra_ns: Optional[Dict[str, Namespace]] = None, ): encoding = encoding or "utf-8" encoder, decoder, stream_reader, stream_writer = codecs.lookup(encoding) # NOTE on type ignores: this is mainly because the variable is being re-used. # type error: Incompatible types in assignment (expression has type "StreamWriter", variable has type "IO[bytes]") self.stream = stream = stream_writer(stream) # type: ignore[assignment] if decl: # type error: No overload variant of "write" of "IO" matches argument type "str" stream.write('<?xml version="1.0" encoding="%s"?>' % encoding) # type: ignore[call-overload] self.element_stack: List[str] = [] self.nm = namespace_manager self.extra_ns = extra_ns or {} self.closed = True
def __get_indent(self) -> str: return " " * len(self.element_stack) indent = property(__get_indent) def __close_start_tag(self) -> None: if not self.closed: # TODO: self.closed = True self.stream.write(">")
[docs] def push(self, uri: str) -> None: self.__close_start_tag() write = self.stream.write write("\n") write(self.indent) write("<%s" % self.qname(uri)) self.element_stack.append(uri) self.closed = False self.parent = False
[docs] def pop(self, uri: Optional[str] = None) -> None: top = self.element_stack.pop() if uri: assert uri == top write = self.stream.write if not self.closed: self.closed = True write("/>") else: if self.parent: write("\n") write(self.indent) write("</%s>" % self.qname(top)) self.parent = True
[docs] def element( self, uri: str, content: str, attributes: Dict[URIRef, str] = {} ) -> None: """Utility method for adding a complete simple element""" self.push(uri) for k, v in attributes.items(): self.attribute(k, v) self.text(content) self.pop()
[docs] def namespaces(self, namespaces: Iterable[Tuple[str, str]] = None) -> None: if not namespaces: namespaces = self.nm.namespaces() write = self.stream.write write("\n") for prefix, namespace in namespaces: if prefix: write(' xmlns:%s="%s"\n' % (prefix, namespace)) # Allow user-provided namespace bindings to prevail elif prefix not in self.extra_ns: write(' xmlns="%s"\n' % namespace) for prefix, namespace in self.extra_ns.items(): if prefix: write(' xmlns:%s="%s"\n' % (prefix, namespace)) else: write(' xmlns="%s"\n' % namespace)
[docs] def attribute(self, uri: str, value: str) -> None: write = self.stream.write write(" %s=%s" % (self.qname(uri), quoteattr(value)))
[docs] def text(self, text: str) -> None: self.__close_start_tag() if "<" in text and ">" in text and "]]>" not in text: self.stream.write("<![CDATA[") self.stream.write(text) self.stream.write("]]>") else: self.stream.write(escape(text, ESCAPE_ENTITIES))
[docs] def qname(self, uri: str) -> str: """Compute qname for a uri using our extra namespaces, or the given namespace manager""" for pre, ns in self.extra_ns.items(): if uri.startswith(ns): if pre != "": return ":".join([pre, uri[len(ns) :]]) else: return uri[len(ns) :] return self.nm.qname_strict(uri)