Source code for rdflib.plugins.sparql.parserutils


NOTE: PyParsing setResultName/__call__ provides a very similar solution to this
I didn't realise at the time of writing and I will remove a
lot of this code at some point

Utility classes for creating an abstract-syntax tree out with pyparsing actions

Lets you label and group parts of parser production rules

For example:

# [5] BaseDecl ::= 'BASE' IRIREF
BaseDecl = Comp('Base', Keyword('BASE') + Param('iri',IRIREF))

After parsing, this gives you back an CompValue object,
which is a dict/object with the parameters specified.
So you can access the parameters are attributes or as keys:


Comp lets you set an evalFn that is bound to the eval method of
the resulting CompValue


from __future__ import annotations

from collections import OrderedDict
from types import MethodType
from typing import (

from pyparsing import ParserElement, ParseResults, TokenConverter, originalTextFor

from rdflib.term import BNode, Identifier, Variable

    from rdflib.plugins.sparql.sparql import FrozenBindings

# This is an alternative

# Comp('Sum')( Param('x')(Number) + '+' + Param('y')(Number) )

[docs]def value( ctx: FrozenBindings, val: Any, variables: bool = False, errors: bool = False, ) -> Any: """ utility function for evaluating something... Variables will be looked up in the context Normally, non-bound vars is an error, set variables=True to return unbound vars Normally, an error raises the error, set errors=True to return error """ if isinstance(val, Expr): return val.eval(ctx) # recurse? elif isinstance(val, CompValue): raise Exception("What do I do with this CompValue? %s" % val) elif isinstance(val, list): return [value(ctx, x, variables, errors) for x in val] elif isinstance(val, (BNode, Variable)): r = ctx.get(val) if isinstance(r, SPARQLError) and not errors: raise r if r is not None: return r # not bound if variables: return val else: raise NotBoundError elif isinstance(val, ParseResults) and len(val) == 1: return value(ctx, val[0], variables, errors) else: return val
[docs]class ParamValue: """ The result of parsing a Param This just keeps the name/value All cleverness is in the CompValue """
[docs] def __init__( self, name: str, tokenList: Union[List[Any], ParseResults], isList: bool ): self.isList = isList = name if isinstance(tokenList, (list, ParseResults)) and len(tokenList) == 1: tokenList = tokenList[0] self.tokenList = tokenList
[docs] def __str__(self) -> str: return "Param(%s, %s)" % (, self.tokenList)
[docs]class Param(TokenConverter): """ A pyparsing token for labelling a part of the parse-tree if isList is true repeat occurrences of ParamList have their values merged in a list """
[docs] def __init__(self, name: str, expr, isList: bool = False): self.isList = isList TokenConverter.__init__(self, expr) self.setName(name) self.addParseAction(self.postParse2)
[docs] def postParse2(self, tokenList: Union[List[Any], ParseResults]) -> ParamValue: return ParamValue(, tokenList, self.isList)
[docs]class ParamList(Param): """ A shortcut for a Param with isList=True """
[docs] def __init__(self, name: str, expr): Param.__init__(self, name, expr, True)
_ValT = TypeVar("_ValT")
[docs]class CompValue(OrderedDict): """ The result of parsing a Comp Any included Params are available as Dict keys or as attributes """
[docs] def __init__(self, name: str, **values): OrderedDict.__init__(self) = name self.update(values)
[docs] def clone(self) -> CompValue: return CompValue(, **self)
[docs] def __str__(self) -> str: return + "_" + OrderedDict.__str__(self)
[docs] def __repr__(self) -> str: return + "_" + dict.__repr__(self)
def _value( self, val: _ValT, variables: bool = False, errors: bool = False ) -> Union[_ValT, Any]: if self.ctx is not None: return value(self.ctx, val, variables) else: return val
[docs] def __getitem__(self, a): return self._value(OrderedDict.__getitem__(self, a))
# type error: Signature of "get" incompatible with supertype "dict" # type error: Signature of "get" incompatible with supertype "Mapping" [override]
[docs] def get(self, a, variables: bool = False, errors: bool = False): # type: ignore[override] return self._value(OrderedDict.get(self, a, a), variables, errors)
[docs] def __getattr__(self, a: str) -> Any: # Hack hack: OrderedDict relies on this if a in ("_OrderedDict__root", "_OrderedDict__end"): raise AttributeError() try: return self[a] except KeyError: # raise AttributeError('no such attribute '+a) return None
if TYPE_CHECKING: # this is here because properties are dynamically set on CompValue def __setattr__(self, __name: str, __value: Any) -> None: ...
[docs]class Expr(CompValue): """ A CompValue that is evaluatable """
[docs] def __init__( self, name: str, evalfn: Optional[Callable[[Any, Any], Any]] = None, **values, ): super(Expr, self).__init__(name, **values) self._evalfn = None if evalfn: self._evalfn = MethodType(evalfn, self)
[docs] def eval(self, ctx: Any = {}) -> Union[SPARQLError, Any]: try: self.ctx: Optional[Union[Mapping, FrozenBindings]] = ctx # type error: "None" not callable return self._evalfn(ctx) # type: ignore[misc] except SPARQLError as e: return e finally: self.ctx = None
[docs]class Comp(TokenConverter): """ A pyparsing token for grouping together things with a label Any sub-tokens that are not Params will be ignored. Returns CompValue / Expr objects - depending on whether evalFn is set. """
[docs] def __init__(self, name: str, expr: ParserElement): self.expr = expr TokenConverter.__init__(self, expr) self.setName(name) self.evalfn: Optional[Callable[[Any, Any], Any]] = None
[docs] def postParse( self, instring: str, loc: int, tokenList: ParseResults ) -> Union[Expr, CompValue]: res: Union[Expr, CompValue] if self.evalfn: res = Expr( res._evalfn = MethodType(self.evalfn, res) else: res = CompValue( if == "ServiceGraphPattern": # Then this must be a service graph pattern and have # already matched. # lets assume there is one, for now, then test for two later. sgp = originalTextFor(self.expr) service_string = sgp.searchString(instring)[0][0] res["service_string"] = service_string for t in tokenList: if isinstance(t, ParamValue): if t.isList: if not in res: res[] = [] res[].append(t.tokenList) else: res[] = t.tokenList # res.append(t.tokenList) # if isinstance(t,CompValue): # res.update(t) return res
[docs] def setEvalFn(self, evalfn: Callable[[Any, Any], Any]) -> Comp: self.evalfn = evalfn return self
[docs]def prettify_parsetree(t: ParseResults, indent: str = "", depth: int = 0) -> str: out: List[str] = [] for e in t.asList(): out.append(_prettify_sub_parsetree(e, indent, depth + 1)) for k, v in sorted(t.items()): out.append("%s%s- %s:\n" % (indent, " " * depth, k)) out.append(_prettify_sub_parsetree(v, indent, depth + 1)) return "".join(out)
def _prettify_sub_parsetree( t: Union[Identifier, CompValue, set, list, dict, Tuple, bool, None], indent: str = "", depth: int = 0, ) -> str: out: List[str] = [] if isinstance(t, CompValue): out.append("%s%s> %s:\n" % (indent, " " * depth, for k, v in t.items(): out.append("%s%s- %s:\n" % (indent, " " * (depth + 1), k)) out.append(_prettify_sub_parsetree(v, indent, depth + 2)) elif isinstance(t, dict): for k, v in t.items(): out.append("%s%s- %s:\n" % (indent, " " * (depth + 1), k)) out.append(_prettify_sub_parsetree(v, indent, depth + 2)) elif isinstance(t, list): for e in t: out.append(_prettify_sub_parsetree(e, indent, depth + 1)) else: out.append("%s%s- %r\n" % (indent, " " * depth, t)) return "".join(out) # hurrah for circular imports from rdflib.plugins.sparql.sparql import NotBoundError, SPARQLError # noqa: E402