mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1588565 - in /mahout/trunk: core/src/test/java/org/apache/mahout/vectorizer/collocations/llr/ integration/src/main/java/org/apache/mahout/utils/ integration/src/main/java/org/apache/mahout/utils/vectors/ integration/src/test/java/org/apach...
Date Fri, 18 Apr 2014 21:03:09 GMT
Author: ssc
Date: Fri Apr 18 21:03:08 2014
New Revision: 1588565

URL: http://svn.apache.org/r1588565
Log:
MAHOUT-1427 - Convert old .mapred API to new .mapreduce

Modified:
    mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/collocations/llr/CollocMapperTest.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/utils/SplitInputTest.java

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/collocations/llr/CollocMapperTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/collocations/llr/CollocMapperTest.java?rev=1588565&r1=1588564&r2=1588565&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/collocations/llr/CollocMapperTest.java
(original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/vectorizer/collocations/llr/CollocMapperTest.java
Fri Apr 18 21:03:08 2014
@@ -19,7 +19,7 @@ package org.apache.mahout.vectorizer.col
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.mahout.common.MahoutTestCase;

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java?rev=1588565&r1=1588564&r2=1588565&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
(original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
Fri Apr 18 21:03:08 2014
@@ -31,9 +31,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
 import org.apache.mahout.math.list.IntArrayList;
 import org.apache.mahout.math.map.OpenObjectIntHashMap;
@@ -65,7 +65,7 @@ public final class SequenceFileDumper ex
     Path input = getInputPath();
     FileSystem fs = input.getFileSystem(conf);
     if (fs.getFileStatus(input).isDir()) {
-      pathArr = FileUtil.stat2Paths(fs.listStatus(input, new OutputFilesFilter()));
+      pathArr = FileUtil.stat2Paths(fs.listStatus(input, PathFilters.logsCRCFilter()));
     } else {
       pathArr = new Path[1];
       pathArr[0] = input;

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java?rev=1588565&r1=1588564&r2=1588565&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java Fri
Apr 18 21:03:08 2014
@@ -27,15 +27,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.lib.MultipleOutputs;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.RandomUtils;
@@ -43,7 +41,6 @@ import org.apache.mahout.common.iterator
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
 
-@SuppressWarnings("deprecation")
 /**
  * Class which implements a map reduce version of SplitInput.
  * This class takes a SequenceFile input, e.g. a set of training data
@@ -52,15 +49,12 @@ import org.apache.mahout.common.iterator
  */
 public final class SplitInputJob {
 
-  private static final String DOWNSAMPLING_FACTOR =
-      "SplitInputJob.downsamplingFactor";
-  private static final String RANDOM_SELECTION_PCT =
-      "SplitInputJob.randomSelectionPct";
+  private static final String DOWNSAMPLING_FACTOR = "SplitInputJob.downsamplingFactor";
+  private static final String RANDOM_SELECTION_PCT = "SplitInputJob.randomSelectionPct";
   private static final String TRAINING_TAG = "training";
   private static final String TEST_TAG = "test";
 
-  private SplitInputJob() {
-  }
+  private SplitInputJob() {}
 
   /**
    * Run job to downsample, randomly permute and split data into test and
@@ -104,17 +98,11 @@ public final class SplitInputJob {
     } else {
       throw new IllegalStateException("Couldn't determine class of the input values");
     }
-    // Use old API for multiple outputs
-    JobConf oldApiJob = new JobConf(initialConf);
-    MultipleOutputs.addNamedOutput(oldApiJob, TRAINING_TAG,
-        org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
-        keyClass, valueClass);
-    MultipleOutputs.addNamedOutput(oldApiJob, TEST_TAG,
-        org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
-        keyClass, valueClass);
 
-    // Setup job with new API
-    Job job = new Job(oldApiJob);
+    Job job = new Job(new Configuration(initialConf));
+
+    MultipleOutputs.addNamedOutput(job, TRAINING_TAG, SequenceFileOutputFormat.class, keyClass,
valueClass);
+    MultipleOutputs.addNamedOutput(job, TEST_TAG, SequenceFileOutputFormat.class, keyClass,
valueClass);
     job.setJarByClass(SplitInputJob.class);
     FileInputFormat.addInputPath(job, inputPath);
     FileOutputFormat.setOutputPath(job, outputPath);
@@ -133,23 +121,18 @@ public final class SplitInputJob {
     }
   }
 
-  /**
-   * Mapper which downsamples the input by downsamplingFactor
-   */
+  /** Mapper which downsamples the input by downsamplingFactor */
   public static class SplitInputMapper extends
       Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable>
{
 
     private int downsamplingFactor;
 
     @Override
-    public void setup(Context context) {
-      downsamplingFactor =
-          context.getConfiguration().getInt(DOWNSAMPLING_FACTOR, 1);
+    public void setup(Context ctx) {
+      downsamplingFactor = ctx.getConfiguration().getInt(DOWNSAMPLING_FACTOR, 1);
     }
 
-    /**
-     * Only run map() for one out of every downsampleFactor inputs
-     */
+    /** Only run map() for one out of every downsampleFactor inputs */
     @Override
     public void run(Context context) throws IOException, InterruptedException {
       setup(context);
@@ -165,28 +148,18 @@ public final class SplitInputJob {
 
   }
 
-  /**
-   * Reducer which uses MultipleOutputs to randomly allocate key value pairs
-   * between test and training outputs
-   */
+  /** Reducer which uses MultipleOutputs to randomly allocate key value pairs between test
and training outputs */
   public static class SplitInputReducer extends
       Reducer<WritableComparable<?>, Writable, WritableComparable<?>, Writable>
{
 
     private MultipleOutputs multipleOutputs;
-    private OutputCollector<WritableComparable<?>, Writable> trainingCollector
= null;
-    private OutputCollector<WritableComparable<?>, Writable> testCollector =
null;
     private final Random rnd = RandomUtils.getRandom();
     private float randomSelectionPercent;
 
-    @SuppressWarnings("unchecked")
     @Override
-    protected void setup(Context context) throws IOException {
-      randomSelectionPercent =
-          context.getConfiguration().getFloat(RANDOM_SELECTION_PCT, 0);
-      multipleOutputs =
-          new MultipleOutputs(new JobConf(context.getConfiguration()));
-      trainingCollector = multipleOutputs.getCollector(TRAINING_TAG, null);
-      testCollector = multipleOutputs.getCollector(TEST_TAG, null);
+    protected void setup(Context ctx) throws IOException {
+      randomSelectionPercent = ctx.getConfiguration().getFloat(RANDOM_SELECTION_PCT, 0);
+      multipleOutputs = new MultipleOutputs(ctx);
     }
 
     /**
@@ -198,9 +171,9 @@ public final class SplitInputJob {
         Context context) throws IOException, InterruptedException {
       for (Writable value : values) {
         if (rnd.nextInt(100) < randomSelectionPercent) {
-          testCollector.collect(key, value);
+          multipleOutputs.write(TEST_TAG, key, value);
         } else {
-          trainingCollector.collect(key, value);
+          multipleOutputs.write(TRAINING_TAG, key, value);
         }
       }
 
@@ -208,14 +181,16 @@ public final class SplitInputJob {
 
     @Override
     protected void cleanup(Context context) throws IOException {
-      multipleOutputs.close();
+      try {
+        multipleOutputs.close();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
     }
 
   }
 
-  /**
-   * Randomly permute key value pairs
-   */
+  /** Randomly permute key value pairs */
   public static class SplitInputComparator extends WritableComparator implements Serializable
{
 
     private final Random rnd = RandomUtils.getRandom();

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java?rev=1588565&r1=1588564&r2=1588565&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
(original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
Fri Apr 18 21:03:08 2014
@@ -34,11 +34,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
 import org.apache.mahout.math.NamedVector;
 import org.apache.mahout.math.Vector;
@@ -98,7 +98,7 @@ public final class VectorDumper extends 
     Path input = getInputPath();
     FileStatus fileStatus = fs.getFileStatus(input);
     if (fileStatus.isDir()) {
-      pathArr = FileUtil.stat2Paths(fs.listStatus(input, new OutputFilesFilter()));
+      pathArr = FileUtil.stat2Paths(fs.listStatus(input, PathFilters.logsCRCFilter()));
     } else {
       FileStatus[] inputPaths = fs.globStatus(input);
       pathArr = new Path[inputPaths.length];

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/utils/SplitInputTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/utils/SplitInputTest.java?rev=1588565&r1=1588564&r2=1588565&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/utils/SplitInputTest.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/utils/SplitInputTest.java Fri
Apr 18 21:03:08 2014
@@ -200,9 +200,7 @@ public final class SplitInputTest extend
     VectorWritable value = new VectorWritable();
     SequenceFile.Writer writer = null;
     try {
-      writer =
-          SequenceFile.createWriter(fs, conf, tempSequenceFile,
-              IntWritable.class, VectorWritable.class);
+      writer = SequenceFile.createWriter(fs, conf, tempSequenceFile, IntWritable.class, VectorWritable.class);
       for (int i = 0; i < testPoints; i++) {
         key.set(i);
         Vector v = new SequentialAccessSparseVector(4);
@@ -216,13 +214,11 @@ public final class SplitInputTest extend
   }
 
   /**
-   * Create a Sequencefile for testing consisting of IntWritable
-   * keys and Text values
+   * Create a Sequencefile for testing consisting of IntWritable keys and Text values
    * @param path path for test SequenceFile
    * @param testPoints number of records in test SequenceFile
    */
-  private void writeTextSequenceFile(Path path, int testPoints)
-      throws IOException {
+  private void writeTextSequenceFile(Path path, int testPoints) throws IOException {
     Path tempSequenceFile = new Path(path, "part-00000");
     Configuration conf = getConfiguration();
     Text key = new Text();
@@ -275,10 +271,7 @@ public final class SplitInputTest extend
     testSplitInputMapReduce(1000);
   }
 
-  /**
-   * Test map reduce version of split input with Text, Text key value
-   * pairs in input called from command line
-   */
+  /** Test map reduce version of split input with Text, Text key value pairs in input called
from command line */
   @Test
   public void testSplitInputMapReduceTextCli() throws Exception {
     writeTextSequenceFile(tempSequenceDirectory, 1000);



Mime
View raw message