Source code for rdflib.plugins.sparql.algebra


"""
Converting the 'parse-tree' output of pyparsing to a SPARQL Algebra expression

http://www.w3.org/TR/sparql11-query/#sparqlQuery

"""

from __future__ import print_function

import functools
import operator
import collections

from functools import reduce

from rdflib import Literal, Variable, URIRef, BNode

from rdflib.plugins.sparql.sparql import Prologue, Query
from rdflib.plugins.sparql.parserutils import CompValue, Expr
from rdflib.plugins.sparql.operators import (
    and_, TrueFilter, simplify as simplifyFilters)
from rdflib.paths import (
    InvPath, AlternativePath, SequencePath, MulPath, NegatedPath)

from pyparsing import ParseResults


# ---------------------------
# Some convenience methods
[docs]def OrderBy(p, expr): return CompValue('OrderBy', p=p, expr=expr)
[docs]def ToMultiSet(p): return CompValue('ToMultiSet', p=p)
[docs]def Union(p1, p2): return CompValue('Union', p1=p1, p2=p2)
[docs]def Join(p1, p2): return CompValue('Join', p1=p1, p2=p2)
[docs]def Minus(p1, p2): return CompValue('Minus', p1=p1, p2=p2)
[docs]def Graph(term, graph): return CompValue('Graph', term=term, p=graph)
[docs]def BGP(triples=None): return CompValue('BGP', triples=triples or [])
[docs]def LeftJoin(p1, p2, expr): return CompValue('LeftJoin', p1=p1, p2=p2, expr=expr)
[docs]def Filter(expr, p): return CompValue('Filter', expr=expr, p=p)
[docs]def Extend(p, expr, var): return CompValue('Extend', p=p, expr=expr, var=var)
[docs]def Values(res): return CompValue('values', res=res)
[docs]def Project(p, PV): return CompValue('Project', p=p, PV=PV)
[docs]def Group(p, expr=None): return CompValue('Group', p=p, expr=expr)
def _knownTerms(triple, varsknown, varscount): return (len([x for x in triple if x not in varsknown and isinstance(x, (Variable, BNode))]), -sum(varscount.get(x, 0) for x in triple), not isinstance(triple[2], Literal), )
[docs]def reorderTriples(l): """ Reorder triple patterns so that we execute the ones with most bindings first """ def _addvar(term, varsknown): if isinstance(term, (Variable, BNode)): varsknown.add(term) l = [(None, x) for x in l] varsknown = set() varscount = collections.defaultdict(int) for t in l: for c in t[1]: if isinstance(c, (Variable, BNode)): varscount[c] += 1 i = 0 # Done in steps, sort by number of bound terms # the top block of patterns with the most bound terms is kept # the rest is resorted based on the vars bound after the first # block is evaluated # we sort by decorate/undecorate, since we need the value of the sort keys while i < len(l): l[i:] = sorted((_knownTerms(x[ 1], varsknown, varscount), x[1]) for x in l[i:]) t = l[i][0][0] # top block has this many terms bound j = 0 while i + j < len(l) and l[i + j][0][0] == t: for c in l[i + j][1]: _addvar(c, varsknown) j += 1 i += 1 return [x[1] for x in l]
[docs]def triples(l): l = reduce(lambda x, y: x + y, l) if (len(l) % 3) != 0: raise Exception('these aint triples') return reorderTriples((l[x], l[x + 1], l[x + 2]) for x in range(0, len(l), 3))
[docs]def translatePName(p, prologue): """ Expand prefixed/relative URIs """ if isinstance(p, CompValue): if p.name == 'pname': return prologue.absolutize(p) if p.name == 'literal': return Literal(p.string, lang=p.lang, datatype=prologue.absolutize(p.datatype)) elif isinstance(p, URIRef): return prologue.absolutize(p)
[docs]def translatePath(p): """ Translate PropertyPath expressions """ if isinstance(p, CompValue): if p.name == 'PathAlternative': if len(p.part) == 1: return p.part[0] else: return AlternativePath(*p.part) elif p.name == 'PathSequence': if len(p.part) == 1: return p.part[0] else: return SequencePath(*p.part) elif p.name == 'PathElt': if not p.mod: return p.part else: if isinstance(p.part, list): if len(p.part) != 1: raise Exception('Denkfehler!') return MulPath(p.part[0], p.mod) else: return MulPath(p.part, p.mod) elif p.name == 'PathEltOrInverse': if isinstance(p.part, list): if len(p.part) != 1: raise Exception('Denkfehler!') return InvPath(p.part[0]) else: return InvPath(p.part) elif p.name == 'PathNegatedPropertySet': if isinstance(p.part, list): return NegatedPath(AlternativePath(*p.part)) else: return NegatedPath(p.part)
[docs]def translateExists(e): """ Translate the graph pattern used by EXISTS and NOT EXISTS http://www.w3.org/TR/sparql11-query/#sparqlCollectFilters """ def _c(n): if isinstance(n, CompValue): if n.name in ('Builtin_EXISTS', 'Builtin_NOTEXISTS'): n.graph = translateGroupGraphPattern(n.graph) if n.graph.name == 'Filter': # filters inside (NOT) EXISTS can see vars bound outside n.graph.no_isolated_scope = True e = traverse(e, visitPost=_c) return e
[docs]def collectAndRemoveFilters(parts): """ FILTER expressions apply to the whole group graph pattern in which they appear. http://www.w3.org/TR/sparql11-query/#sparqlCollectFilters """ filters = [] i = 0 while i < len(parts): p = parts[i] if p.name == 'Filter': filters.append(translateExists(p.expr)) parts.pop(i) else: i += 1 if filters: return and_(*filters) return None
[docs]def translateGroupOrUnionGraphPattern(graphPattern): A = None for g in graphPattern.graph: g = translateGroupGraphPattern(g) if not A: A = g else: A = Union(A, g) return A
[docs]def translateGraphGraphPattern(graphPattern): return Graph(graphPattern.term, translateGroupGraphPattern(graphPattern.graph))
[docs]def translateInlineData(graphPattern): return ToMultiSet(translateValues(graphPattern))
[docs]def translateGroupGraphPattern(graphPattern): """ http://www.w3.org/TR/sparql11-query/#convertGraphPattern """ if graphPattern.name == 'SubSelect': return ToMultiSet(translate(graphPattern)[0]) if not graphPattern.part: graphPattern.part = [] # empty { } filters = collectAndRemoveFilters(graphPattern.part) g = [] for p in graphPattern.part: if p.name == 'TriplesBlock': # merge adjacent TripleBlocks if not (g and g[-1].name == 'BGP'): g.append(BGP()) g[-1]["triples"] += triples(p.triples) else: g.append(p) G = BGP() for p in g: if p.name == 'OptionalGraphPattern': A = translateGroupGraphPattern(p.graph) if A.name == 'Filter': G = LeftJoin(G, A.p, A.expr) else: G = LeftJoin(G, A, TrueFilter) elif p.name == 'MinusGraphPattern': G = Minus(p1=G, p2=translateGroupGraphPattern(p.graph)) elif p.name == 'GroupOrUnionGraphPattern': G = Join(p1=G, p2=translateGroupOrUnionGraphPattern(p)) elif p.name == 'GraphGraphPattern': G = Join(p1=G, p2=translateGraphGraphPattern(p)) elif p.name == 'InlineData': G = Join(p1=G, p2=translateInlineData(p)) elif p.name == 'ServiceGraphPattern': G = Join(p1=G, p2=p) elif p.name in ('BGP', 'Extend'): G = Join(p1=G, p2=p) elif p.name == 'Bind': G = Extend(G, p.expr, p.var) else: raise Exception('Unknown part in GroupGraphPattern: %s - %s' % (type(p), p.name)) if filters: G = Filter(expr=filters, p=G) return G
[docs]class StopTraversal(Exception):
[docs] def __init__(self, rv): self.rv = rv
def _traverse(e, visitPre=lambda n: None, visitPost=lambda n: None): """ Traverse a parse-tree, visit each node if visit functions return a value, replace current node """ _e = visitPre(e) if _e is not None: return _e if e is None: return None if isinstance(e, (list, ParseResults)): return [_traverse(x, visitPre, visitPost) for x in e] elif isinstance(e, tuple): return tuple([_traverse(x, visitPre, visitPost) for x in e]) elif isinstance(e, CompValue): for k, val in e.items(): e[k] = _traverse(val, visitPre, visitPost) _e = visitPost(e) if _e is not None: return _e return e def _traverseAgg(e, visitor=lambda n, v: None): """ Traverse a parse-tree, visit each node if visit functions return a value, replace current node """ res = [] if isinstance(e, (list, ParseResults, tuple)): res = [_traverseAgg(x, visitor) for x in e] elif isinstance(e, CompValue): for k, val in e.items(): if val is not None: res.append(_traverseAgg(val, visitor)) return visitor(e, res)
[docs]def traverse( tree, visitPre=lambda n: None, visitPost=lambda n: None, complete=None): """ Traverse tree, visit each node with visit function visit function may raise StopTraversal to stop traversal if complete!=None, it is returned on complete traversal, otherwise the transformed tree is returned """ try: r = _traverse(tree, visitPre, visitPost) if complete is not None: return complete return r except StopTraversal as st: return st.rv
def _hasAggregate(x): """ Traverse parse(sub)Tree return true if any aggregates are used """ if isinstance(x, CompValue): if x.name.startswith('Aggregate_'): raise StopTraversal(True) def _aggs(e, A): """ Collect Aggregates in A replaces aggregates with variable references """ # TODO: nested Aggregates? if isinstance(e, CompValue) and e.name.startswith('Aggregate_'): A.append(e) aggvar = Variable('__agg_%d__' % len(A)) e["res"] = aggvar return aggvar def _findVars(x, res): """ Find all variables in a tree """ if isinstance(x, Variable): res.add(x) if isinstance(x, CompValue): if x.name == "Bind": res.add(x.var) return x # stop recursion and finding vars in the expr elif x.name == 'SubSelect': if x.projection: res.update(v.var or v.evar for v in x.projection) return x def _addVars(x, children): """ find which variables may be bound by this part of the query """ if isinstance(x, Variable): return set([x]) elif isinstance(x, CompValue): if x.name == "RelationalExpression": x["_vars"] = set() elif x.name == "Extend": # vars only used in the expr for a bind should not be included x["_vars"] = reduce(operator.or_, [child for child, part in zip(children, x) if part != 'expr'], set()) else: x["_vars"] = set(reduce(operator.or_, children, set())) if x.name == 'SubSelect': if x.projection: s = set(v.var or v.evar for v in x.projection) else: s = set() return s return x["_vars"] return reduce(operator.or_, children, set()) def _sample(e, v=None): """ For each unaggregated variable V in expr Replace V with Sample(V) """ if isinstance(e, CompValue) and e.name.startswith("Aggregate_"): return e # do not replace vars in aggregates if isinstance(e, Variable) and v != e: return CompValue('Aggregate_Sample', vars=e) def _simplifyFilters(e): if isinstance(e, Expr): return simplifyFilters(e)
[docs]def translateAggregates(q, M): E = [] A = [] # collect/replace aggs in : # select expr as ?var if q.projection: for v in q.projection: if v.evar: v.expr = traverse(v.expr, functools.partial(_sample, v=v.evar)) v.expr = traverse(v.expr, functools.partial(_aggs, A=A)) # having clause if traverse(q.having, _hasAggregate, complete=False): q.having = traverse(q.having, _sample) traverse(q.having, functools.partial(_aggs, A=A)) # order by if traverse(q.orderby, _hasAggregate, complete=False): q.orderby = traverse(q.orderby, _sample) traverse(q.orderby, functools.partial(_aggs, A=A)) # sample all other select vars # TODO: only allowed for vars in group-by? if q.projection: for v in q.projection: if v.var: rv = Variable('__agg_%d__' % (len(A) + 1)) A.append(CompValue('Aggregate_Sample', vars=v.var, res=rv)) E.append((rv, v.var)) return CompValue('AggregateJoin', A=A, p=M), E
[docs]def translateValues(v): # if len(v.var)!=len(v.value): # raise Exception("Unmatched vars and values in ValueClause: "+str(v)) res = [] if not v.var: return res if not v.value: return res if not isinstance(v.value[0], list): for val in v.value: res.append({v.var[0]: val}) else: for vals in v.value: res.append(dict(zip(v.var, vals))) return Values(res)
[docs]def translate(q): """ http://www.w3.org/TR/sparql11-query/#convertSolMod """ _traverse(q, _simplifyFilters) q.where = traverse(q.where, visitPost=translatePath) # TODO: Var scope test VS = set() traverse(q.where, functools.partial(_findVars, res=VS)) # all query types have a where part M = translateGroupGraphPattern(q.where) aggregate = False if q.groupby: conditions = [] # convert "GROUP BY (?expr as ?var)" to an Extend for c in q.groupby.condition: if isinstance(c, CompValue) and c.name == 'GroupAs': M = Extend(M, c.expr, c.var) c = c.var conditions.append(c) M = Group(p=M, expr=conditions) aggregate = True elif traverse(q.having, _hasAggregate, complete=False) or \ traverse(q.orderby, _hasAggregate, complete=False) or \ any(traverse(x.expr, _hasAggregate, complete=False) for x in q.projection or [] if x.evar): # if any aggregate is used, implicit group by M = Group(p=M) aggregate = True if aggregate: M, E = translateAggregates(q, M) else: E = [] # HAVING if q.having: M = Filter(expr=and_(*q.having.condition), p=M) # VALUES if q.valuesClause: M = Join(p1=M, p2=ToMultiSet(translateValues(q.valuesClause))) if not q.projection: # select * PV = list(VS) else: PV = list() for v in q.projection: if v.var: if v not in PV: PV.append(v.var) elif v.evar: if v not in PV: PV.append(v.evar) E.append((v.expr, v.evar)) else: raise Exception("I expected a var or evar here!") for e, v in E: M = Extend(M, e, v) # ORDER BY if q.orderby: M = OrderBy(M, [CompValue('OrderCondition', expr=c.expr, order=c.order) for c in q.orderby.condition]) # PROJECT M = Project(M, PV) if q.modifier: if q.modifier == 'DISTINCT': M = CompValue('Distinct', p=M) elif q.modifier == 'REDUCED': M = CompValue('Reduced', p=M) if q.limitoffset: offset = 0 if q.limitoffset.offset is not None: offset = q.limitoffset.offset.toPython() if q.limitoffset.limit is not None: M = CompValue('Slice', p=M, start=offset, length=q.limitoffset.limit.toPython()) else: M = CompValue('Slice', p=M, start=offset) return M, PV
[docs]def simplify(n): """Remove joins to empty BGPs""" if isinstance(n, CompValue): if n.name == 'Join': if n.p1.name == 'BGP' and len(n.p1.triples) == 0: return n.p2 if n.p2.name == 'BGP' and len(n.p2.triples) == 0: return n.p1 elif n.name == 'BGP': n["triples"] = reorderTriples(n.triples) return n
[docs]def analyse(n, children): """ Some things can be lazily joined. This propegates whether they can up the tree and sets lazy flags for all joins """ if isinstance(n, CompValue): if n.name == 'Join': n["lazy"] = all(children) return False elif n.name in ('Slice', 'Distinct'): return False else: return all(children) else: return True
[docs]def translatePrologue(p, base, initNs=None, prologue=None): if prologue is None: prologue = Prologue() prologue.base = "" if base: prologue.base = base if initNs: for k, v in initNs.items(): prologue.bind(k, v) for x in p: if x.name == 'Base': prologue.base = x.iri elif x.name == 'PrefixDecl': prologue.bind(x.prefix, prologue.absolutize(x.iri)) return prologue
[docs]def translateQuads(quads): if quads.triples: alltriples = triples(quads.triples) else: alltriples = [] allquads = collections.defaultdict(list) if quads.quadsNotTriples: for q in quads.quadsNotTriples: if q.triples: allquads[q.term] += triples(q.triples) return alltriples, allquads
[docs]def translateUpdate1(u, prologue): if u.name in ('Load', 'Clear', 'Drop', 'Create'): pass # no translation needed elif u.name in ('Add', 'Move', 'Copy'): pass elif u.name in ('InsertData', 'DeleteData', 'DeleteWhere'): t, q = translateQuads(u.quads) u["quads"] = q u["triples"] = t if u.name in ('DeleteWhere', 'DeleteData'): pass # TODO: check for bnodes in triples elif u.name == 'Modify': if u.delete: u.delete["triples"], u.delete[ "quads"] = translateQuads(u.delete.quads) if u.insert: u.insert["triples"], u.insert[ "quads"] = translateQuads(u.insert.quads) u["where"] = translateGroupGraphPattern(u.where) else: raise Exception('Unknown type of update operation: %s' % u) u.prologue = prologue return u
[docs]def translateUpdate(q, base=None, initNs=None): """ Returns a list of SPARQL Update Algebra expressions """ res = [] prologue = None if not q.request: return res for p, u in zip(q.prologue, q.request): prologue = translatePrologue(p, base, initNs, prologue) # absolutize/resolve prefixes u = traverse( u, visitPost=functools.partial(translatePName, prologue=prologue)) u = _traverse(u, _simplifyFilters) u = traverse(u, visitPost=translatePath) res.append(translateUpdate1(u, prologue)) return res
[docs]def translateQuery(q, base=None, initNs=None): """ Translate a query-parsetree to a SPARQL Algebra Expression Return a rdflib.plugins.sparql.sparql.Query object """ # We get in: (prologue, query) prologue = translatePrologue(q[0], base, initNs) # absolutize/resolve prefixes q[1] = traverse( q[1], visitPost=functools.partial(translatePName, prologue=prologue)) P, PV = translate(q[1]) datasetClause = q[1].datasetClause if q[1].name == 'ConstructQuery': template = triples(q[1].template) if q[1].template else None res = CompValue(q[1].name, p=P, template=template, datasetClause=datasetClause) else: res = CompValue(q[1].name, p=P, datasetClause=datasetClause, PV=PV) res = traverse(res, visitPost=simplify) _traverseAgg(res, visitor=analyse) _traverseAgg(res, _addVars) return Query(prologue, res)
[docs]def pprintAlgebra(q): def pp(p, ind=" "): # if isinstance(p, list): # print "[ " # for x in p: pp(x,ind) # print "%s ]"%ind # return if not isinstance(p, CompValue): print(p) return print("%s(" % (p.name, )) for k in p: print("%s%s =" % (ind, k,), end=' ') pp(p[k], ind + " ") print("%s)" % ind) try: pp(q.algebra) except AttributeError: # it's update, just a list for x in q: pp(x)
if __name__ == '__main__': import sys from rdflib.plugins.sparql import parser import os.path if os.path.exists(sys.argv[1]): q = file(sys.argv[1]) else: q = sys.argv[1] pq = parser.parseQuery(q) print(pq) tq = translateQuery(pq) pprintAlgebra(tq)