"""
A TriX parser for RDFLib
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, List, NoReturn, Optional, Tuple
from xml.sax import handler, make_parser
from xml.sax.handler import ErrorHandler
from rdflib.exceptions import ParserError
from rdflib.graph import Graph
from rdflib.namespace import Namespace
from rdflib.parser import InputSource, Parser
from rdflib.store import Store
from rdflib.term import BNode, Identifier, Literal, URIRef
if TYPE_CHECKING:
# from xml.sax.expatreader import ExpatLocator
from xml.sax.xmlreader import AttributesImpl, Locator, XMLReader
__all__ = ["create_parser", "TriXHandler", "TriXParser"]
TRIXNS = Namespace("http://www.w3.org/2004/03/trix/trix-1/")
XMLNS = Namespace("http://www.w3.org/XML/1998/namespace")
[docs]class TriXHandler(handler.ContentHandler):
"""An Sax Handler for TriX. See http://sw.nokia.com/trix/"""
[docs] def __init__(self, store: Store):
self.store = store
self.preserve_bnode_ids = False
self.reset()
[docs] def reset(self) -> None:
self.bnode: Dict[str, BNode] = {}
self.graph: Optional[Graph] = None
self.triple: Optional[List[Identifier]] = None
self.state = 0
self.lang = None
self.datatype = None
# ContentHandler methods
[docs] def setDocumentLocator(self, locator: Locator):
self.locator = locator
[docs] def startDocument(self) -> None:
pass
[docs] def startPrefixMapping(self, prefix: Optional[str], namespace: str) -> None:
pass
[docs] def endPrefixMapping(self, prefix: Optional[str]) -> None:
pass
[docs] def startElementNS(
self, name: Tuple[Optional[str], str], qname, attrs: AttributesImpl
) -> None:
if name[0] != str(TRIXNS):
self.error(
"Only elements in the TriX namespace are allowed. %s!=%s"
% (name[0], TRIXNS)
)
if name[1].lower() == "trix":
if self.state == 0:
self.state = 1
else:
self.error("Unexpected TriX element")
elif name[1] == "graph":
if self.state == 1:
self.state = 2
else:
self.error("Unexpected graph element")
elif name[1] == "uri":
if self.state == 2:
# the context uri
self.state = 3
elif self.state == 4:
# part of a triple
pass
else:
self.error("Unexpected uri element")
elif name[1] == "triple":
if self.state == 2:
if self.graph is None:
# anonymous graph, create one with random bnode id
self.graph = Graph(store=self.store)
# start of a triple
self.triple = []
self.state = 4
else:
self.error("Unexpected triple element")
elif name[1] == "typedLiteral":
if self.state == 4:
# part of triple
self.lang = None
self.datatype = None
try:
self.lang = attrs.getValue((str(XMLNS), "lang"))
except Exception:
# language not required - ignore
pass
try:
self.datatype = attrs.getValueByQName("datatype")
except KeyError:
self.error("No required attribute 'datatype'")
else:
self.error("Unexpected typedLiteral element")
elif name[1] == "plainLiteral":
if self.state == 4:
# part of triple
self.lang = None
self.datatype = None
try:
self.lang = attrs.getValue((str(XMLNS), "lang"))
except Exception:
# language not required - ignore
pass
else:
self.error("Unexpected plainLiteral element")
elif name[1] == "id":
if self.state == 2:
# the context uri
self.state = 3
elif self.state == 4:
# part of triple
pass
else:
self.error("Unexpected id element")
else:
self.error("Unknown element %s in TriX namespace" % name[1])
self.chars = ""
[docs] def endElementNS(self, name: Tuple[Optional[str], str], qname) -> None:
if TYPE_CHECKING:
assert self.triple is not None
if name[0] != str(TRIXNS):
self.error(
"Only elements in the TriX namespace are allowed. %s!=%s"
% (name[0], TRIXNS)
)
if name[1] == "uri":
if self.state == 3:
self.graph = Graph(
store=self.store, identifier=URIRef(self.chars.strip())
)
self.state = 2
elif self.state == 4:
self.triple += [URIRef(self.chars.strip())]
else:
self.error(
"Illegal internal self.state - This should never "
+ "happen if the SAX parser ensures XML syntax correctness"
)
elif name[1] == "id":
if self.state == 3:
self.graph = Graph(
self.store, identifier=self.get_bnode(self.chars.strip())
)
self.state = 2
elif self.state == 4:
self.triple += [self.get_bnode(self.chars.strip())]
else:
self.error(
"Illegal internal self.state - This should never "
+ "happen if the SAX parser ensures XML syntax correctness"
)
elif name[1] == "plainLiteral" or name[1] == "typedLiteral":
if self.state == 4:
self.triple += [
Literal(self.chars, lang=self.lang, datatype=self.datatype)
]
else:
self.error(
"This should never happen if the SAX parser "
+ "ensures XML syntax correctness"
)
elif name[1] == "triple":
if self.state == 4:
if len(self.triple) != 3:
self.error(
"Triple has wrong length, got %d elements: %s"
% (len(self.triple), self.triple)
)
# type error: Item "None" of "Optional[Graph]" has no attribute "add"
# type error: Argument 1 to "add" of "Graph" has incompatible type "List[Identifier]"; expected "Tuple[Node, Node, Node]"
self.graph.add(self.triple) # type: ignore[union-attr, arg-type]
# self.store.store.add(self.triple,context=self.graph)
# self.store.addN([self.triple+[self.graph]])
self.state = 2
else:
self.error(
"This should never happen if the SAX parser "
+ "ensures XML syntax correctness"
)
elif name[1] == "graph":
self.graph = None
self.state = 1
elif name[1].lower() == "trix":
self.state = 0
else:
self.error("Unexpected close element")
[docs] def get_bnode(self, label: str) -> BNode:
if self.preserve_bnode_ids:
bn = BNode(label)
else:
if label in self.bnode:
bn = self.bnode[label]
else:
bn = BNode(label)
self.bnode[label] = bn
return bn
[docs] def characters(self, content: str) -> None:
self.chars += content
[docs] def ignorableWhitespace(self, content) -> None:
pass
[docs] def processingInstruction(self, target, data) -> None:
pass
[docs] def error(self, message: str) -> NoReturn:
locator = self.locator
info = "%s:%s:%s: " % (
locator.getSystemId(),
locator.getLineNumber(),
locator.getColumnNumber(),
)
raise ParserError(info + message)
[docs]def create_parser(store: Store) -> XMLReader:
parser = make_parser()
try:
# Workaround for bug in expatreader.py. Needed when
# expatreader is trying to guess a prefix.
# type error: "XMLReader" has no attribute "start_namespace_decl"
parser.start_namespace_decl("xml", "http://www.w3.org/XML/1998/namespace") # type: ignore[attr-defined]
except AttributeError:
pass # Not present in Jython (at least)
parser.setFeature(handler.feature_namespaces, 1)
trix = TriXHandler(store)
parser.setContentHandler(trix)
parser.setErrorHandler(ErrorHandler())
return parser
[docs]class TriXParser(Parser):
"""A parser for TriX. See http://sw.nokia.com/trix/"""
[docs] def __init__(self):
pass
[docs] def parse(self, source: InputSource, sink: Graph, **args: Any) -> None:
assert (
sink.store.context_aware
), "TriXParser must be given a context aware store."
self._parser = create_parser(sink.store)
content_handler = self._parser.getContentHandler()
preserve_bnode_ids = args.get("preserve_bnode_ids", None)
if preserve_bnode_ids is not None:
content_handler.preserve_bnode_ids = preserve_bnode_ids
# We're only using it once now
# content_handler.reset()
# self._parser.reset()
self._parser.parse(source)