The following code examples show how to write apps in Cascading. The apps are intended to run on a laptop using Apache Hadoop in standalone mode, on a laptop running Linux or Unix (including Mac OS X). If you are using a Windows-based laptop, then many of these examples will not work, and generally speaking Hadoop does not behave well under Cygwin. However, you could run Linux, etc., in a virtual machine. Also, these examples are not intended to show how to set up and run a Hadoop cluster. There are other good resources about that—see Hadoop: The Definitive Guide by Tom White (O’Reilly).
First, you will need to have a few platforms and tools installed:
Our use of Gradle and Git implies that these commands will be downloading JARs, checking code repos, etc., so you will need an Internet connection for most of the examples in this book.
Next, set up your command-line environment. You will need to have the following environment variables set properly, according to the installation instructions for each project and depending on your operating system:
JAVA_HOME
HADOOP_HOME
GRADLE_HOME
Assuming that the installers for both Java and Git have placed binaries in the appropriate directories,
now extend your PATH
definition for the other tools that depend on Java:
$
export
PATH
=
$PATH
:$HADOOP_HOME
/bin:$GRADLE_HOME
/bin
OK, now for some tests. Try the following command lines to verify that your installations worked:
$
java -version$
hadoop -version$
gradle --version$
git --version
Each command should print its version information. If there are problems, most likely you’ll get errors at this stage. Don’t worry if you see a warning like the following—that is a known behavior in Apache Hadoop:
Warning: $HADOOP_HOME
is deprecated.
It’s a great idea to create an account on GitHub, too. An account is not required to run the sample apps in this book. However, it will help you follow project updates for the example code, participate within the developer community, ask questions, etc.
Also note that you do not need to install Cascading. Certainly you can, but the Gradle build scripts used in these examples will pull the appropriate version of Cascading from the Conjars Maven repo automatically. Conjars has lots of interesting JARs for related projects—take a peek sometime.
OK, now you are ready to download source code. Connect to a directory on your computer where you have a few gigabytes of available disk space, and then clone the whole source code repo for this multipart series:
$
git clone git://github.com/Cascading/Impatient.git
Once that completes, connect into the part1 subdirectory. You’re ready to begin programming in Cascading.
The first item on our agenda is how to write a simple Cascading app. The goal is clear and concise: create the simplest possible app in Cascading while following best practices. This app will copy a file, potentially a very large file, in parallel—in other words, it performs a distributed copy. No bangs, no whistles, just good solid code.
First, we create a source tap to specify the input data. That data happens to be formatted as tab-separated values (TSV) with a header row, which the TextDelimited data scheme handles.
String
inPath
=
args
[
0
];
Tap
inTap
=
new
Hfs
(
new
TextDelimited
(
true
,
"\t"
),
inPath
);
Next we create a sink tap to specify the output data, which will also be in TSV format:
String
outPath
=
args
[
1
];
Tap
outTap
=
new
Hfs
(
new
TextDelimited
(
true
,
"\t"
),
outPath
);
Then we create a pipe to connect the taps:
Pipe
copyPipe
=
new
Pipe
(
"copy"
);
Here comes the fun part. Get your tool belt ready, because we need to do a little plumbing. Connect the taps and the pipe to create a flow:
FlowDef
flowDef
=
FlowDef
.
flowDef
()
.
addSource
(
copyPipe
,
inTap
)
.
addTailSink
(
copyPipe
,
outTap
);
The notion of a workflow lives at the heart of Cascading. Instead of thinking in terms of map and reduce phases in a Hadoop job step, Cascading developers define workflows and business processes as if they were doing plumbing work.
Enterprise data workflows tend to use lots of job steps. Those job steps are connected and have dependencies, specified as a directed acyclic graph (DAG). Cascading uses FlowDef objects to define how a flow—that is to say, a portion of the DAG—must be connected. A pipe must connect to both a source and a sink. Done and done. That defines the simplest flow possible.
Now that we have a flow defined, one last line of code invokes the planner on it. Planning a flow is akin to the physical plan for a query in SQL. The planner verifies that the correct fields are available for each operation, that the sequence of operations makes sense, and that all of the pipes and taps are connected in some meaningful way. If the planner detects any problems, it will throw exceptions long before the app gets submitted to the Hadoop cluster.
flowConnector
.
connect
(
flowDef
).
complete
();
Generally, these Cascading source lines go into a static main
method in a Main
class.
Look in the part1/src/main/java/impatient/ directory, in the Main.java file, where this is already done.
You should be good to go.
Each different kind of computing framework is called a topology,
and each must have its own planner class.
This example code uses the HadoopFlowConnector
class to invoke the flow planner,
which generates the Hadoop job steps needed to implement the flow.
Cascading performs that work on the client side, and then submits those jobs to the Hadoop cluster and tracks their status.
If you want to read in more detail about the classes in the Cascading API that were used, see the Cascading User Guide and JavaDoc.
Cascading uses Gradle to build the JAR for an app. The build script for is in build.gradle:
apply
plugin:
'java'
apply
plugin:
'idea'
apply
plugin:
'eclipse'
archivesBaseName
=
'impatient'
repositories
{
mavenLocal
()
mavenCentral
()
mavenRepo
name:
'conjars'
,
url:
'http://conjars.org/repo/'
}
ext
.
cascadingVersion
=
'2.1.0'
dependencies
{
compile
(
group:
'cascading'
,
name:
'cascading-core'
,
version:
cascadingVersion
)
compile
(
group:
'cascading'
,
name:
'cascading-hadoop'
,
version:
cascadingVersion
)
}
jar
{
description
=
"Assembles a Hadoop ready jar file"
doFirst
{
into
(
'lib'
)
{
from
configurations
.
compile
}
}
manifest
{
attributes
(
"Main-Class"
:
"impatient/Main"
)
}
}
Notice the reference to a Maven repo called http://conjars.org/repo/
in the build script.
That is how Gradle accesses the appropriate version of Cascading, pulling from the open source project’s Conjars public Maven repo.
To build this sample app from a command line, run Gradle:
$
gradle clean jar
Note that each Cascading app gets compiled into a single JAR file. That is to say, it includes all of the app’s business logic, system integrations, unit tests, assertions, exception handling, etc. The principle is “Same JAR, any scale.” After building a Cascading app as a JAR, a developer typically runs it on a laptop for unit tests and other validation using relatively small-scale data. Once those tests are confirmed, the JAR typically moves into continuous integration (CI) on a staging cluster using moderate-scale data. After passing CI, Enterprise IT environments generally place a tested JAR into a Maven repository as a new version of the app that Ops will schedule for production use with the full data sets.
What you should have at this point is a JAR file that is ready to run. Before running it, be sure to clear the output directory. Apache Hadoop insists on this when you’re running in standalone mode. To be clear, these examples are working with input and output paths that are in the local filesystem, not HDFS.
Now run the app:
$
rm -rf output$
hadoop jar ./build/libs/impatient.jar data/rain.txt output/rain
Notice how those command-line arguments (actual parameters) align with the args[]
array (formal parameters) in the source.
In the first argument, the source tap loads from the input file data/rain.txt, which contains text from search results about “rain shadow.”
Each line is supposed to represent a different document.
The first two lines look like this:
doc_id text doc01 A rain shadow is a dry area on the lee back side of a mountainous area.
Input tuples get copied, TSV row by TSV row, to the sink tap.
The second argument specifies that the sink tap be written to the output/rain
output, which is organized as a partition file.
You can verify that those lines got copied by viewing the text output, for example:
$
head -2 output/rain/part-00000
doc_id text
doc01 A rain shadow is a dry area on the lee back side of a mountainous area.
For quick reference, the source code, input data, and a log for this example are listed in a GitHub gist.
If the log of your run looks terribly different, something is probably not set up correctly.
There are multiple ways to interact with the Cascading developer community.
You can post a note on the cascading-user
email forum. Plenty of experienced Cascading users are discussing taps and pipes and flows there, and they are eager to help.
Or you can visit one of the Cascading user group meetings.
Conceptually, a “flow diagram” for this first example is shown in Figure 1-1. Our simplest app possible copies lines of text from file “A” to file “B.” The “M” and “R” labels represent the map and reduce phases, respectively. As the flow diagram shows, it uses one job step in Apache Hadoop: only one map and no reduce needed. The implementation is a brief Java program, 10 lines long.
Wait—10 lines of code to copy a file?
That seems excessive; certainly this same work could be performed in much quicker ways, such as using the cp
command on Linux.
However, keep in mind that Cascading is about the “plumbing” required to make Enterprise apps robust.
There is some overhead in the setup, but those lines of code won’t change much as an app’s complexity grows.
That overhead helps provide for the principle of “Same JAR, any scale.”
Let’s take a look at the components of a Cascading app. Figure 1-2 shows a taxonomy that starts with apps at the top level. An app has a unique signature and is versioned, and it includes one or more flows. Optionally, those flows may be organized into cascades, which are collections of flows without dependencies on one another, so that they may be run in parallel.
Each flow represents a physical plan, based on the planner for a specific topology such as Apache Hadoop. The physical plan provides a deterministic strategy for a query. Developers talk about a principle of “Fail the same way twice.” In other words, when we need to debug an issue, it’s quite important that Cascading flows behave deterministically. Otherwise, the process of troubleshooting edge cases on a large cluster and with a large data set can become enormous. Again, that addresses a more “conservatism” aspect of Cascading.
We’ve already introduced the use of pipes. Each assembly of pipes has a head and a tail. We bind taps to pipes to create a flow; so source taps get bound to the heads of pipes for input data, and sink taps to the tails of pipes for output data. That is the functional graph. Any unconnected pipes and taps will cause the planner to throw exceptions.
The physical plan of a flow results in a dependency graph of one or more steps. Formally speaking, that is a directed acyclic graph (DAG). At runtime, data flows through the DAG as streams of key/value tuples.
The steps created by a Hadoop flow planner, for example, correspond to the job steps that run on the Hadoop cluster. Within each step there may be multiple phases, e.g., the map phase or reduce phase in Hadoop. Also, each step is composed of slices. These are the most granular “unit of work” in a Cascading app, such that collections of slices can be parallelized. In Hadoop these slices correspond to the tasks executing in task slots.
That’s it in a nutshell, how the proverbial neck bone gets connected to the collarbone in Cascading.
The first example showed how to do a file copy in Cascading. Let’s take that code and stretch it a bit further. Undoubtedly you’ve seen Word Count before. We’d feel remiss if we did not provide an example.
Word Count serves as a “Hello World” for Hadoop apps. In other words, this simple program provides a great test case for parallel processing:
When a distributed computing framework can run Word Count in parallel at scale, it can handle much larger and more interesting algorithms. Along the way, we’ll show how to use a few more Cascading operations, plus show how to generate a flow diagram as a visualization.
Starting from the source code directory that you cloned in Git, connect into the part2 subdirectory. For quick reference, the source code and a log for this example are listed in a GitHub gist. Input data remains the same as in the earlier code.
Note that the names of the taps have changed.
Instead of inTap
and outTap
, we’re using docTap
and wcTap
now.
We’ll be adding more taps, so this will help us have more descriptive names.
This makes it simpler to follow all the plumbing.
Previously we defined a simple pipe to connect the taps. This example shows a more complex pipe assembly. We use a generator inside an Each to split the document text into a token stream. The generator uses a regex to split the input text on word boundaries:
Fields
token
=
new
Fields
(
"token"
);
Fields
text
=
new
Fields
(
"text"
);
RegexSplitGenerator
splitter
=
new
RegexSplitGenerator
(
token
,
"[ \\[\\]\\(\\),.]"
);
// returns only "token"
Pipe
docPipe
=
new
Each
(
"token"
,
text
,
splitter
,
Fields
.
RESULTS
);
Out of that pipe, we get a tuple stream of token
values.
One benefit of using a regex is that it’s simple to change. We can handle more complex cases of splitting tokens
without having to rewrite the generator.
Next, we use a GroupBy to count the occurrences of each token:
Pipe
wcPipe
=
new
Pipe
(
"wc"
,
docPipe
);
wcPipe
=
new
GroupBy
(
wcPipe
,
token
);
wcPipe
=
new
Every
(
wcPipe
,
Fields
.
ALL
,
new
Count
(),
Fields
.
ALL
);
Notice that we’ve used Each
and Every
to perform operations within the pipe assembly.
The difference between these two is that an Each
operates on individual tuples so that it takes Function
operations.
An Every
operates on groups of tuples so that it takes Aggregator
or Buffer
operations—in this case the GroupBy
performed an aggregation.
The different ways of inserting operations serve to categorize the different built-in operations in Cascading.
They also illustrate how the pattern language syntax guides the development of workflows.
From that wcPipe
we get a resulting tuple stream of token
and count
for the output.
Again, we connect the plumbing with a
FlowDef:
FlowDef
flowDef
=
FlowDef
.
flowDef
()
.
setName
(
"wc"
)
.
addSource
(
docPipe
,
docTap
)
.
addTailSink
(
wcPipe
,
wcTap
);
Finally, we generate a DOT file to depict the Cascading flow graphically. You can load the DOT file into OmniGraffle or Visio. Those diagrams are really helpful for troubleshooting workflows in Cascading:
Flow
wcFlow
=
flowConnector
.
connect
(
flowDef
);
wcFlow
.
writeDOT
(
"dot/wc.dot"
);
wcFlow
.
complete
();
This code is already in the part2/src/main/java/impatient/ directory, in the Main.java file. To build it:
$
gradle clean jar
Then to run it:
$
rm -rf output$
hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc
This second example uses the same input from the first example, but we expect different output. The sink tap writes to the partition file output/wc, and the first 10 lines (including a header) should look like this:
$ head output/wc/part-00000 token count 9 A 3 Australia 1 Broken 1 California's 1 DVD 1 Death 1 Land 1 Secrets 1
Again, a GitHub gist shows building and running the sample app. If your run looks terribly different, something is probably not set up correctly. Ask the Cascading developer community how to troubleshoot for your environment.
So that’s our Word Count
example.
Eighteen lines of yummy goodness.
Conceptually, we can examine a workflow as a stylized flow diagram. This helps visualize the “plumbing” metaphor by using a design that removes low-level details. Figure 1-3 shows one of these for . Formally speaking, this diagram represents a DAG.
Meanwhile the Cascading code in writes a flow diagram called dot/wc.dot to depict the flow graphically. Figure 1-4 shows a version that has been annotated to indicate where the map and reduce phases run. As mentioned before, those diagrams come in handy when troubleshooting Cascading workflows. If you ask other Cascading developers for help debugging an issue, don’t be surprised when their first request is to see your app’s flow diagram.
From a high-level perspective, differs from in two ways:
Although several lines of Java source code changed, in pattern language terms we can express the difference between the apps simply as those two points. That’s a powerful benefit of using the “plumbing” metaphor.
First let’s consider how the source and sink taps were redefined to be more specific.
Instead of simply describing a generic “Source” or “File A,” now we’ve defined the source tap as a collection of text documents.
Instead of “Sink” or “File B,” now we’ve defined the sink tap to produce word count tuples—the desired end result.
Those changes in the taps began to reference fields in the tuple stream.
The source tap in both examples was based on TextDelimited
with parameters so that it reads a TSV file and uses the header line to assign field names.
ignored the fields by simply copying data tuple by tuple.
begins to reference fields by name, which introduces the notion of scheme—imposing some expectation of structure on otherwise unstructured data.
The change in taps also added semantics to the workflow, specifying requirements for added operations needed to reach the desired results.
Let’s consider the new Cascading operations that were added to the pipe assembly in :
Tokenize
, GroupBy
, and Count
.
The first one, Tokenize
, transforms the input data tuples, splitting lines of text into a stream of tokens.
That transform represents the “T” in ETL.
The second operation, GroupBy
, performs an aggregation.
In terms of Hadoop, this causes a reduce with token
as a key.
The third operation, Count
, gets applied to each aggregation—counting the values for each token
key, i.e., the number of instances of each token in the stream.
The deltas between and illustrate important aspects of Cascading.
Consider how data tuples flow through a pipe assembly, getting routed through familiar data operators such as GroupBy
, Count
, etc.
Each flow must be connected to a source of data as its input and a sink as its output.
The sink tap for one flow may in turn become a source tap for another flow.
Each flow defines a DAG that Cascading uses to infer schema from unstructured data.
Enterprise data workflows are complex applications, and managing that complexity is the purpose for Cascading. Enterprise apps based on Apache Hadoop typically involve more than just one Hadoop job step. Some apps are known to include hundreds of job steps, with complex dependencies between them. Cascading leverages this concept of a DAG to represent the business process of an app. The DAG, in turn, declares the requirements for the job steps that are needed to complete the app’s data flow. Consequently, a flow planner has sufficient information about the workflow so that it can leverage the DAG in several ways:
Those capabilities address important concerns in Enterprise IT and stand as key points by which Cascading differentiates itself from other Hadoop abstraction layers.
Another subtle point concerns the use of taps. On one hand, data taps are available for integrating Cascading with several other popular data frameworks, including Memcached, HBase, Cassandra, etc. Several popular data serialization systems are supported, such as Apache Thrift, Avro, Kyro, etc. Looking at the conceptual flow diagram, our workflow could be using any of a variety of different data frameworks and serialization systems. That could apply equally well to SQL query result sets via JDBC or to data coming from Cassandra via Thrift. It wouldn’t be difficult to modify the code in to set those details based on configuration parameters. To wit, the taps generalize many physical aspects of the data so that we can leverage patterns.
On the other hand, taps also help manage complexity at scale. Our code in could be run on a laptop in Hadoop’s “standalone” mode to process a small file such as rain.txt, which is a mere 510 bytes. The same code could be run on a 1,000-node Hadoop cluster to process several petabytes of the Internet Archives’ Wayback Machine.
Taps are agnostic about scale, because the underlying topology (Hadoop) uses parallelism to handle very large data. Generally speaking, Cascading apps handle scale-out into larger and larger data sets by changing the parameters used to define taps. Taps themselves are formal parameters that specify placeholders for input and output data. When a Cascading app runs, its actual parameters specify the actual data to be used—whether those are HDFS partition files, HBase data objects, Memcached key/values, etc. We call these tap identifiers. They are effectively uniform resource identifiers (URIs) for connecting through protocols such as HDFS, JDBC, etc. A dependency graph of tap identifiers and the history of app instances that produced or consumed them is analogous to a catalog in relational databases.
The code in showed how to move data from point A to point B. That was simply a distributed file copy—loading data via distributed tasks, or the “L” in ETL.
A copy example may seem trivial, and it may seem like Cascading is overkill for that. However, moving important data from point A to point B reliably can be a crucial job to perform. This helps illustrate one of the key reasons to use Cascading.
Consider an analogy of building a small Ferris wheel. With a little bit of imagination and some background in welding, a person could cobble one together using old bicycle parts. In fact, those DIY Ferris wheels show up at events such as Maker Faire. Starting out, a person might construct a little Ferris wheel, just for demo. It might not hold anything larger than hamsters, but it’s not a hard problem. With a bit more skill, a person could probably build a somewhat larger instance, one that’s big enough for small children to ride.
Ask yourself this: how robust would a DIY Ferris wheel need to be before you let your kids ride on it? That’s precisely part of the challenge at an event like Maker Faire. Makers must be able to build a device such as a Ferris wheel out of spare bicycle parts that is robust enough that strangers will let their kids ride. Let’s hope those welds were made using best practices and good materials, to avoid catastrophes.
That’s a key reason why Cascading was created. When you need to move a few gigabytes from point A to point B, it’s probably simple enough to write a Bash script, or just use a single command-line copy. When your work requires some reshaping of the data, then a few lines of Python will probably work fine. Run that Python code from your Bash script and you’re done.
That’s a great approach, when it fits the use case requirements. However, suppose you’re not moving just gigabytes. Suppose you’re moving terabytes, or petabytes. Bash scripts won’t get you very far. Also think about this: suppose an app not only needs to move data from point A to point B, but it must follow the required best practices of an Enterprise IT shop. Millions of dollars and potentially even some jobs ride on the fact that the app performs correctly. Day in and day out. That’s not unlike trusting a Ferris wheel made by strangers; the users want to make sure it wasn’t just built out of spare bicycle parts by some amateur welder. Robustness is key.
Or, taking this analogy a few steps in another interesting direction, perhaps you’re not only moving data and reshaping it a little, but you’re applying some interesting machine learning algorithms, some natural language processing, gene sequencing…who knows? Those imply lots of resource use, lots of potential expense in case of failures. Or lots of customer exposure. You’ll want to use an application framework that is significantly more robust than a bunch of scripts cobbled together.
With Cascading, you can package your entire MapReduce application, including its orchestration and testing, within a single JAR file. You define all of that within the context of one programming language—whether that language may be Java, Scala, Clojure, Python, Ruby, etc. That way your tests are included within a single program, not spread across several scripts written in different languages. Having a single JAR file define an app helps for following the best practices required in Enterprise IT: unit tests, stream assertions, revision control, continuous integration, Maven repos, role-based configuration management, advanced schedulers, monitoring and notifications, data provenance, etc. Those are key reasons why we make Cascading, and why people use it for robust apps that run at scale.