Mutexes

There may be occasions when multiple threads each need to access some kind of global resource. This has the potential of producing erroneous results because the current state of the global resource may be modified by one thread and this modified value may be unpredictable when it is used by some other thread. For a simple example, look at this code:

no_mutex.rb

$i = 0

def addNum(aNum)
    aNum + 1
end

somethreads = (1..3).collect {
    Thread.new {
        1000000.times{ $i = addNum($i)  }
    }
}


somethreads.each{|t| t.join }
puts( $i )

My intention here is to create and run three threads, each of which increments the global variable, $i, 1 million times. I do this by enumerating from 1 to 3 and creating an array using the collect method (the map method is synonymous with collect so could also be used) from the results returned by the block. This array of threads, somethreads, subsequently passes each thread, t, into a block to be executed using join, as explained earlier. Each thread calls the addNum method to increment the value of $i. The expected result of $i at the end of this would (naturally) be 3 million. But, in fact, when I run this, the end value of $i is 1,068,786 (though you may see a different result).

The explanation of this is that the three threads are, in effect, competing for access to the global variable, $i. This means, at certain times, thread a may get the current value of $i (let’s suppose it happens to be 100), and simultaneously thread b gets the current value of $i (still 100). Now, a increments the value it just got ($i becomes 101), and b increments the value it just got, which was 100 (so $i becomes 101 once again). In other words, when multiple threads simultaneously access a shared resource, some of them may be working with out-of-date values, that is, values that do not take into account any modifications that have been made by other threads. Over time, errors resulting from these operations accumulate until you end up with results that differ substantially from those you might have anticipated.

To deal with this problem, you need to ensure that when one thread has access to a global resource, it blocks the access of other threads. This is another way of saying that the access to global resources granted to multiple threads should be “mutually exclusive.” You can implement this using Ruby’s Mutex class, which uses a semaphore to indicate whether a resource is currently being accessed and provides the synchronize method to prevent access to resources inside a block. Note that you must, in principle, require 'thread' to use the Mutex class, but in some versions of Ruby this is provided automatically. Here is my rewritten code:

mutex.rb

require 'thread'

$i = 0
semaphore = Mutex.new

def addNum(aNum)
    aNum + 1
end

somethreads = (1..3).collect {
    Thread.new {
        semaphore.synchronize{
            1000000.times{ $i = addNum($i)  }
        }
    }
}

somethreads.each{|t| t.join }
puts( $i )

This time, the end result of $i is 3,000,000.

Finally, for a slightly more useful example of using threads, take a look at file_find2.rb. This sample program uses Ruby’s Find class to traverse directories on disk. For a nonthreaded example, see file_find.rb. Compare this with the file_info3.rb program in Sorting by Size, which uses the Dir class.

This program sets two threads running. The first, t1, calls the processFiles method to find and display file information (you will need to edit the call to processFiles to pass to it a directory name on your system). The second thread, t2, simply prints out a message, and this thread runs while t1 is “alive” (that is, running or sleeping):

file_find2.rb

require 'find'
require 'thread'

$totalsize = 0
$dirsize = 0

semaphore = Mutex.new

def processFiles( baseDir )
    Find.find( baseDir ) { |path|
        $dirsize += $dirsize    # if a directory
     if (FileTest.directory?(path)) && (path != baseDir ) then
            print( "\n#{path} [#{$dirsize / 1024}K]" )
            $dirsize = 0
        else                    # if a file
            $filesize = File.size(path)
            print( "\n#{path} [#{$filesize} bytes]" )
            $dirsize += $filesize
            $totalsize += $filesize
        end
    }
end

t1 = Thread.new{
    semaphore.synchronize{
        processFiles( '..' ) # you may edit this directory name
    }
}


t2 = Thread.new{
    semaphore.synchronize{
        while t1.alive? do
            print( "\n\t\tProcessing..." )
            Thread.pass
        end
    }
}

t2.join

printf( "\nTotal: #{$totalsize} bytes, #{$totalsize/1024}K, %0.02
fMB\n\n",  "#{$totalsize/1048576.0}" )
puts( "Total file size: #{$filesize}, Total directory size: #{$dirsize}" )

In a real application, you could adapt this technique to provide user feedback of some kind while some intensive process (such as directory walking) is taking place.