Living in the age of big data we ask what to do when we have the good fortune to be presented with a huge amount of supervised training data? Most often at large scale we are presented with the un-supervised problems of characterization and information extraction; but some problem domains offer an almost limitless supply of supervised training data (such as using older data to build models that predict the near future). Having too much training data is a good problem to have and there are ways to use traditional methods (like logistic regression) at this scale. We present an “out of core” logistic regression implementation and a quick example in Apache Hadoop running on Amazon Elastic MapReduce. This presentation assumes familiarity with Unix style command lines, Java and Hadoop.
Apache Hadoop already has a machine learning infrastructure named Mahout. While Mahout seems to concentrate more on unsupervised methods (like clustering, nearest neighbor and recommender systems) it does already include a logistic regression package. This package uses a learning method called “Stochastic Gradient Descent”, which is in a sense the perceptron update algorithm updated for the new millennium. This method is fast in most cases but differs from the traditional method of solving a logistic regression which are based on Fisher Scoring or the Newton/Raphston Method (see “Categorical Data Analysis,” Alan Agresti, 1990 and Paul Komarek’s thesis “Logistic Regression for Data Mining and High-Dimensional Classification”). Fisher Scoring remains interesting in that it parallelizes in exactly the manner described in “Map-Reduce for Machine Learning on Multicore,” Cheng-Tao Chu, Sang Kyun Kim, Yi-An Lin, Yuan Yuan Yu, Gary Bradski, Andrew Y Ng, Kunle Olukotoun NIPS 2006.
Stochastic gradient descent is in fact an appropriate method for big data. For example: if our model complexity is held constant and our data set size is allowed to grow; then stochastic gradient descent will achieve its convergence condition before it even completes a single random order traversal of the data. However, stochastic gradient descent has a control called the learning rate and one can easily imagine a series of problems that require the learning rate to be set arbitrarily slow. For example a data set formed as the union of very many “typical” examples where a given variable is independent of the outcome and small minority of “special” examples where the same variable helps influence the outcome presents a problem. Training on the “typical” examples causes the stochastic gradient descent method to perform a random walk on the given variable coefficient. So the learning rate must be slow enough that the expected drift does not swamp out the rare contributions from the “special” examples (meaning the learning rate must slow roughly proportionally to the square root of the ratio of the typical to special examples).
Not too much must be made of artificial problems designed to slow stochastic gradient descent. The traditional Fisher scoring (or the Newton/Raphston method) can simply be killed by specifying a problem with a great number of levels for categorical variables. In this case traditional methods have to solve a linear system that can in fact be much larger than the entire data set (causing representation, work and numeric stability problems). So it takes little imagination to design problems that kill the traditional methods. Other intermediate complexity methods (like conjugate gradient) avoid the storage size problem; but can require a many more passes through the training data.
There is a common situation where Fisher scoring makes good sense: you are trying to fit a relatively simple model to an enormous amount of data (often to predict a rare event). One could sub-sample the training data to shrink the scale of the problem- but this is a case of the analyst being forced to accede to poor tools. What one would naturally want is a training method that can fit reasonable sized models (that is models with a reasonable number of variables and levels) onto enormous data sets. The software package R can work with fairly large data sets (in the gigabytes range) and has some parallel flavors, but R is mostly an in-memory system. It is appropriate to want a direct method that both “works out of core” (i.e. in the terabytes and petabytes ranges), parallelizes to hundreds of machines (using current typical infrastructure- like a Hadoop cluster) and is exact (without additional parameters like learning rate).
We demonstrate here an example implementation in Java for both single machine “out of core” training (allowing filesystem sized datasets) and MapReduce style parallelism (allowing even larger scale). The method also includes the problem regularization steps discussed in our recent logistic regression article. The code (packaged in: WinVectorLogistic.Hadoop0.20.2.jar ) is being distributed under the GNU Affero General Public License version 3. This is an open source license that (roughly) requires (among other things) redistribution of source code of systems linked against the licensed project to anyone receiving a compiled version or using the system as a network service. The license also promises no warranty or implied fitness. The distribution is a standalone runnable Jar (source code and license inside the jar) and is the minimal object required to run on Hadoop (which is itself a Java project). More advanced versions of the library (with better linear algebra libraries, better problem slice control, unit tests, JDBC bindings and with different license arrangements) can be arranged from the code owners: Win-Vector LLC. This jar was built for Apache Hadoop version 0.20.2 (the latest version Amazon Elastic Map Reduce runs at this time) and we use as many of the newer interfaces as possible (so the code will run against the current Hadoop 0.21.0 if re-built against Hadoop 0.21.0, the jar can not switch versions without being re-built due to how Hadoop calls methods).
For our example we will work on a small data set. The code is designed to pass through data directly from disk, storing only the Fisher structures- which require storage proportional to the square of the number of variables and levels but is independent of the number of data rows. The data format is what we call “naive TSV” or “naive tab separated values.” This is a file where each line has exactly the same number of values (separated by tabs) and the first line of the file is the header line naming each column. This is compatible with Microsoft Excel and R with the proviso that this file format does not allow any sort of escapes, quoting or multiple line fields. Our data set is taken from the UCI machine learning database ( data, description ) and converted into the naive TSV format (split into training and testing subsets: uciCarTrain.tsv, uciCarTest.tsv).
The first few lines of the training file are given here:
buying maintenance doors persons lug_boot safety rating vhigh vhigh 2 2 small med FALSE vhigh vhigh 2 2 med low FALSE vhigh vhigh 2 2 med med FALSE
The first experiment is to use the Java program standalone (without Hadoop) to train a model. The method used is Fisher scoring by multiple passes over the data file. Only the Fisher structures are stored in memory- so in principle the data set could be arbitrarily large. To run the logistic training program download the files WinVectorLogistic.Hadoop0.20.2.jar and uciCarTrain.tsv . You will also need some libraries ( commons-logging-*.jar and commons-logging-api-*.jar , and sometimes hadoop-*-core.jar and log4j-*.jar ) from the appropriate Hadoop distribution. Before running the code you can examine the source (and re-build the project using an IDE like Eclipse) by extracting the code in an empty directory using the Java jar command:
jar xvf WinVectorLogistic.Hadoop0.20.2.jar
To run the code type at the command line (all in a single line, we have inserted line breaks for clarity, we are also assuming you are using a Unix style shell on Linux, OSX or Cygwin on Windows):
java -cp WinVectorLogistic.Hadoop0.20.2.jar:commons-logging-1.0.4.jar:commons-logging-api-1.0.4.jar com.winvector.logistic.demo.LogisticTrain file:uciCarTrain.tsv "rating ~ buying + maintenance + doors + persons + lug_boot + safety" model.ser
The portion of interest is the last three arguments:
- file:uciCarTrain.tsv : The URI pointing to the file containing the training data.
- “rating ~ buying + maintenance + doors + persons + lug_boot + safety” : The formula specifying that rating will be predicted as a function of buying, maintenance, doors, persons, lug_boot and safety.
- model.ser : Where to write the Java Serialized model result.
After that we can run the scoring procedure on the held-out test data:
java -cp WinVectorLogistic.Hadoop0.20.2.jar:commons-logging-1.0.4.jar:commons-logging-api-1.0.4.jar com.winvector.logistic.demo.LogisticScore model.ser file:uciCarTest.tsv scored.tsv
In this case the last three arguments are:
- model.ser : Where to read the Java Serialized model from.
- file:uciCarTest.tsv : The URI pointing to the file to make predictions for.
- scored.tsv : Where to write the predictions to.
The first few lines of the result file are:
predict.rating.FALSE predict.rating.TRUE buying maintenance doors persons lug_boot safety rating 0.9999999999999392 6.091299561082107E-14 vhigh vhigh 2 2 small low FALSE 0.9999999824028766 1.759712345446162E-8 vhigh vhigh 2 2 small high FALSE
These lines are just lines from the file uciCarTest.tsv (same format is uciCarTrain.tsv) copied over with the addition of the first two columns that show the modeled probabilities of rating acceptable being FALSE or TRUE. The accuracy of the prediction is computed and written into the runlog if the data had the rating outcomes in it (else we just get a file of predictions- which is the usual application of machine learning).
The details of running the Hadoop versions of the same process depend on the configuration of your Hadoop environment. Just unpacking the 0.20.2 version of Hadoop will let you try the single-machine version of the MapReduce Logistic Regression process (which will be much slower than the standalone Java version). To run the training step the Hadoop command line is as follows (notice this time we do not have to specify the logging jars as they are part of the Hadoop environment):
hadoop-0.20.2/bin/hadoop jar WinVectorLogistic.Hadoop0.20.2.jar logistictrain uciCarTrain.tsv "rating ~ buying + maintinance + doors + persons + lug_boot + safety" model.ser
And the scoring procedure is below:
hadoop-0.20.2/bin/hadoop jar WinVectorLogistic.Hadoop0.20.2.jar logisticscore model.ser uciCarTest.tsv scoredDir
The only operational differences are that the results are written into the file scoredDir/part-r-00000 (as is Hadoop convention) instead of scored.tsv (and an extra “offset” column is also included) and data is handled in Files (to allow Hadoop Paths to be formed) instead of URIs. The Hadoop training and test steps are able to run in this manner because we have constructed WinVectorLogistic.Hadoop0.20.2.jar as an executable jar file with the class com.winvector.logistic.demo.DemoDriver as the class to execute. This class uses that standard org.apache.hadoop.util.ProgramDriver pattern to run our jobs under the org.apache.hadoop.util.Tool interface. This means that the standard Hadoop generic flags for specifying cluster configuration will be respected.
The big benefit of all of this packaging is: if this command is run on a large Hadoop cluster (instead of on a single machine) then the input file could be split up and processed in parallel on many machines. The easiest way to do this is to use Amazon.com’s Amazon Elastic MapReduce. This service (used in conjunction with S3 storage and EC2 virtual machines) allows the immediate remote provisioning and execution on a version 0.20.* Hadoop cluster. To demonstrate this service we created a new S3 Bucket named wvlogistic. Into wvlogistic we copied our jar of our code compiled against Hadoop 0.20.2 APIs ( WinVectorLogistic.Hadoop0.20.2.jar ) and a moderate sized synthetic training data set ( bigProb.tsv, created by running: java -cp WinVectorLogistic.Hadoop0.20.2.jar com.winvector.logistic.demo.BigExample bigProb.tsv ). Once this has been set up (and you have signed up for the Amazon Elastic MapReduce credentials) you can run the training procedure from the Amazon web UI. In five steps (following the direcitons found in Tutorial: How to Create and Debug an Amazon Elastic MapReduce Job Flow ) the job can be configured and launched.
First: press “Crate New Job Flow” and choose a job name, check “Run your own application” and select “Cusom Jar”.
Second: specify the location of the jar in your Bucket and give the command line arguments (prepending S3 paths with “s3n://”).
Third: select the type and number of machine instances you want, run without and EC2 key pair, enable logging and send the log back to your S3 bucket.
Fourth: add the default bootstrap action of configuring the Hadoop cluster.
Fifth: confirm and launch the job.
When the job completes transfer the result ( bigModel.ser ) back to your local system and you have your new map reduced produced logistic model. We can confirm and use the model locally with a Java command similar to our earlier examples:
java -cp WinVectorLogistic.Hadoop0.20.2.jar:commons-logging-1.0.4.jar:commons-logging-api-1.0.4.jar:hadoop-0.20.2-core.jar:log4j-1.2.15.jar com.winvector.logistic.demo.LogisticScore bigModel.ser bigProb.tsv bigScored.tsv
Be aware that at this tens of megabytes scale there is no advantage in running on a Hadoop cluster (versus using the stand-alone program). At moderate scale parallelism may not even be attempted (due to block size) and the costs of data motion can overcome the benefit of parallel scans. The biggest gain is being able to train many models from many gigabytes of data on a single machine without sub-sampling. While we have the ability to build a logistic model at “web scale” (terabytes or petabytes of data) you would not want to use the MapReduce calling pattern until you had a web-scale amount of training data.
The point of this exercise was to take a solid implementation of regularized logistic regression article and use the decomposition into the ” Statistical Query Model” (as suggested in the NIPS paper “Map-Reduce for Machine Learning on Multicore”) to quickly get an intermediate sophistication machine learning method (more sophisticated than Naive Bayes, less sophisticated than Kernelized Support Vector Machines) working at large (beyond RAM) scale. Briefly: most of the technique is in an interface that considers the mis-fit, gradient if mis-fit and hessian of mis-fit as a linear (summable) function over the data. Or in the “book’s worth of preparation so we can write the result in one line” paradigm: all of the machinery we have been discussing is support so the following summable interface (part of the source code we are distributing) can be used to do all of the work:
Of course once you have the framework up that makes one non-trivial task easy you have likely made many other non-trivial tasks easy.
We hope this demonstration and examining the source code in our WinVectorLogistic.Hadoop0.20.2.jar will help you find ways to tackle your large data machine learning problems.
Packages com.winvector.*, extra.winvector.*
Code for performing logistic regression on Hadoop.
Copyright (C) Win Vector LLC 2010 (contact: John Mount email@example.com).
Distributed under GNU Affero General Public License version 3 (2007, see http://www.gnu.org/licenses/agpl.html ).
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, only version 3 of the
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
(Source code in jar, see also http://www.win-vector.com/blog/2010/12/large-data-logistic-regression-with-example-hadoop-code/ )
Note Dec-15-2011: We have moved the code distribution to github.com/WinVector/Logistic . We have fixed some major bugs in the supplied optimizers and moved com.winvector.logistic.LogisticScore and com.winvector.logistic.LogisticTrain form freeform arguments to Apache CLI. The new command lines need flags as shown below:
usage: com.winvector.logistic.LogisticTrain -formula <arg> formula to fit -inmemory if set data is held in memory during training -resultSer <arg> (optional) file to write seriazlized results to -resultTSV <arg> (optional) file to write TSV results to -trainClass <arg> (optional) alternate class to use for training -trainHDL <arg> XML file to get JDBC connection to training data table -trainTBL <arg> table to use from database for training data -trainURI <arg> URI to get training TSV data from
usage: com.winvector.logistic.LogisticScore -dataHDL <arg> XML file to get JDBC connection to scoring data table -dataTBL <arg> table to use from database for scoring data -dataURI <arg> URI to get scoring data from -modelFile <arg> file to read serialized model from -resultFile <arg> file to write results to
Data Scientist and trainer at Win Vector LLC. One of the authors of Practical Data Science with R.