甚好!现在我们知道了如何获取堆栈信息和 Python 中每一个操作对应的帧信息。上面结果所展示的结果目前而言并不是很实用。在最后一部分中让我们对注入做进一步的封装。
增加 Python 封装
正如您所见到的,所有的底层接口都是好用的。我们最后要做的一件事是让 op_target 更加方便使用(这部分相对而言比较空泛一些,毕竟在我看来这不是整个项目中最有趣的部分)。
首先我们来看一下帧的参数所能提供的信息,如下所示:
f_code当前帧将执行的 code object f_lasti当前的操作(code object 中的字节码字符串的索引)经过我们的处理我们可以得知 DEBUG_OP 之后要被执行的操作码,这对我们聚合数据并展示是相当有用的。
新建一个用于追踪函数内部机制的类:
改变函数自身的 co_code 设置回调函数作为 op_debug 的目标函数一旦我们知道下一个操作,我们就可以分析它并修改它的参数。举例来说我们可以增加一个 auto-follow-called-functions 的特性。
| def op_target(l, f, exc=None): if op_target.callback is not None: op_target.callback(l, f, exc) class Trace: def __init__(self, func): self.func = func def call(self, *args, **kwargs): self.add_func_to_trace(self.func) # Activate Trace callback for the func call op_target.callback = self.callback try: res = self.func(*args, **kwargs) except Exception as e: res = e op_target.callback = None return res def add_func_to_trace(self, f): # Is it code? is it already transformed? if not hasattr(f ,"op_debug") and hasattr(f, "__code__"): f.__code__ = transform_code(f.__code__, transform=add_everywhere, add_stacksize=ADD_STACK) f.__globals__['op_target'] = op_target f.op_debug = True def do_auto_follow(self, stack, frame): # Nothing fancy: FrameAnalyser is just the wrapper that gives the next executed instruction next_instr = FrameAnalyser(frame).next_instr() if "CALL" in next_instr.opname: arg = next_instr.arg f_index = (arg & 0xff) + (2 * (arg >> 8)) called_func = stack[f_index] # If call target is not traced yet: do it if not hasattr(called_func, "op_debug"): self.add_func_to_trace(called_func) |
现在我们实现一个 Trace 的子类,在这个子类中增加 callback 和 doreport 这两个方法。callback 方法将在每一个操作之后被调用。doreport 方法将我们收集到的信息打印出来。
这是一个伪函数追踪器实现:
| class DummyTrace(Trace): def __init__(self, func): self.func = func self.data = collections.OrderedDict() self.last_frame = None self.known_frame = [] self.report = [] def callback(self, stack, frame, exc): if frame not in self.known_frame: self.known_frame.append(frame) self.report.append(" === Entering New Frame {0} ({1}) ===".format(frame.f_code.co_name, id(frame))) self.last_frame = frame if frame != self.last_frame: self.report.append(" === Returning to Frame {0} {1}===".format(frame.f_code.co_name, id(frame))) self.last_frame = frame self.report.append(str(stack)) instr = FrameAnalyser(frame).next_instr() offset = str(instr.offset).rjust(8) opname = str(instr.opname).ljust(20) arg = str(instr.arg).ljust(10) self.report.append("{0} {1} {2} {3}".format(offset, opname, arg, instr.argval)) self.do_auto_follow(stack, frame) def do_report(self): print("n".join(self.report)) |










