"""
Converting the 'parse-tree' output of pyparsing to a SPARQL Algebra expression
http://www.w3.org/TR/sparql11-query/#sparqlQuery
"""
import functools
import operator
import collections
from rdflib import Literal, Variable, URIRef, BNode
from rdflib.plugins.sparql.sparql import Prologue, Query
from rdflib.plugins.sparql.parserutils import CompValue, Expr
from rdflib.plugins.sparql.operators import (
and_, TrueFilter, simplify as simplifyFilters)
from rdflib.paths import (
InvPath, AlternativePath, SequencePath, MulPath, NegatedPath)
from pyparsing import ParseResults
# ---------------------------
# Some convenience methods
[docs]def OrderBy(p, expr):
return CompValue('OrderBy', p=p, expr=expr)
[docs]def ToMultiSet(p):
return CompValue('ToMultiSet', p=p)
[docs]def Union(p1, p2):
return CompValue('Union', p1=p1, p2=p2)
[docs]def Join(p1, p2):
return CompValue('Join', p1=p1, p2=p2)
[docs]def Minus(p1, p2):
return CompValue('Minus', p1=p1, p2=p2)
[docs]def Graph(term, graph):
return CompValue('Graph', term=term, p=graph)
[docs]def BGP(triples=None):
return CompValue('BGP', triples=triples or [])
[docs]def LeftJoin(p1, p2, expr):
return CompValue('LeftJoin', p1=p1, p2=p2, expr=expr)
[docs]def Filter(expr, p):
return CompValue('Filter', expr=expr, p=p)
[docs]def Extend(p, expr, var):
return CompValue('Extend', p=p, expr=expr, var=var)
[docs]def Values(res):
return CompValue('values', res=res)
[docs]def Project(p, PV):
return CompValue('Project', p=p, PV=PV)
[docs]def Group(p, expr=None):
return CompValue('Group', p=p, expr=expr)
def _knownTerms(triple, varsknown, varscount):
return (len(filter(None, (x not in varsknown and
isinstance(
x, (Variable, BNode)) for x in triple))),
-sum(varscount.get(x, 0) for x in triple),
not isinstance(triple[2], Literal),
)
[docs]def reorderTriples(l):
"""
Reorder triple patterns so that we execute the
ones with most bindings first
"""
def _addvar(term, varsknown):
if isinstance(term, (Variable, BNode)):
varsknown.add(term)
l = [(None, x) for x in l]
varsknown = set()
varscount = collections.defaultdict(int)
for t in l:
for c in t[1]:
if isinstance(c, (Variable, BNode)):
varscount[c] += 1
i = 0
# Done in steps, sort by number of bound terms
# the top block of patterns with the most bound terms is kept
# the rest is resorted based on the vars bound after the first
# block is evaluated
# we sort by decorate/undecorate, since we need the value of the sort keys
while i < len(l):
l[i:] = sorted((_knownTerms(x[
1], varsknown, varscount), x[1]) for x in l[i:])
t = l[i][0][0] # top block has this many terms bound
j = 0
while i+j < len(l) and l[i+j][0][0] == t:
for c in l[i+j][1]:
_addvar(c, varsknown)
j += 1
i += 1
return [x[1] for x in l]
[docs]def triples(l):
l = reduce(lambda x, y: x + y, l)
if (len(l) % 3) != 0:
raise Exception('these aint triples')
return reorderTriples((l[x], l[x + 1], l[x + 2])
for x in range(0, len(l), 3))
[docs]def translatePName(p, prologue):
"""
Expand prefixed/relative URIs
"""
if isinstance(p, CompValue):
if p.name == 'pname':
return prologue.absolutize(p)
if p.name == 'literal':
return Literal(p.string, lang=p.lang,
datatype=prologue.absolutize(p.datatype))
elif isinstance(p, URIRef):
return prologue.absolutize(p)
[docs]def translatePath(p):
"""
Translate PropertyPath expressions
"""
if isinstance(p, CompValue):
if p.name == 'PathAlternative':
if len(p.part) == 1:
return p.part[0]
else:
return AlternativePath(*p.part)
elif p.name == 'PathSequence':
if len(p.part) == 1:
return p.part[0]
else:
return SequencePath(*p.part)
elif p.name == 'PathElt':
if not p.mod:
return p.part
else:
if isinstance(p.part, list):
if len(p.part) != 1:
raise Exception('Denkfehler!')
return MulPath(p.part[0], p.mod)
else:
return MulPath(p.part, p.mod)
elif p.name == 'PathEltOrInverse':
if isinstance(p.part, list):
if len(p.part) != 1:
raise Exception('Denkfehler!')
return InvPath(p.part[0])
else:
return InvPath(p.part)
elif p.name == 'PathNegatedPropertySet':
if isinstance(p.part, list):
return NegatedPath(AlternativePath(*p.part))
else:
return NegatedPath(p.part)
[docs]def translateExists(e):
"""
Translate the graph pattern used by EXISTS and NOT EXISTS
http://www.w3.org/TR/sparql11-query/#sparqlCollectFilters
"""
def _c(n):
if isinstance(n, CompValue):
if n.name in ('Builtin_EXISTS', 'Builtin_NOTEXISTS'):
n.graph = translateGroupGraphPattern(n.graph)
if n.graph.name == 'Filter':
# filters inside (NOT) EXISTS can see vars bound outside
n.graph.no_isolated_scope = True
e = traverse(e, visitPost=_c)
return e
[docs]def collectAndRemoveFilters(parts):
"""
FILTER expressions apply to the whole group graph pattern in which
they appear.
http://www.w3.org/TR/sparql11-query/#sparqlCollectFilters
"""
filters = []
i = 0
while i < len(parts):
p = parts[i]
if p.name == 'Filter':
filters.append(translateExists(p.expr))
parts.pop(i)
else:
i += 1
if filters:
return and_(*filters)
return None
[docs]def translateGroupOrUnionGraphPattern(graphPattern):
A = None
for g in graphPattern.graph:
g = translateGroupGraphPattern(g)
if not A:
A = g
else:
A = Union(A, g)
return A
[docs]def translateGraphGraphPattern(graphPattern):
return Graph(graphPattern.term,
translateGroupGraphPattern(graphPattern.graph))
[docs]def translateInlineData(graphPattern):
return ToMultiSet(translateValues(graphPattern))
[docs]def translateGroupGraphPattern(graphPattern):
"""
http://www.w3.org/TR/sparql11-query/#convertGraphPattern
"""
if graphPattern.name == 'SubSelect':
return ToMultiSet(translate(graphPattern)[0])
if not graphPattern.part:
graphPattern.part = [] # empty { }
filters = collectAndRemoveFilters(graphPattern.part)
g = []
for p in graphPattern.part:
if p.name == 'TriplesBlock':
# merge adjacent TripleBlocks
if not (g and g[-1].name == 'BGP'):
g.append(BGP())
g[-1]["triples"] += triples(p.triples)
else:
g.append(p)
G = BGP()
for p in g:
if p.name == 'OptionalGraphPattern':
A = translateGroupGraphPattern(p.graph)
if A.name == 'Filter':
G = LeftJoin(G, A.p, A.expr)
else:
G = LeftJoin(G, A, TrueFilter)
elif p.name == 'MinusGraphPattern':
G = Minus(p1=G, p2=translateGroupGraphPattern(p.graph))
elif p.name == 'GroupOrUnionGraphPattern':
G = Join(p1=G, p2=translateGroupOrUnionGraphPattern(p))
elif p.name == 'GraphGraphPattern':
G = Join(p1=G, p2=translateGraphGraphPattern(p))
elif p.name == 'InlineData':
G = Join(p1=G, p2=translateInlineData(p))
elif p.name == 'ServiceGraphPattern':
G = Join(p1=G, p2=p)
elif p.name in ('BGP', 'Extend'):
G = Join(p1=G, p2=p)
elif p.name == 'Bind':
G = Extend(G, p.expr, p.var)
else:
raise Exception('Unknown part in GroupGraphPattern: %s - %s' %
(type(p), p.name))
if filters:
G = Filter(expr=filters, p=G)
return G
[docs]class StopTraversal(Exception):
[docs] def __init__(self, rv):
self.rv = rv
def _traverse(e, visitPre=lambda n: None, visitPost=lambda n: None):
"""
Traverse a parse-tree, visit each node
if visit functions return a value, replace current node
"""
_e = visitPre(e)
if _e is not None:
return _e
if e is None:
return None
if isinstance(e, (list, ParseResults)):
return [_traverse(x, visitPre, visitPost) for x in e]
elif isinstance(e, tuple):
return tuple([_traverse(x, visitPre, visitPost) for x in e])
elif isinstance(e, CompValue):
for k, val in e.iteritems():
e[k] = _traverse(val, visitPre, visitPost)
_e = visitPost(e)
if _e is not None:
return _e
return e
def _traverseAgg(e, visitor=lambda n, v: None):
"""
Traverse a parse-tree, visit each node
if visit functions return a value, replace current node
"""
res = []
if isinstance(e, (list, ParseResults, tuple)):
res = [_traverseAgg(x, visitor) for x in e]
elif isinstance(e, CompValue):
for k, val in e.iteritems():
if val != None:
res.append(_traverseAgg(val, visitor))
return visitor(e, res)
[docs]def traverse(
tree, visitPre=lambda n: None,
visitPost=lambda n: None, complete=None):
"""
Traverse tree, visit each node with visit function
visit function may raise StopTraversal to stop traversal
if complete!=None, it is returned on complete traversal,
otherwise the transformed tree is returned
"""
try:
r = _traverse(tree, visitPre, visitPost)
if complete is not None:
return complete
return r
except StopTraversal, st:
return st.rv
def _hasAggregate(x):
"""
Traverse parse(sub)Tree
return true if any aggregates are used
"""
if isinstance(x, CompValue):
if x.name.startswith('Aggregate_'):
raise StopTraversal(True)
def _aggs(e, A):
"""
Collect Aggregates in A
replaces aggregates with variable references
"""
# TODO: nested Aggregates?
if isinstance(e, CompValue) and e.name.startswith('Aggregate_'):
A.append(e)
aggvar = Variable('__agg_%d__' % len(A))
e["res"] = aggvar
return aggvar
def _findVars(x, res):
"""
Find all variables in a tree
"""
if isinstance(x, Variable):
res.add(x)
if isinstance(x, CompValue):
if x.name == "Bind":
res.add(x.var)
return x # stop recursion and finding vars in the expr
elif x.name == 'SubSelect':
if x.projection:
res.update(v.var or v.evar for v in x.projection)
return x
def _addVars(x, children):
"""
find which variables may be bound by this part of the query
"""
if isinstance(x, Variable):
return set([x])
elif isinstance(x, CompValue):
if x.name == "RelationalExpression":
x["_vars"] = set()
elif x.name == "Extend":
# vars only used in the expr for a bind should not be included
x["_vars"] = reduce(operator.or_, [ child for child,part in zip(children,x) if part!='expr' ], set())
else:
x["_vars"] = set(reduce(operator.or_, children, set()))
if x.name == 'SubSelect':
if x.projection:
s = set(v.var or v.evar for v in x.projection)
else:
s = set()
return s
return x["_vars"]
return reduce(operator.or_, children, set())
def _sample(e, v=None):
"""
For each unaggregated variable V in expr
Replace V with Sample(V)
"""
if isinstance(e, CompValue) and e.name.startswith("Aggregate_"):
return e # do not replace vars in aggregates
if isinstance(e, Variable) and v != e:
return CompValue('Aggregate_Sample', vars=e)
def _simplifyFilters(e):
if isinstance(e, Expr):
return simplifyFilters(e)
[docs]def translateAggregates(q, M):
E = []
A = []
# collect/replace aggs in :
# select expr as ?var
if q.projection:
for v in q.projection:
if v.evar:
v.expr = traverse(v.expr, functools.partial(_sample, v=v.evar))
v.expr = traverse(v.expr, functools.partial(_aggs, A=A))
# having clause
if traverse(q.having, _hasAggregate, complete=False):
q.having = traverse(q.having, _sample)
traverse(q.having, functools.partial(_aggs, A=A))
# order by
if traverse(q.orderby, _hasAggregate, complete=False):
q.orderby = traverse(q.orderby, _sample)
traverse(q.orderby, functools.partial(_aggs, A=A))
# sample all other select vars
# TODO: only allowed for vars in group-by?
if q.projection:
for v in q.projection:
if v.var:
rv = Variable('__agg_%d__' % (len(A) + 1))
A.append(CompValue('Aggregate_Sample', vars=v.var, res=rv))
E.append((rv, v.var))
return CompValue('AggregateJoin', A=A, p=M), E
[docs]def translateValues(v):
# if len(v.var)!=len(v.value):
# raise Exception("Unmatched vars and values in ValueClause: "+str(v))
res = []
if not v.var:
return res
if not v.value:
return res
if not isinstance(v.value[0], list):
for val in v.value:
res.append({v.var[0]: val})
else:
for vals in v.value:
res.append(dict(zip(v.var, vals)))
return Values(res)
[docs]def translate(q):
"""
http://www.w3.org/TR/sparql11-query/#convertSolMod
"""
_traverse(q, _simplifyFilters)
q.where = traverse(q.where, visitPost=translatePath)
# TODO: Var scope test
VS = set()
traverse(q.where, functools.partial(_findVars, res=VS))
# all query types have a where part
M = translateGroupGraphPattern(q.where)
aggregate = False
if q.groupby:
conditions = []
# convert "GROUP BY (?expr as ?var)" to an Extend
for c in q.groupby.condition:
if isinstance(c, CompValue) and c.name == 'GroupAs':
M = Extend(M, c.expr, c.var)
c = c.var
conditions.append(c)
M = Group(p=M, expr=conditions)
aggregate = True
elif traverse(q.having, _hasAggregate, complete=False) or \
traverse(q.orderby, _hasAggregate, complete=False) or \
any(traverse(x.expr, _hasAggregate, complete=False)
for x in q.projection or [] if x.evar):
# if any aggregate is used, implicit group by
M = Group(p=M)
aggregate = True
if aggregate:
M, E = translateAggregates(q, M)
else:
E = []
# HAVING
if q.having:
M = Filter(expr=and_(*q.having.condition), p=M)
# VALUES
if q.valuesClause:
M = Join(p1=M, p2=ToMultiSet(translateValues(q.valuesClause)))
if not q.projection:
# select *
PV = list(VS)
else:
PV = list()
for v in q.projection:
if v.var:
if v not in PV:
PV.append(v.var)
elif v.evar:
if v not in PV:
PV.append(v.evar)
E.append((v.expr, v.evar))
else:
raise Exception("I expected a var or evar here!")
for e, v in E:
M = Extend(M, e, v)
# ORDER BY
if q.orderby:
M = OrderBy(M, [CompValue('OrderCondition', expr=c.expr,
order=c.order) for c in q.orderby.condition])
# PROJECT
M = Project(M, PV)
if q.modifier:
if q.modifier == 'DISTINCT':
M = CompValue('Distinct', p=M)
elif q.modifier == 'REDUCED':
M = CompValue('Reduced', p=M)
if q.limitoffset:
offset = 0
if q.limitoffset.offset != None:
offset = q.limitoffset.offset.toPython()
if q.limitoffset.limit != None:
M = CompValue('Slice', p=M, start=offset,
length=q.limitoffset.limit.toPython())
else:
M = CompValue('Slice', p=M, start=offset)
return M, PV
[docs]def simplify(n):
"""Remove joins to empty BGPs"""
if isinstance(n, CompValue):
if n.name == 'Join':
if n.p1.name == 'BGP' and len(n.p1.triples) == 0:
return n.p2
if n.p2.name == 'BGP' and len(n.p2.triples) == 0:
return n.p1
elif n.name == 'BGP':
n["triples"] = reorderTriples(n.triples)
return n
[docs]def analyse(n, children):
"""
Some things can be lazily joined.
This propegates whether they can up the tree
and sets lazy flags for all joins
"""
if isinstance(n, CompValue):
if n.name == 'Join':
n["lazy"] = all(children)
return False
elif n.name in ('Slice', 'Distinct'):
return False
else:
return all(children)
else:
return True
[docs]def translatePrologue(p, base, initNs=None, prologue=None):
if prologue is None:
prologue = Prologue()
prologue.base = ""
if base:
prologue.base = base
if initNs:
for k, v in initNs.iteritems():
prologue.bind(k, v)
for x in p:
if x.name == 'Base':
prologue.base = x.iri
elif x.name == 'PrefixDecl':
prologue.bind(x.prefix, prologue.absolutize(x.iri))
return prologue
[docs]def translateQuads(quads):
if quads.triples:
alltriples = triples(quads.triples)
else:
alltriples = []
allquads = collections.defaultdict(list)
if quads.quadsNotTriples:
for q in quads.quadsNotTriples:
if q.triples:
allquads[q.term] += triples(q.triples)
return alltriples, allquads
[docs]def translateUpdate1(u, prologue):
if u.name in ('Load', 'Clear', 'Drop', 'Create'):
pass # no translation needed
elif u.name in ('Add', 'Move', 'Copy'):
pass
elif u.name in ('InsertData', 'DeleteData', 'DeleteWhere'):
t, q = translateQuads(u.quads)
u["quads"] = q
u["triples"] = t
if u.name in ('DeleteWhere', 'DeleteData'):
pass # TODO: check for bnodes in triples
elif u.name == 'Modify':
if u.delete:
u.delete["triples"], u.delete[
"quads"] = translateQuads(u.delete.quads)
if u.insert:
u.insert["triples"], u.insert[
"quads"] = translateQuads(u.insert.quads)
u["where"] = translateGroupGraphPattern(u.where)
else:
raise Exception('Unknown type of update operation: %s' % u)
u.prologue = prologue
return u
[docs]def translateUpdate(q, base=None, initNs=None):
"""
Returns a list of SPARQL Update Algebra expressions
"""
res = []
prologue = None
if not q.request:
return res
for p, u in zip(q.prologue, q.request):
prologue = translatePrologue(p, base, initNs, prologue)
# absolutize/resolve prefixes
u = traverse(
u, visitPost=functools.partial(translatePName, prologue=prologue))
u = _traverse(u, _simplifyFilters)
u = traverse(u, visitPost=translatePath)
res.append(translateUpdate1(u, prologue))
return res
[docs]def translateQuery(q, base=None, initNs=None):
"""
Translate a query-parsetree to a SPARQL Algebra Expression
Return a rdflib.plugins.sparql.sparql.Query object
"""
# We get in: (prologue, query)
prologue = translatePrologue(q[0], base, initNs)
# absolutize/resolve prefixes
q[1] = traverse(
q[1], visitPost=functools.partial(translatePName, prologue=prologue))
P, PV = translate(q[1])
datasetClause = q[1].datasetClause
if q[1].name == 'ConstructQuery':
template = triples(q[1].template) if q[1].template else None
res = CompValue(q[1].name, p=P,
template=template,
datasetClause=datasetClause)
else:
res = CompValue(q[1].name, p=P, datasetClause=datasetClause, PV=PV)
res = traverse(res, visitPost=simplify)
_traverseAgg(res, visitor=analyse)
_traverseAgg(res, _addVars)
return Query(prologue, res)
[docs]def pprintAlgebra(q):
def pp(p, ind=" "):
# if isinstance(p, list):
# print "[ "
# for x in p: pp(x,ind)
# print "%s ]"%ind
# return
if not isinstance(p, CompValue):
print p
return
print "%s(" % (p.name, )
for k in p:
print "%s%s =" % (ind, k,),
pp(p[k], ind + " ")
print "%s)" % ind
try:
pp(q.algebra)
except AttributeError:
# it's update, just a list
for x in q:
pp(x)
if __name__ == '__main__':
import sys
from rdflib.plugins.sparql import parser
import os.path
if os.path.exists(sys.argv[1]):
q = file(sys.argv[1])
else:
q = sys.argv[1]
pq = parser.parseQuery(q)
print pq
tq = translateQuery(pq)
print pprintAlgebra(tq)