In the previous example, we looked at extending pipe assemblies in Cascading workflows.
Functionally, is only a few changes away from implementing an algorithm called
term frequency–inverse document frequency (TF-IDF).
This is the basis for many search indexing metrics, such as in the popular open source search engine
Apache Lucene.
See the
Similarity
class
in Lucene for a great discussion of the algorithm and its use.
For this example, let’s show how to implement TF-IDF in Cascading—which is a useful subassembly to reuse in a variety of apps. Figure 3-1 shows a conceptual diagram for this. Based on having a more complex app to work with, we’ll begin to examine Cascading features for testing at scale.
Starting from the source code directory that you cloned in Git, connect into the part5 subdirectory. First let’s add another sink tap to write the TF-IDF weights:
String
tfidfPath
=
args
[
3
];
Tap
tfidfTap
=
new
Hfs
(
new
TextDelimited
(
true
,
"\t"
),
tfidfPath
);
Next we’ll modify the existing pipe assemblies for Word Count, beginning immediately after the join used as a “stop words” filter.
We add the following line to retain only the doc_id
and token
fields in the output tuple stream, based on the fieldSelector
parameter:
tokenPipe
=
new
Retain
(
tokenPipe
,
fieldSelector
);
Now let’s step back and consider the desired end result, a TF-IDF metric. The “TF” and “IDF” parts of TF-IDF can be calculated given four metrics:
Slight modifications to Word Count produce both term count and document frequency, along with the other two components, which get calculated almost as by-products.
At this point, we need to use the tuple stream in multiple ways—effectively splitting the intermediate results from tokenPipe
in three ways.
Note that there are three basic patterns for separating or combining tuple streams:
We’ve already seen a join; now we’ll introduce a split. This is also a good point to talk about the names of branches in pipe assemblies. Note that pipes always have names. When we connect pipes into pipe assemblies, the name gets inherited downstream—unless it gets changed through the API or due to a structural difference such as a join or a merge. Branch names are important for troubleshooting and instrumentation of workflows. In this case where we’re using a split, Cascading requires each branch to have a different name.
The first branch after tokenPipe
calculates term counts, as shown in Figure 3-2.
We’ll call that pipe assembly tfPipe
, with a branch name TF
:
// one branch of the flow tallies the token counts for term frequency (TF)
Pipe
tfPipe
=
new
Pipe
(
"TF"
,
tokenPipe
);
Fields
tf_count
=
new
Fields
(
"tf_count"
);
tfPipe
=
new
CountBy
(
tfPipe
,
new
Fields
(
"doc_id"
,
"token"
),
tf_count
);
Fields
tf_token
=
new
Fields
(
"tf_token"
);
tfPipe
=
new
Rename
(
tfPipe
,
token
,
tf_token
);
This uses a built-in partial aggregate operation called CountBy, which counts duplicates in a tuple stream. Partial aggregates are quite useful for parallelizing algorithms efficiently. Portions of an aggregation—for example, a summation—can be performed in different tasks.
We also rename token
to tf_token
so that it won’t conflict with other tuple streams in a subsequent join.
At this point, we have the term counts.
The next branch may seem less than intuitive…and it is a bit odd, but efficient.
We need to calculate the total number of documents, in a way that can be consumed later in a join.
So we’ll produce total document count as a field, in each tuple for the RHS of the join.
That keeps our workflow parallel, allowing the calculations to scale out horizontally.
We’ll call that pipe assembly dPipe
, with a branch name D
, as shown in Figure 3-3.
Alternatively, we could calculate the total number of documents outside of this workflow
and pass it along as a parameter, or use a distributed counter.
Fields
doc_id
=
new
Fields
(
"doc_id"
);
Fields
tally
=
new
Fields
(
"tally"
);
Fields
rhs_join
=
new
Fields
(
"rhs_join"
);
Fields
n_docs
=
new
Fields
(
"n_docs"
);
Pipe
dPipe
=
new
Unique
(
"D"
,
tokenPipe
,
doc_id
);
dPipe
=
new
Each
(
dPipe
,
new
Insert
(
tally
,
1
),
Fields
.
ALL
);
dPipe
=
new
Each
(
dPipe
,
new
Insert
(
rhs_join
,
1
),
Fields
.
ALL
);
dPipe
=
new
SumBy
(
dPipe
,
rhs_join
,
tally
,
n_docs
,
long
.
class
);
This filters for the unique doc_id
values and then uses another built-in partial aggregate operation called
SumBy,
which sums values associated with duplicate keys in a tuple stream.
Great, now we’ve got the document count.
Notice that the results are named rhs_join
, preparing for the subsequent join.
The third branch calculates document frequency for each token.
We’ll call that pipe assembly dfPipe
, with a branch name DF
, as shown in Figure 3-4:
// one branch tallies the token counts for document frequency (DF)
Pipe
dfPipe
=
new
Unique
(
"DF"
,
tokenPipe
,
Fields
.
ALL
);
Fields
df_count
=
new
Fields
(
"df_count"
);
dfPipe
=
new
CountBy
(
dfPipe
,
token
,
df_count
);
Fields
df_token
=
new
Fields
(
"df_token"
);
Fields
lhs_join
=
new
Fields
(
"lhs_join"
);
dfPipe
=
new
Rename
(
dfPipe
,
token
,
df_token
);
dfPipe
=
new
Each
(
dfPipe
,
new
Insert
(
lhs_join
,
1
),
Fields
.
ALL
);
Notice that the results are named lhs_join
, again preparing for the subsequent join.
Now we have all the components needed to calculate TF-IDF weights.
To finish the calculations in parallel, we’ll use two different kinds of joins in Cascading—a HashJoin followed by a CoGroup. Figure 3-5 shows how these joins merge the three branches together:
// join to bring together all the components for calculating TF-IDF
// the D side of the join is smaller, so it goes on the RHS
Pipe
idfPipe
=
new
HashJoin
(
dfPipe
,
lhs_join
,
dPipe
,
rhs_join
);
// the IDF side of the join is smaller, so it goes on the RHS
Pipe
tfidfPipe
=
new
CoGroup
(
tfPipe
,
tf_token
,
idfPipe
,
df_token
);
We used HashJoin
previously for a replicated join.
In this case we know that document count will not be a large amount of data, so it works for the RHS.
The other join, CoGroup
, handles a more general case where the RHS cannot be kept entirely in memory.
In those cases a threshold can be adjusted for “spill,” where RHS tuples get moved to disk.
Then we calculate TF-IDF weights using an ExpressionFunction in Cascading:
// calculate the TF-IDF weights, per token, per document
Fields
tfidf
=
new
Fields
(
"tfidf"
);
String
expression
=
"(double) tf_count * Math.log( (double) n_docs / ( 1.0 + df_count ) )"
;
ExpressionFunction
tfidfExpression
=
new
ExpressionFunction
(
tfidf
,
expression
,
Double
.
class
);
Fields
tfidfArguments
=
new
Fields
(
"tf_count"
,
"df_count"
,
"n_docs"
);
tfidfPipe
=
new
Each
(
tfidfPipe
,
tfidfArguments
,
tfidfExpression
,
Fields
.
ALL
);
fieldSelector
=
new
Fields
(
"tf_token"
,
"doc_id"
,
"tfidf"
);
tfidfPipe
=
new
Retain
(
tfidfPipe
,
fieldSelector
);
tfidfPipe
=
new
Rename
(
tfidfPipe
,
tf_token
,
token
);
Now we can get back to the rest of the workflow.
Let’s keep the Word Count metrics, because those become useful when testing.
This branch uses CountBy
as well, to optimize better than :
// keep track of the word counts, which are useful for QA
Pipe
wcPipe
=
new
Pipe
(
"wc"
,
tfPipe
);
Fields
count
=
new
Fields
(
"count"
);
wcPipe
=
new
SumBy
(
wcPipe
,
tf_token
,
tf_count
,
count
,
long
.
class
);
wcPipe
=
new
Rename
(
wcPipe
,
tf_token
,
token
);
// additionally, sort by count
wcPipe
=
new
GroupBy
(
wcPipe
,
count
,
count
);
Last, we’ll add another sink tap to the FlowDef, for the TF-IDF output data:
// connect the taps, pipes, etc., into a flow
FlowDef
flowDef
=
FlowDef
.
flowDef
()
.
setName
(
"tfidf"
)
.
addSource
(
docPipe
,
docTap
)
.
addSource
(
stopPipe
,
stopTap
)
.
addTailSink
(
tfidfPipe
,
tfidfTap
)
.
addTailSink
(
wcPipe
,
wcTap
);
We’ll also change the name of the resulting Flow, to distinguish this from previous examples:
// write a DOT file and run the flow
Flow
tfidfFlow
=
flowConnector
.
connect
(
flowDef
);
tfidfFlow
.
writeDOT
(
"dot/tfidf.dot"
);
tfidfFlow
.
complete
();
Modify the Main
method for these changes.
This code is already in the part5/src/main/java/impatient/ directory, in the Main.java file.
You should be good to go.
For those keeping score, the physical plan in now uses 11 maps and 9 reduces.
That amount jumped by 5x since our previous example.
If you want to read in more detail about the classes in the Cascading API that were used, see the Cascading User Guide and JavaDoc.
To build:
$
gradle clean jar
To run:
$
rm -rf output$
hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop\
output/tfidf
Output text gets stored in the partition file output/tfidf, and you can verify the first 10 lines (including the header) by using the following:
$
head output/tfidf/part-00000 doc_id tfidf token doc02 0.9162907318741551 air doc01 0.44628710262841953 area doc03 0.22314355131420976 area doc02 0.22314355131420976 area doc05 0.9162907318741551 australia doc05 0.9162907318741551 broken doc04 0.9162907318741551 california'
s doc04 0.9162907318741551 cause doc02 0.9162907318741551 cloudcover
A gist on GitHub shows building and running . If your run looks terribly different, something is probably not set up correctly. Ask the developer community for troubleshooting advice.
By the way, did you notice what the TF-IDF weights for the tokens rain
and shadow
were?
Those represent what these documents all have in common.
How do those compare with weights for the other tokens?
Conversely, consider the weight for australia
(high weight) or area
(different weights).
TF-IDF calculates a metric for each token, which indicates how “important” that token is to a document within the context of a collection of documents. The metric is calculated based on relative frequencies. On one hand, tokens that appear in most documents tend to have very low TF-IDF weights. On the other hand, tokens that are less common but appear multiple times in a few documents tend to have very high TF-IDF weights.
Note that information retrieval papers use token
and term
almost interchangeably in some cases.
More advanced text analytics might calculate metrics for phrases, in which case a term
becomes a more complex structure.
However, we’re only looking at single words.
Now that we have a more complex workflow for TF-IDF, let’s consider best practices for test-driven development (TDD) at scale. We’ll add unit tests into the build, then show how to leverage TDD features that are unique to Cascading: checkpoints, traps, assertions, etc. Figure 3-6 shows a conceptual diagram for this app.
Generally speaking, TDD starts off with a failing test, and then you code until the test passes. We’ll start with a working app, with tests that pass—followed by discussion of how to use assertions for the test/code cycle.
Starting from the source code directory that you cloned in Git, connect into the part6 subdirectory.
As a first step toward better testing, let’s add a unit test and show how it fits into this example.
We need to add support for testing into our build.
In the Gradle build script build.gradle we need to modify the compile
task to include
JUnit and other testing dependencies:
dependencies
{
compile
(
'cascading:cascading-core:2.1.+'
)
{
exclude
group:
'log4j'
}
compile
(
'cascading:cascading-hadoop:2.1.+'
)
{
transitive
=
true
}
testCompile
(
'cascading:cascading-test:2.1.+'
)
testCompile
(
'org.apache.hadoop:hadoop-test:1.0.+'
)
testCompile
(
'junit:junit:4.8.+'
)
}
Then we’ll add a new test
task to the build:
test
{
include
'impatient/**'
//makes standard streams (err, out) visible at console when running tests
testLogging
.
showStandardStreams
=
true
//listening to test execution events
beforeTest
{
descriptor
->
logger
.
lifecycle
(
"Running test: "
+
descriptor
)
}
onOutput
{
descriptor
,
event
->
logger
.
lifecycle
(
"Test: "
+
descriptor
+
" produced standard out/err: "
+
event
.
message
)
}
}
A little restructuring of the source directories is required—see this GitHub code repo where that is already set up properly.
The custom function ScrubFunction
used to scrub tokens in Example 3: Customized Operations had an additional method, to make unit testing simpler.
We add a unit test in a new class called ScrubTest.java
, which extends
CascadingTestCase:
public
class
ScrubTest
extends
CascadingTestCase
{
@Test
public
void
testScrub
()
{
Fields
fieldDeclaration
=
new
Fields
(
"doc_id"
,
"token"
);
Function
scrub
=
new
ScrubFunction
(
fieldDeclaration
);
Tuple
[]
arguments
=
new
Tuple
[]{
new
Tuple
(
"doc_1"
,
"FoO"
),
new
Tuple
(
"doc_1"
,
" BAR "
),
new
Tuple
(
"doc_1"
,
" "
)
// will be scrubbed
};
ArrayList
<
Tuple
>
expectResults
=
new
ArrayList
<
Tuple
>();
expectResults
.
add
(
new
Tuple
(
"doc_1"
,
"foo"
)
);
expectResults
.
add
(
new
Tuple
(
"doc_1"
,
"bar"
)
);
TupleListCollector
collector
=
invokeFunction
(
scrub
,
arguments
,
Fields
.
ALL
);
Iterator
<
Tuple
>
it
=
collector
.
iterator
();
ArrayList
<
Tuple
>
results
=
new
ArrayList
<
Tuple
>();
while
(
it
.
hasNext
()
)
results
.
add
(
it
.
next
()
);
assertEquals
(
"Scrubbed result is not expected"
,
expectResults
,
results
);
}
}
Again, this is a particularly good place for a unit test. Scrubbing tokens is a likely point where edge cases will get encountered at scale. In practice, you’d want to define even more unit tests.
Going back to the Main.java module, let’s see how to handle other kinds of unexpected issues with data at scale. We’ll add both a trap and a checkpoint as taps:
String
trapPath
=
args
[
4
];
String
checkPath
=
args
[
5
];
Tap
trapTap
=
new
Hfs
(
new
TextDelimited
(
true
,
"\t"
),
trapPath
);
Tap
checkTap
=
new
Hfs
(
new
TextDelimited
(
true
,
"\t"
),
checkPath
);
Next we’ll modify the head of the pipe assembly for documents to incorporate a stream assertion, as Figure 3-7 shows. This uses an AssertMatches to define the expected pattern for data in the input tuple stream. There could be quite a large number of documents, so it stands to reason that some data may become corrupted. In our case, another line has been added to the example input data/rain.txt to exercise the assertion and trap.
Notice in Figure 3-7 how the trap will apply to the entire branch that includes the stream assertion. Then we apply AssertionLevel.STRICT to force validation of the data:
// use a stream assertion to validate the input data
Pipe
docPipe
=
new
Pipe
(
"token"
);
AssertMatches
assertMatches
=
new
AssertMatches
(
"doc\\d+\\s.*"
);
docPipe
=
new
Each
(
docPipe
,
AssertionLevel
.
STRICT
,
assertMatches
);
Sometimes, when working with complex workflows, we just need to see what the tuple stream looks like. To show this feature, we’ll insert a Debug operation on the DF branch and use DebugLevel.VERBOSE to trace the tuple values in the flow there:
// example use of a debug, to observe tuple stream; turn off below
dfPipe
=
new
Each
(
dfPipe
,
DebugLevel
.
VERBOSE
,
new
Debug
(
true
)
);
This prints the tuple values at that point to the log file. Fortunately, it can be disabled with a single line—in practice, you’d probably use a command-line argument to control that.
Next let’s show how to use a Checkpoint that forces the tuple stream to be persisted to HDFS. Figure 3-8 shows this inserted after the join of the DF and D branches.
// create a checkpoint, to observe the intermediate data in DF stream
Checkpoint
idfCheck
=
new
Checkpoint
(
"checkpoint"
,
idfPipe
);
Pipe
tfidfPipe
=
new
CoGroup
(
tfPipe
,
tf_token
,
idfCheck
,
df_token
);
Checkpoints help especially when there is an expensive unit of work—such as a lengthy calculation. On one hand, if a calculation fits into a single map and several branches consume from it, then a checkpoint avoids having to redo the calculation for each branch. On the other hand, if a Hadoop job step fails, for whatever reason, then the Cascading app can be restarted from the last successful checkpoint.
At this point we have a relatively more complex set of taps to connect in the FlowDef, to include the new output data for test-related features:
// connect the taps, pipes, traps, checkpoints, etc., into a flow
FlowDef
flowDef
=
FlowDef
.
flowDef
()
.
setName
(
"tfidf"
)
.
addSource
(
docPipe
,
docTap
)
.
addSource
(
stopPipe
,
stopTap
)
.
addTailSink
(
tfidfPipe
,
tfidfTap
)
.
addTailSink
(
wcPipe
,
wcTap
)
.
addTrap
(
docPipe
,
trapTap
)
.
addCheckpoint
(
idfCheck
,
checkTap
);
Last, we’ll specify the verbosity level for the debug trace and the strictness level for the stream assertion:
// set to DebugLevel.VERBOSE for trace,
// or DebugLevel.NONE in production
flowDef
.
setDebugLevel
(
DebugLevel
.
VERBOSE
);
// set to AssertionLevel.STRICT for all assertions,
// or AssertionLevel.NONE in production
flowDef
.
setAssertionLevel
(
AssertionLevel
.
STRICT
);
Modify the Main
method for those changes.
This code is already in the part6/src/main/java/impatient/ directory, in the Main.java file.
You should be good to go.
For those keeping score, the physical plan for now uses 12 maps and 9 reduces. In other words, we added one map as the overhead for gaining lots of test features.
To build:
$
gradle clean jar
To run:
$
rm -rf output$
hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop\
output/tfidf output/trap output/check
Remember that data/rain.txt has another row, intended to cause a trap. The output log should include a warning based on the stream assertion, which looks like this:
12/08/06 14:15:07 WARN stream.TrapHandler: exceptiontrap
on branch:'token'
,for
fields:[{
2}
:'doc_id'
,'text'
]
tuple:[
'zoink'
,'null'
]
cascading.operation.AssertionException: argument tuple:[
'zoink'
,'null'
]
did not match: doc\d
+\s
.* at cascading.operation.assertion.BaseAssertion.throwFail(
BaseAssertion.java:107)
at cascading.operation.assertion.AssertMatches.doAssert(
AssertMatches.java:84)
That is expected behavior. We asked Cascading to show warnings when stream assertions failed. The data that caused this warning gets trapped.
Not too far after that point in the log, there should be some other debug output that looks like the following:
12/08/06 14:15:46 INFO hadoop.FlowReducer: sinking to: TempHfs[
"SequenceFile[ ['df_count', 'df_token', 'lhs_join']]"
][
DF/93669/]
[
'df_count'
,'df_token'
,'lhs_join'
]
[
'1'
,'air'
,'1'
]
[
'3'
,'area'
,'1'
]
[
'1'
,'australia'
,'1'
]
[
'1'
,'broken'
,'1'
]
Plus several more similar lines. That is the result of our debug trace.
Output text gets stored in the partition file output/tfidf as before. We also have the checkpointed data now:
$
head output/check/part-00000 df_count df_token lhs_join rhs_join n_docs 1 air 1 1 5 3 area 1 1 5 1 australia 1 1 5 1 broken 1 1 5 1 california'
s 1 1 5 1 cause 1 1 5 1 cloudcover 1 1 5 1 death 1 1 5 1 deserts 1 1 5
Also notice the data tuple trapped in output/trap
:
$
cat output/trap/part-m-00001-00000
zoink null
That tuple does not match the regex doc\\d+\\s.*
that was specified by the stream assertion.
Great, we caught it before it blew up something downstream.
A gist on GitHub shows building and running . If your run looks terribly different, something is probably not set up correctly. Ask the developer community for troubleshooting advice.
At first glance, the notion of TDD might seem a bit antithetical in the context of Big Data. After all, TDD is supposed to be about short development cycles, writing automated test cases that are intended to fail, and lots of refactoring. Those descriptions don’t seem to fit with batch jobs that involve terabytes of data run on huge Hadoop clusters for days before they complete.
Stated in a somewhat different way, according to Kent Beck, TDD “encourages simple designs and inspires confidence.”
That statement fits quite well with the philosophy of Cascading.
The Cascading API is intended to provide a pattern language for working with large-scale data—GroupBy
, Join
, Count
, Regex
, Filter
—so that the need for writing custom functions becomes relatively rare.
That speaks to “encouraging simple designs” directly.
The practice in Cascading of modeling business process and orchestrating Apache Hadoop workflows speaks to “inspiring confidence” in a big way.
So now we’ll let the cat out of the bag for a little secret…working with unstructured data at scale has been shown to be quite valuable by the likes of Google, Amazon, eBay, Facebook, LinkedIn, Twitter, etc. However, most of the “heavy lifting” that we perform in MapReduce workflows is essentially cleaning up data. DJ Patil, formerly Chief Scientist at LinkedIn, explains this point quite eloquently in the mini-book Data Jujitsu:
It’s impossible to overstress this: 80% of the work in any data project is in cleaning the data… Work done up front in getting clean data will be amply repaid over the course of the project.
— DJ Patil Data Jujitsu (2012)
Cleaning up unstructured data allows for subsequent use of sampling techniques, dimensional reduction, and other practices that help alleviate some of the bottlenecks that might otherwise be encountered in Enterprise data workflows. Thinking about this in another way, we have need for API features that demonstrate how “dirty” data has been cleaned up. Cascading provides those features, which turn out to be quite valuable in practice.
Common practices for test-driven development include writing unit tests or mocks. How does one write a quick unit test for a Godzilla-sized data set?
The short answer is: you don’t. However, you can greatly reduce the need for writing unit test coverage by limiting the amount of custom code required. Hopefully we’ve shown that aspect of Cascading by now. Beyond that, you can use sampling techniques to quantify confidence that an app has run correctly. You can also define system tests at scale in relatively simple ways. Furthermore, you can define contingencies for what to do when assumptions fail…as they inevitably do, at scale.
Let’s discuss sampling. Generally speaking, large MapReduce workflows tend to be relatively opaque processes that are difficult to observe. Cascading, however, provides two techniques for observing portions of a workflow. One very simple approach is to insert a Debug into a pipe to see the tuple values passing through a particular part of a workflow. Debug output goes to the log instead of a file, but it can be turned off, e.g., with a command-line option. If the data is large, one can use a Sample filter to sample the tuple values that get written to the log.
Another approach is to use a Checkpoint, which forces intermediate data to be written out to HDFS. This may also become important for performance reasons, i.e., forcing results to disk to avoid recomputing—e.g., when there are multiple uses for the output of a pipe downstream such as with the RHS of a HashJoin. Sampling may be performed either before (with Debug) or after (with Checkpoint) the data gets persisted to HDFS. Checkpoints can also be used to restart partially failed workflows, to recover some costs.
Next, let’s talk about system tests.
Cascading includes support for
stream assertions.
These provide mechanisms for asserting that the values in a tuple stream meet certain criteria—similar to the assert
keyword in Java, or an assert not null
in a JUnit test.
We can assert patterns strictly as unit tests during development and then run testing against regression data.
For performance reasons, we might use command-line options to turn off assertions in production—or keep them (fail-fast mode) if a use case requires that level of guarantee.
Lastly, what should you do when assumptions fail? One lesson of working with data at scale is that the best assumptions will inevitably fail. Unexpected things happen, and 80% of the work will be cleaning up problems.
Cascading defines failure traps, which capture data that would otherwise cause an operation to fail, e.g., by throwing an exception. For example, perhaps 99% of the cases in your log files can be rolled up into a set of standard reports…but 1% requires manual review. Great, process the 99% that work and shunt the 1% failure cases into a special file marked “For manual review.” That can be turned into a report for the customer support department. Keep in mind, however, that traps are intended for handling exceptional cases. If you know in advance how to categorize good versus bad data, then a best practice is to use a filter instead of a trap.