原文链接: Understanding python execution from inside: A Python assembly tracer
以下为译文
最近我在学习 Python 的运行模型。我对 Python 的一些内部机制很是好奇,比如 Python 是怎么实现类似 YIELDVALUE
、YIELDFROM
这样的操作码的;对于 递推式构造列表(List Comprehensions)、生成器表达式(generator expressions)以及其他一些有趣的 Python 特性是怎么编译的;从字节码的层面来看,当异常抛出的时候都发生了什么事情。翻阅 CPython 的代码对于解答这些问题当然是很有帮助的,但我仍然觉得以这样的方式来做的话对于理解字节码的执行和堆栈的变化还是缺少点什么。GDB 是个好选择,但是我懒,而且只想使用一些比较高阶的接口写点 Python 代码来完成这件事。
所以呢,我的目标就是创建一个字节码级别的追踪 api,类似 sys.setrace 所提供的那样,但相对而言会有更好的粒度。这充分锻炼了我编写 Python 实现的 C 代码的编码能力。我们所需要的有如下几项,在这篇文章中所用的 Python 版本为 3.5。
- 一个新的 Cpython 解释器操作码
- 一种将操作码注入到 Python 字节码的方法
- 一些用于处理操作码的 Python 代码
一个新的 Cpython 操作码
新操作码:DEBUG_OP
这个新的操作码 DEBUG_OP 是我第一次尝试写 CPython 实现的 C 代码,我将尽可能的让它保持简单。 我们想要达成的目的是,当我们的操作码被执行的时候我能有一种方式来调用一些 Python 代码。同时,我们也想能够追踪一些与执行上下文有关的数据。我们的操作码会把这些信息当作参数传递给我们的回调函数。通过操作码能辨识出的有用信息如下:
- 堆栈的内容
- 执行 DEBUG_OP 的帧对象信息
所以呢,我们的操作码需要做的事情是:
- 找到回调函数
- 创建一个包含堆栈内容的列表
- 调用回调函数,并将包含堆栈内容的列表和当前帧作为参数传递给它
听起来挺简单的,现在开始动手吧!声明:下面所有的解释说明和代码是经过了大量段错误调试之后总结得到的结论。首先要做的是给操作码定义一个名字和相应的值,因此我们需要在 Include/opcode.h中添加代码。
#define DEBUG_OP 0
#define POP_TOP 1
#define ROT_TWO 2
#define ROT_THREE 3
这部分工作就完成了,现在我们去编写操作码真正干活的代码。
实现 DEBUG_OP
在考虑如何实现DEBUG_OP之前我们需要了解的是 DEBUG_OP 提供的接口将长什么样。 拥有一个可以调用其他代码的新操作码是相当酷眩的,但是究竟它将调用哪些代码捏?这个操作码如何找到回调函数的捏?我选择了一种最简单的方法:在帧的全局区域写死函数名。那么问题就变成了,我该怎么从字典中找到一个固定的 C 字符串?为了回答这个问题我们来看看在 Python 的 main loop 中使用到的和上下文管理相关的标识符 enter 和 exit。
我们可以看到这两标识符被使用在操作码 SETUP_WITH 中:
TARGET(SETUP_WITH) {
_Py_IDENTIFIER(__exit__);
_Py_IDENTIFIER(__enter__);
PyObject *mgr = TOP();
PyObject *exit = special_lookup(mgr, &PyId___exit__), *enter;
PyObject *res;
现在,看一眼宏 _Py_IDENTIFIER 定义
typedef struct _Py_Identifier {
struct _Py_Identifier *next;
const char* string;
PyObject *object;
} _Py_Identifier;
#define _Py_static_string_init(value) { 0, value, 0 }
#define _Py_static_string(varname, value) static _Py_Identifier varname = _Py_static_string_init(value)
#define _Py_IDENTIFIER(varname) _Py_static_string(PyId_##varname, #varname)
嗯,注释部分已经说明得很清楚了。通过一番查找,我们发现了可以用来从字典找固定字符串的函数 _PyDict_GetItemId,所以我们操作码的查找部分的代码就是长这样滴。
PyObject *target = NULL;
_Py_IDENTIFIER(op_target);
target = _PyDict_GetItemId(f->f_globals, &PyId_op_target);
if (target == NULL && _PyErr_OCCURRED()) {
if (!PyErr_ExceptionMatches(PyExc_KeyError))
Goto error;
PyErr_Clear();
DISPATCH();
}
为了方便理解,对这一段代码做一些说明:
- f 是当前的帧,f->f_globals 是它的全局区域
- 如果我们没有找到 op_target,我们将会检查这个异常是不是 KeyError
- goto error; 是一种在 main loop 中抛出异常的方法
- PyErr_Clear() 抑制了当前异常的抛出,而 DISPATCH() 触发了下一个操作码的执行
下一步就是收集我们想要的堆栈信息。
PyObject *value = PyList_New(0);
for (i = 1 ; i <= STACK_LEVEL(); i++) {
tmp = PEEK(i);
if (tmp == NULL) {
tmp = Py_None;
}
PyList_Append(value, tmp);
}
最后一步就是调用我们的回调函数!我们用 call_function 来搞定这件事,我们通过研究操作码 CALL_FUNCTION 的实现来学习怎么使用 call_function 。
TARGET(CALL_FUNCTION) {
PyObject **sp, *res;
sp = stack_pointer;
res = call_function(&sp, oparg);
stack_pointer = sp;
PUSH(res);
if (res == NULL)
goto error;
DISPATCH();
}
有了上面这些信息,我们终于可以捣鼓出一个操作码DEBUG_OP的草稿了:
TARGET(DEBUG_OP) {
PyObject *value = NULL;
PyObject *target = NULL;
PyObject *res = NULL;
PyObject **sp = NULL;
PyObject *tmp;
int i;
_Py_IDENTIFIER(op_target);
target = _PyDict_GetItemId(f->f_globals, &PyId_op_target);
if (target == NULL && _PyErr_OCCURRED()) {
if (!PyErr_ExceptionMatches(PyExc_KeyError))
goto error;
PyErr_Clear();
DISPATCH();
}
value = PyList_New(0);
Py_INCREF(target);
for (i = 1 ; i <= STACK_LEVEL(); i++) {
tmp = PEEK(i);
if (tmp == NULL)
tmp = Py_None;
PyList_Append(value, tmp);
}
PUSH(target);
PUSH(value);
Py_INCREF(f);
PUSH(f);
sp = stack_pointer;
res = call_function(&sp, 2);
stack_pointer = sp;
if (res == NULL)
goto error;
Py_DECREF(res);
DISPATCH();
}
在编写 CPython 实现的 C 代码方面我确实没有什么经验,有可能我漏掉了些细节。如果您有什么建议还请您纠正,我期待您的反馈。
编译它,成了!
一切看起来很顺利,但是当我们尝试去使用我们定义的操作码 DEBUG_OP 的时候却失败了。自从 2008 年之后,Python 使用预先写好的 goto(你也可以从 这里获取更多的讯息)。故,我们需要更新下 goto jump table,我们在 Python/opcode_targets.h 中做如下修改。
static void *opcode_targets[256] = {
//&&_unknown_opcode,
&&TARGET_DEBUG_OP,
&&TARGET_POP_TOP,
这就完事了,我们现在就有了一个可以工作的新操作码。唯一的问题就是这货虽然存在,但是没有被人调用过。接下来,我们将DEBUG_OP注入到函数的字节码中。
在 Python 字节码中注入操作码 DEBUG_OP
有很多方式可以在 Python 字节码中注入新的操作码:
- 使用 peephole optimizer, Quarkslab就是这么干的
- 在生成字节码的代码中动些手脚
- 在运行时直接修改函数的字节码(这就是我们将要干的事儿)
为了创造出一个新操作码,有了上面的那一堆 C 代码就够了。现在让我们回到原点,开始理解奇怪甚至神奇的 Python!
我们将要做的事儿有:
- 得到我们想要追踪函数的 code object
- 重写字节码来注入 DEBUG_OP
- 将新生成的 code object 替换回去
和 code object 有关的小贴士
如果你从没听说过 code object,这里有一个简单的介绍网路上也有一些相关的文档可供查阅,可以直接 Ctrl+F 查找 code object
还有一件事情需要注意的是在这篇文章所指的环境中 code object 是不可变的:
Python 3.4.2 (default, Oct 8 2014, 10:45:20)
[GCC 4.9.1] on linux
Type "help", "copyright", "credits" or "license" for more infORMation.
>>> x = lambda y : 2
>>> x.__code__
<code object <lambda> at 0x7f481fd88390, file "<stdin>", line 1>
>>> x.__code__.co_name
'<lambda>'
>>> x.__code__.co_name = 'truc'
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: readonly attribute
>>> x.__code__.co_consts = ('truc',)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: readonly attribute
但是不用担心,我们将会找到方法绕过这个问题的。
使用的工具
为了修改字节码我们需要一些工具:
- dis模块用来反编译和分析字节码
- dis.BytecodePython 3.4新增的一个特性,对于反编译和分析字节码特别有用
- 一个能够简单修改 code object 的方法
用 dis.Bytecode 反编译 code object 能告诉我们一些有关操作码、参数和上下文的信息。
# python3.4
>>> import dis
>>> f = lambda x: x + 3
>>> for i in dis.Bytecode(f.__code__): print (i)
...
Instruction(opname='LOAD_FAST', opcode=124, arg=0, argval='x', argrepr='x', offset=0, starts_line=1, is_jump_target=False)
Instruction(opname='LOAD_CONST', opcode=100, arg=1, argval=3, argrepr='3', offset=3, starts_line=None, is_jump_target=False)
Instruction(opname='BINARY_ADD', opcode=23, arg=None, argval=None, argrepr='', offset=6, starts_line=None, is_jump_target=False)
Instruction(opname='RETURN_VALUE', opcode=83, arg=None, argval=None, argrepr='', offset=7, starts_line=None, is_jump_target=False)
为了能够修改 code object,我定义了一个很小的类用来复制 code object,同时能够按我们的需求修改相应的值,然后重新生成一个新的 code object。
class MutableCodeObject(object):
args_name = ("co_argcount", "co_kwonlyargcount", "co_nlocals", "co_stacksize", "co_flags", "co_code",
"co_consts", "co_names", "co_varnames", "co_filename", "co_name", "co_firstlineno",
"co_lnotab", "co_freevars", "co_cellvars")
def __init__(self, initial_code):
self.initial_code = initial_code
for attr_name in self.args_name:
attr = getattr(self.initial_code, attr_name)
if isinstance(attr, tuple):
attr = list(attr)
setattr(self, attr_name, attr)
def get_code(self):
args = []
for attr_name in self.args_name:
attr = getattr(self, attr_name)
if isinstance(attr, list):
attr = tuple(attr)
args.append(attr)
return self.initial_code.__class__(*args)
这个类用起来很方便,解决了上面提到的 code object 不可变的问题。
>>> x = lambda y : 2
>>> m = MutableCodeObject(x.__code__)
>>> m
<new_code.MutableCodeObject object at 0x7f3f0ea546a0>
>>> m.co_consts
[None, 2]
>>> m.co_consts[1] = '3'
>>> m.co_name = 'truc'
>>> m.get_code()
<code object truc at 0x7f3f0ea2bc90, file "<stdin>", line 1>
测试我们的新操作码
我们现在拥有了注入 DEBUG_OP 的所有工具,让我们来验证下我们的实现是否可用。我们将我们的操作码注入到一个最简单的函数中:
from new_code import MutableCodeObject
def op_target(*args):
print("WOOT")
print("op_target called with args <{0}>".format(args))
def nop():
pass
new_nop_code = MutableCodeObject(nop.__code__)
new_nop_code.co_code = b"\x00" + new_nop_code.co_code[0:3] + b"\x00" + new_nop_code.co_code[-1:]
new_nop_code.co_stacksize += 3
nop.__code__ = new_nop_code.get_code()
import dis
dis.dis(nop)
nop()
# Don't forget that ./python is our custom Python implementing DEBUG_OP
hakril@computer ~/python/CPython3.5 % ./python proof.py
8 0 <0>
1 LOAD_CONST 0 (None)
4 <0>
5 RETURN_VALUE
WOOT
op_target called with args <([], <frame object at 0x7fde9eaebdb0>)>
WOOT
op_target called with args <([None], <frame object at 0x7fde9eaebdb0>)>
看起来它成功了!有一行代码需要说明一下 new_nop_code.co_stacksize += 3
- co_stacksize 表示 code object 所需要的堆栈的大小
- 操作码DEBUG_OP往堆栈中增加了三项,所以我们需要为这些增加的项预留些空间
现在我们可以将我们的操作码注入到每一个 Python 函数中了!
重写字节码
正如我们在上面的例子中所看到的那样,重写 Pyhton 的字节码似乎 so easy。为了在每一个操作码之间注入我们的操作码,我们需要获取每一个操作码的偏移量,然后将我们的操作码注入到这些位置上(把我们操作码注入到参数上是有坏处大大滴)。这些偏移量也很容易获取,使用 dis.Bytecode,就像这样。
def add_debug_op_everywhere(code_obj):
# We get every instruction offset in the code object
offsets = [instr.offset for instr in dis.Bytecode(code_obj)]
# And insert a DEBUG_OP at every offset
return insert_op_debug_list(code_obj, offsets)
def insert_op_debug_list(code, offsets):
# We insert the DEBUG_OP one by one
for nb, off in enumerate(sorted(offsets)):
# Need to ajust the offsets by the number of opcodes already inserted before
# That's why we sort our offsets!
code = insert_op_debug(code, off + nb)
return code
# Last problem: what does insert_op_debug looks like?
基于上面的例子,有人可能会想我们的 insert_op_debug 会在指定的偏移量增加一个"\x00",这尼玛是个坑啊!我们第一个 DEBUG_OP 注入的例子中被注入的函数是没有任何的分支的,为了能够实现完美一个函数注入函数 insert_op_debug 我们需要考虑到存在分支操作码的情况。
Python 的分支一共有两种:
- 绝对分支:看起来是类似这样子的 Instruction_Pointer = argument(instruction)
- 相对分支:看起来是类似这样子的 Instruction_Pointer += argument(instruction)
- 相对分支总是向前的
我们希望这些分支在我们插入操作码之后仍然能够正常工作,为此我们需要修改一些指令参数。以下是其逻辑流程:
- 对于每一个在插入偏移量之前的相对分支而言
- 如果目标地址是严格大于我们的插入偏移量的话,将指令参数增加 1
- 如果相等,则不需要增加 1 就能够在跳转操作和目标地址之间执行我们的操作码DEBUG_OP
- 如果小于,插入我们的操作码的话并不会影响到跳转操作和目标地址之间的距离
- 对于 code object 中的每一个绝对分支而言
- 如果目标地址是严格大于我们的插入偏移量的话,将指令参数增加 1
- 如果相等,那么不需要任何修改,理由和相对分支部分是一样的
- 如果小于,插入我们的操作码的话并不会影响到跳转操作和目标地址之间的距离
下面是实现:
# Helper
def bytecode_to_string(bytecode):
if bytecode.arg is not None:
return struct.pack("<Bh", bytecode.opcode, bytecode.arg)
return struct.pack("<B", bytecode.opcode)
# Dummy class for bytecode_to_string
class DummyInstr:
def __init__(self, opcode, arg):
self.opcode = opcode
self.arg = arg
def insert_op_debug(code, offset):
opcode_jump_rel = ['FOR_ITER', 'JUMP_FORWARD', 'SETUP_LOOP', 'SETUP_WITH', 'SETUP_EXCEPT', 'SETUP_FINALLY']
opcode_jump_abs = ['POP_JUMP_IF_TRUE', 'POP_JUMP_IF_FALSE', 'JUMP_ABSOLUTE']
res_codestring = b""
inserted = False
for instr in dis.Bytecode(code):
if instr.offset == offset:
res_codestring += b"\x00"
inserted = True
if instr.opname in opcode_jump_rel and not inserted: #relative jump are always forward
if offset < instr.offset + 3 + instr.arg: # inserted beetwen jump and dest: add 1 to dest (3 for size)
#If equal: jump on DEBUG_OP to get info before exec instr
res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1))
continue
if instr.opname in opcode_jump_abs:
if instr.arg > offset:
res_codestring += bytecode_to_string(DummyInstr(instr.opcode, instr.arg + 1))
continue
res_codestring += bytecode_to_string(instr)
# replace_bytecode just replaces the original code co_code
return replace_bytecode(code, res_codestring)
让我们看一下效果如何:
>>> def lol(x):
... for i in range(10):
... if x == i:
... break
>>> dis.dis(lol)
101 0 SETUP_LOOP 36 (to 39)
3 LOAD_GLOBAL 0 (range)
6 LOAD_CONST 1 (10)
9 CALL_FUNCTION 1 (1 positional, 0 keyWord pair)
12 GET_ITER
>> 13 FOR_ITER 22 (to 38)
16 STORE_FAST 1 (i)
102 19 LOAD_FAST 0 (x)
22 LOAD_FAST 1 (i)
25 COMPARE_OP 2 (==)
28 POP_JUMP_IF_FALSE 13
103 31 BREAK_LOOP
32 JUMP_ABSOLUTE 13
35 JUMP_ABSOLUTE 13
>> 38 POP_BLOCK
>> 39 LOAD_CONST 0 (None)
42 RETURN_VALUE
>>> lol.__code__ = transform_code(lol.__code__, add_debug_op_everywhere, add_stacksize=3)
>>> dis.dis(lol)
101 0 <0>
1 SETUP_LOOP 50 (to 54)
4 <0>
5 LOAD_GLOBAL 0 (range)
8 <0>
9 LOAD_CONST 1 (10)
12 <0>
13 CALL_FUNCTION 1 (1 positional, 0 keyword pair)
16 <0>
17 GET_ITER
>> 18 <0>
102 19 FOR_ITER 30 (to 52)
22 <0>
23 STORE_FAST 1 (i)
26 <0>
27 LOAD_FAST 0 (x)
30 <0>
103 31 LOAD_FAST 1 (i)
34 <0>
35 COMPARE_OP 2 (==)
38 <0>
39 POP_JUMP_IF_FALSE 18
42 <0>
43 BREAK_LOOP
44 <0>
45 JUMP_ABSOLUTE 18
48 <0>
49 JUMP_ABSOLUTE 18
>> 52 <0>
53 POP_BLOCK
>> 54 <0>
55 LOAD_CONST 0 (None)
58 <0>
59 RETURN_VALUE
# Setup the simplest handler EVER
>>> def op_target(stack, frame):
... print (stack)
# GO
>>> lol(2)
[]
[]
[<class 'range'>]
[10, <class 'range'>]
[range(0, 10)]
[<range_iterator object at 0x7f1349afab80>]
[0, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[0, 2, <range_iterator object at 0x7f1349afab80>]
[False, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[1, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[1, 2, <range_iterator object at 0x7f1349afab80>]
[False, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[2, <range_iterator object at 0x7f1349afab80>]
[2, 2, <range_iterator object at 0x7f1349afab80>]
[True, <range_iterator object at 0x7f1349afab80>]
[<range_iterator object at 0x7f1349afab80>]
[]
[None]
甚好!现在我们知道了如何获取堆栈信息和 Python 中每一个操作对应的帧信息。上面结果所展示的结果目前而言并不是很实用。在最后一部分中让我们对注入做进一步的封装。
增加 Python 封装
正如您所见到的,所有的底层接口都是好用的。我们最后要做的一件事是让 op_target 更加方便使用(这部分相对而言比较空泛一些,毕竟在我看来这不是整个项目中最有趣的部分)。
首先我们来看一下帧的参数所能提供的信息,如下所示:
- f_code当前帧将执行的 code object
- f_lasti当前的操作(code object 中的字节码字符串的索引)
经过我们的处理我们可以得知 DEBUG_OP 之后要被执行的操作码,这对我们聚合数据并展示是相当有用的。
新建一个用于追踪函数内部机制的类:
- 改变函数自身的 co_code
- 设置回调函数作为 op_debug 的目标函数
一旦我们知道下一个操作,我们就可以分析它并修改它的参数。举例来说我们可以增加一个 auto-follow-called-functions 的特性。
def op_target(l, f, exc=None):
if op_target.callback is not None:
op_target.callback(l, f, exc)
class Trace:
def __init__(self, func):
self.func = func
def call(self, *args, **kwargs):
self.add_func_to_trace(self.func)
# Activate Trace callback for the func call
op_target.callback = self.callback
try:
res = self.func(*args, **kwargs)
except Exception as e:
res = e
op_target.callback = None
return res
def add_func_to_trace(self, f):
# Is it code? is it already transformed?
if not hasattr(f ,"op_debug") and hasattr(f, "__code__"):
f.__code__ = transform_code(f.__code__, transform=add_everywhere, add_stacksize=ADD_STACK)
f.__globals__['op_target'] = op_target
f.op_debug = True
def do_auto_follow(self, stack, frame):
# Nothing fancy: FrameAnalyser is just the wrapper that gives the next executed instruction
next_instr = FrameAnalyser(frame).next_instr()
if "CALL" in next_instr.opname:
arg = next_instr.arg
f_index = (arg & 0xff) + (2 * (arg >> 8))
called_func = stack[f_index]
# If call target is not traced yet: do it
if not hasattr(called_func, "op_debug"):
self.add_func_to_trace(called_func)
现在我们实现一个 Trace 的子类,在这个子类中增加 callback 和 doreport 这两个方法。callback 方法将在每一个操作之后被调用。doreport 方法将我们收集到的信息打印出来。
这是一个伪函数追踪器实现:
class DummyTrace(Trace):
def __init__(self, func):
self.func = func
self.data = collections.OrderedDict()
self.last_frame = None
self.known_frame = []
self.report = []
def callback(self, stack, frame, exc):
if frame not in self.known_frame:
self.known_frame.append(frame)
self.report.append(" === Entering New Frame {0} ({1}) ===".format(frame.f_code.co_name, id(frame)))
self.last_frame = frame
if frame != self.last_frame:
self.report.append(" === Returning to Frame {0} {1}===".format(frame.f_code.co_name, id(frame)))
self.last_frame = frame
self.report.append(str(stack))
instr = FrameAnalyser(frame).next_instr()
offset = str(instr.offset).rjust(8)
opname = str(instr.opname).ljust(20)
arg = str(instr.arg).ljust(10)
self.report.append("{0} {1} {2} {3}".format(offset, opname, arg, instr.argval))
self.do_auto_follow(stack, frame)
def do_report(self):
print("\n".join(self.report))
这里有一些实现的例子和使用方法。格式有些不方便观看,毕竟我并不擅长于搞这种对用户友好的报告的事儿。
- 例1自动追踪堆栈信息和已经执行的指令
- 例2上下文管理
递推式构造列表(List Comprehensions)的追踪示例。
- 例3伪追踪器的输出
- 例4输出收集的堆栈信息
总结
这个小项目是一个了解 Python 底层的良好途径,包括解释器的 main loop,Python 实现的 C 代码编程、Python 字节码。通过这个小工具我们可以看到 Python 一些有趣构造函数的字节码行为,例如生成器、上下文管理和递推式构造列表。
这里是这个小项目的完整代码。更进一步的,我们还可以做的是修改我们所追踪的函数的堆栈。我虽然不确定这个是否有用,但是可以肯定是这一过程是相当有趣的。
本文作者系OneAPM工程师编译整理。想阅读更多技术文章,请访问OneAPM官方技术博客。
0