File format vulnerabilities are fast becoming the vector of choice for client-side attacks, so naturally we should be interested in finding bugs in file format parsers. We want to be able to generically mutate all kinds of different formats to get the biggest bang for our buck, whether we're targeting antivirus products or document readers. We will also make sure to bundle in some debugging functionality so that we can catch crash information to determine whether we have found an exploitable condition or not. To top it off, we'll incorporate some emailing capabilities to notify you whenever a crash occurs and send the crash information. This can be useful if you have a bank of fuzzers hitting multiple targets, and you want to know when to investigate a crash. The first step is to create the class skeleton and a simple file selector that will take care of opening a random example file for mutation. Open a new Python file, name it file_fuzzer.py, and enter the following code.
from pydbg import * from pydbg.defines import * import utils import random import sys import struct import threading import os import shutil import time import getopt class file_fuzzer: def __init__(self, exe_path, ext, notify): self.exe_path = exe_path self.ext = ext self.notify_crash = notify self.orig_file = None self.mutated_file = None self.iteration = 0 self.exe_path = exe_path self.orig_file = None self.mutated_file = None self.iteration = 0 self.crash = None self.send_notify = False self.pid = None self.in_accessv_handler = False self.dbg = None self.running = False self.ready = False # Optional self.smtpserver = 'mail.nostarch.com' self.recipients = ['jms@bughunter.ca',] self.sender = 'jms@bughunter.ca' self.test_cases = [ "%s%n%s%n%s%n", "\xff", "\x00", "A" ] def file_picker( self ): file_list = os.listdir("examples/") list_length = len(file_list) file = file_list[random.randint(0, list_length-1)] shutil.copy("examples\\%s" % file,"test.%s" % self.ext) return file
The class skeleton for our file fuzzer defines some global variables for tracking
basic information about our test iterations as well as the test
cases that will be applied as mutations to the sample files. The file_picker
function
simply uses some built-in functions from Python to list the files in
a directory and randomly pick one for mutation. Now we have to do
some threading work to get the target application loaded, track it
for crashes, and terminate it when the document parsing is finished.
The first stage is to get the target application loaded inside a
debugger thread and install the custom access violation handler. We
then spawn the second thread to monitor the debugger thread so that
it can kill it after a reasonable amount of time. We'll also throw
in the email notification routine. Let's incorporate these features
by creating some new class functions.
... def fuzz( self ): while 1:if not self.running: # We first snag a file for mutation self.test_file = self.file_picker()
self.mutate_file() # Start up the debugger thread
pydbg_thread = threading.Thread(target=self.start_debugger) pydbg_thread.setDaemon(0) pydbg_thread.start() while self.pid == None: time.sleep(1) # Start up the monitoring thread
monitor_thread = threading.Thread (target=self.monitor_debugger) monitor_thread.setDaemon(0) monitor_thread.start() self.iteration += 1 else: time.sleep(1) # Our primary debugger thread that the application # runs under def start_debugger(self): print "[*] Starting debugger for iteration: %d" % self.iteration self.running = True self.dbg = pydbg() self.dbg.set_callback(EXCEPTION_ACCESS_VIOLATION,self.check_accessv) pid = self.dbg.load(self.exe_path,"test.%s" % self.ext) self.pid = self.dbg.pid self.dbg.run() # Our access violation handler that traps the crash # information and stores it def check_accessv(self,dbg): if dbg.dbg.u.Exception.dwFirstChance: return DBG_CONTINUE print "[*] Woot! Handling an access violation!" self.in_accessv_handler = True crash_bin = utils.crash_binning.crash_binning() crash_bin.record_crash(dbg) self.crash = crash_bin.crash_synopsis() # Write out the crash informations crash_fd = open("crashes\\crash-%d" % self.iteration,"w") crash_fd.write(self.crash) # Now back up the files shutil.copy("test.%s" % self.ext,"crashes\\%d.%s" % (self.iteration,self.ext)) shutil.copy("examples\\%s" % self.test_file,"crashes\\%d_orig.%s" % ( self.iteration,self.ext)) self.dbg.terminate_process() self.in_accessv_handler = False self.running = False return DBG_EXCEPTION_NOT_HANDLED # This is our monitoring function that allows the application # to run for a few seconds and then it terminates it def monitor_debugger(self): counter = 0 print "[*] Monitor thread for pid: %d waiting." % self.pid, while counter < 3: time.sleep(1) print counter, counter += 1 if self.in_accessv_handler != True: time.sleep(1) self.dbg.terminate_process() self.pid = None self.running = False else: print "[*] The access violation handler is doing its business. Waiting." while self.running: time.sleep(1) # Our emailing routine to ship out crash information def notify(self): crash_message = "From:%s\r\n\r\nTo:\r\n\r\nIteration: %d\n\nOutput:\n\n %s" % (self.sender, self.iteration, self.crash) session = smtplib.SMTP(smtpserver) session.sendmail(sender, recipients, crash_message) session.quit() return
We now have the main logic for controlling the application
being fuzzed, so let's walk through the fuzz function briefly. The
first step is to check to make sure that a current fuzzing iteration isn't already running. The
self.running
flag also will be set if the access
violation handler is busy compiling a crash report. Once we have
selected a document to mutate, we pass it off to our simple mutation
function , which we will be writing shortly.
Once the file mutator is finished, we start our debugger
thread , which merely fires up the document-parsing
application and passes in the mutated document as a command-line
argument. We then wait in a tight loop for the debugger thread to
register the PID of the target application. Once we have the PID, we
spawn the monitoring thread
whose job is to make sure that we kill the
application after a reasonable amount of time. Once the monitoring
thread has started, we increment the iteration count and reenter our
main loop until it's time to pick a new file and fuzz again! Now
let's add our simple mutation function into the mix.
... def mutate_file( self ): # Pull the contents of the file into a buffer fd = open("test.%s" % self.ext, "rb") stream = fd.read() fd.close() # The fuzzing meat and potatoes, really simple # Take a random test case and apply it to a random position # in the filetest_case = self.test_cases[random.randint(0,len(self.test_cases)-1)]
stream_length = len(stream) rand_offset = random.randint(0, stream_length - 1 ) rand_len = random.randint(1, 1000) # Now take the test case and repeat it test_case = test_case * rand_len # Apply it to the buffer, we are just # splicing in our fuzz data
fuzz_file = stream[0:rand_offset] fuzz_file += str(test_case) fuzz_file += stream[rand_offset:] # Write out the file fd = open("test.%s" % self.ext, "wb") fd.write( fuzz_file ) fd.close() return
This is about as rudimentary a mutator as you can get. We
randomly select a test case from our global test case list
; then we pick a random offset and fuzz data
length to apply to the file
. Using the offset and length information, we
then slice into the file and do the mutation
. When we're finished, we write out the file,
and the debugger thread will immediately use it to test the
application. Now let's wrap up the fuzzer with some command-line
parameter parsing, and we're nearly ready to start using it.
... def print_usage(): print "[*]" print "[*] file_fuzzer.py -e <Executable Path> -x <File Extension>" print "[*]" sys.exit(0) if __name__ == "__main__": print "[*] Generic File Fuzzer." # This is the path to the document parser # and the filename extension to use try: opts, argo = getopt.getopt(sys.argv[1:],"e:x:n") except getopt.GetoptError: print_usage() exe_path = None ext = None notify = False for o,a in opts: if o == "-e": exe_path = a elif o == "-x": ext = a elif o == "-n": notify = True if exe_path is not None and ext is not None: fuzzer = file_fuzzer( exe_path, ext, notify ) fuzzer.fuzz() else: print_usage()
We now allow the file_fuzzer.py script to
receive some command-line options. The -e
flag is
the path to the target application's executable. The
-x
option is the filename extension we are
testing; for instance, .txt would be the file
extension we could enter if that's the type of file we are
fuzzing. The optional -n
parameter
tells the fuzzer whether we want notifications enabled or not. Now
let's take it for a quick test drive.
The best way that I have found to test whether my file fuzzer is working is by watching the results of my mutation in action while testing the target application. There is no better way than to fuzz text files than to use Windows Notepad as the test application. This way you can actually see the text change in each iteration, as opposed to using a hex editor or binary diffing tool. Before you get started, create an examples directory and a crashes directory, in the same directory from where you are running the file_fuzzer.py script. Once you have added the directories, create a couple of dummy text files and place them in the examples directory. To fire up the fuzzer, use the following command line:
python file_fuzzer.py -e C:\\WINDOWS\\system32\\notepad.exe -x .txt
You should see Notepad get spawned, and you can watch your test files get mutated. Once you are satisfied that you are mutating the test files appropriately, you can take this file fuzzer and run it against any target application. Let's wrap up with some future considerations for this fuzzer.