test_splits takes allowed/target codecs as arguments, no longer relies on get_codec.
[audiomangler.git] / audiomangler / expression.py
blob97f49de04a875573da9b9913b468ec9b3437512f
1 # -*- coding: utf-8 -*-
2 ###########################################################################
3 # Copyright (C) 2008 by Andrew Mahone
4 # <andrew.mahone@gmail.com>
6 # Copyright: See COPYING file that comes with this distribution
8 ###########################################################################
9 import os.path
10 import re
11 import codecs
12 from RestrictedPython.RCompile import RExpression
13 from RestrictedPython.MutatingWalker import walk
14 from RestrictedPython.Guards import safe_builtins as eval_builtins
15 from string import maketrans
16 from compiler import ast
18 _breakre = re.compile("\(|\)|\$\$|\$(?P<nosan>/)?(?P<label>[a-z]+)?(?P<paren>\()?|(?P<raw>[rR])?(?P<quote>'|\")|\\\\.")
19 pathseptrans = unicode(maketrans('/', '_')[:48])
20 pathtrans = unicode(maketrans(r'/\[]?=+<>;",*|', os.path.sep + '_' * 13)[:125])
22 eval_builtins = eval_builtins.copy()
23 eval_builtins.update(filter=filter, map=map, max=max, min=min, reduce=reduce, reversed=reversed, slice=slice, sorted=sorted)
24 del eval_builtins['delattr']
25 del eval_builtins['setattr']
26 eval_globals = {'__builtins__':eval_builtins, '_getattr_':getattr, '_getitem_': lambda x, y: x[y]}
28 def underscorereplace_errors(e):
29 return (u'_' * (e.end - e.start), e.end)
31 codecs.register_error('underscorereplace', underscorereplace_errors)
33 def evaluate(item, cdict):
34 if isinstance(item, Expr):
35 return item.evaluate(cdict)
36 else:
37 return item
39 class InlineFuncsVisitor:
40 def __init__(self, filename, baseexpr):
41 self.filename = filename
42 self.baseexpr = baseexpr
43 def visitCallFunc(self, node, *args):
44 if not hasattr(node, 'node'):
45 return node
46 if not isinstance(node.node, ast.Name):
47 return node
48 handler = getattr(self, '_' + node.node.name, None)
49 if handler:
50 return handler(node, *args)
51 else:
52 return node
53 def _first(self, node, *args):
54 clocals = ast.Const(locals)
55 clocals.lineno = node.lineno
56 clocals = ast.CallFunc(clocals, [], None, None)
57 clocals.lineno = node.lineno
58 exp = ast.Or([])
59 exp.lineno = node.lineno
60 for item in node.args:
61 if not isinstance(item, ast.Const) or isinstance(item.value, basestring):
62 if isinstance(item, ast.Const):
63 item = item.value
64 item = self.baseexpr(item, self.filename)
65 item = ast.Const(item.evaluate)
66 item.lineno = node.lineno
67 item = ast.CallFunc(item, [clocals])
68 item.lineno = node.lineno
69 exp.nodes.append(item)
70 return exp
72 class Expr(RExpression, object):
73 _globals = eval_globals
74 _cache = {}
76 def __new__(cls, source, filename="", baseexpr=None):
77 key = (cls, source, filename, baseexpr)
78 if isinstance(source, basestring):
79 if key not in cls._cache:
80 cls._cache[key] = object.__new__(cls)
81 return cls._cache[key]
82 elif isinstance(source, ast.Node):
83 return object.__new__(cls)
84 elif isinstance(source, cls):
85 return source
87 def __init__(self, source, filename="", baseexpr=None):
88 if hasattr(self, '_compiled'):
89 return
90 self._source = source
91 self._baseexpr = baseexpr or getattr(self.__class__, '_baseexpr', None) or self.__class__
92 self._filename = filename
93 if not isinstance(source, ast.Node):
94 RExpression.__init__(self, source, filename)
95 source = self._get_tree()
96 else:
97 if not (isinstance(source, ast.Expression)):
98 source = ast.Expression(source)
99 source.filename = filename
100 walk(source, InlineFuncsVisitor(self._filename, self._baseexpr))
101 gen = self.CodeGeneratorClass(source)
102 self._compiled = gen.getCode()
104 def __hash__(self):
105 return hash(self._compiled)
107 def _get_tree(self):
108 tree = RExpression._get_tree(self)
109 walk(tree, InlineFuncsVisitor(self.filename, self._baseexpr))
110 return tree
112 def evaluate(self, cdict):
113 try:
114 return eval(self._compiled, self._globals, cdict)
115 except NameError:
116 return None
118 class StringExpr(Expr):
119 def evaluate(self, cdict):
120 ret = super(self.__class__, self).evaluate(cdict)
121 if ret is not None:
122 ret = unicode(ret)
123 return ret
125 class SanitizedExpr(Expr):
126 def evaluate(self, cdict):
127 ret = super(self.__class__, self).evaluate(cdict)
128 if ret is not None:
129 ret = unicode(ret).translate(pathseptrans)
130 return ret
132 class Format(Expr):
133 _sanitize = False
135 def _get_tree(self):
136 clocals = ast.Const(locals)
137 clocals.lineno = 1
138 clocals = ast.CallFunc(clocals, [], None, None)
139 clocals.lineno = 1
140 items = self._parse()
141 te = ast.Tuple([])
142 te.lineno = 1
143 ta = ast.Tuple([])
144 ta.lineno = 1
145 for item in items:
146 if isinstance(item, Expr):
147 item = ast.Const(item.evaluate)
148 item.lineno = 1
149 item = ast.CallFunc(item, [clocals])
150 item.lineno = 1
151 ta.nodes.append(item)
152 te.nodes.append(item)
153 else:
154 item = ast.Const(item)
155 item.lineno = 1
156 ta.nodes.append(item)
157 result = ast.Const(''.join)
158 result.lineno = 1
159 result = ast.CallFunc(result, [ta], None, None)
160 if te.nodes:
161 none = ast.Name('None')
162 none.lineno = 1
163 test = ast.Compare(none, [('in', te)])
164 test.lineno = 1
165 result = ast.IfExp(test, none, result)
166 result.lineno = 1
167 result = ast.Expression(result)
168 result.lineno = 1
169 result.filename = self._filename
170 return result
172 def _parse(self):
173 state = []
174 result = []
175 cur = []
176 prevend = 0
177 for m in _breakre.finditer(self._source):
178 # import pdb; pdb.set_trace()
179 mt = m.group(0)
180 mg = m.groupdict()
181 if m.start() > prevend:
182 cur.append(self._source[prevend:m.start()])
183 prevend = m.end()
184 if not state:
185 if mt == '$$':
186 cur.append('$')
187 elif mt.startswith('$'):
188 if not (mg['label'] or mg['paren']):
189 cur.append(mt)
190 continue
191 if any(cur):
192 result.append(''.join(cur))
193 cur = []
194 if not mg['paren']:
195 if mg['nosan'] or not self._sanitize:
196 result.append(StringExpr(mg['label'], self._filename, self._baseexpr))
197 else:
198 result.append(SanitizedExpr(mg['label'], self._filename, self._baseexpr))
199 else:
200 if mg['nosan'] or not self._sanitize:
201 cur.append(StringExpr)
202 else:
203 cur.append(SanitizedExpr)
204 if mg['label']:
205 cur.append(mg['label'])
206 cur.append('(')
207 state.append('(')
208 else:
209 cur.append(mt)
210 else:
211 cur.append(mt)
212 if state[-1] == '(':
213 if mt == ')':
214 state.pop()
215 elif mg['quote']:
216 state.append(mg['quote'])
217 elif mt.endswith('('):
218 state.append('(')
219 else:
220 if mg['quote'] == state[-1]:
221 state.pop()
222 if not state:
223 result.append(cur[0](''.join(cur[1:]), self._filename, self._baseexpr))
224 cur = []
225 cur.append(self._source[prevend:])
226 if state:
227 raise SyntaxError('unexpected EOF while parsing', (self._filename, 1, len(self._source), self._source))
228 if any(cur):
229 result.append(''.join(cur))
230 return result
232 class SanitizedFormat(Format):
233 _sanitize = True
235 class FileFormat(SanitizedFormat):
236 _baseexpr = SanitizedFormat
237 def evaluate(self, cdict):
238 ret = super(self.__class__, self).evaluate(cdict)
239 if ret is not None:
240 ret = ret.translate(pathtrans)
241 return ret
243 #class Format(Expr):
245 def unique(testset, expr, evalexpr): pass
247 __all__ = ['Format', 'FileFormat', 'Expr', 'evaluate']