from __future__ import annotations
from collections import OrderedDict
from types import MethodType
from typing import (
TYPE_CHECKING,
Any,
Callable,
List,
Mapping,
Optional,
Tuple,
TypeVar,
Union,
)
from pyparsing import ParserElement, ParseResults, TokenConverter, originalTextFor
from rdflib.term import BNode, Identifier, Variable
if TYPE_CHECKING:
from rdflib.plugins.sparql.sparql import FrozenBindings
"""
NOTE: PyParsing setResultName/__call__ provides a very similar solution to this
I didn't realise at the time of writing and I will remove a
lot of this code at some point
Utility classes for creating an abstract-syntax tree out with pyparsing actions
Lets you label and group parts of parser production rules
For example:
# [5] BaseDecl ::= 'BASE' IRIREF
BaseDecl = Comp('Base', Keyword('BASE') + Param('iri',IRIREF))
After parsing, this gives you back an CompValue object,
which is a dict/object with the parameters specified.
So you can access the parameters are attributes or as keys:
baseDecl.iri
Comp lets you set an evalFn that is bound to the eval method of
the resulting CompValue
"""
# This is an alternative
# Comp('Sum')( Param('x')(Number) + '+' + Param('y')(Number) )
[docs]def value(
ctx: "FrozenBindings",
val: Any,
variables: bool = False,
errors: bool = False,
) -> Any:
"""
utility function for evaluating something...
Variables will be looked up in the context
Normally, non-bound vars is an error,
set variables=True to return unbound vars
Normally, an error raises the error,
set errors=True to return error
"""
if isinstance(val, Expr):
return val.eval(ctx) # recurse?
elif isinstance(val, CompValue):
raise Exception("What do I do with this CompValue? %s" % val)
elif isinstance(val, list):
return [value(ctx, x, variables, errors) for x in val]
elif isinstance(val, (BNode, Variable)):
r = ctx.get(val)
if isinstance(r, SPARQLError) and not errors:
raise r
if r is not None:
return r
# not bound
if variables:
return val
else:
raise NotBoundError
elif isinstance(val, ParseResults) and len(val) == 1:
return value(ctx, val[0], variables, errors)
else:
return val
[docs]class ParamValue:
"""
The result of parsing a Param
This just keeps the name/value
All cleverness is in the CompValue
"""
[docs] def __init__(
self, name: str, tokenList: Union[List[Any], ParseResults], isList: bool
):
self.isList = isList
self.name = name
if isinstance(tokenList, (list, ParseResults)) and len(tokenList) == 1:
tokenList = tokenList[0]
self.tokenList = tokenList
[docs] def __str__(self) -> str:
return "Param(%s, %s)" % (self.name, self.tokenList)
[docs]class Param(TokenConverter):
"""
A pyparsing token for labelling a part of the parse-tree
if isList is true repeat occurrences of ParamList have
their values merged in a list
"""
[docs] def __init__(self, name: str, expr, isList: bool = False):
self.isList = isList
TokenConverter.__init__(self, expr)
self.setName(name)
self.addParseAction(self.postParse2)
[docs] def postParse2(self, tokenList: Union[List[Any], ParseResults]) -> ParamValue:
return ParamValue(self.name, tokenList, self.isList)
[docs]class ParamList(Param):
"""
A shortcut for a Param with isList=True
"""
[docs] def __init__(self, name: str, expr):
Param.__init__(self, name, expr, True)
_ValT = TypeVar("_ValT")
[docs]class CompValue(OrderedDict):
"""
The result of parsing a Comp
Any included Params are available as Dict keys
or as attributes
"""
[docs] def __init__(self, name: str, **values):
OrderedDict.__init__(self)
self.name = name
self.update(values)
[docs] def clone(self) -> CompValue:
return CompValue(self.name, **self)
[docs] def __str__(self) -> str:
return self.name + "_" + OrderedDict.__str__(self)
[docs] def __repr__(self) -> str:
return self.name + "_" + dict.__repr__(self)
def _value(
self, val: _ValT, variables: bool = False, errors: bool = False
) -> Union[_ValT, Any]:
if self.ctx is not None:
return value(self.ctx, val, variables)
else:
return val
[docs] def __getitem__(self, a):
return self._value(OrderedDict.__getitem__(self, a))
# type error: Signature of "get" incompatible with supertype "dict"
# type error: Signature of "get" incompatible with supertype "Mapping" [override]
[docs] def get(self, a, variables: bool = False, errors: bool = False): # type: ignore[override]
return self._value(OrderedDict.get(self, a, a), variables, errors)
[docs] def __getattr__(self, a: str) -> Any:
# Hack hack: OrderedDict relies on this
if a in ("_OrderedDict__root", "_OrderedDict__end"):
raise AttributeError()
try:
return self[a]
except KeyError:
# raise AttributeError('no such attribute '+a)
return None
if TYPE_CHECKING:
# this is here because properties are dynamically set on CompValue
def __setattr__(self, __name: str, __value: Any) -> None:
...
[docs]class Expr(CompValue):
"""
A CompValue that is evaluatable
"""
[docs] def __init__(
self,
name: str,
evalfn: Optional[Callable[[Any, Any], Any]] = None,
**values,
):
super(Expr, self).__init__(name, **values)
self._evalfn = None
if evalfn:
self._evalfn = MethodType(evalfn, self)
[docs] def eval(self, ctx: Any = {}) -> Union[SPARQLError, Any]:
try:
self.ctx: Optional[Union[Mapping, FrozenBindings]] = ctx
# type error: "None" not callable
return self._evalfn(ctx) # type: ignore[misc]
except SPARQLError as e:
return e
finally:
self.ctx = None
[docs]class Comp(TokenConverter):
"""
A pyparsing token for grouping together things with a label
Any sub-tokens that are not Params will be ignored.
Returns CompValue / Expr objects - depending on whether evalFn is set.
"""
[docs] def __init__(self, name: str, expr: ParserElement):
self.expr = expr
TokenConverter.__init__(self, expr)
self.setName(name)
self.evalfn: Optional[Callable[[Any, Any], Any]] = None
[docs] def postParse(
self, instring: str, loc: int, tokenList: ParseResults
) -> Union[Expr, CompValue]:
res: Union[Expr, CompValue]
if self.evalfn:
res = Expr(self.name)
res._evalfn = MethodType(self.evalfn, res)
else:
res = CompValue(self.name)
if self.name == "ServiceGraphPattern":
# Then this must be a service graph pattern and have
# already matched.
# lets assume there is one, for now, then test for two later.
sgp = originalTextFor(self.expr)
service_string = sgp.searchString(instring)[0][0]
res["service_string"] = service_string
for t in tokenList:
if isinstance(t, ParamValue):
if t.isList:
if t.name not in res:
res[t.name] = []
res[t.name].append(t.tokenList)
else:
res[t.name] = t.tokenList
# res.append(t.tokenList)
# if isinstance(t,CompValue):
# res.update(t)
return res
[docs] def setEvalFn(self, evalfn: Callable[[Any, Any], Any]) -> Comp:
self.evalfn = evalfn
return self
[docs]def prettify_parsetree(t: ParseResults, indent: str = "", depth: int = 0) -> str:
out: List[str] = []
for e in t.asList():
out.append(_prettify_sub_parsetree(e, indent, depth + 1))
for k, v in sorted(t.items()):
out.append("%s%s- %s:\n" % (indent, " " * depth, k))
out.append(_prettify_sub_parsetree(v, indent, depth + 1))
return "".join(out)
def _prettify_sub_parsetree(
t: Union[Identifier, CompValue, set, list, dict, Tuple, bool, None],
indent: str = "",
depth: int = 0,
) -> str:
out: List[str] = []
if isinstance(t, CompValue):
out.append("%s%s> %s:\n" % (indent, " " * depth, t.name))
for k, v in t.items():
out.append("%s%s- %s:\n" % (indent, " " * (depth + 1), k))
out.append(_prettify_sub_parsetree(v, indent, depth + 2))
elif isinstance(t, dict):
for k, v in t.items():
out.append("%s%s- %s:\n" % (indent, " " * (depth + 1), k))
out.append(_prettify_sub_parsetree(v, indent, depth + 2))
elif isinstance(t, list):
for e in t:
out.append(_prettify_sub_parsetree(e, indent, depth + 1))
else:
out.append("%s%s- %r\n" % (indent, " " * depth, t))
return "".join(out)
# hurrah for circular imports
from rdflib.plugins.sparql.sparql import NotBoundError, SPARQLError # noqa: E402