mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject svn commit: r1094154 - /mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
Date Sun, 17 Apr 2011 15:09:47 GMT
Author: srowen
Date: Sun Apr 17 15:09:46 2011
New Revision: 1094154

URL: http://svn.apache.org/viewvc?rev=1094154&view=rev
Log:
Re-do some recent iterator-related changes that may have been clobbered in a merge. Fix an
apparent typo bug in buildClusters() that would pass output as both input and output?

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=1094154&r1=1094153&r2=1094154&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
Sun Apr 17 15:09:46 2011
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -47,7 +46,6 @@ import org.apache.mahout.common.distance
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
 import org.apache.mahout.math.VectorWritable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,20 +89,20 @@ public class CanopyDriver extends Abstra
     double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
     double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
     double t3 = t1;
-    if (hasOption(DefaultOptionCreator.T3_OPTION))
+    if (hasOption(DefaultOptionCreator.T3_OPTION)) {
       t3 = Double.parseDouble(getOption(DefaultOptionCreator.T3_OPTION));
+    }
     double t4 = t2;
-    if (hasOption(DefaultOptionCreator.T4_OPTION))
+    if (hasOption(DefaultOptionCreator.T4_OPTION)) {
       t4 = Double.parseDouble(getOption(DefaultOptionCreator.T4_OPTION));
+    }
     boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
-    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
-        .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+    boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+        DefaultOptionCreator.SEQUENTIAL_METHOD);
     ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-    DistanceMeasure measure = ccl.loadClass(measureClass).asSubclass(
-        DistanceMeasure.class).newInstance();
+    DistanceMeasure measure = ccl.loadClass(measureClass).asSubclass(DistanceMeasure.class).newInstance();
 
-    run(conf, input, output, measure, t1, t2, t3, t4, runClustering,
-        runSequential);
+    run(conf, input, output, measure, t1, t2, t3, t4, runClustering, runSequential);
     return 0;
   }
 
@@ -132,34 +130,37 @@ public class CanopyDriver extends Abstra
    *          cluster the input vectors if true
    * @param runSequential
    *          execute sequentially if true
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws ClassNotFoundException
-   * @throws InstantiationException
-   * @throws IllegalAccessException
    */
-  public static void run(Configuration conf, Path input, Path output,
-      DistanceMeasure measure, double t1, double t2, double t3, double t4,
-      boolean runClustering, boolean runSequential) throws IOException,
-      InterruptedException, ClassNotFoundException, InstantiationException,
-      IllegalAccessException {
-    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3,
-        t4, runSequential);
+  public static void run(Configuration conf,
+                         Path input,
+                         Path output,
+                         DistanceMeasure measure,
+                         double t1,
+                         double t2,
+                         double t3,
+                         double t4,
+                         boolean runClustering,
+                         boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException, InstantiationException,
IllegalAccessException {
+    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3, t4, runSequential);
     if (runClustering) {
-      clusterData(conf, input, clustersOut, output, measure, t1, t2,
-          runSequential);
+      clusterData(conf, input, clustersOut, output, measure, t1, t2, runSequential);
     }
   }
 
   /**
    * Convenience method to provide backward compatibility
    */
-  public static void run(Configuration conf, Path input, Path output,
-      DistanceMeasure measure, double t1, double t2, boolean runClustering,
-      boolean runSequential) throws IOException, InterruptedException,
-      ClassNotFoundException, InstantiationException, IllegalAccessException {
-    run(conf, input, output, measure, t1, t2, t1, t2, runClustering,
-        runSequential);
+  public static void run(Configuration conf,
+                         Path input,
+                         Path output,
+                         DistanceMeasure measure,
+                         double t1,
+                         double t2,
+                         boolean runClustering,
+                         boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException, InstantiationException,
IllegalAccessException {
+    run(conf, input, output, measure, t1, t2, t1, t2, runClustering, runSequential);
   }
 
   /**
@@ -180,23 +181,29 @@ public class CanopyDriver extends Abstra
    * @param runSequential
    *          execute sequentially if true
    */
-  public static void run(Path input, Path output, DistanceMeasure measure,
-      double t1, double t2, boolean runClustering, boolean runSequential)
-      throws IOException, InterruptedException, ClassNotFoundException,
-      InstantiationException, IllegalAccessException {
-    run(new Configuration(), input, output, measure, t1, t2, runClustering,
-        runSequential);
+  public static void run(Path input,
+                         Path output,
+                         DistanceMeasure measure,
+                         double t1,
+                         double t2,
+                         boolean runClustering,
+                         boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException, InstantiationException,
IllegalAccessException {
+    run(new Configuration(), input, output, measure, t1, t2, runClustering, runSequential);
   }
 
   /**
    * Convenience method for backwards compatibility
    */
-  public static Path buildClusters(Configuration conf, Path input, Path output,
-      DistanceMeasure measure, double t1, double t2, boolean runSequential)
-      throws InstantiationException, IllegalAccessException, IOException,
-      InterruptedException, ClassNotFoundException {
-    return buildClusters(conf, output, output, measure, t1, t2, t1, t2,
-        runSequential);
+  public static Path buildClusters(Configuration conf,
+                                   Path input,
+                                   Path output,
+                                   DistanceMeasure measure,
+                                   double t1,
+                                   double t2,
+                                   boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    return buildClusters(conf, input, output, measure, t1, t2, t1, t2, runSequential);
   }
 
   /**
@@ -223,13 +230,18 @@ public class CanopyDriver extends Abstra
    *          a boolean indicates to run the sequential (reference) algorithm
    * @return the canopy output directory Path
    */
-  public static Path buildClusters(Configuration conf, Path input, Path output,
-      DistanceMeasure measure, double t1, double t2, double t3, double t4,
-      boolean runSequential) throws InstantiationException,
-      IllegalAccessException, IOException, InterruptedException,
-      ClassNotFoundException {
-    log.info("Build Clusters Input: {} Out: {} " + "Measure: {} t1: {} t2: {}",
-        new Object[] { input, output, measure, t1, t2 });
+  public static Path buildClusters(Configuration conf,
+                                   Path input,
+                                   Path output,
+                                   DistanceMeasure measure,
+                                   double t1,
+                                   double t2,
+                                   double t3,
+                                   double t4,
+                                   boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    log.info("Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}",
+             new Object[] {input, output, measure, t1, t2});
     if (runSequential) {
       return buildClustersSeq(input, output, measure, t1, t2);
     } else {
@@ -253,42 +265,34 @@ public class CanopyDriver extends Abstra
    *          the double T2 distance metric
    * @return the canopy output directory Path
    */
-  private static Path buildClustersSeq(Path input, Path output,
-      DistanceMeasure measure, double t1, double t2)
-      throws InstantiationException, IllegalAccessException, IOException {
+  private static Path buildClustersSeq(Path input,
+                                       Path output,
+                                       DistanceMeasure measure,
+                                       double t1,
+                                       double t2) throws IOException {
     CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
     Collection<Canopy> canopies = new ArrayList<Canopy>();
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(input.toUri(), conf);
-    FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
-    for (FileStatus s : status) {
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(),
-          conf);
-      try {
-        Writable key = reader.getKeyClass().asSubclass(Writable.class)
-            .newInstance();
-        VectorWritable vw = reader.getValueClass().asSubclass(
-            VectorWritable.class).newInstance();
-        while (reader.next(key, vw)) {
-          clusterer.addPointToCanopies(vw.get(), canopies);
-          vw = reader.getValueClass().asSubclass(VectorWritable.class)
-              .newInstance();
-        }
-      } finally {
-        reader.close();
-      }
+
+    for (VectorWritable vw
+         : new SequenceFileDirValueIterable<VectorWritable>(input, PathType.LIST, PathFilters.logsCRCFilter(),
conf)) {
+      clusterer.addPointToCanopies(vw.get(), canopies);
     }
+
     Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
     Path path = new Path(canopyOutputDir, "part-r-00000");
-    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
-        Text.class, Canopy.class);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Canopy.class);
     try {
       for (Canopy canopy : canopies) {
         canopy.computeParameters();
-        log.debug("Writing Canopy:" + canopy.getIdentifier() + " center:"
-            + AbstractCluster.formatVector(canopy.getCenter(), null)
-            + " numPoints:" + canopy.getNumPoints() + " radius:"
-            + AbstractCluster.formatVector(canopy.getRadius(), null));
+        log.debug("Writing Canopy:{} center:{} numPoints:{} radius:{}",
+                  new Object[] {
+                      canopy.getIdentifier(),
+                      AbstractCluster.formatVector(canopy.getCenter(), null),
+                      canopy.getNumPoints(),
+                      AbstractCluster.formatVector(canopy.getRadius(), null)
+                  });
         writer.append(new Text(canopy.getIdentifier()), canopy);
       }
     } finally {
@@ -319,19 +323,22 @@ public class CanopyDriver extends Abstra
    * 
    * @return the canopy output directory Path
    */
-  private static Path buildClustersMR(Configuration conf, Path input,
-      Path output, DistanceMeasure measure, double t1, double t2, double t3,
-      double t4) throws IOException, InterruptedException,
-      ClassNotFoundException {
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
-        .getName());
+  private static Path buildClustersMR(Configuration conf,
+                                      Path input,
+                                      Path output,
+                                      DistanceMeasure measure,
+                                      double t1,
+                                      double t2,
+                                      double t3,
+                                      double t4)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
     conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(t3));
     conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(t4));
 
-    Job job = new Job(conf, "Canopy Driver running buildClusters over input: "
-        + input);
+    Job job = new Job(conf, "Canopy Driver running buildClusters over input: " + input);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setMapperClass(CanopyMapper.class);
@@ -347,17 +354,20 @@ public class CanopyDriver extends Abstra
     Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
     FileOutputFormat.setOutputPath(job, canopyOutputDir);
     if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("Canopy Job failed processing "
-          + input.toString());
+      throw new InterruptedException("Canopy Job failed processing " + input);
     }
     return canopyOutputDir;
   }
 
-  public static void clusterData(Configuration conf, Path points,
-      Path canopies, Path output, DistanceMeasure measure, double t1,
-      double t2, boolean runSequential) throws InstantiationException,
-      IllegalAccessException, IOException, InterruptedException,
-      ClassNotFoundException {
+  public static void clusterData(Configuration conf,
+                                 Path points,
+                                 Path canopies,
+                                 Path output,
+                                 DistanceMeasure measure,
+                                 double t1,
+                                 double t2,
+                                 boolean runSequential)
+    throws InstantiationException, IllegalAccessException, IOException, InterruptedException,
ClassNotFoundException {
     if (runSequential) {
       clusterDataSeq(points, canopies, output, measure, t1, t2);
     } else {
@@ -365,35 +375,27 @@ public class CanopyDriver extends Abstra
     }
   }
 
-  private static void clusterDataSeq(Path points, Path canopies, Path output,
-      DistanceMeasure measure, double t1, double t2)
-      throws InstantiationException, IllegalAccessException, IOException {
+  private static void clusterDataSeq(Path points,
+                                     Path canopies,
+                                     Path output,
+                                     DistanceMeasure measure,
+                                     double t1,
+                                     double t2)
+    throws InstantiationException, IllegalAccessException, IOException {
     CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
 
     Collection<Canopy> clusters = new ArrayList<Canopy>();
     Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(canopies.toUri(), conf);
-    FileStatus[] status = fs.listStatus(canopies, new OutputLogFilter());
-    for (FileStatus s : status) {
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(),
-          conf);
-      try {
-        Writable key = reader.getKeyClass().asSubclass(Writable.class)
-            .newInstance();
-        Canopy value = reader.getValueClass().asSubclass(Canopy.class)
-            .newInstance();
-        while (reader.next(key, value)) {
-          clusters.add(value);
-          value = reader.getValueClass().asSubclass(Canopy.class).newInstance();
-        }
-      } finally {
-        reader.close();
-      }
+
+    for (Canopy value
+         : new SequenceFileDirValueIterable<Canopy>(canopies, PathType.LIST, PathFilters.logsCRCFilter(),
conf)) {
+      clusters.add(value);
     }
+
     // iterate over all points, assigning each to the closest canopy and
     // outputing that clustering
-    fs = FileSystem.get(points.toUri(), conf);
-    status = fs.listStatus(points, new OutputLogFilter());
+    FileSystem fs = FileSystem.get(points.toUri(), conf);
+    FileStatus[] status = fs.listStatus(points, PathFilters.logsCRCFilter());
     Path outPath = new Path(output, DEFAULT_CLUSTERED_POINTS_DIRECTORY);
     int part = 0;
     for (FileStatus s : status) {
@@ -421,17 +423,20 @@ public class CanopyDriver extends Abstra
     }
   }
 
-  private static void clusterDataMR(Configuration conf, Path points,
-      Path canopies, Path output, DistanceMeasure measure, double t1, double t2)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
-        .getName());
+  private static void clusterDataMR(Configuration conf,
+                                    Path points,
+                                    Path canopies,
+                                    Path output,
+                                    DistanceMeasure measure,
+                                    double t1,
+                                    double t2)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
     conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
     conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
     conf.set(CanopyConfigKeys.CANOPY_PATH_KEY, canopies.toString());
 
-    Job job = new Job(conf, "Canopy Driver running clusterData over input: "
-        + points);
+    Job job = new Job(conf, "Canopy Driver running clusterData over input: " + points);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setMapperClass(ClusterMapper.class);
@@ -446,8 +451,7 @@ public class CanopyDriver extends Abstra
     HadoopUtil.delete(conf, outPath);
 
     if (!job.waitForCompletion(true)) {
-      throw new InterruptedException("Canopy Clustering failed processing "
-          + canopies.toString());
+      throw new InterruptedException("Canopy Clustering failed processing " + canopies);
     }
   }
 



Mime
View raw message