Source code for rdflib.plugins.sparql.parserutils

from collections import OrderedDict
from types import MethodType
from typing import TYPE_CHECKING, Any

from pyparsing import ParseResults, TokenConverter, originalTextFor

from rdflib import BNode, Variable

    from rdflib.plugins.sparql.sparql import FrozenBindings


NOTE: PyParsing setResultName/__call__ provides a very similar solution to this
I didn't realise at the time of writing and I will remove a
lot of this code at some point

Utility classes for creating an abstract-syntax tree out with pyparsing actions

Lets you label and group parts of parser production rules

For example:

# [5] BaseDecl ::= 'BASE' IRIREF
BaseDecl = Comp('Base', Keyword('BASE') + Param('iri',IRIREF))

After parsing, this gives you back an CompValue object,
which is a dict/object with the parameters specified.
So you can access the parameters are attributes or as keys:


Comp lets you set an evalFn that is bound to the eval method of
the resulting CompValue


# This is an alternative

# Comp('Sum')( Param('x')(Number) + '+' + Param('y')(Number) )

[docs]def value( ctx: "FrozenBindings", val: Any, variables: bool = False, errors: bool = False, ): """ utility function for evaluating something... Variables will be looked up in the context Normally, non-bound vars is an error, set variables=True to return unbound vars Normally, an error raises the error, set errors=True to return error """ if isinstance(val, Expr): return val.eval(ctx) # recurse? elif isinstance(val, CompValue): raise Exception("What do I do with this CompValue? %s" % val) elif isinstance(val, list): return [value(ctx, x, variables, errors) for x in val] elif isinstance(val, (BNode, Variable)): r = ctx.get(val) if isinstance(r, SPARQLError) and not errors: raise r if r is not None: return r # not bound if variables: return val else: raise NotBoundError elif isinstance(val, ParseResults) and len(val) == 1: return value(ctx, val[0], variables, errors) else: return val
[docs]class ParamValue(object): """ The result of parsing a Param This just keeps the name/value All cleverness is in the CompValue """
[docs] def __init__(self, name, tokenList, isList): self.isList = isList = name if isinstance(tokenList, (list, ParseResults)) and len(tokenList) == 1: tokenList = tokenList[0] self.tokenList = tokenList
[docs] def __str__(self): return "Param(%s, %s)" % (, self.tokenList)
[docs]class Param(TokenConverter): """ A pyparsing token for labelling a part of the parse-tree if isList is true repeat occurrences of ParamList have their values merged in a list """
[docs] def __init__(self, name, expr, isList=False): self.isList = isList TokenConverter.__init__(self, expr) self.setName(name) self.addParseAction(self.postParse2)
[docs] def postParse2(self, tokenList): return ParamValue(, tokenList, self.isList)
[docs]class ParamList(Param): """ A shortcut for a Param with isList=True """
[docs] def __init__(self, name, expr): Param.__init__(self, name, expr, True)
[docs]class plist(list): """this is just a list, but we want our own type to check for""" pass
[docs]class CompValue(OrderedDict): """ The result of parsing a Comp Any included Params are available as Dict keys or as attributes """
[docs] def __init__(self, name: str, **values): OrderedDict.__init__(self) = name self.update(values)
[docs] def clone(self): return CompValue(, **self)
[docs] def __str__(self): return + "_" + OrderedDict.__str__(self)
[docs] def __repr__(self): return + "_" + dict.__repr__(self)
def _value(self, val, variables=False, errors=False): if self.ctx is not None: return value(self.ctx, val, variables) else: return val
[docs] def __getitem__(self, a): return self._value(OrderedDict.__getitem__(self, a))
[docs] def get(self, a, variables=False, errors=False): return self._value(OrderedDict.get(self, a, a), variables, errors)
[docs] def __getattr__(self, a: str) -> Any: # Hack hack: OrderedDict relies on this if a in ("_OrderedDict__root", "_OrderedDict__end"): raise AttributeError() try: return self[a] except KeyError: # raise AttributeError('no such attribute '+a) return None
if TYPE_CHECKING: # this is here because properties are dynamically set on CompValue def __setattr__(self, __name: str, __value: Any) -> None: ...
[docs]class Expr(CompValue): """ A CompValue that is evaluatable """
[docs] def __init__(self, name, evalfn=None, **values): super(Expr, self).__init__(name, **values) self._evalfn = None if evalfn: self._evalfn = MethodType(evalfn, self)
[docs] def eval(self, ctx={}): try: self.ctx = ctx return self._evalfn(ctx) except SPARQLError as e: return e finally: self.ctx = None
[docs]class Comp(TokenConverter): """ A pyparsing token for grouping together things with a label Any sub-tokens that are not Params will be ignored. Returns CompValue / Expr objects - depending on whether evalFn is set. """
[docs] def __init__(self, name, expr): self.expr = expr TokenConverter.__init__(self, expr) self.setName(name) self.evalfn = None
[docs] def postParse(self, instring, loc, tokenList): if self.evalfn: res = Expr( res._evalfn = MethodType(self.evalfn, res) else: res = CompValue( if == "ServiceGraphPattern": # Then this must be a service graph pattern and have # already matched. # lets assume there is one, for now, then test for two later. sgp = originalTextFor(self.expr) service_string = sgp.searchString(instring)[0][0] res["service_string"] = service_string for t in tokenList: if isinstance(t, ParamValue): if t.isList: if not in res: res[] = plist() res[].append(t.tokenList) else: res[] = t.tokenList # res.append(t.tokenList) # if isinstance(t,CompValue): # res.update(t) return res
[docs] def setEvalFn(self, evalfn): self.evalfn = evalfn return self
[docs]def prettify_parsetree(t, indent="", depth=0): out = [] if isinstance(t, ParseResults): for e in t.asList(): out.append(prettify_parsetree(e, indent, depth + 1)) for k, v in sorted(t.items()): out.append("%s%s- %s:\n" % (indent, " " * depth, k)) out.append(prettify_parsetree(v, indent, depth + 1)) elif isinstance(t, CompValue): out.append("%s%s> %s:\n" % (indent, " " * depth, for k, v in t.items(): out.append("%s%s- %s:\n" % (indent, " " * (depth + 1), k)) out.append(prettify_parsetree(v, indent, depth + 2)) elif isinstance(t, dict): for k, v in t.items(): out.append("%s%s- %s:\n" % (indent, " " * (depth + 1), k)) out.append(prettify_parsetree(v, indent, depth + 2)) elif isinstance(t, list): for e in t: out.append(prettify_parsetree(e, indent, depth + 1)) else: out.append("%s%s- %r\n" % (indent, " " * depth, t)) return "".join(out)
# hurrah for circular imports from rdflib.plugins.sparql.sparql import NotBoundError, SPARQLError # noqa: E402