Chapter 6. Beyond MapReduce

Overall, the notion of an Enterprise data workflow spans well beyond Hadoop, integrating many different kinds of frameworks and processes. Consider the architecture in Figure 6-1 as a strawman that shows where a typical Enterprise data workflow runs.

In the center there is a workflow consuming from some unstructured data—most likely some kind of machine data, such as log files—plus some other, more structured data from another framework, such as customer profiles. That workflow runs on an Apache Hadoop cluster, and possibly on other topologies, such as in-memory data grids (IMDGs).

Some of the results go directly to a frontend use case, such as getting pushed into Memcached, which is backing a customer API. Line of business use cases are what drive most of the need for Big Data apps.

Some of the results also go to the back office. Enterprise organizations almost always have made substantial investments in data infrastructure for the back office, in the process used to integrate systems and coordinate different departments, and in the people trained in that process. Workflow results such as data cubes get pushed from the Hadoop cluster out to an analytics framework. In turn, those data cubes get consumed for reporting needs, data science work, customer support, etc.

We can also view this strawman workflow from a functional perspective at Enterprise scale, shown in Figure 6-2.

Different departments are typically responsible for specific units of work. Multiple data sources get loaded and merged through the ETL process. Many organizations perform that ETL within a data warehouse, such as Teradata, where the unit of work is defined in ANSI SQL queries. Most use cases require additional data preparation, applying business logic that is specific to an organization—for example, cleaning the data and producing sample training sets. Apache Pig is a popular tool for that kind of data preparation on a Hadoop cluster. Those data sets may get used to create and score predictive models: classifiers, recommenders, forecasts, etc. Many organizations perform their modeling within an analytics framework, such as SAS. Application developers translate specifications from the analysts’ models into another programming language, such as Java, to run at scale. Then the data products from those apps must be integrated into end use cases.

At this point, the business process for our strawman app has probably crossed through four or five departments: Ops, Analytics, plus a few different applications teams. And while some organizations have adopted more of Data Science practice—introducing multidisciplinary teams that can handle all of those functions—in reality, most Enterprise firms will have these responsibilities split across different departments, probably with each team leveraging different frameworks. That creates an issue of operational complexity, because the business process for the workflow is defined and executed in pieces. Ops ultimately has responsibility for keeping the entire workflow running smoothly, even though it crosses several boundaries; part of it gets defined in ANSI SQL, part in Pig, part in SAS, part in Java, etc.

This kind of problem is why Enterprise organizations leverage Cascading. The entire business process can be defined in one app—one JAR file—that integrates each of the respective units of work, as shown in Figure 6-3. Rather than translating requirements from each department into Java, Clojure, Scala, etc., most of the work can be integrated directly.

To support this, Cascading includes two components, Lingual for ANSI SQL and Pattern for PMML, which are effectively DSLs. These allow for all of the following:

In other words, the different departments have a way to collaborate on a combined app that ties together and optimizes business processes across the organization.

Lingual is an extension to Cascading that executes ANSI SQL queries as Cascading apps. This open source project is a collaboration between Cascading and Optiq—an ANSI-compliant SQL parser/optimizer written by Julian Hyde, the author of Mondrian. Julian wrote a good description of the project.

It is important to note that Lingual itself is not a database. Rather, it leverages the power of SQL to describe the business logic for data workflows—as a kind of functional programming. In that sense Lingual implements a domain-specific language (DSL) where Cascading workflows get defined in SQL. Optiq provides compatibility with a wide range of commercial implementations for ANSI SQL: tested with more than 6,000 complex SQL statements from production queries in DB2, Oracle, Teradata, SQL Server, etc.

Consider that ANSI SQL provides the lingua franca for describing data in Enterprise. People working in data management often think of database, relational, and SQL as all referring to the same thing. In reality, most of the databases in use today are non-relational; e.g., banking makes enormous use of hierarchical databases. Moreover, SQL is not quite the same as the relational model. Edgar Codd, the author of the relational model, spent much of the latter part of his career arguing that point. However, SQL is a language—a declarative language, mostly based on a functional programming paradigm—and it describes workflows that are directed acyclic graphs (DAGs). In that sense, SQL corresponds quite closely to the internals of Cascading.

Another subtle point is that Lingual is not intended for low-latency, ad hoc queries. In that sense it is the opposite of “SQL on Hadoop” platforms such as Apache Hive—where people issue queries and expect rapid responses. Instead, Lingual provides for high-throughput work. Studies based on many years of Enterprise SQL use case analysis have shown a long tail of machine-to-machine batch processing. In other words, a mission-critical business process gets defined in SQL, with queries written by a machine.

ETL is a typical use case for this in Enterprise. On the one hand, large-scale joins, filtering, and aggregation are typically required. On the other hand, the source data is probably not indexed and almost certainly not normalized. The requirements for ETL are nearly the opposite of what a relational database provides. Many of the Enterprise deployments of Cascading apps are ETL—addressing complex data quality problems that are readily handled by the “plumbing” of traps, flows, branches, merges, etc.

In addition to the Lingual library and a JAR file used to build Cascading apps, other components include the following:

To install the SQL command shell, run the following script:

$ curl \
 http://files.concurrentinc.com/lingual/1.0/lingual-client/ \
 install-lingual-client.sh | bash

That will create a ~/.lingual-client/ directory, which needs to be added to your PATH environment variable.

$ export PATH=~/.lingual-client/bin/:$PATH

When using Lingual with Apache Hadoop, the SQL command shell expects certain environment variables to be set. That way the correct Hadoop version and configuration will be included in the CLASSPATH:

HADOOP_HOME
Path to local Hadoop installation
HADOOP_CONF_DIR
Defaults to $HADOOP_HOME/conf
HADOOP_USER_NAME
The username to use when submitting Hadoop jobs

Assuming that you have HADOOP_HOME already set, then:

$ export HADOOP_CONF_DIR=$HADOOP_HOME/conf
$ export HADOOP_USER_NAME=<username>

If you’re working with a remote Elastic MapReduce cluster on Amazon AWS, see the Bash EMR utilities. Specifically, use the emrconf command to fetch remote configuration files.

If you encounter errors executing SQL queries on a remote cluster (Amazon AWS, Windows Azure HDInsight, etc.) try the following workaround:

$ export HADOOP_USER_NAME=hadoop

That should resolve security issues that may be causing failures on the remote cluster.

Now let’s try using the Lingual SQL command shell. The following example is based on data from the MySQL Sample Employee Database:

$ mkdir -p ~/src/lingual
$ cd ~/src/lingual
$ curl http://data.cascading.org/employees.tgz | tar xvz

That creates an employees subdirectory for the table data, which is essentially several large CSV files. Next, load the schema for these tables into Lingual using SQL data definitions:

$ curl http://data.cascading.org/create-employees.sh > create-employees.sh
$ chmod +x ./create-employees.sh
$ ./create-employees.sh local

Now try the SQL command line, querying to show a relational catalog for these tables:

$ lingual shell
0: jdbc:lingual:local> !tables

That lists metadata about the available tables: EMPLOYEE, TITLES, SALARIES. Next, let’s try a simple query:

0: jdbc:lingual:local> SELECT * FROM EMPLOYEES.EMPLOYEES WHERE FIRST_NAME = 'Gina';

The result set should show records for a whole bunch of people named Gina.

An interesting use case for the Lingual SQL command shell is in organizations that use Hadoop for large-scale data products, which are not using SQL already. For example, consider the case where an Engineering team is building machine learning apps in Cascalog…then Customer Support comes along with an interrupt task to pull the data for a particular customer ID. Rather than derail Engineering with Support interrupts, it makes sense to expose a view of the data through standard tools with ANSI SQL and JDBC connections—these will already be familiar to the people working in Support, Finance, Ops, etc.

Connect to a directory on your computer where you have a few gigabytes of available disk space, and then clone the source code repo from GitHub:

$ git clone git://github.com/Cascading/lingual.git

Once that completes, connect into the lingual directory, then into the lingual-local subdirectory. Next build the Lingual JDBC connector to run locally:

$ gradle clean fatjar

Then connect into the ../lingual-examples subdirectory and take a look at the src/main/java/cascading/lingual/examples/foodmart/JdbcExample.java app. Java source used to execute SQL queries through a Lingual JDBC connection is much the same as with any other JDBC driver:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class JdbcExample
  {
  public static void main( String[] args ) throws Exception
    {
    new JdbcExample().run();
    }

  public void run() throws ClassNotFoundException, SQLException
    {
    Class.forName( "cascading.lingual.jdbc.Driver" );
    Connection connection = DriverManager.getConnection(
       "jdbc:lingual:local;schemas=src/main/resources/data/example" );
    Statement statement = connection.createStatement();

    ResultSet resultSet = statement.executeQuery(
        "select *\n"
          + "from \"example\".\"sales_fact_1997\" as s\n"
          + "join \"example\".\"employee\" as e\n"
          + "on e.\"EMPID\" = s.\"CUST_ID\"" );

    while( resultSet.next() )
      {
      int n = resultSet.getMetaData().getColumnCount();
      StringBuilder builder = new StringBuilder();

      for( int i = 1; i <= n; i++ )
        {
        builder.append(( i > 1 ? "; " : "" )
            + resultSet.getMetaData().getColumnLabel( i )
            + "="
            + resultSet.getObject( i ) );
        }

     System.out.println( builder );
      }

    resultSet.close();
    statement.close();
    connection.close();
    }
  }

In this example, the table schema gets derived directly from the headers of the CSV files. In other words, point a JDBC connection at a directory of flat files and query them as tables—as if they had already been loaded into a SQL database—without needing the database.

To build and run the JDBC example:

$ gradle clean jar
$ hadoop jar build/libs/lingual-examples–1.0.0-wip-dev.jar

This sample app uses Lingual to open a JDBC connection and run the following SQL query:

SELECT *
  FROM "example"."sales_fact_1997" AS s
  JOIN "example"."employee" AS e
    ON e."EMPID" = s."CUST_ID"
;

Keep in mind that the quote marks are important, and table names are case-sensitive on some operating systems (this is due to Java).

The query runs on example data in the src/main/resources/data/example/ subdirectory in the CSV files there. Query results should look like this:

CUST_ID=100; PROD_ID=10; EMPID=100; NAME=Bill
CUST_ID=150; PROD_ID=20; EMPID=150; NAME=Sebastian

It’s interesting to consider how the code would look in an equivalent Cascading app:

Tap empTap =
 new FileTap(new TextDelimited(true, ",", "\""), "src/test/data/employee.txt");
Tap salesTap =
 new FileTap(new TextDelimited(true, ",", "\""), "src/test/data/salesfact.txt");

Tap resultsTap =
 new FileTap(new TextDelimited(true, ",", "\""), "build/test/output/results.txt",
 SinkMode.REPLACE);

Pipe empPipe = new Pipe("emp");
Pipe salesPipe = new Pipe("sales");

Pipe join =
 new CoGroup(empPipe, new Fields("empid"), salesPipe, new Fields("cust_id"));

FlowDef flowDef = flowDef()
  .setName("flow")
  .addSource(empPipe, empTap)
  .addSource(salesPipe, salesTap)
  .addTailSink(join, resultsTap);

Flow flow = new LocalFlowConnector().connect(flowDef);
flow.start();

TupleEntryIterator iterator = resultTap.openForRead();

Arguably, that code is more compact than the JDBC use case. Even so, Lingual allows for Cascading apps that read SQL queries as flat files, as command-line options—which can leverage a great number of existing ANSI SQL queries.

By virtue of having a JDBC connector into Cascading workflows on Apache Hadoop clusters, we can leverage many existing SQL tools. For example, Toad is a popular tool for interacting with SQL frameworks. RStudio (shown in Figure 6-4) is a popular IDE for statistical computing in R, which can import data through JDBC.

The following example is based on the RJDBC package for R, assuming that the MySQL Sample Employee Database has been downloaded as described previously. This illustrates a common use case for embedded SQL queries leveraging unstructured data, i.e., the long tail of machine-to-machine communications:

# JDBC support in R is provided by the RJDBC package http://www.rforge.net/RJDBC/
# install the RJDBC package; only needed once--uncomment next line the first time
#install.packages("RJDBC", dep=TRUE)

# load the library
library(RJDBC)

# set up the driver
drv <- JDBC("cascading.lingual.jdbc.Driver",
  "~/src/concur/lingual/lingual-local/build/libs/
  lingual-local-1.0.0-wip-dev-jdbc.jar")

# set up a database connection to a local repository
connection <- dbConnect(drv,
  "jdbc:lingual:local;catalog=~/src/concur/lingual/lingual-examples/
  tables;schema=EMPLOYEES")

# query the repository
df <- dbGetQuery(connection,
  "SELECT * FROM EMPLOYEES.EMPLOYEES WHERE FIRST_NAME = 'Gina'")
head(df)

# use R functions to summarize and visualize part of the data
df$hire_age <- as.integer(as.Date(df$HIRE_DATE) - as.Date(df$BIRTH_DATE)) / 365.25
summary(df$hire_age)

# uncomment next line the first time
#install.packages("ggplot2")
library(ggplot2)

m <- ggplot(df, aes(x=hire_age))
m <- m + ggtitle("Age at hire, people named Gina")
m + geom_histogram(binwidth=1, aes(y=..density.., fill=..count..)) + geom_density()

That R script first sets up a JDBC connection in Lingual. Then it runs the same query we used in the SQL command shell to list records for employees named Gina. Next, the script calculates age (in years) at time of hire for employees in the SQL result set. Then it calculates summary statistics and visualizes the age distribution, shown in Figure 6-5:

> summary(df$hire_age)
   Min. 1st Qu.  Median    Mean 3rd Qu.    Max.
  20.86   27.89   31.70   31.61   35.01   43.92

This shows how a very large data set could be queried to produce a sample, then analyzed—all based on R, JDBC, and SQL. Under the hood, Cascading and Apache Hadoop are doing the heavy lifting to run those queries at scale. Meanwhile, the users, analysts, and data scientists work with familiar tools and languages. That’s a subtle yet powerful capability of Lingual.

Pattern is an extension to Cascading that translates Predictive Model Markup Language (PMML) into Cascading apps. This open source project is a collaboration between developers at Cascading and other firms, to get coverage for several popular machine learning algorithms.

PMML is an established XML standard, since 1997, developed by a consortium called Data Modeling Group. Many vendors for analytics frameworks support exporting models as PMML: SAS, IBM SPSS, Microstrategy, Oracle, etc. Also, many popular open source tools support PMML export: R, Weka, KNIME, RapidMiner, etc.

The XML captures the parameters of a model, plus metadata for defining it as a workflow. That’s the point of Pattern: develop models on popular analytics frameworks, then deploy them within Cascading workflows. Benefits include greatly reduced development costs and fewer licensing issues at scale; leveraging the economics of Apache Hadoop clusters, plus the core competencies of analytics staff, plus existing IP in predictive models.

Organizations also like to use PMML for this work because several different models can be trained, and then the resulting PMML gets tagged and archived in version control. Experiments can be evaluated with A/B testing, multi-armed bandit, etc.; however, the source code does not have to change as the models evolve.

Initially, the focus of the Pattern project was entirely on model scoring:

  1. Create a predictive model in an analytics framework.
  2. Export the model as PMML.
  3. Use Pattern to translate the PMML description into a parallelized algorithm, as a Cascading subassembly.
  4. Run the model in parallel at scale on a Hadoop cluster.

More recently the project has begun work on model creation, where models get trained at scale using Hadoop clusters and saved as PMML. Training at scale can leverage other libraries based on Cascading, such as the Matrix API for Scalding. Then the model can be run at scale using the model scoring features.

Of course there are many commercial analytics frameworks used for predictive modeling. Popular tools include SAS, SAP’s Hana, Oracle’s Exalytics, Microstrategy, Microsoft SQL Server, Teradata, plus a variety of offerings from IBM such as SPSS. What these products all share is that they are expensive to license for large-scale apps.

There are Java translators for SAS such as Carolina. Enterprise organizations typically look to migrate analytics workloads off of licensed frameworks and onto Hadoop clusters because of the potential for enormous cost savings. However, that migration implies the cost of rewriting and validating models in Java, Hive, Pig, etc.

In terms of Hadoop specifically, there are very good machine learning libraries available—such as Apache Mahout or the Mallet toolkit from UMass. However, these are tightly coupled to Apache Hadoop. They are not designed to integrate with other data frameworks and topologies, let alone leverage the Cascading flow planner.

Pattern implements large-scale, distributed algorithms in the context of Cascading as a pattern language:

  • In contrast with R, it emphasizes test-driven development (TDD) at scale, with more standardized failure modes.
  • In contrast with SAS, it is open sourced under an Apache ASL 2.0 license, and its algorithms run efficiently in parallel on large-scale clusters.
  • In contrast with Mahout, it implements predictive models that can leverage resources beyond Hadoop while complying with best practices for Enterprise IT.

Pattern comes with a predefined app, which you can use to run PMML models at scale without having to write any code. A conceptual flow diagram for this app is shown in Figure 6-6, based on the Java source in the src/main/java/pattern/pmml/Main.java file.

Let’s create a model in R, then export it as PMML, and run that model on Hadoop. The following example uses a well-known public domain data set called Iris, which is based on a 1936 botanical study of three species of Iris flower. Look in data/iris.rf.tsv for an example of this data:

sepal_length sepal_width petal_length petal_width species predict
5.1 3.5 1.4 0.2 setosa  setosa
4.9 3.0 1.4 0.2 setosa  setosa
5.6 2.5 3.9 1.1 versicolor  versicolor
5.9 3.2 4.8 1.8 versicolor  virginica
6.3 3.3 6.0 2.5 virginica   virginica
4.9 2.5 4.5 1.7 virginica   versicolor

Next, we’ll create a predictive model using a machine learning algorithm called Random Forest (RF). Random Forest is an ensemble learning method based on using a statistical technique called “bagging” with decision trees. The general idea is that one decision tree is probably never enough to capture the possible variations in a large data set. Instead, we create a collection of decision trees to help explain the various edge cases while avoiding overfitting.

In this example, the RF model uses flower measurements such as petal length to predict the iris species. The Iris data set is particularly interesting in statistics because it is provably impossible to predict all the edge cases correctly using simple linear regression methods. That presents an excellent use case for RF. The algorithm gets used widely for this reason in domains that have lots of important edge cases: for example, in finance for anti-fraud detection, and in astrophysics for detecting cosmological anomalies.

Take a look at the source code in examples/r/pmml_models.R, in particular the section that handles RF modeling. Here is an R script for just that model, based on the Random Forest implementation in R:

install.packages("pmml")
install.packages("randomForest")
library(pmml)
library(randomForest)
require(graphics)

## split data into test and train sets
data(iris)
iris_full <- iris
colnames(iris_full) <-
 c("sepal_length", "sepal_width", "petal_length", "petal_width", "species")

idx <- sample(150, 100)
iris_train <- iris_full[idx,]
iris_test <- iris_full[-idx,]

## train a Random Forest model
f <- as.formula("as.factor(species) ~ .")
fit <- randomForest(f, data=iris_train, proximity=TRUE, ntree=50)

## report the measures of model fitness
print(fit$importance)
print(fit)
print(table(iris_test$species, predict(fit, iris_test, type="class")))

## visualize results
plot(fit, log="y", main="Random Forest")
varImpPlot(fit)
MDSplot(fit, iris_full$species)

## export PMML + test data
out <- iris_full
out$predict <- predict(fit, out, type="class")

dat_folder <- './data'
tsv <- paste(dat_folder, "iris.rf.tsv", sep="/")
write.table(out, file=tsv, quote=FALSE, sep="\t", row.names=FALSE)
saveXML(pmml(fit), file=paste(dat_folder, "iris.rf.xml", sep="/"))

The R script loads the required packages, along with the Iris data set. It splits the data set into two parts: iris_train and iris_test. Then it trains a Random Forest model using the iris_train part, using the petal and sepal measures to predict species.

The results of this model creation get evaluated and visualized in a few different ways. First we have a few printed reports about the fitness of the model. One well-known aspect of the Iris data set is that the “setosa” species is relatively easy to predict, whereas the other two species have overlap, which confuses predictive models. We see that in the results, but overall there is an estimated 5% error rate:

        OOB estimate of  error rate: 5%
Confusion matrix:
           setosa versicolor virginica class.error
setosa         32          0         0  0.00000000
versicolor      0         26         2  0.07142857
virginica       0          3        37  0.07500000

The chart in Figure 6-7 shows error rate versus the number of trees. One of the parameters for training an RF model is to select the number of trees in the forest. As that parameter approaches 50 trees, decrease in error levels out.

The chart in Figure 6-8 shows the mean decrease in the Gini ratio for each independent variable used in the model. In this case, petal_width is the best predictor.

The chart in Figure 6-9 shows a multidimensional scaling (MDS) plot for the proximity matrix.

The plot shows the principal components of the distance matrix—points that are close together represent data points that are similar to each other. This is one way of showing outliers that haven’t been handled well by the model. Again, we know that the “setosa” species clusters tightly, whereas “versicolor” and “virginica” tend to overlap.

The remainder of the R script writes the data with a column added to represent the expected results from the model for us to use in regression testing. Then it writes the PMML file to capture the model. Take a look at the resulting XML definitions in the data/iris.rf.xml file:

 <MiningModel
   modelName="randomForest_Model"
   functionName="classification"
   >
  <MiningSchema>
   <MiningField name="species" usageType="predicted"/>
   <MiningField name="sepal_length" usageType="active"/>
   <MiningField name="sepal_width" usageType="active"/>
   <MiningField name="petal_length" usageType="active"/>
   <MiningField name="petal_width" usageType="active"/>
  </MiningSchema>
...

Now that we have a PMML model, let’s use Pattern to run it. We’ll run a regression test to confirm that the results predicted on Hadoop match those predicted in R as a baseline. Then we’ll calculate a confusion matrix to evaluate the error rates in the model. Again, a log of a successful run is given in a GitHub gist to compare:

$ rm -rf out
$ hadoop jar build/libs/pattern-examples-*.jar \
 data/iris.rf.tsv out/classify out/trap \
 --pmml data/iris.rf.xml \
 --assert \
 --measure out/measure

First, we clear the out directory used for output files, because Hadoop will check for it and fail the app rather than overwrite data. We specify the input data source data/iris.rf.tsv, output data sink out/classify/*, and also out/trap as a trap sink. The latter is used for catching bad input data. The --pmml data/iris.rf.xml command-line argument specifies our PMML model.

Note that we add --assert and --measure as optional command-line arguments. For each tuple in the data, a stream assertion tests whether the predicted field matches the score field generated by the model in R. Tuples that fail that assertion get trapped into out/trap/part* for inspection later. Also, a confusion matrix gets written to out/measure/part* output, based on species as the predicted field. We measure the performance of the predictive model, counting how many false positives or false negatives result.

The output shows that model had a 100% success rate for the regression test. If there had been any difference between the Pattern results and the R results, Cascading stream assertions would have rejected those output tuples and shown exceptions in the console log:

$ head out/classify/part-00000
sepal_length  sepal_width   petal_length    petal_width species predict score
5.1 3.5 1.4 0.2 setosa  setosa  setosa
4.9 3   1.4 0.2 setosa  setosa  setosa
4.7 3.2 1.3 0.2 setosa  setosa  setosa
4.6 3.1 1.5 0.2 setosa  setosa  setosa
5   3.6 1.4 0.2 setosa  setosa  setosa
5.4 3.9 1.7 0.4 setosa  setosa  setosa
4.6 3.4 1.4 0.3 setosa  setosa  setosa
5   3.4 1.5 0.2 setosa  setosa  setosa
4.4 2.9 1.4 0.2 setosa  setosa  setosa
$
$ head out/measure/part-00000
species    score      count
setosa     setosa     50
versicolor versicolor 48
versicolor virginica  2
virginica  versicolor 1
virginica  virginica  49

As expected, there is approximately 5% error overall. The setosa species gets predicted correctly, whereas the other two species have some overlap.

Let’s take a look at how to incorporate Pattern into a Cascading app. This requires only two additional lines of source code. The following shows a minimal Cascading app that uses Pattern, starting with the set up for a Main.java class:

public class Main {
  public static void main( String[] args ) {
    String pmmlPath = args[ 0 ];
    String inputPath = args[ 1 ];
    String classifyPath = args[ 2 ];
    String trapPath = args[ 3 ];

    Properties properties = new Properties();
    AppProps.setApplicationJarClass( properties, Main.class );
    HadoopFlowConnector flowConnector =
     new HadoopFlowConnector( properties );

Next, we define three Cascading taps for input, output, and trap:

Tap inputTap =
  new Hfs( new TextDelimited( true, "\t" ), inputPath );
Tap classifyTap =
  new Hfs( new TextDelimited( true, "\t" ), classifyPath );
Tap trapTap =
  new Hfs( new TextDelimited( true, "\t" ), trapPath );

Then we use the PMMLPlanner in Pattern to parse the predictive model and build a SubAssembly. The PMML file is referenced as a command-line argument called pmmlPath in the following code:

PMMLPlanner pmmlPlanner = new PMMLPlanner()
  .setPMMLInput( new File( pmmlPath ) )
  .retainOnlyActiveIncomingFields()
  .setDefaultPredictedField( new Fields( "predict", Double.class ) );

flowDef.addAssemblyPlanner( pmmlPlanner );

Those are the only lines required for Pattern, other than its package import. In Cascalog or Scalding, this would require even less code.

Finally, we call the flow planner to create a physical plan and then submit the job to Hadoop:

Flow classifyFlow = flowConnector.connect( flowDef );
classifyFlow.writeDOT( "dot/classify.dot" );
classifyFlow.complete();

There has been much interest in leveraging Pattern, Cascading, and Apache Hadoop to run customer experiments at scale. The idea is to generate multiple variants of a predictive model, each exported as PMML. Then run these models on a Hadoop cluster with large-scale customer data. Finally, use analysis of the confusion matrix results to measure the relative lift among models.

To show an example, first we need some data to use for an experiment. The code on GitHub includes a Python script to generate sample data sets. Take a look at the examples/py/gen_orders.py file. That script can be used to create a relatively large data set (e.g., terabyte scale) for training and evaluating the PMML models on a Hadoop cluster:

#!/usr/bin/env python
# encoding: utf-8
import random
import sys
import uuid

CUSTOMER_SEGMENTS = (
    [0.2, ["0", random.gauss, 0.25, 0.75, "%0.2f"]],
    [0.8, ["0", random.gauss, 1.5, 0.25, "%0.2f"]],
    [0.9, ["1", random.gauss, 0.6, 0.2, "%0.2f"]],
    [1.0, ["1", random.gauss, 0.75, 0.2, "%0.2f"]]
)

def gen_row (segments, num_col):
    coin_flip = random.random()

    for prob, rand_var in segments:
        if coin_flip <= prob:
            (label, dist, mean, sigma, f) = rand_var
            order_id = str(uuid.uuid1()).split("-")[0]
            d = dist(mean, sigma)
            m = map(lambda x: f % d, range(0, num_col))
            return [label] + m + [order_id]

if __name__ == '__main__':
    num_row = int(sys.argv[1])
    num_col = int(sys.argv[2])

    m = map(lambda x: "v" + str(x), range(0, num_col))
    print "\t".join(["label"] + m + ["order_id"])

    for i in range(0, num_row):
        print "\t".join(gen_row(CUSTOMER_SEGMENTS, num_col))

We run this script with command-line arguments to specify the number of rows and columns. For example, the following creates 1,000 rows with 50 independent variables each:

./examples/py/gen_orders.py 50 1000

A small example is given in the data/sample.tsv file:

label   var0    var1    var2    order_id        predict
1       0       1       0       6f8e1014        1
0       0       0       1       6f8ea22e        0
1       0       1       0       6f8ea435        1
...

Next, we use this data to create a model based on Random Forest—like in the earlier example. The label dependent variable gets predicted based on var0, var1, and var2 as independent variables:

## train a Random Forest model
## example: http://mkseo.pe.kr/stats/?p=220

f <- as.formula("as.factor(label) ~ var0 + var1 + var2")
fit <- randomForest(f, data=data, proximity=TRUE, ntree=25)
print(fit)
saveXML(pmml(fit), file="sample.rf.xml")

Output from R shows an estimated 14% error rate for this model:

        OOB estimate of  error rate: 14%
Confusion matrix:
   0   1 class.error
0 69  16   0.1882353
1 12 103   0.1043478

Next, we use the same data to train a model based on a different algorithm, Logistic Regression. To help illustrate experiment results later, one of the independent variables var1 is omitted from the model:

## train a Logistic Regression model (special case of GLM)
## example: http://www.stat.cmu.edu/~cshalizi/490/clustering/clustering01.r

f <- as.formula("as.factor(label) ~ var0 + var2")
fit <- glm(f, family=binomial, data=data)
print(summary(fit))
saveXML(pmml(fit), file="sample.lr.xml")

Now we can use the predefined app in Pattern to run both models and collect their confusion matrix results:

$ rm -rf out
$ hadoop jar build/libs/pattern-examples-*.jar \
 data/sample.tsv out/classify.rf out/trap \
 --pmml sample.rf.xml --measure out/measure
$ mv out/classify.rf .

$ rm -rf out
$ hadoop jar build/libs/pattern-examples-*.jar \
 data/sample.tsv out/classify.lr out/trap \
 --pmml sample.lr.xml --measure out/measure
$ mv out/classify.lr .

It would be reasonably simple to build a Cascading app to do the comparisons between models, i.e., a framework for customer experiments. That would be especially useful if there were a large number of models to compare. In this case, we can compare results using a spreadsheet as shown in Figure 6-10.

The model based on Logistic Regression has a lower rate (5% versus 11%) for false negatives (FN). However, that model has a much higher rate (52% versus 14%) for false positives (FP).

Let’s put this into terms that decision makers use in business to determine which model is better. For example, in the case of an anti-fraud classifier used in ecommerce, we can assign a cost function to select a winner of the experiment. On one hand, a higher rate of false negatives implies that more fraudulent orders fail to get flagged for review. Ultimately that results in a higher rate of chargeback fines from the bank, and punitive actions by the credit card processor if that rate goes too high for too long. So the FN rate is proportional to chargeback risk in ecommerce. On the other hand, a higher rate of false positives implies that more legitimate orders get flagged for review. Ultimately that results in more complaints from actual customers, and higher costs for customer support. So the FP rate is proportional to support costs in ecommerce.

Evaluating this experiment, the Logistic Regression model—which had a variable omitted to exaggerate the comparison—resulted in approximately half the FN rate, compared with the Random Forest model. However, it also resulted in quadrupled costs for customer support. A decision maker can use those cost trade-offs to select the appropriate model for the business needs.

One important issue to keep in mind about analytics frameworks is that it tends to be expensive or impossible to run models at scale. Running multiple models, such as for extensive customer experiments, compounds that problem. By using Pattern, Cascading, and Apache Hadoop, organizations can now scale their experiments, adding more science to the practice of data-driven business.

As of version 4.01, PMML supports quite a number of different families of predictive models:

The structure of a PMML document is quite interesting in the context of Cascading. The input variables are defined in a metadata section. It’s possible to define some forms of preprocessing and post-processing. Models can be combined into ensembles, such as how Random Forest is an ensemble of decision trees. That has become a powerful strategy in commercial applications of machine learning. Moreover, PMML combines these definitions into an expression of business process for a complex data workflow. Overall, that maps to Cascading quite closely—input and output variables in PMML correspond to tuple flows, with the Cascading flow planners providing parallelization for predictive model algorithms on Hadoop clusters.

Currently there are several companies collaborating on the Pattern project. Besides the Random Forest and Logistic Regression algorithms, other PMML implementations include the following:

Linear regression is probably the most common form of predictive model, such as in Microsoft Excel spreadsheets. K-means is widely used for customer segmentation, document search, and other kinds of predictive models.

Other good PMML resources include the following: