Source code for rdflib.plugins.sparql.results.txtresults
from typing import IO, List, Optional
from rdflib import BNode, Literal, URIRef
from rdflib.namespace import NamespaceManager
from rdflib.query import ResultSerializer
from rdflib.term import Variable
def _termString(t, namespace_manager: Optional[NamespaceManager]):
if t is None:
return "-"
if namespace_manager:
if isinstance(t, URIRef):
return namespace_manager.normalizeUri(t)
elif isinstance(t, BNode):
return t.n3()
elif isinstance(t, Literal):
return t._literal_n3(qname_callback=namespace_manager.normalizeUri)
else:
return t.n3()
[docs]class TXTResultSerializer(ResultSerializer):
"""
A write only QueryResult serializer for text/ascii tables
"""
# TODO FIXME: class specific args should be keyword only.
[docs] def serialize( # type: ignore[override]
self,
stream: IO,
encoding: str,
namespace_manager: Optional[NamespaceManager] = None,
):
"""
return a text table of query results
"""
def c(s, w):
"""
center the string s in w wide string
"""
w -= len(s)
h1 = h2 = w // 2
if w % 2:
h2 += 1
return " " * h1 + s + " " * h2
if self.result.type != "SELECT":
raise Exception("Can only pretty print SELECT results!")
if not self.result:
return "(no results)\n"
else:
keys: List[Variable] = self.result.vars # type: ignore[assignment]
maxlen = [0] * len(keys)
b = [
[_termString(r[k], namespace_manager) for k in keys]
for r in self.result
]
for r in b:
for i in range(len(keys)):
maxlen[i] = max(maxlen[i], len(r[i]))
stream.write("|".join([c(k, maxlen[i]) for i, k in enumerate(keys)]) + "\n")
stream.write("-" * (len(maxlen) + sum(maxlen)) + "\n")
for r in sorted(b):
stream.write(
"|".join([t + " " * (i - len(t)) for i, t in zip(maxlen, r)]) + "\n"
)