mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject [CONF] Apache Mahout > Mahout on Elastic MapReduce
Date Tue, 08 Mar 2011 05:41:00 GMT
Space: Apache Mahout (
Page: Mahout on Elastic MapReduce (

Change Comment:
Fixed a link to EMR doco

Edited by Mat Kelcey:
h1. Introduction

This page details the set of steps that was necessary to get an example of k-Means clustering
running on Amazon's [Elastic MapReduce|] (EMR). 

Note: Some of this work is due in part to credits donated by Amazon Web Services Apache Projects
Testing Program.

h1. Getting Started

   * Get yourself an EMR account.  If you're already using EC2, then you can do this from
[Amazon's AWS Managment Console|], which has a tab for running
   * Get the [ElasticFox|] and [S3Fox|]
Firefox extensions.  These will make it easy to monitor running EMR instances, upload code
and data, and download results.
   * Download the [Ruby command line client for EMR|].
 You can do things from the GUI, but when you're in the midst of trying to get something running,
the CLI client will make life a lot easier.
   * Have a look at [Common Problems Running Job Flows|]
and [Developing and Debugging Job Flows|]
in the EMR forum at Amazon.  They were tremendously useful.
   * Make sure that you're up to date with the Mahout source.  The fix for [Issue 118|]
is required to get things running when you're sending output to an S3 bucket.
   * Build the Mahout core and examples.

Note that the Hadoop that's running on EMR is version of Hadoop 0.20.0.  The EMR GUI in the
AWS Management Console provides a number of examples of using EMR, and you might want to try
running one of these to get started.

One big gotcha that I discovered is that the S3N file system for Hadoop has a couple of weird
cases that boil down to the following advice:  if you're naming a directory in an s3n URI,
make sure that it ends in a slash and you should not try to use a top-level S3 bucket name
as the place where your Mahout output will be going, you should always include a subdirectory.

h1. Uploading Code and Data

I decided that I would use separate S3 buckets for the Mahout code, the input for the clustering
(I used the synthetic control data, you can find it easily from the [Quickstart] page), and
the output of the clustering.  

You will need to upload:
# The Mahout Job jar.  For the example here, we are using {{mahout-core-0.4-SNAPSHOT.job}}
# The data.  In this example, we uploaded two files: dictionary.txt and part-out.vec.  The
latter is the main vector file and the former is the dictionary that maps words to columns.
 It was created by converting a Lucene index to Mahout vectors.

h1. Running k-means Clustering

EMR offers two modes for running MapReduce jobs.  The first is a "streaming" mode where you
provide the source for single-step mapper and reducer functions (you can use languages other
than Java for this).  The second mode is called "Custom Jar" and it gives you full control
over the job steps that will run.  This is the mode that we need to use to run Mahout.  

In order to run in Custom Jar mode, you need to look at the example that you want to run and
figure out the arguments that you need to provide to the job.  Essentially, you need to know
the command line that you would give to bin/hadoop in order to run the job, including whatever
parameters the job needs to run.  

h2. Using the GUI

The EMR GUI is an easy way to start up a Custom Jar run, but it doesn't have the full functionality
of the CLI.  Basically, you tell the GUI where in S3 the jar file is using a Hadoop s3n URI
like {{s3n://PATH/mahout-core-0.4-SNAPSHOT.job}}.  The GUI will check and make sure that the
given file exists, which is a nice sanity check.  You can then provide the arguments for the
job just as you would on the command line.  The arguments for the k-means job that were as

org.apache.mahout.clustering.kmeans.KMeansDriver --input s3n://news-vecs/part-out.vec --clusters
s3n://news-vecs/kmeans/clusters-9-11/ -k 10 --output s3n://news-vecs/out-9-11/ --distanceMeasure
org.apache.mahout.common.distance.CosineDistanceMeasure --convergenceDelta 0.001 --overwrite
--maxIter 50 --clustering

TODO: Screenshot

The main failing with the GUI mode is that you can only specify a single job to run, and you
can't run another job in the same set of instances.  Recall that on AWS you pay for partial
hours at the hourly rate, so if your job fails in the first 10 seconds, you pay for the full
hour and if you try again, you're going to paying for another hour.

Because of this, using a command line interface (CLI) is strongly recommended.

h2. Using the CLI

If you're in development mode, and trying things out, EMR allows you to set up a set of instances
and leave them running.  Once you've done this, you can add job steps to the set of instances
as you like.  This solves the "10 second failure" problem that I described above and lets
you get full value for your EMR dollar.  Amazon has pretty good [documentation for the CLI|],
which you'll need to read to figure out how to do things like set up your AWS credentials
for the EMR CLI.

You can start up a job flow that will keep running using an invocation like the following:

./elastic-mapreduce --create --alive \
   --log-uri s3n://PATH_FOR_LOGS/ --key_pair YOUR_KEY \
   --num-instances 2 --name NAME_HERE

Fill in the name, key pair and path for logs as appropriate. This call returns the name of
the job flow, and you'll need that for subsequent calls to add steps to the job flow. You
can, however, retrieve it at any time by calling:
./elastic-mapreduce --list

Let's list our job flows:

[stgreen@dhcp-ubur02-74-153 14:16:15 emr]$ ./elastic-mapreduce --list
j-3JB4UF7CQQ025     WAITING    kmeans

At this point, everything's started up, and it's waiting for us to add a step to the job.
 When we started the job flow, we specified a key pair that we created earlier so that we
can log into the master while the job flow is running:

 elastic-mapreduce --ssh -j j-3JB4UF7CQQ025

Let's add a step to run a job:

 elastic-mapreduce -j j-3JB4UF7CQQ025  --jar s3n://PATH/mahout-core-0.4-SNAPSHOT.job  --main-class
org.apache.mahout.clustering.kmeans.KMeansDriver --arg --input --arg s3n://PATH/part-out.vec
--arg --clusters --arg s3n://PATH/kmeans/clusters/ --arg -k --arg 10 --arg --output --arg
s3n://PATH/out-9-11/ --arg --distanceMeasure --arg  org.apache.mahout.common.distance.CosineDistanceMeasure
--arg --convergenceDelta --arg 0.001 --arg --overwrite --arg --maxIter --arg 50 --arg --clustering

When you do this, the job flow goes into the {{RUNNING}} state for a while and then returns
to {{WAITING}} once the step has finished.  You can use the CLI or the GUI to monitor the
step while it runs.  Once you've finished with your job flow, you can shut it down the following

./elastic-mapreduce -j j-3JB4UF7CQQ025 --terminate

and go look in your S3 buckets to find your output and logs.

h1. Troubleshooting

The primary means for understanding what went wrong is via the logs and stderr/stdout.  When
running on EMR, stderr and stdout are captured to files in your log directories.  Additionally,
logging is setup to write out to a file called syslog.  To view these in the AWS Console,
go to your logs directory, then the folder with the same JobFlow id as above (j-3JB4UF7CQQ025),
then the steps folder and then the appropriate step number (usually 1 for this case).

That is, go to the folder s3n://PATH_TO_LOGS/j-3JB4UF7CQQ025/steps/1.  In this directory,
you will find stdout, stderr, syslog and potentially a few other logs. 

See [resulting thread|]
for some early user experience with Mahout on EMR

h2. Building Vectors for Large Document Sets

Use the following steps as a guide to using Elastic MapReduce (EMR) to create sparse vectors
needed for running Mahout clustering algorithms on large document sets. This section evolved
from benchmarking Mahout's clustering algorithms using a large document set. Specifically,
we used the ASF mail archives that have been parsed and converted to the Hadoop SequenceFile
format (block-compressed) and saved to a public S3 folder: {{s3://asf-mail-archives/mahout-0.4/sequence-files}}.
Overall, there are 6,094,444 key-value pairs in 283 files taking around 5.7GB of disk.

h4. 1. Setup elastic-mapreduce-ruby

As discussed previously, make sure you install the *elastic-mapreduce-ruby* tool. On Debian-based
Linux like Ubuntu, use the following commands to install elastic-mapreduce-ruby's dependencies:

apt-get install ruby1.8
apt-get install libopenssl-ruby1.8
apt-get install libruby1.8-extras

Once these dependencies are installed, download and extract the elastic-mapreduce-ruby application.
We use {{/mnt/dev}} as the base working directory because this process was originally conducted
on an EC2 instance; be sure to replace this path with the correct path for your environment
as you work through these steps.

mkdir -p /mnt/dev/elastic-mapreduce /mnt/dev/downloads
cd /mnt/dev/downloads
cd /mnt/dev/elastic-mapreduce
unzip /mnt/dev/downloads/

Please refer to [Amazon Elastic MapReduce Ruby Client|]
for a detailed explanation, but to get running quickly, all you need to do is create a file
named {{credentials.json}} in the elastic-mapreduce directory, such as {{/mnt/dev/elastic-mapreduce/credentials.json}}.
The credentials.json should contain the following information (change to match your environment):

  "access-id": "YOUR_ACCESS_KEY",
  "private-key": "YOUR_SECRET_KEY", 
  "key-pair": "gsg-keypair", 
  "key-pair-file": "/mnt/dev/aws/gsg-keypair.pem", 
  "region": "us-east-1", 
  "log-uri": "s3n://BUCKET/asf-mail-archives/logs/"
If you are confused about any of these parameters, please read: [Understanding Access Credentials
for AWS/EC2|]. Also, it's a good idea to add the
elastic-mapreduce directory to your PATH. To verify it is working correctly, simply do:

elastic-mapreduce --list

h4. 2. Setup s3cmd and Create a Bucket

It's also beneficial when working with EMR and S3 to install [s3cmd|],
which helps you interact with S3 using easy to understand command-line options. To install
on Ubuntu, simply do:

sudo apt-get install s3cmd

Once installed, configure s3cmd by doing:

s3cmd --configure

If you don't have an S3 bucket to work with, then please create one using:

s3cmd mb s3://BUCKET

Replace this bucket name in the remaining steps whenever you see {{s3://BUCKET}} in the steps

h4. 3. Launch EMR Cluster

Once elastic-mapreduce is installed, start a cluster with no jobflow steps:

elastic-mapreduce --create --alive \
  --log-uri s3n://BUCKET/emr/logs/ \
  --key-pair gsg-keypair \
  --slave-instance-type m1.xlarge \
  --master-instance-type m1.xlarge \
  --num-instances # \
  --name mahout-0.4-vectorize

This will create an EMR Job Flow named "mahout-0.4-vectorize" in the US-East region using
EC2 xlarge instances. Take note of the Job ID returned as you will need it to add the "seq2sparse"
step to the Job Flow. It can take a few minutes for the cluster to start; the job flow enters
a "waiting" status when it is ready. We launch the EMR instances in the *us-east-1* region
so that we don't incur data transfer charges to/from US-Standard S3 buckets (credentials.json
=> "region":"us-east-1").

When vectorizing large document sets, you need to distribute processing across as many reducers
as possible. This also helps keep the size of the vector files more manageable. I'll leave
it to you to decide how many instances to allocate, but keep in mind that one will be dedicated
as the master (Hadoop NameNode). Also, it took about 75 minutes to run the seq2sparse job
on 19 xlarge instances when using {{maxNGramSize=2}} (~190 normalized instance hours – not
cheap). I think you'll be safe to use about 10-13 instances and still finish in under 2 hours.
Also, if you are not creating bi-grams, then you won't need as much horse-power; a four node
cluster with 3 reducers per node is sufficient for generating vectors with {{maxNGramSize
= 1}} in less than 30 minutes.

_Tip: Amazon provides a bootstrap action to configure the cluster for running memory intensive
jobs. For more information about this, see: []_

h4. 4. Copy Mahout JAR to S3

The Mahout 0.4 JAR containing a custom Lucene Analyzer ({{org.apache.mahout.text.MailArchivesClusteringAnalyzer}})
is available at:


The source code is available at [MAHOUT-588|].

If you need to use your own Mahout JAR, use s3cmd to copy it to your S3 bucket:

s3cmd put JAR_FILE s3://BUCKET/

h4. 5. Vectorize

Schedule a jobflow step to vectorize (1-grams only) using Mahout's seq2sparse job:

elastic-mapreduce --jar s3://asf-mail-archives/mahout-0.4/mahout-examples-0.4-job-ext.jar
  --main-class org.apache.mahout.driver.MahoutDriver \
  --arg seq2sparse \
  --arg -i --arg s3n://asf-mail-archives/mahout-0.4/sequence-files/ \
  --arg -o --arg /asf-mail-archives/mahout-0.4/vectors/ \
  --arg --weight --arg tfidf \
  --arg --minSupport --arg 500 \
  --arg --maxDFPercent --arg 70 \
  --arg --norm --arg 2 \
  --arg --numReducers --arg # \
  --arg --analyzerName --arg org.apache.mahout.text.MailArchivesClusteringAnalyzer \
  --arg --maxNGramSize --arg 1 \
  -j JOB_ID

You need to determine the correct number of reducers based on the EC2 instance type and size
of your cluster. For xlarge nodes, set the number of reducers to 3 x N (where N is the size
of your EMR cluster not counting the master node). For large instances, 2 reducers per node
is probably safe unless your job is extremely CPU intensive, in which case use only 1 reducer
per node.

Be sure to use Hadoop's *s3n* protocol for the input parameter ({{-i s3n://asf-mail-archives/mahout-0.4/sequence-files/}})
so that Mahout/Hadoop can find the SequenceFiles in S3. Also, notice that we've configured
the job to send output to HDFS instead of S3. This is needed to work-around an issue with
multi-step jobs and EMR (see [MAHOUT-598|]).
Once the job completes, you can copy the results to S3 from the EMR cluster's HDFS using distcp.

The job shown above created 6,076,937 vectors with 20,444 dimensions in around 28 minutes
on a 4+1 node cluster of EC2 xlarge instances. Depending on the number of unique terms, setting
maxNGramSize greater than 1 has a major impact on the execution time of the seq2sparse job.
For example, the same job with maxNGramSize=2 can take up to 2 hours with the bulk of the
time spent creating collocations, see [Collocations|].

To monitor the status of the job, use:

elastic-mapreduce --logs -j JOB_ID

h4. 6. Copy output from HDFS to S3 (optional)

It's a good idea to save the vectors for running future jobs. Of course, if you don't save
the vectors to S3, then they will be lost when you terminate the EMR cluster. There are two
approaches to moving data out of HDFS to S3:

# SSH into the master node to run distcp, or
# Add a jobflow step to run distcp

To login to the master node, use:

elastic-mapreduce --ssh -j JOB_ID

Once logged in, do:

hadoop distcp /asf-mail-archives/mahout-0.4/vectors/ s3n://ACCESS_KEY:SECRET_KEY@BUCKET/asf-mail-archives/mahout-0.4/vectors/

Or, you can just add another job flow step to do it:

elastic-mapreduce --jar s3://elasticmapreduce/samples/distcp/distcp.jar \
  --arg hdfs:///asf-mail-archives/mahout-0.4/vectors/ \
  --arg s3n://ACCESS_KEY:SECRET_KEY@BUCKET/asf-mail-archives/mahout-0.4/vectors/ \
  -j JOB_ID

_Note: You will need all the output from the vectorize step in order to run Mahout's clusterdump._

Once copied, if you would like to share your results with the Mahout community, make the vectors
public in S3 using the Amazon console or s3cmd:

s3cmd setacl --acl-public --recursive s3://BUCKET/asf-mail-archives/mahout-0.4/vectors/

Dump out the size of the vectors:

bin/mahout vectordump --seqFile s3n://ACCESS_KEY:SECRET_KEY@BUCKET/asf-mail-archives/mahout-0.4/vectors/tfidf-vectors/part-r-00000
--sizeOnly | more

h4. 7. k-Means Clustering

Now that you have vectors, you can do some clustering! The following command will create a
new jobflow step to run the k-Means job using the TFIDF vectors produced by seq2sparse:

elastic-mapreduce --jar s3://asf-mail-archives/mahout-0.4/mahout-examples-0.4-job-ext.jar
  --main-class org.apache.mahout.driver.MahoutDriver \
  --arg kmeans \
  --arg -i --arg /asf-mail-archives/mahout-0.4/vectors/tfidf-vectors/ \
  --arg -c --arg /asf-mail-archives/mahout-0.4/initial-clusters/ \
  --arg -o --arg /asf-mail-archives/mahout-0.4/kmeans-clusters \
  --arg -x --arg 10 \
  --arg -cd --arg 0.01 \
  --arg -k --arg 60 \
  --arg --distanceMeasure --arg org.apache.mahout.common.distance.CosineDistanceMeasure \
  -j JOB_ID

Depending on the EC2 instance type and size of your cluster, the k-Means job can take a couple
of hours to complete. The input is the HDFS location of the vectors created by the seq2sparse
job. If you copied the vectors to S3, then you could also use the s3n protocol. However, since
I'm using the same EMR job flow, the vectors are already in HDFS, so there is no need to pull
them from S3.

_Tip: use a convergenceDelta of 0.01 to ensure the clustering job performs more than one iteration._

h4. 8. Shut down your cluster

elastic-mapreduce --terminate -j JOB_ID

Verify the cluster is terminated in your Amazon console.

Change your notification preferences:

View raw message