from ast import ( # Import,; ImportFrom,; Expr,; alias,
Add,
And,
Assign,
Attribute,
BinOp,
BoolOp,
Call,
Compare,
Constant,
Eq,
FunctionDef,
GeneratorExp,
Gt,
GtE,
If,
Import,
Is,
IsNot,
Lambda,
List,
Load,
Lt,
LtE,
Module,
Name,
Not,
NotEq,
Num,
Return,
Store,
Str,
Tuple,
UnaryOp,
While,
alias,
arg,
arguments,
comprehension,
fix_missing_locations,
stmt,
)
from enum import Enum
OPERATORS = {
"<": Lt,
"<=": LtE,
"=": Eq,
"!=": NotEq,
">": Gt,
">=": GtE,
}
_extensions = {}
[docs]class Parser(object):
def __init__(self, function):
expression = function(None)
if getattr(expression, "copy", None):
expression = expression.copy()
func_code = getattr(function, "__code__", None) or function.func_code
self.file = {
"lineno": func_code.co_firstlineno,
"filename": func_code.co_filename,
"col_offset": 0,
"ctx": Load(),
}
if getattr(func_code, 'co_lines', None):
self.file['end_lineno'] = max(x[2] for x in func_code.co_lines())
self.file_store = dict(self.file, ctx=Store())
self.expression = expression
self.imports = set()
body = self.build_expression(expression)
if isinstance(body, list):
pass
elif not isinstance(body, stmt):
body = [Return(value=body, **self.file)]
else:
body = [body]
self.ast = Module(
# body=self.imports + [
body=[
FunctionDef(
name=func_code.co_name,
args=arguments(
args=[arg(arg="self", annotation=None)], # function.func_code.co_varnames
defaults=[],
vararg=None,
kwarg=None,
kwonlyargs=[],
kw_defaults=[],
posonlyargs=[],
),
kwarg=[],
kw_defaults=[],
vararg=[],
kwonlyargs=[],
body=[
Import(names=[alias(name=imp, **self.file)], **self.file)
for imp in self.imports
] + body,
decorator_list=[],
),
],
type_ignores=[],
**self.file,
)
fix_missing_locations(self.ast)
self.code = compile(self.ast, mode="exec", filename=self.file["filename"])
[docs] def build_expression(self, expression):
if expression is None:
return Constant(value=None, **self.file)
handler_name = "handle_{}".format(expression.__class__.__name__.lower())
if handler_name in _extensions:
return _extensions[handler_name](self, expression)
return getattr(self, handler_name)(expression)
[docs] def handle_case(self, case):
# We can have N When expressions, and then (optionally) one that is the default.
return self.handle_when(*case.get_source_expressions())
[docs] def handle_when(self, when, *others):
test, body = when.get_source_expressions()
if others:
if others[0].__class__.__name__ == "When":
orelse = [self.handle_when(*others)]
else:
# Can we ever have other expressions after the default?
orelse = [Return(value=self.build_expression(others[0]), **self.file)]
else:
orelse = []
return If(
test=self.build_expression(test),
body=[Return(value=self.build_expression(body), **self.file)],
orelse=orelse,
**self.file,
)
[docs] def handle_bool(self, boolean):
return Constant(value=boolean, **self.file)
[docs] def handle_q(self, q):
if not q.children:
expr = Name(id="True", **self.file)
elif len(q.children) == 1:
expr = self._attr_lookup(*q.children[0])
elif q.connector == "AND":
expr = Call(
func=Name(id="all", **self.file),
args=[List(elts=[self._attr_lookup(k, v) for k, v in q.children], **self.file)],
keywords=[],
kwonlyargs=[],
**self.file,
)
else: # q.connector == 'OR'
expr = Call(
func=Name(id="any", **self.file),
args=[List(elts=[self._attr_lookup(k, v) for k, v in q.children], **self.file)],
keywords=[],
kwonlyargs=[],
**self.file,
)
if q.negated:
return UnaryOp(op=Not(), operand=expr, **self.file)
return expr
[docs] def handle_concat(self, concat):
return self.build_expression(*concat.get_source_expressions())
[docs] def handle_concatpair(self, pair):
a, b = pair.get_source_expressions()
return BinOp(left=self.build_expression(a), op=Add(), right=self.build_expression(b), **self.file)
[docs] def handle_f(self, f):
# Do we need to use .attname here?
# What about transforms/lookups?
f.contains_aggregate = False
if "__" in f.name:
self.imports.add('functools')
return Call(
func=Attribute(value=Name(id='functools', **self.file), attr='reduce', **self.file),
args=[
Lambda(
args=arguments(
posonlyargs=[],
args=[arg(arg='x', **self.file), arg(arg='y', **self.file)],
kwonlyargs=[],
kw_defaults=[],
defaults=[]
),
body=Call(
func=Name(id='getattr', **self.file),
args=[
Name(id='x', **self.file),
Name(id='y', **self.file),
Constant(value=None, **self.file),
],
keywords=[],
**self.file
),
**self.file
),
List(
elts=[
Constant(value=x, **self.file)
for x in f.name.split('__')
],
**self.file
),
Name(id='self', **self.file),
],
keywords=[],
**self.file,
)
left, *parts = f.name.split('__')
expression = Attribute(value=Name(id='self', **self.file), attr=left, **self.file)
while parts:
left, *parts = parts
expression = Attribute(value=expression, attr=left, **self.file)
return expression
# We need to chain a bunch of attr lookups, returning None
# if any of them give us a None, we should be returning a None.
#
# . while x is not None and parts:
# . x = getattr(x, parts.pop(0), None)
# return x
return [
Assign(
targets=[Name(id="source", **self.file_store)], value=Name(id="self", **self.file), **self.file
),
Assign(
targets=[Name(id="parts", **self.file_store)],
value=Call(
func=Attribute(value=Constant(value=f.name, **self.file), attr="split", **self.file),
args=[Constant(value="__", **self.file)],
keywords=[],
kwonlyargs=[],
**self.file,
),
**self.file,
),
While(
test=BoolOp(
op=And(),
values=[
Compare(
left=Name(id="source", **self.file),
ops=[IsNot()],
comparators=[Constant(value=None, **self.file)],
**self.file,
),
Name(id="parts", **self.file),
],
**self.file,
),
body=[
Assign(
targets=[Name(id="source", **self.file_store)],
value=Call(
func=Name(id="getattr", **self.file),
args=[
Name(id="source", **self.file),
Call(
func=Attribute(value=Name(id="parts", **self.file), attr="pop", **self.file),
args=[Constant(value=0, **self.file)],
keywords=[],
kwonlyargs=[],
**self.file,
),
Constant(value=None, **self.file),
],
keywords=[],
kwonlyargs=[],
**self.file,
),
**self.file,
),
],
orelse=[],
**self.file,
),
Return(value=Name(id="source", **self.file), **self.file),
]
return Attribute(value=Name(id="self", **self.file), attr=f.name, **self.file)
[docs] def handle_value(self, value):
if isinstance(value.value, Enum):
return Attribute(
value=Name(id=value.value.__class__.__name__, **self.file),
attr=value.value.name,
**self.file,
)
if isinstance(value.value, str):
return Str(s=value.value, **self.file)
if value.value is None:
return Constant(value=None, **self.file)
if isinstance(value.value, int):
return Num(n=value.value, **self.file)
if isinstance(value.value, bool):
return Constant(value=value.value, **self.file)
raise ValueError("Unhandled Value")
def _handle_call_factory(func):
def handle_call(self, expression):
return Call(
func=Attribute(
value=self.build_expression(*expression.get_source_expressions()), attr=func, **self.file
),
args=[],
keywords=[],
kwonlyargs=[],
**self.file,
)
return handle_call
handle_upper = _handle_call_factory("upper")
handle_lower = _handle_call_factory("lower")
[docs] def handle_expressionwrapper(self, expression):
return self.build_expression(*expression.get_source_expressions())
[docs] def handle_combinedexpression(self, expression):
return Compare(
left=self.build_expression(expression.lhs),
ops=[OPERATORS[expression.connector](**self.file)],
comparators=[self.build_expression(expression.rhs)],
**self.file,
)
# This is commented out because it only supports one function.
# def handle_func(self, expression):
# func = expression.extra.get('function') or expression.function
# if func in {'current_timestamp', 'now'}:
# self.imports.append(Import(names=[alias(name='django.utils')]))
# return Call(
# func=Attribute(
# value=Attribute(
# value=Attribute(
# value=Name('django', **self.file),
# attr='utils',
# **self.file
# ),
# attr='timezone',
# **self.file
# ),
# attr='now',
# **self.file
# ),
# args=[],
# keywords=[],
# **self.file,
# )
# return Call(func=Name(id='utcnow', **self.file), args=[], keywords=[], **self.file)
[docs] def handle_coalesce(self, expression):
"""
next(
itertools.chain(
(x for x in expression.get_source_expressions() where x is not None),
(None,)
)
)
"""
expressions = List(
elts=[self.build_expression(exp) for exp in expression.get_source_expressions()],
**self.file,
)
return Call(
func=Name(id="next", **self.file),
args=[
GeneratorExp(
elt=Name(id="x", **self.file),
generators=[
comprehension(
target=Name(id="x", **(dict(self.file, ctx=Store()))),
iter=expressions,
ifs=[
Compare(
left=Name(id="x", **self.file),
ops=[IsNot()],
comparators=[Constant(value=None, **self.file)],
),
],
is_async=False,
**self.file,
)
],
**self.file,
),
Tuple(elts=[Constant(value=None, **self.file)], **self.file),
],
keywords=[],
**self.file,
)
[docs] def handle_exact(self, exact):
left, right = exact.get_source_expressions()
return Compare(
left=self.build_expression(left),
ops=[Eq(**self.file)],
comparators=[self.build_expression(right)],
**self.file
)
def _attr_lookup(self, attr, value):
if "__" not in attr:
return Compare(
left=Attribute(value=Name(id="self", **self.file), attr=attr, **self.file),
ops=[Eq()],
comparators=[self.build_expression(value)],
**self.file,
)
attr, lookup = attr.split("__", 1)
if lookup == "isnull":
return Compare(
left=Attribute(value=Name(id="self", **self.file), attr=attr, **self.file),
ops=[Is() if value else IsNot()],
comparators=[Constant(value=None, **self.file)],
**self.file,
)
if lookup == "exact":
return self._attr_lookup(attr, value)
raise ValueError("Unhandled attr lookup")
[docs]class register:
def __init__(self, func):
if isinstance(func, str):
self.name = f'handle_{func}'
else:
name = func.__code__.co_name
if not name.startswith('handle_'):
name = f'handle_{name}'
_extensions[name] = func
def __call__(self, func):
_extensions[self.name] = func