Now that we know how to structure our data, we can revisit the signature of qsort:
| def qsort(data:Ptr[Byte], |
| num:Int, |
| size:Int, |
| comparator:CFuncPtr2[Ptr[Byte], Ptr[Byte], Int]) |
We’ll pass in an array of NGramData cast to Ptr[Byte], we can get the value of num from the used field of the WrappedArray, and we can get the size of each item just by sizeof[NGgramData]. But for qsort to work, we have to supply the comparator, too; and comparator has a type we haven’t seen before, a CFuncPtr, or function pointer.
In a typical systems programming course, abstracting from functions to function pointers is one of the hardest conceptual leaps, and C’s syntax certainly doesn’t make it any easier. In Scala though, the situation is much easier; Scala developers are used to treating named or anonymous functions as first-class values, and Scala’s type system makes it easy to represent the parameter and result types of a function in a straightforward way. However, C functions have some restrictions that Scala functions don’t; in particular, for the Scala Native compiler to pass a Scala function into a C function, the Scala function must not close over any local variables or state—fortunately, the clean semantics of these comparator functions makes that an easy constraint to obey.
Much like the standard Scala trait Ordered, the comparator function takes two Ptr[Byte]s as arguments, and returns an int depending on the relative position of a and b in the desired sequencing:
The comparator function is also responsible for casting the input Ptr[Byte] to whatever the correct type of value is, and implementing the comparison as efficiently as possible. For example, we could implement an ascending sort comparator like this:
| val by_count_naive = new CFuncPtr2[Ptr[Byte],Ptr[Byte],Int] { |
| def apply(p1:Ptr[Byte], p2:Ptr[Byte]):Int = { |
| val ngram_ptr_1 = p1.asInstanceOf[Ptr[NGramData]] |
| val ngram_ptr_2 = p2.asInstanceOf[Ptr[NGramData]] |
| val count_1 = ngram_ptr_1._2 |
| val count_2 = ngram_ptr_2._2 |
| if (count_1 > count_2) { |
| return -1 |
| } |
| else if (count_1 == count_2) { |
| return 0 |
| } |
| else { |
| return 1 |
| } |
| } |
| } |
It’s entirely correct, but a bit inefficient, and since this code will run tens of millions of times, we want to make it as efficient as possible. We can remove all the conditional logic with a clever optimization:
| val by_count = new CFuncPtr2[Ptr[Byte],Ptr[Byte],Int] { |
| def apply(p1:Ptr[Byte], p2:Ptr[Byte]):Int = { |
| val ngram_ptr_1 = p1.asInstanceOf[Ptr[NGramData]] |
| val ngram_ptr_2 = p2.asInstanceOf[Ptr[NGramData]] |
| val count_1 = ngram_ptr_1._2 |
| val count_2 = ngram_ptr_2._2 |
| return count_2 - count_1 |
| } |
| } |
This may seem a bit unintuitive, but every little bit helps when we’re working deep inside an inner loop.
We’re now (finally) ready to sort some NGrams! This doesn’t quite fulfill the whole exercise that we set out at the start of the chapter, but it’s a good intermediate step because it integrates all the techniques we’ve learned so far. With our resizable array, our comparator, and our parseLine function in hand, the main program loop is quite simple:
Initialize the array.
While there is input in stdin, resize the array if it is full and then parse the new line of input.
Sort the array.
Print the top twenty items.
The only thing we have to figure out is our strategy for growing the array. realloc is expensive, which leads us toward infrequent block allocation, but too large of a block size risks over-allocation and consuming unnecessary amounts of memory. In our case, however, we can expect this program to use most (or all) of the available memory on a given system, certainly in the multi-gigabyte range, and from examining the file, we know we have to hold around 80 million items in our array at once!
Based on these factors, I’ve set a block size of 2^20 or 1048576 items, which will be both the initial size of the array, as well as the amount we grow by when it reaches maximum capacity. Since each NGramData element is exactly 20 bytes long, this means we’ll be allocating about 20 megabytes of data at a time; this is a lot, but even in a worst case we can’t over-allocate by more than 2% of the total data size. In a more complex program, sizing like this would be a bit aggressive, but it’s a good fit for these conditions.
Here’s the code:
| def main(args:Array[String]):Unit = { |
| val block_size = 65536 * 16 |
| val line_buffer = malloc(1024) |
| |
| var array = makeWrappedArray(block_size) |
| |
| val read_start = System.currentTimeMillis() |
| while (stdio.fgets(line_buffer, 1023, stdin) != null) { |
| if (array.used == array.capacity) { |
| growWrappedArray(array, block_size) |
| } |
| parseLine(line_buffer, array.data + array.used) |
| array.used += 1 |
| } |
| |
| val read_elapsed = System.currentTimeMillis() - read_start |
| stdio.fprintf(stdio.stderr, c"reached EOF after %d lines in %d ms\n", |
| array.used, read_elapsed) |
| |
| val sort_start = System.currentTimeMillis() |
| qsort.qsort(array.data.asInstanceOf[Ptr[Byte]], array.used, |
| sizeof[NGramData], by_count) |
| val sort_elapsed = System.currentTimeMillis() - sort_start |
| stdio.printf(c"sorting done in %d ms\n", sort_elapsed) |
| |
| val to_show = if (array.used <= 20) array.used else 20 |
| for (i <- 0 until to_show) { |
| stdio.printf(c"word %d: %s %d\n", i, (array.data + i)._1, |
| (array.data + i)._2) |
| } |
| stdio.printf(c"done.\n") |
| } |
As you can see, the main function mostly just knits together the pieces we’ve already constructed, while maintaining control over the sizing and reallocation of the main data array. It runs pretty fast too: on my laptop, it was capable of sorting the full data file in under three minutes, with about 7GB of memory, whereas a JVM Scala implementation wasn’t capable of sorting the complete file at all!
| $ ./target/scala-2.11/sort_by_count-out |
| < ../../googlebooks-eng-all-1gram-20120701-a |
| reached EOF after 86618505 lines in 38379 ms |
| sorting done in 6726 ms |
| word n: and 470825580 |
| word n: and_CONJ 470334485 |
| word n: and 381273613 |
| word n: and_CONJ 380846175 |
| word n: and 358027403 |
| word n: and_CONJ 357625732 |
| word n: and 341461347 |
| word n: and_CONJ 341045795 |
| word n: and 334803358 |
| word n: and_CONJ 334407859 |
| word n: and 313209351 |
| word n: and_CONJ 312823075 |
| word n: a 303316362 |
| word n: a_DET 302961892 |
| word n: and 285501930 |
| word n: and_CONJ 285145298 |
| word n: and 259728252 |
| word n: and_CONJ 259398607 |
| word n: and 255989520 |
| word n: and_CONJ 255677047 |
| done. |
Now, we’re ready to move on to our final task: implementing aggregation so that we can combine occurrences of the same word across multiple years.