from __future__ import absolute_import
import collections
import itertools
import datetime
import isodate
from six import text_type, iteritems
from rdflib.compat import Mapping, MutableMapping
from rdflib.namespace import NamespaceManager
from rdflib import Variable, BNode, Graph, ConjunctiveGraph, URIRef, Literal
from rdflib.term import Node
from rdflib.plugins.sparql.parserutils import CompValue
import rdflib.plugins.sparql
[docs]class SPARQLError(Exception):
[docs] def __init__(self, msg=None):
Exception.__init__(self, msg)
[docs]class NotBoundError(SPARQLError):
[docs] def __init__(self, msg=None):
SPARQLError.__init__(self, msg)
[docs]class AlreadyBound(SPARQLError):
"""Raised when trying to bind a variable that is already bound!"""
[docs] def __init__(self):
SPARQLError.__init__(self)
[docs]class SPARQLTypeError(SPARQLError):
[docs] def __init__(self, msg):
SPARQLError.__init__(self, msg)
[docs]class Bindings(MutableMapping):
"""
A single level of a stack of variable-value bindings.
Each dict keeps a reference to the dict below it,
any failed lookup is propegated back
In python 3.3 this could be a collections.ChainMap
"""
[docs] def __init__(self, outer=None, d=[]):
self._d = dict(d)
self.outer = outer
[docs] def __getitem__(self, key):
try:
return self._d[key]
except KeyError:
if not self.outer:
raise
return self.outer[key]
[docs] def __contains__(self, key):
try:
self[key]
return True
except KeyError:
return False
[docs] def __setitem__(self, key, value):
self._d[key] = value
[docs] def __delitem__(self, key):
raise Exception("DelItem is not implemented!")
[docs] def __len__(self):
i = 0
for x in self:
i += 1
return i
[docs] def __iter__(self):
d = self
while d is not None:
for i in dict.__iter__(d._d):
yield i
d = d.outer
[docs] def __str__(self):
return "Bindings({" + ", ".join((k, self[k]) for k in self) + "})"
[docs] def __repr__(self):
return text_type(self)
[docs]class FrozenDict(Mapping):
"""
An immutable hashable dict
Taken from http://stackoverflow.com/a/2704866/81121
"""
[docs] def __init__(self, *args, **kwargs):
self._d = dict(*args, **kwargs)
self._hash = None
[docs] def __iter__(self):
return iter(self._d)
[docs] def __len__(self):
return len(self._d)
[docs] def __getitem__(self, key):
return self._d[key]
[docs] def __hash__(self):
# It would have been simpler and maybe more obvious to
# use hash(tuple(sorted(self._d.iteritems()))) from this discussion
# so far, but this solution is O(n). I don't know what kind of
# n we are going to run into, but sometimes it's hard to resist the
# urge to optimize when it will gain improved algorithmic performance.
if self._hash is None:
self._hash = 0
for key, value in iteritems(self):
self._hash ^= hash(key)
self._hash ^= hash(value)
return self._hash
[docs] def project(self, vars):
return FrozenDict(
(x for x in iteritems(self) if x[0] in vars))
[docs] def disjointDomain(self, other):
return not bool(set(self).intersection(other))
[docs] def compatible(self, other):
for k in self:
try:
if self[k] != other[k]:
return False
except KeyError:
pass
return True
[docs] def merge(self, other):
res = FrozenDict(
itertools.chain(iteritems(self), iteritems(other)))
return res
[docs] def __str__(self):
return str(self._d)
[docs] def __repr__(self):
return repr(self._d)
[docs]class FrozenBindings(FrozenDict):
[docs] def __init__(self, ctx, *args, **kwargs):
FrozenDict.__init__(self, *args, **kwargs)
self.ctx = ctx
[docs] def __getitem__(self, key):
if not isinstance(key, Node):
key = Variable(key)
if not type(key) in (BNode, Variable):
return key
if key not in self._d:
return self.ctx.initBindings[key]
else:
return self._d[key]
[docs] def project(self, vars):
return FrozenBindings(
self.ctx, (x for x in iteritems(self) if x[0] in vars))
[docs] def merge(self, other):
res = FrozenBindings(
self.ctx, itertools.chain(iteritems(self), iteritems(other)))
return res
def _now(self):
return self.ctx.now
def _bnodes(self):
return self.ctx.bnodes
def _prologue(self):
return self.ctx.prologue
prologue = property(_prologue)
bnodes = property(_bnodes)
now = property(_now)
[docs] def forget(self, before, _except=None):
"""
return a frozen dict only of bindings made in self
since before
"""
if not _except:
_except = []
# bindings from initBindings are newer forgotten
return FrozenBindings(
self.ctx, (
x for x in iteritems(self) if (
x[0] in _except or
x[0] in self.ctx.initBindings or
before[x[0]] is None)))
[docs] def remember(self, these):
"""
return a frozen dict only of bindings in these
"""
return FrozenBindings(
self.ctx, (x for x in iteritems(self) if x[0] in these))
[docs]class QueryContext(object):
"""
Query context - passed along when evaluating the query
"""
[docs] def __init__(self, graph=None, bindings=None, initBindings=None):
self.initBindings = initBindings
self.bindings = Bindings(d=bindings or [])
if initBindings:
self.bindings.update(initBindings)
if isinstance(graph, ConjunctiveGraph):
self._dataset = graph
if rdflib.plugins.sparql.SPARQL_DEFAULT_GRAPH_UNION:
self.graph = self.dataset
else:
self.graph = self.dataset.default_context
else:
self._dataset = None
self.graph = graph
self.prologue = None
self.now = datetime.datetime.now(isodate.tzinfo.UTC)
self.bnodes = collections.defaultdict(BNode)
[docs] def clone(self, bindings=None):
r = QueryContext(
self._dataset if self._dataset is not None else self.graph, bindings or self.bindings, initBindings=self.initBindings)
r.prologue = self.prologue
r.graph = self.graph
r.bnodes = self.bnodes
return r
def _get_dataset(self):
if self._dataset is None:
raise Exception(
'You performed a query operation requiring ' +
'a dataset (i.e. ConjunctiveGraph), but ' +
'operating currently on a single graph.')
return self._dataset
dataset = property(_get_dataset, doc="current dataset")
[docs] def load(self, source, default=False, **kwargs):
def _load(graph, source):
try:
return graph.load(source, **kwargs)
except:
pass
try:
return graph.load(source, format='n3', **kwargs)
except:
pass
try:
return graph.load(source, format='nt', **kwargs)
except:
raise Exception(
"Could not load %s as either RDF/XML, N3 or NTriples" % (
source))
if not rdflib.plugins.sparql.SPARQL_LOAD_GRAPHS:
# we are not loading - if we already know the graph
# being "loaded", just add it to the default-graph
if default:
self.graph += self.dataset.get_context(source)
else:
if default:
_load(self.graph, source)
else:
_load(self.dataset, source)
[docs] def __getitem__(self, key):
# in SPARQL BNodes are just labels
if not type(key) in (BNode, Variable):
return key
try:
return self.bindings[key]
except KeyError:
return None
[docs] def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
[docs] def solution(self, vars=None):
"""
Return a static copy of the current variable bindings as dict
"""
if vars:
return FrozenBindings(
self, ((k, v)
for k, v in iteritems(self.bindings)
if k in vars))
else:
return FrozenBindings(self, iteritems(self.bindings))
[docs] def __setitem__(self, key, value):
if key in self.bindings and self.bindings[key] != value:
raise AlreadyBound()
self.bindings[key] = value
[docs] def pushGraph(self, graph):
r = self.clone()
r.graph = graph
return r
[docs] def push(self):
r = self.clone(Bindings(self.bindings))
return r
[docs] def clean(self):
return self.clone([])
# def pop(self):
# self.bindings = self.bindings.outer
# if self.bindings is None:
# raise Exception("We've bottomed out of the bindings stack!")
[docs] def thaw(self, frozenbindings):
"""
Create a new read/write query context from the given solution
"""
c = self.clone(frozenbindings)
return c
[docs]class Prologue(object):
"""
A class for holding prefixing bindings and base URI information
"""
[docs] def __init__(self):
self.base = None
self.namespace_manager = NamespaceManager(
Graph()) # ns man needs a store
[docs] def resolvePName(self, prefix, localname):
ns = self.namespace_manager.store.namespace(prefix or "")
if ns is None:
raise Exception('Unknown namespace prefix : %s' % prefix)
return URIRef(ns + (localname or ""))
[docs] def bind(self, prefix, uri):
self.namespace_manager.bind(prefix, uri, replace=True)
[docs] def absolutize(self, iri):
"""
Apply BASE / PREFIXes to URIs
(and to datatypes in Literals)
TODO: Move resolving URIs to pre-processing
"""
if isinstance(iri, CompValue):
if iri.name == 'pname':
return self.resolvePName(iri.prefix, iri.localname)
if iri.name == 'literal':
return Literal(
iri.string, lang=iri.lang,
datatype=self.absolutize(iri.datatype))
elif isinstance(iri, URIRef) and not ':' in iri:
return URIRef(iri, base=self.base)
return iri
[docs]class Query(object):
"""
A parsed and translated query
"""
[docs] def __init__(self, prologue, algebra):
self.prologue = prologue
self.algebra = algebra