In a higher-level idiom, without the constraints on scale we could sum up the counts by year easily with a map or dictionary. For example, in more verbose Scala we could do something like this:
| val data:Seq[NGramData] = ??? |
| val m = mutable.Map[String,Int] |
| for (d <- data) { |
| if (m.containsKey(d.word)) { |
| m(d.word) = d.count |
| } else { |
| m(d.word) += d.count |
| } |
| } |
Or in an even more functional style, admittedly at the cost of further efficiency, we could do this:
| val g = l.groupBy(identity).map { i => (i._1,i._2.size) } |
However, when scale is a concern, planning this sort of bulk data-processing job can be a genuinely hard problem, and Scala frameworks like Spark[18] often do a great job of it. A common technique for large-scale accumulation operations like this is to presort the data first. If the data is already ordered by whatever property one wishes to group by, we can entirely avoid repeated searches of associative data structures, and instead simply buffer a single item in memory at a time.
In our case, we can do even better than this. Upon close inspection, it turns out that the NGrams data files are already sorted in this fashion. Although every word-year pair has a separate entry in the file, they all occur one right after another, in ascending chronological order, like so:
| again_ADV 1505 3 1 |
| again_ADV 1507 20 1 |
| ...[lines omitted]... |
| again_ADV 2006 4833584 139355 |
| again_ADV 2007 5463040 147042 |
| again_ADV 2008 7763389 198434 |
This fortunate property of the data allows us to further optimize our code, and substantially reduce overhead. Instead of reading each line of the file into a separate NGramData instance, we can maintain a single instance of NGramData at a time, and sum all the counts into it until we encounter a new word.
Based on what we’ve learned so far, how would we plan to cut down our memory overhead, given these constraints? We could go about it in a lot of ways, but what I’ve chosen is to have parseLine use two pointer variables at NGramData instances, called current and next. Most of the time, those will both point at different positions in our array: current will point at the last fully initialized word that we read, whereas next will point at the immediate next item in the array. Here’s the trick: we can reuse the same values of current and next over and over, because it’s perfectly safe to read into next with sscanf as many times as we want. Combined with a stack-allocated buffer for the temporary word, this strategy should help us cut down memory usage by several orders of magnitude!
Finally, we’ll need to use the standard library function strcmp, with the following signature, to check for string equality:
| def strcmp(left:CString, right:CString):Int |
Just like our own comparators, strcmp returns -1, 0, or 1 based on the relative ordering of the two strings. Now let’s write our modified parseLine, which we’ll call parseAndCompare; it has to do a lot more work, but we can break it down further into more helper functions for the following tasks:
By breaking the logic down into small steps, the main loop is clear:
| def parseAndCompare(line:CString, array:WrappedArray[NGramData]):Unit = { |
| val temp_item = stackalloc[NGramData] |
| temp_item._1 = temp_word_buffer |
| val next_item = array.data + array.used |
| scan_item(line, temp_item) |
| if (array.used == 0) { |
| add_new_item(temp_item, next_item) |
| array.used += 1 |
| } else { |
| val prev_item = array.data + (array.used - 1) |
| if (is_item_new(temp_item, prev_item) != 0) { |
| add_new_item(temp_item, next_item) |
| array.used += 1 |
| } else { |
| accumulate_item(temp_item, prev_item) |
| } |
| } |
| } |
The key change we’re making here is passing the WrappedArray itself into parseAndCompare. With this extra metadata, our code can determine for itself when to increment array.used; however, we won’t actually resize the underlying array until later.
Likewise, each of the individual helper functions are straightforward, some of them even one-liners:
| def scan_item(line:CString, temp_item:Ptr[NGramData]):Boolean = { |
| val temp_word = temp_item._1 |
| val temp_count = temp_item.at2 |
| val temp_year = temp_item.at3 |
| val temp_doc_count = temp_item.at4 |
| |
| val sscanf_result = stdio.sscanf(line, c"%1023s %d %d %d\n", |
| temp_word, temp_year, temp_count, temp_doc_count) |
| if (sscanf_result < 4) { |
| throw new Exception("input error") |
| } |
| |
| return true |
| } |
| |
| def is_item_new(temp_item:Ptr[NGramData], prev_item:Ptr[NGramData]):Int = { |
| strcmp(temp_item._1, prev_item._1) |
| } |
| |
| def add_new_item(temp_item:Ptr[NGramData], |
| next_item:Ptr[NGramData]):Unit = { |
| val temp_word = temp_item._1 |
| val new_word_length = strlen(temp_word) |
| val new_word_buffer = malloc(new_word_length + 1) |
| |
| strncpy(new_word_buffer, temp_word, new_word_length) |
| new_word_buffer(new_word_length) = 0 |
| |
| next_item._1 = new_word_buffer |
| next_item._2 = temp_item._2 |
| next_item._3 = temp_item._3 |
| next_item._4 = temp_item._4 |
| } |
| |
| def accumulate_item(temp_item:Ptr[NGramData], |
| prev_item:Ptr[NGramData]):Unit = { |
| prev_item._2 = prev_item._2 + temp_item._2 |
| } |
| } |
| |
| @extern |
| object qsort { |
| def qsort(data:Ptr[Byte], |
| num:Int, |
| size:Long, |
| comparator:CFuncPtr2[Ptr[Byte], Ptr[Byte], Int]):Unit = extern |
| |
| } |
But we still need to handle file I/O in an outer loop, as well as determining when to grow the array itself. Since parseAndCompare() will only ever use one new array cell, all we have to do is make sure that there’s just enough space every time we read a new line, like this:
| def readAllLines(fd:Ptr[stdio.FILE], array:WrappedArray[NGramData]):Long = { |
| var lines_read = 0L |
| while (stdio.fgets(line_buffer, 1024, fd) != null) { |
| if (array.used >= (array.capacity - 1)) { |
| growWrappedArray(array, block_size) |
| } |
| parseAndCompare(line_buffer, array) |
| lines_read += 1 |
| if (lines_read % 10000000 == 0) { |
| stdio.printf(c"read %d lines, %d unique words so far\n", lines_read, |
| array.used) |
| } |
| } |
| return lines_read |
| } |
And once that’s done, the main loop is even simpler:
| def main(args:Array[String]):Unit = { |
| val array = makeWrappedArray(block_size) |
| |
| val read_start = System.currentTimeMillis() |
| val lines_read = readAllLines(stdin,array) |
| val read_elapsed = System.currentTimeMillis() - read_start |
| println(s"""done. read $lines_read lines, ${array.used} unique words. |
| $read_elapsed ms""") |
| |
| val sort_start = System.currentTimeMillis() |
| qsort.qsort(array.data.asInstanceOf[Ptr[Byte]], array.used, |
| sizeof[NGramData], by_count) |
| val sort_elapsed = System.currentTimeMillis() - sort_start |
| stdio.printf(c"sorting done in %d ms\n", sort_elapsed) |
| |
| val to_show = if (array.used <= 20) array.used else 20 |
| for (i <- 0 until to_show) { |
| stdio.printf(c"word n: %s %d\n", (array.data + i)._1, (array.data + i)._2) |
| } |
| |
| println(c"done") |
| } |
Whew! It might have taken a bit of work to get here, but we’ll see the benefits of this approach when we run and time the code:
| read 10000000 lines, 165976 unique words so far |
| read 20000000 lines, 332408 unique words so far |
| ... |
| read 80000000 lines, 1330247 unique words so far |
| done. read 86618505 lines, 1440378 unique words. 29741 ms |
| sorting done in 102 ms |
| word n: are 1826889845 |
| word n: are_VERB 1826888554 |
| word n: at 1562321315 |
| word n: at_ADP 1560015959 |
| word n: an 1266190915 |
| ... |
| done. |
Not only do we read in the data significantly faster as our previous sorting example—in about 30 seconds, down from 45—but our sort time has improved by two orders of magnitude, from 7 seconds down to about 100 milliseconds. And that’s just the improvement over our previous Scala Native code. In standard JVM Scala, a Map[String,Int]-based approach couldn’t handle the entire file on my machine, and even an ArrayBuffer-based approach takes more than 2 minutes to read in the data and around 4 seconds to sort it. (If you want to look at that code, it’s provided in the book’s source code available for download on the book’s website.[19])
That, in a nutshell, is Scala Native’s power: bare-metal performance at the cost of more complexity for certain common tasks. It might not be the right tool to for every problem, but where it fits, the impact is enormous.
All that being said, the code and the techniques that we’ve explored in this chapter are the hardest in this book, and it isn’t necessary to have total fluency or mastery over them to proceed into other topics. Especially if you’re encountering these concepts for the first time, the patterns and idioms that may seem opaque right now will make a lot more sense as we see them repeated in the chapters to come.