Aggregation at Scale

In a higher-level idiom, without the constraints on scale we could sum up the counts by year easily with a map or dictionary. For example, in more verbose Scala we could do something like this:

 val data:Seq[NGramData] = ???
 val m = mutable.Map[String,Int]
 for (d <- data) {
  if (m.containsKey(d.word)) {
  m(d.word) = d.count
  } else {
  m(d.word) += d.count
  }
 }

Or in an even more functional style, admittedly at the cost of further efficiency, we could do this:

 val g = l.groupBy(identity).map { i => (i._1,i._2.size) }

However, when scale is a concern, planning this sort of bulk data-processing job can be a genuinely hard problem, and Scala frameworks like Spark[18] often do a great job of it. A common technique for large-scale accumulation operations like this is to presort the data first. If the data is already ordered by whatever property one wishes to group by, we can entirely avoid repeated searches of associative data structures, and instead simply buffer a single item in memory at a time.

In our case, we can do even better than this. Upon close inspection, it turns out that the NGrams data files are already sorted in this fashion. Although every word-year pair has a separate entry in the file, they all occur one right after another, in ascending chronological order, like so:

 again_ADV 1505 3 1
 again_ADV 1507 20 1
 ...[lines omitted]...
 again_ADV 2006 4833584 139355
 again_ADV 2007 5463040 147042
 again_ADV 2008 7763389 198434

This fortunate property of the data allows us to further optimize our code, and substantially reduce overhead. Instead of reading each line of the file into a separate NGramData instance, we can maintain a single instance of NGramData at a time, and sum all the counts into it until we encounter a new word.

Based on what we’ve learned so far, how would we plan to cut down our memory overhead, given these constraints? We could go about it in a lot of ways, but what I’ve chosen is to have parseLine use two pointer variables at NGramData instances, called current and next. Most of the time, those will both point at different positions in our array: current will point at the last fully initialized word that we read, whereas next will point at the immediate next item in the array. Here’s the trick: we can reuse the same values of current and next over and over, because it’s perfectly safe to read into next with sscanf as many times as we want. Combined with a stack-allocated buffer for the temporary word, this strategy should help us cut down memory usage by several orders of magnitude!

Finally, we’ll need to use the standard library function strcmp, with the following signature, to check for string equality:

 def strcmp(left:CString, right:CString):Int

Just like our own comparators, strcmp returns -1, 0, or 1 based on the relative ordering of the two strings. Now let’s write our modified parseLine, which we’ll call parseAndCompare; it has to do a lot more work, but we can break it down further into more helper functions for the following tasks:

  1. Scan the input line into a temporary NGramData instance.
  2. Compare the temporary data to the top of the buffer.
  3. Add the temporary data to the buffer as a new item.
  4. Accumulate the count from the temporary data into the current top of the buffer.

By breaking the logic down into small steps, the main loop is clear:

MemoryManagement/aggregate_and_sort/main.scala
 def​ parseAndCompare(line​:​​CString​, array​:​​WrappedArray​[​NGramData​])​:​​Unit​ = {
 val​ temp_item ​=​ stackalloc[​NGramData​]
  temp_item._1 ​=​ temp_word_buffer
 val​ next_item ​=​ array.data + array.used
  scan_item(line, temp_item)
 if​ (array.used == 0) {
  add_new_item(temp_item, next_item)
  array.used += 1
  } ​else​ {
 val​ prev_item ​=​ array.data + (array.used - 1)
 if​ (is_item_new(temp_item, prev_item) != 0) {
  add_new_item(temp_item, next_item)
  array.used += 1
  } ​else​ {
  accumulate_item(temp_item, prev_item)
  }
  }
 }

The key change we’re making here is passing the WrappedArray itself into parseAndCompare. With this extra metadata, our code can determine for itself when to increment array.used; however, we won’t actually resize the underlying array until later.

Likewise, each of the individual helper functions are straightforward, some of them even one-liners:

MemoryManagement/aggregate_and_sort/main.scala
 def​ scan_item(line​:​​CString​, temp_item​:​​Ptr​[​NGramData​])​:​​Boolean​ = {
 val​ temp_word ​=​ temp_item._1
 val​ temp_count ​=​ temp_item.at2
 val​ temp_year ​=​ temp_item.at3
 val​ temp_doc_count ​=​ temp_item.at4
 
 val​ sscanf_result ​=​ stdio.sscanf(line, c​"%1023s %d %d %d\n"​,
  temp_word, temp_year, temp_count, temp_doc_count)
 if​ (sscanf_result < 4) {
 throw​ ​new​ Exception(​"input error"​)
  }
 
 return​ ​true
 }
 
 def​ is_item_new(temp_item​:​​Ptr​[​NGramData​], prev_item​:​​Ptr​[​NGramData​])​:​​Int​ = {
  strcmp(temp_item._1, prev_item._1)
 }
 
 def​ add_new_item(temp_item​:​​Ptr​[​NGramData​],
  next_item​:​​Ptr​[​NGramData​])​:​​Unit​ = {
 val​ temp_word ​=​ temp_item._1
 val​ new_word_length ​=​ strlen(temp_word)
 val​ new_word_buffer ​=​ malloc(new_word_length + 1)
 
  strncpy(new_word_buffer, temp_word, new_word_length)
  new_word_buffer(new_word_length) ​=​ 0
 
  next_item._1 ​=​ new_word_buffer
  next_item._2 ​=​ temp_item._2
  next_item._3 ​=​ temp_item._3
  next_item._4 ​=​ temp_item._4
 }
 
 def​ accumulate_item(temp_item​:​​Ptr​[​NGramData​],
  prev_item​:​​Ptr​[​NGramData​])​:​​Unit​ = {
  prev_item._2 ​=​ prev_item._2 + temp_item._2
 }
 }
 
 @extern
 object​ qsort {
 def​ qsort(data​:​​Ptr​[​Byte​],
  num​:​​Int​,
  size​:​​Long​,
  comparator​:​​CFuncPtr2​[​Ptr​[​Byte​], ​Ptr​[​Byte​], ​Int​])​:​​Unit​ = extern
 
 }

But we still need to handle file I/O in an outer loop, as well as determining when to grow the array itself. Since parseAndCompare() will only ever use one new array cell, all we have to do is make sure that there’s just enough space every time we read a new line, like this:

MemoryManagement/aggregate_and_sort/main.scala
 def​ readAllLines(fd​:​​Ptr​[​stdio.FILE​], array​:​​WrappedArray​[​NGramData​])​:​​Long​ = {
 var​ lines_read ​=​ 0L
 while​ (stdio.fgets(line_buffer, 1024, fd) != ​null​) {
 if​ (array.used >= (array.capacity - 1)) {
  growWrappedArray(array, block_size)
  }
  parseAndCompare(line_buffer, array)
  lines_read += 1
 if​ (lines_read % 10000000 == 0) {
  stdio.printf(c​"read %d lines, %d unique words so far\n"​, lines_read,
  array.used)
  }
  }
 return​ lines_read
 }

And once that’s done, the main loop is even simpler:

MemoryManagement/aggregate_and_sort/main.scala
 def​ main(args​:​​Array​[​String​])​:​​Unit​ = {
 val​ array ​=​ makeWrappedArray(block_size)
 
 val​ read_start ​=​ System.currentTimeMillis()
 val​ lines_read ​=​ readAllLines(stdin,array)
 val​ read_elapsed ​=​ System.currentTimeMillis() - read_start
  println(s​"""done. read $lines_read lines, ${array.used} unique words.
  $read_elapsed ms"""​)
 
 val​ sort_start ​=​ System.currentTimeMillis()
  qsort.qsort(array.data.asInstanceOf[​Ptr​[​Byte​]], array.used,
  sizeof[​NGramData​], by_count)
 val​ sort_elapsed ​=​ System.currentTimeMillis() - sort_start
  stdio.printf(c​"sorting done in %d ms\n"​, sort_elapsed)
 
 val​ to_show ​=​ ​if​ (array.used <= 20) array.used ​else​ 20
 for​ (i ​<-​ 0 until to_show) {
  stdio.printf(c​"word n: %s %d\n"​, (array.data + i)._1, (array.data + i)._2)
  }
 
  println(c​"done"​)
 }

Whew! It might have taken a bit of work to get here, but we’ll see the benefits of this approach when we run and time the code:

 read 10000000 lines, 165976 unique words so far
 read 20000000 lines, 332408 unique words so far
 ...
 read 80000000 lines, 1330247 unique words so far
 done. read 86618505 lines, 1440378 unique words. 29741 ms
 sorting done in 102 ms
 word n: are 1826889845
 word n: are_VERB 1826888554
 word n: at 1562321315
 word n: at_ADP 1560015959
 word n: an 1266190915
 ...
 done.

Not only do we read in the data significantly faster as our previous sorting example—in about 30 seconds, down from 45—but our sort time has improved by two orders of magnitude, from 7 seconds down to about 100 milliseconds. And that’s just the improvement over our previous Scala Native code. In standard JVM Scala, a Map[String,Int]-based approach couldn’t handle the entire file on my machine, and even an ArrayBuffer-based approach takes more than 2 minutes to read in the data and around 4 seconds to sort it. (If you want to look at that code, it’s provided in the book’s source code available for download on the book’s website.[19])

That, in a nutshell, is Scala Native’s power: bare-metal performance at the cost of more complexity for certain common tasks. It might not be the right tool to for every problem, but where it fits, the impact is enormous.

All that being said, the code and the techniques that we’ve explored in this chapter are the hardest in this book, and it isn’t necessary to have total fluency or mastery over them to proceed into other topics. Especially if you’re encountering these concepts for the first time, the patterns and idioms that may seem opaque right now will make a lot more sense as we see them repeated in the chapters to come.