Source code for rdflib.plugins.stores.sparqlconnector
import logging
import threading
import requests
import os
from io import BytesIO
from rdflib.query import Result
log = logging.getLogger(__name__)
[docs]class SPARQLConnectorException(Exception):
pass
# TODO: Pull in these from the result implementation plugins?
_response_mime_types = {
'xml': 'application/sparql-results+xml, application/rdf+xml',
'json': 'application/sparql-results+json',
'csv': 'text/csv',
'tsv': 'text/tab-separated-values',
'application/rdf+xml': 'application/rdf+xml',
}
[docs]class SPARQLConnector(object):
"""
this class deals with nitty gritty details of talking to a SPARQL server
"""
[docs] def __init__(self, query_endpoint=None, update_endpoint=None, returnFormat='xml', method='GET', **kwargs):
"""
Any additional keyword arguments will be passed to requests, and can be used to setup timesouts, basic auth, etc.
"""
self.returnFormat = returnFormat
self.query_endpoint = query_endpoint
self.update_endpoint = update_endpoint
self.kwargs = kwargs
self.method = method
# it is recommended to have one session object per thread/process. This assures that is the case.
# https://github.com/kennethreitz/requests/issues/1871
self._session = threading.local()
@property
def session(self):
k = 'session_%d' % os.getpid()
self._session.__dict__.setdefault(k, requests.Session())
log.debug('Session %s %s', os.getpid(), id(self._session.__dict__[k]))
return self._session.__dict__[k]
@property
def method(self):
return self._method
@method.setter
def method(self, method):
if method not in ('GET', 'POST'):
raise SPARQLConnectorException('Method must be "GET" or "POST"')
self._method = method
[docs] def query(self, query, default_graph=None):
if not self.query_endpoint:
raise SPARQLConnectorException("Query endpoint not set!")
params = {'query': query}
if default_graph:
params["default-graph-uri"] = default_graph
headers = {'Accept': _response_mime_types[self.returnFormat]}
args = dict(self.kwargs)
args.update(url=self.query_endpoint)
# merge params/headers dicts
args.setdefault('params', {})
args.setdefault('headers', {})
args['headers'].update(headers)
if self.method == 'GET':
args['params'].update(params)
elif self.method == 'POST':
args['data'] = params
else:
raise SPARQLConnectorException("Unknown method %s" % self.method)
res = self.session.request(self.method, **args)
res.raise_for_status()
return Result.parse(BytesIO(res.content), content_type=res.headers['Content-type'])
[docs] def update(self, update, default_graph=None):
if not self.update_endpoint:
raise SPARQLConnectorException("Query endpoint not set!")
params = {}
if default_graph:
params["using-graph-uri"] = default_graph
headers = {'Accept': _response_mime_types[self.returnFormat]}
args = dict(self.kwargs)
args.update(url=self.update_endpoint,
data=update.encode('utf-8'))
# merge params/headers dicts
args.setdefault('params', {})
args['params'].update(params)
args.setdefault('headers', {})
args['headers'].update(headers)
res = self.session.post(**args)
res.raise_for_status()
[docs] def close(self):
self.session.close()