import logging
from typing import IO, Optional
from xml.dom import XML_NAMESPACE
from xml.sax.saxutils import XMLGenerator
from xml.sax.xmlreader import AttributesNSImpl
from rdflib import BNode, Literal, URIRef, Variable
from rdflib.compat import etree
from rdflib.query import Result, ResultException, ResultParser, ResultSerializer
SPARQL_XML_NAMESPACE = "http://www.w3.org/2005/sparql-results#"
RESULTS_NS_ET = "{%s}" % SPARQL_XML_NAMESPACE
log = logging.getLogger(__name__)
"""A Parser for SPARQL results in XML:
http://www.w3.org/TR/rdf-sparql-XMLres/
Bits and pieces borrowed from:
http://projects.bigasterisk.com/sparqlhttp/
Authors: Drew Perttula, Gunnar Aastrand Grimnes
"""
[docs]class XMLResultParser(ResultParser):
# TODO FIXME: content_type should be a keyword only arg.
[docs] def parse(self, source, content_type: Optional[str] = None): # type: ignore[override]
return XMLResult(source)
[docs]class XMLResult(Result):
[docs] def __init__(self, source, content_type: Optional[str] = None):
try:
# try use as if etree is from lxml, and if not use it as normal.
parser = etree.XMLParser(huge_tree=True) # type: ignore[call-arg]
tree = etree.parse(source, parser)
except TypeError:
tree = etree.parse(source)
boolean = tree.find(RESULTS_NS_ET + "boolean")
results = tree.find(RESULTS_NS_ET + "results")
if boolean is not None:
type_ = "ASK"
elif results is not None:
type_ = "SELECT"
else:
raise ResultException("No RDF result-bindings or boolean answer found!")
Result.__init__(self, type_)
if type_ == "SELECT":
self.bindings = []
for result in results: # type: ignore[union-attr]
r = {}
for binding in result:
# type error: error: Argument 1 to "Variable" has incompatible type "Union[str, None, Any]"; expected "str"
# NOTE on type error: Element.get() can return None, and
# this will invariably fail if passed into Variable
# constructor as value
r[Variable(binding.get("name"))] = parseTerm(binding[0]) # type: ignore[arg-type] # FIXME
self.bindings.append(r)
self.vars = [
# type error: Argument 1 to "Variable" has incompatible type "Optional[str]"; expected "str"
# NOTE on type error: Element.get() can return None, and this
# will invariably fail if passed into Variable constructor as
# value
Variable(x.get("name")) # type: ignore[arg-type] # FIXME
for x in tree.findall(
"./%shead/%svariable" % (RESULTS_NS_ET, RESULTS_NS_ET)
)
]
else:
self.askAnswer = boolean.text.lower().strip() == "true" # type: ignore[union-attr]
[docs]def parseTerm(element):
"""rdflib object (Literal, URIRef, BNode) for the given
elementtree element"""
tag, text = element.tag, element.text
if tag == RESULTS_NS_ET + "literal":
if text is None:
text = ""
datatype = None
lang = None
if element.get("datatype", None):
datatype = URIRef(element.get("datatype"))
elif element.get("{%s}lang" % XML_NAMESPACE, None):
lang = element.get("{%s}lang" % XML_NAMESPACE)
ret = Literal(text, datatype=datatype, lang=lang)
return ret
elif tag == RESULTS_NS_ET + "uri":
return URIRef(text)
elif tag == RESULTS_NS_ET + "bnode":
return BNode(text)
else:
raise TypeError("unknown binding type %r" % element)
[docs]class XMLResultSerializer(ResultSerializer):
[docs] def __init__(self, result):
ResultSerializer.__init__(self, result)
[docs] def serialize(self, stream: IO, encoding: str = "utf-8", **kwargs):
writer = SPARQLXMLWriter(stream, encoding)
if self.result.type == "ASK":
writer.write_header([])
writer.write_ask(self.result.askAnswer)
else:
writer.write_header(self.result.vars)
writer.write_results_header()
for b in self.result.bindings:
writer.write_start_result()
for key, val in b.items():
writer.write_binding(key, val)
writer.write_end_result()
writer.close()
# TODO: Rewrite with ElementTree?
[docs]class SPARQLXMLWriter:
"""
Python saxutils-based SPARQL XML Writer
"""
[docs] def __init__(self, output, encoding="utf-8"):
writer = XMLGenerator(output, encoding)
writer.startDocument()
writer.startPrefixMapping("", SPARQL_XML_NAMESPACE)
writer.startPrefixMapping("xml", XML_NAMESPACE)
writer.startElementNS(
(SPARQL_XML_NAMESPACE, "sparql"), "sparql", AttributesNSImpl({}, {})
)
self.writer = writer
self._output = output
self._encoding = encoding
self._results = False
[docs] def write_ask(self, val):
self.writer.startElementNS(
(SPARQL_XML_NAMESPACE, "boolean"), "boolean", AttributesNSImpl({}, {})
)
self.writer.characters(str(val).lower())
self.writer.endElementNS((SPARQL_XML_NAMESPACE, "boolean"), "boolean")
[docs] def write_start_result(self):
self.writer.startElementNS(
(SPARQL_XML_NAMESPACE, "result"), "result", AttributesNSImpl({}, {})
)
self._resultStarted = True
[docs] def write_end_result(self):
assert self._resultStarted
self.writer.endElementNS((SPARQL_XML_NAMESPACE, "result"), "result")
self._resultStarted = False
[docs] def write_binding(self, name, val):
assert self._resultStarted
attr_vals = {
(None, "name"): str(name),
}
attr_qnames = {
(None, "name"): "name",
}
self.writer.startElementNS(
(SPARQL_XML_NAMESPACE, "binding"),
"binding",
AttributesNSImpl(attr_vals, attr_qnames),
)
if isinstance(val, URIRef):
self.writer.startElementNS(
(SPARQL_XML_NAMESPACE, "uri"), "uri", AttributesNSImpl({}, {})
)
self.writer.characters(val)
self.writer.endElementNS((SPARQL_XML_NAMESPACE, "uri"), "uri")
elif isinstance(val, BNode):
self.writer.startElementNS(
(SPARQL_XML_NAMESPACE, "bnode"), "bnode", AttributesNSImpl({}, {})
)
self.writer.characters(val)
self.writer.endElementNS((SPARQL_XML_NAMESPACE, "bnode"), "bnode")
elif isinstance(val, Literal):
attr_vals = {}
attr_qnames = {}
if val.language:
attr_vals[(XML_NAMESPACE, "lang")] = val.language
attr_qnames[(XML_NAMESPACE, "lang")] = "xml:lang"
elif val.datatype:
attr_vals[(None, "datatype")] = val.datatype
attr_qnames[(None, "datatype")] = "datatype"
self.writer.startElementNS(
(SPARQL_XML_NAMESPACE, "literal"),
"literal",
AttributesNSImpl(attr_vals, attr_qnames),
)
self.writer.characters(val)
self.writer.endElementNS((SPARQL_XML_NAMESPACE, "literal"), "literal")
else:
raise Exception("Unsupported RDF term: %s" % val)
self.writer.endElementNS((SPARQL_XML_NAMESPACE, "binding"), "binding")
[docs] def close(self):
if self._results:
self.writer.endElementNS((SPARQL_XML_NAMESPACE, "results"), "results")
self.writer.endElementNS((SPARQL_XML_NAMESPACE, "sparql"), "sparql")
self.writer.endDocument()