File Fuzzer

File format vulnerabilities are fast becoming the vector of choice for client-side attacks, so naturally we should be interested in finding bugs in file format parsers. We want to be able to generically mutate all kinds of different formats to get the biggest bang for our buck, whether we're targeting antivirus products or document readers. We will also make sure to bundle in some debugging functionality so that we can catch crash information to determine whether we have found an exploitable condition or not. To top it off, we'll incorporate some emailing capabilities to notify you whenever a crash occurs and send the crash information. This can be useful if you have a bank of fuzzers hitting multiple targets, and you want to know when to investigate a crash. The first step is to create the class skeleton and a simple file selector that will take care of opening a random example file for mutation. Open a new Python file, name it file_fuzzer.py, and enter the following code.

from pydbg import *
from pydbg.defines import *

import utils
import random
import sys
import struct
import threading
import os
import shutil
import time
import getopt

class file_fuzzer:

    def __init__(self, exe_path, ext, notify):

        self.exe_path       = exe_path
        self.ext            = ext
        self.notify_crash   = notify
        self.orig_file      = None
        self.mutated_file   = None
        self.iteration      = 0
        self.exe_path       = exe_path
        self.orig_file      = None
        self.mutated_file   = None
        self.iteration      = 0
        self.crash          = None
        self.send_notify    = False
        self.pid            = None
        self.in_accessv_handler = False
        self.dbg            = None
        self.running        = False
        self.ready          = False

        # Optional
        self.smtpserver = 'mail.nostarch.com'
        self.recipients = ['jms@bughunter.ca',]
        self.sender     = 'jms@bughunter.ca'

        self.test_cases = [ "%s%n%s%n%s%n", "\xff", "\x00", "A" ]

    def file_picker( self ):

        file_list = os.listdir("examples/")
        list_length = len(file_list)
        file = file_list[random.randint(0, list_length-1)]
        shutil.copy("examples\\%s" % file,"test.%s" % self.ext)

        return file

The class skeleton for our file fuzzer defines some global variables for tracking basic information about our test iterations as well as the test cases that will be applied as mutations to the sample files. The file_picker function simply uses some built-in functions from Python to list the files in a directory and randomly pick one for mutation. Now we have to do some threading work to get the target application loaded, track it for crashes, and terminate it when the document parsing is finished. The first stage is to get the target application loaded inside a debugger thread and install the custom access violation handler. We then spawn the second thread to monitor the debugger thread so that it can kill it after a reasonable amount of time. We'll also throw in the email notification routine. Let's incorporate these features by creating some new class functions.

 ...
 def fuzz( self ):

         while 1:

file_fuzzer.py             if not self.running:

                 # We first snag a file for mutation
                 self.test_file = self.file_picker()
file_fuzzer.py                self.mutate_file()

                 # Start up the debugger thread
file_fuzzer.py                pydbg_thread = threading.Thread(target=self.start_debugger)
                 pydbg_thread.setDaemon(0)
                 pydbg_thread.start()

                 while self.pid == None:
                     time.sleep(1)

                 # Start up the monitoring thread
file_fuzzer.py                monitor_thread = threading.Thread
                  (target=self.monitor_debugger)
                 monitor_thread.setDaemon(0)
                 monitor_thread.start()

                 self.iteration += 1

             else:
                 time.sleep(1)

     # Our primary debugger thread that the application
     # runs under
     def start_debugger(self):

         print "[*] Starting debugger for iteration: %d" % self.iteration
         self.running = True
         self.dbg = pydbg()

           self.dbg.set_callback(EXCEPTION_ACCESS_VIOLATION,self.check_accessv)
            pid = self.dbg.load(self.exe_path,"test.%s" % self.ext)

         self.pid = self.dbg.pid
         self.dbg.run()

     # Our access violation handler that traps the crash
     # information and stores it
     def check_accessv(self,dbg):

         if dbg.dbg.u.Exception.dwFirstChance:

             return DBG_CONTINUE

         print "[*] Woot! Handling an access violation!"
         self.in_accessv_handler = True
         crash_bin = utils.crash_binning.crash_binning()
         crash_bin.record_crash(dbg)
         self.crash = crash_bin.crash_synopsis()

         # Write out the crash informations
         crash_fd = open("crashes\\crash-%d" % self.iteration,"w")
         crash_fd.write(self.crash)

         # Now back up the files
           shutil.copy("test.%s" % self.ext,"crashes\\%d.%s" %
            (self.iteration,self.ext))
           shutil.copy("examples\\%s" % self.test_file,"crashes\\%d_orig.%s" %
           ( self.iteration,self.ext))

         self.dbg.terminate_process()
         self.in_accessv_handler = False
         self.running = False

         return DBG_EXCEPTION_NOT_HANDLED

     # This is our monitoring function that allows the application
     # to run for a few seconds and then it terminates it
     def monitor_debugger(self):

         counter = 0
         print "[*] Monitor thread for pid: %d waiting." % self.pid,
         while counter < 3:
             time.sleep(1)
             print counter,
             counter += 1

         if self.in_accessv_handler != True:
             time.sleep(1)
             self.dbg.terminate_process()
             self.pid = None
             self.running = False
         else:
                print "[*] The access violation handler is doing
                 its business. Waiting."

             while self.running:
                 time.sleep(1)

     # Our emailing routine to ship out crash information
     def notify(self):

           crash_message = "From:%s\r\n\r\nTo:\r\n\r\nIteration:
            %d\n\nOutput:\n\n %s" %
           (self.sender, self.iteration, self.crash)

         session = smtplib.SMTP(smtpserver)
         session.sendmail(sender, recipients, crash_message)
         session.quit()

         return

We now have the main logic for controlling the application being fuzzed, so let's walk through the fuzz function briefly. The first step file_fuzzer.py is to check to make sure that a current fuzzing iteration isn't already running. The self.running flag also will be set if the access violation handler is busy compiling a crash report. Once we have selected a document to mutate, we pass it off to our simple mutation function file_fuzzer.py, which we will be writing shortly.

Once the file mutator is finished, we start our debugger thread file_fuzzer.py, which merely fires up the document-parsing application and passes in the mutated document as a command-line argument. We then wait in a tight loop for the debugger thread to register the PID of the target application. Once we have the PID, we spawn the monitoring thread file_fuzzer.py whose job is to make sure that we kill the application after a reasonable amount of time. Once the monitoring thread has started, we increment the iteration count and reenter our main loop until it's time to pick a new file and fuzz again! Now let's add our simple mutation function into the mix.

...
def print_usage():

    print "[*]"
    print "[*] file_fuzzer.py -e <Executable Path> -x <File Extension>"
    print "[*]"

    sys.exit(0)

if __name__ == "__main__":

    print "[*] Generic File Fuzzer."

    # This is the path to the document parser
    # and the filename extension to use
    try:
        opts, argo = getopt.getopt(sys.argv[1:],"e:x:n")
    except getopt.GetoptError:
        print_usage()

    exe_path = None
    ext      = None
    notify   = False

    for o,a in opts:
        if o == "-e":
            exe_path = a
        elif o == "-x":
            ext = a
        elif o == "-n":
            notify = True

    if exe_path is not None and ext is not None:
        fuzzer = file_fuzzer( exe_path, ext, notify )
        fuzzer.fuzz()
    else:
        print_usage()

We now allow the file_fuzzer.py script to receive some command-line options. The -e flag is the path to the target application's executable. The -x option is the filename extension we are testing; for instance, .txt would be the file extension we could enter if that's the type of file we are fuzzing. The optional -n parameter tells the fuzzer whether we want notifications enabled or not. Now let's take it for a quick test drive.

The best way that I have found to test whether my file fuzzer is working is by watching the results of my mutation in action while testing the target application. There is no better way than to fuzz text files than to use Windows Notepad as the test application. This way you can actually see the text change in each iteration, as opposed to using a hex editor or binary diffing tool. Before you get started, create an examples directory and a crashes directory, in the same directory from where you are running the file_fuzzer.py script. Once you have added the directories, create a couple of dummy text files and place them in the examples directory. To fire up the fuzzer, use the following command line:

python file_fuzzer.py -e C:\\WINDOWS\\system32\\notepad.exe -x .txt

You should see Notepad get spawned, and you can watch your test files get mutated. Once you are satisfied that you are mutating the test files appropriately, you can take this file fuzzer and run it against any target application. Let's wrap up with some future considerations for this fuzzer.