driller
1. driller介绍
driller在afl的基础上开发的crash模糊测试工具。Driller在AFL的基础上加入了动态符号执行引擎,当模糊测试发生stuck时,使用动态符号执行去突破这些限制,生成满足fuzz需求的新输入,使得fuzz能够继续执行。
总体上说,Driller结合了AFL的高效、低消耗、快速的优点和动态符号执行探索能力强的优点,又避免了AFL较难突破特殊的边界和动态符号执行路径爆炸的问题。
Driller为CTF团队开发,结合了AFL和该团队的符号执行工具Angr,具体原理可参考Driller论文driller augmenting fuzzing through selective symbolic execution,angr论文The Art of War: Offensive Techniques in binary Analysis, AFL官网和本人的另一篇文章。
2. 基本原理
我们使用AFL模糊测试器作为符号辅助Fuzzing的基础,并使用angr作为它的符号执行工具。通过监测AFL的执行,我们能够决定什么时候开始符号执行AFL产生的输入。为了做出这个决定,我们按照模糊器发现新状态转换的速率采取行动。如果AFL报告说,在执行了一轮输入变化后没有发现新的状态转换,我们假设模糊器难以取得进展,并且在AFL被认为是唯一的所有路径上引用angr(即,任何路径有一个跳转,由源地址和目标地址的元组标识,没有其他路径),寻找AFL无法找到输入的转换。
Driller的符号组件是通过使用angr的符号执行引擎来实现的,以便根据AFL提供的具体输入来符号性地追踪路径。这避免了符号执行固有的路径爆炸问题,因为每个具体输入对应于单个(追踪)路径,并且这些输入经AFL严格过滤以确保仅追踪有希望的输入。每个具体输入对应于PathGroup中的单个路径。 在PathGroup的每一步中,检查每个分支以确保最新的跳转指令引入先前AFL未知的路径。 当发现这样的跳转时,SMT求解器被查询以创建一个输入来驱动执行到新的跳转。这个输入反馈给AFL,AFL在未来的模糊步骤中进行变异。 这个反馈循环使我们能够将昂贵的符号执行时间与廉价的模糊时间进行平衡,并且减轻了模糊对程序操作的低语义洞察力。
3. 安装使用
driller工具是由Python语言写的,主要依赖于angr,afl等2个工具。我们可以通过shellphfuzz工具来使用driller,安装步骤可以参考博客driller安装教程
使用方式:
官方推荐的driller的使用方法是通过shellphuzz工具来使用,使用方式如下,“-i”选项指定afl-fuzz的线程数,“-d”选项指定driller(即符号执行工具)的线程数,如果不使用-d或者-d 0,则不使用符号执行。
# fuzz with 4 AFL cores
shellphuzz -i -c 4 /path/to/binary
# perform symbolic-assisted fuzzing with 4 AFL cores and 2 symbolic tracing (drilling) cores.
shellphuzz -i -c 4 -d 2 /path/to/binary
4. 源码分析
这里主要分析shellphuzz、driller、afl部分的代码,主要解释3个工具如何一起工作,符号执行的时机是什么时候等。
4.1 shellphuzz
shellphuzz工具是AFL工具的一层Python接口,可以看做afl的python封装。shellphuzz支持启动afl、添加slave worker、注入或删除测试case,检查性能数据,使用符号执行等。
shellphuzz启动流程:
if args.driller_workers:
print "[*] Drilling..."
drill_extension = driller.localCallback(num_workers=args.driller_workers)
stuck_callback = (
(lambda f: (grease_extension(f), drill_extension(f))) if drill_extension and grease_extension
else drill_extension or grease_extension
)
print "[*] Creating fuzzer..."
fuzzer = fuzzer.Fuzzer(
args.binary, args.work_dir, afl_count=args.afl_cores, force_Interval=args.force_interval,
create_dictionary=not args.no_dictionary, stuck_callback=stuck_callback, time_limit=args.timeout
)
# start it!
print "[*] Starting fuzzer..."
fuzzer.start()
start_time = time.time()
shellphuzz开始运行后,如果加了-d选项,会注册一个指向driller模块的callback;然后实例化一个Fuzzer类的对象,然后启动fuzzer。
def start(self):
'''
start fuzzing
'''
# spin up the AFL workers
self._start_afl() # 根据参数启动多个afl线程
# start the callback timer
self._timer.start() # 启动一个InfiniteTimer类的对象,会周期性的调用stuck_callback,即driller的callback。
4.2 driller
- driller_callback:
如上所述,shellphuzz通过计时器,周期性的调用driller的callback方法。该方法的代码如下:
def driller_callback(self, fuzz):
l.warning("Driller stuck callback triggered!")
# remove any workers that aren't running
self._running_workers = [x for x in self._running_workers if x.is_alive()]
# get the files in queue
queue = self._queue_files(fuzz)
#for i in range(1, fuzz.fuzz_id):
# fname = "fuzzer-%d" % i
# queue.extend(self.queue_files(fname))
# start drilling
not_drilled = set(queue) - self._already_drilled_inputs
if len(not_drilled) == 0:
l.warning("no inputs left to drill")
while len(self._running_workers) < self._num_workers and len(not_drilled) > 0:
to_drill_path = list(not_drilled)[0]
not_drilled.remove(to_drill_path)
self._already_drilled_inputs.add(to_drill_path)
proc = multiprocessing.Process(target=_run_drill, args=(self, fuzz, to_drill_path))
proc.start()
self._running_workers.APPend(proc)
__call__ = driller_callback
对队列queue中没有经过符号执行的输入文件进行符号执行,同一时间在运行的符号执行器的个数不超过-d选项指定的个数。
该方法最后调用_run_driller方法来启动driller。
- _run_driller:
_run_driller方法提取了driller所需要的参数,并调用main来真正启动driller
def _run_drill(drill, fuzz, _path_to_input_to_drill):
_binary_path = fuzz.binary_path
_fuzzer_out_dir = fuzz.out_dir
_bitmap_path = os.path.join(_fuzzer_out_dir, 'fuzzer-master', "fuzz_bitmap")
_timeout = drill._worker_timeout
l.warning("starting drilling of %s, %s", os.path.basename(_binary_path), os.path.basename(_path_to_input_to_drill))
args = (
"timeout", "-k", str(_timeout+10), str(_timeout),
sys.executable, os.path.abspath(__file__),
_binary_path, _fuzzer_out_dir, _bitmap_path, _path_to_input_to_drill
)
p = subprocess.Popen(args, stdout=subprocess.PIPE)
print p.communicate()
- main:
main的基本流程为:
1. 设置参数,打开文件
2. 初始化driller对象
3. 通过driller的drill_generator方法进行符号执行,并生成满足afl需求的新输入文件
4. 保存新的输入文件
if __name__ == "__main__":
if len(sys.argv) != 5:
l.ERROR("INTERNAL USE ONLY -- expecting 5 arguments for driller runner, got %d", len(sys.argv))
binary_path, fuzzer_out_dir, bitmap_path, path_to_input_to_drill = sys.argv[1:5]
fuzzer_bitmap = open(bitmap_path, "r").read()
# create a folder
driller_dir = os.path.join(fuzzer_out_dir, "driller")
driller_queue_dir = os.path.join(driller_dir, "queue")
try: os.mkdir(driller_dir)
except OSError: pass
try: os.mkdir(driller_queue_dir)
except OSError: pass
# get the input
input_to_drill = open(path_to_input_to_drill, "r").read()
d = driller.Driller(binary_path, input_to_drill, fuzzer_bitmap)
count = 0
for new_input in d.drill_generator():
id_num = len(os.listdir(driller_queue_dir))
fuzzer_from = path_to_input_to_drill.split("sync/")[1].split("/")[0] + path_to_input_to_drill.split("id:")[1].split(",")[0]
filepath = "id:" + ("%d" % id_num).rjust(6, "0") + ",from:" + fuzzer_from
filepath = os.path.join(driller_queue_dir, filepath)
with open(filepath, "wb") as f:
f.write(new_input[1])
count += 1
l.warning("found %d new inputs", count)
- drill_generator:
生成满足需求的文件的driller接口。最终调用_drill_input这个方法真正的去实现符号执行。
- _drill_input:
沿着一个指定的trace流一步步进行符号执行(angr引擎,采用driller_core技术),如果发现新的路径这记录下来。
def _drill_input(self):
"""
Symbolically step down a path with a tracer, trying to concretize inputs for unencountered
state transitions.
"""
# initialize the tracer
r = tracer.qemu_runner.QEMURunner(self.binary, self.input, argv=self.argv)
p = angr.Project(self.binary)
for addr, proc in self._hooks.items():
p.hook(addr, proc)
l.debug("Hooking %#x -> %s...", addr, proc.display_name)
if p.loader.main_object.os == 'cgc':
p.simos.syscall_library.update(angr.SIM_LIBRARIES['cgcabi_tracer'])
s = p.factory.entry_state(stdin=angr.storage.file.Simfilestream, flag_page=r.magic)
else:
s = p.factory.full_init_state(stdin=angr.storage.file.SimFileStream)
s.preconstrainer.preconstrain_file(self.input, s.posix.stdin, True)
simgr = p.factory.simgr(s, save_unsat=True, hierarchy=False, save_unconstrained=r.crash_mode)
t = angr.exploration_techniques.Tracer(trace=r.trace)
c = angr.exploration_techniques.Crashmonitor(trace=r.trace, crash_addr=r.crash_addr)
self._core = angr.exploration_techniques.DrillerCore(trace=r.trace)
if r.crash_mode:
simgr.use_technique(c)
simgr.use_technique(t)
simgr.use_technique(angr.exploration_techniques.Oppologist())
simgr.use_technique(self._core)
self._set_concretizations(simgr.one_active)
l.debug("Drilling into %r.", self.input)
l.debug("Input is %r.", self.input)
while simgr.active and simgr.one_active.globals['bb_cnt'] < len(r.trace):
simgr.step()
# Check here to see if a crash has been found.
if self.redis and self.redis.sismember(self.identifier + '-finished', True):
return
if 'perted' not in simgr.stashes:
continue
while simgr.perted:
state = simgr.perted.pop(0)
l.debug("Found a perted state, exploring to some extent.")
w = self._writeout(state.history.bbl_addrs[-1], state)
if w is not None:
yield w
for i in self._symbolic_explorer_stub(state):
yield i