Source code for rdflib.plugins.sparql.results.csvresults
"""
This module implements a parser and serializer for the CSV SPARQL result
formats
http://www.w3.org/TR/sparql11-results-csv-tsv/
"""
import codecs
import csv
from six import binary_type, PY3
from rdflib import Variable, BNode, URIRef, Literal
from rdflib.query import Result, ResultSerializer, ResultParser
[docs]class CSVResultParser(ResultParser):
[docs] def __init__(self):
self.delim = ","
[docs] def parse(self, source, content_type=None):
r = Result('SELECT')
if isinstance(source.read(0), binary_type):
# if reading from source returns bytes do utf-8 decoding
source = codecs.getreader('utf-8')(source)
reader = csv.reader(source, delimiter=self.delim)
r.vars = [Variable(x) for x in next(reader)]
r.bindings = []
for row in reader:
r.bindings.append(self.parseRow(row, r.vars))
return r
[docs] def parseRow(self, row, v):
return dict((var, val)
for var, val in zip(v, [self.convertTerm(t)
for t in row]) if val is not None)
[docs] def convertTerm(self, t):
if t == "":
return None
if t.startswith("_:"):
return BNode(t) # or generate new IDs?
if t.startswith("http://") or t.startswith("https://"): # TODO: more?
return URIRef(t)
return Literal(t)
[docs]class CSVResultSerializer(ResultSerializer):
[docs] def __init__(self, result):
ResultSerializer.__init__(self, result)
self.delim = ","
if result.type != "SELECT":
raise Exception(
"CSVSerializer can only serialize select query results")
[docs] def serialize(self, stream, encoding='utf-8'):
if PY3:
# the serialiser writes bytes in the given encoding
# in py3 csv.writer is unicode aware and writes STRINGS,
# so we encode afterwards
# in py2 it breaks when passed unicode strings,
# and must be passed utf8, so we encode before
import codecs
stream = codecs.getwriter(encoding)(stream)
out = csv.writer(stream, delimiter=self.delim)
vs = [self.serializeTerm(v, encoding) for v in self.result.vars]
out.writerow(vs)
for row in self.result.bindings:
out.writerow([self.serializeTerm(
row.get(v), encoding) for v in self.result.vars])
[docs] def serializeTerm(self, term, encoding):
if term is None:
return ""
if not PY3:
return term.encode(encoding)
else:
return term