mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeast...@apache.org
Subject svn commit: r1149369 - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/meanshift/ core/src/test/java/org/apache/mahout/clustering/meanshift/ examples/src/main/java/org/apache/mahout/clustering/display/
Date Thu, 21 Jul 2011 21:13:04 GMT
Author: jeastman
Date: Thu Jul 21 21:13:02 2011
New Revision: 1149369

URL: http://svn.apache.org/viewvc?rev=1149369&view=rev
Log:
MAHOUT-749: Implemented multiple reducer approach from Jira patch, plus a scalability enhancement to avoid accumulating merged clusterIds if -cl option is not present. The defaults are for the same behavior as before.  All tests run though this needs more testing to see how it really scales 

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java Thu Jul 21 21:13:02 2011
@@ -34,16 +34,27 @@ import org.apache.mahout.math.list.IntAr
  * centroid when needed.
  */
 public class MeanShiftCanopy extends Cluster {
-  
+
   // TODO: this is still problematic from a scalability perspective, but how
   // else to encode membership?
   private IntArrayList boundPoints = new IntArrayList();
-  
+
+  private int mass = 0;
+
+  public int getMass() {
+    return mass;
+  }
+
+  void setMass(int num) {
+    mass = num;
+  }
+
   /**
    * Used for Writable
    */
-  public MeanShiftCanopy() {}
-  
+  public MeanShiftCanopy() {
+  }
+
   /**
    * Create a new Canopy containing the given point
    * 
@@ -57,8 +68,9 @@ public class MeanShiftCanopy extends Clu
   public MeanShiftCanopy(Vector point, int id, DistanceMeasure measure) {
     super(point, id, measure);
     boundPoints.add(id);
+    mass = 1;
   }
-  
+
   /**
    * Create an initial Canopy, retaining the original type of the given point
    * (e.g. NamedVector)
@@ -78,43 +90,26 @@ public class MeanShiftCanopy extends Clu
     result.setCenter(point);
     return result;
   }
-  
-  /**
-   * Create a new Canopy containing the given point, id and bound points
-   * 
-   * @param point
-   *          a Vector
-   * @param id
-   *          an int identifying the canopy local to this process only
-   * @param boundPoints
-   *          a IntArrayList containing points ids bound to the canopy
-   * @param converged
-   *          true if the canopy has converged
-   */
-  MeanShiftCanopy(Vector point, int id, IntArrayList boundPoints,
-      boolean converged) {
-    this.setId(id);
-    this.setCenter(point);
-    this.setRadius(point.like());
-    this.setNumPoints(1);
-    this.boundPoints = boundPoints;
-    setConverged(converged);
-  }
-  
+
   public IntArrayList getBoundPoints() {
     return boundPoints;
   }
-  
+
   /**
    * The receiver overlaps the given canopy. Add my bound points to it.
    * 
    * @param canopy
    *          an existing MeanShiftCanopy
+   * @param accumulateBoundPoints
+   *          true to accumulate bound points from the canopy
    */
-  void merge(MeanShiftCanopy canopy) {
-    boundPoints.addAllOf(canopy.boundPoints);
+  void merge(MeanShiftCanopy canopy, boolean accumulateBoundPoints) {
+    if (accumulateBoundPoints) {
+      boundPoints.addAllOf(canopy.boundPoints);
+    }
+    mass += canopy.mass;
   }
-  
+
   /**
    * The receiver touches the given canopy. Add respective centers with the
    * given weights.
@@ -125,29 +120,32 @@ public class MeanShiftCanopy extends Clu
    *          double weight of the touching
    */
   void touch(MeanShiftCanopy canopy, double weight) {
-    canopy.observe(getCenter(), weight * boundPoints.size());
-    observe(canopy.getCenter(), weight * canopy.boundPoints.size());
+    canopy.observe(getCenter(), weight * mass);
+    observe(canopy.getCenter(), weight * canopy.mass);
   }
-  
+
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
+    this.mass = in.readInt();
     int numpoints = in.readInt();
     this.boundPoints = new IntArrayList();
     for (int i = 0; i < numpoints; i++) {
       this.boundPoints.add(in.readInt());
     }
+    this.mass = boundPoints.size();
   }
-  
+
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
+    out.writeInt(mass);
     out.writeInt(boundPoints.size());
     for (int v : boundPoints.elements()) {
       out.writeInt(v);
     }
   }
-  
+
   public MeanShiftCanopy shallowCopy() {
     MeanShiftCanopy result = new MeanShiftCanopy();
     result.setMeasure(this.getMeasure());
@@ -156,28 +154,29 @@ public class MeanShiftCanopy extends Clu
     result.setRadius(this.getRadius());
     result.setNumPoints(this.getNumPoints());
     result.setBoundPoints(boundPoints);
+    result.setMass(mass);
     return result;
   }
-  
+
   @Override
   public String asFormatString() {
     return toString();
   }
-  
+
   public void setBoundPoints(IntArrayList boundPoints) {
     this.boundPoints = boundPoints;
   }
-  
+
   @Override
   public String getIdentifier() {
     return (isConverged() ? "MSV-" : "MSC-") + getId();
   }
-  
+
   @Override
   public double pdf(VectorWritable vw) {
     // MSCanopy membership is explicit via membership in boundPoints. Can't
     // compute pdf for Arbitrary point
     throw new UnsupportedOperationException();
   }
-  
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java Thu Jul 21 21:13:02 2011
@@ -30,30 +30,34 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MeanShiftCanopyClusterer {
-  
+
   private static final Logger log = LoggerFactory
       .getLogger(MeanShiftCanopyClusterer.class);
-  
+
   private final double convergenceDelta;
-  
+
   // the T1 distance threshold
   private final double t1;
-  
+
   // the T2 distance threshold
   private final double t2;
-  
+
   // the distance measure
   private final DistanceMeasure measure;
-  
+
   private final IKernelProfile kernelProfile;
-  
+
+  // if true accumulate clusters during merge so clusters can be produced later
+  private final boolean runClustering;
+
   public MeanShiftCanopyClusterer(Configuration configuration) {
     try {
-      measure = Class
-          .forName(
-              configuration.get(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY))
+      measure = Class.forName(
+          configuration.get(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY))
           .asSubclass(DistanceMeasure.class).newInstance();
       measure.configure(configuration);
+      runClustering = configuration.getBoolean(
+          MeanShiftCanopyConfigKeys.CLUSTER_POINTS_KEY, true);
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
     } catch (IllegalAccessException e) {
@@ -62,9 +66,8 @@ public class MeanShiftCanopyClusterer {
       throw new IllegalStateException(e);
     }
     try {
-      kernelProfile = Class
-          .forName(
-              configuration.get(MeanShiftCanopyConfigKeys.KERNEL_PROFILE_KEY))
+      kernelProfile = Class.forName(
+          configuration.get(MeanShiftCanopyConfigKeys.KERNEL_PROFILE_KEY))
           .asSubclass(IKernelProfile.class).newInstance();
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
@@ -81,26 +84,27 @@ public class MeanShiftCanopyClusterer {
     convergenceDelta = Double.parseDouble(configuration
         .get(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY));
   }
-  
+
   public MeanShiftCanopyClusterer(DistanceMeasure aMeasure,
       IKernelProfile aKernelProfileDerivative, double aT1, double aT2,
-      double aDelta) {
+      double aDelta, boolean runClustering) {
     // nextCanopyId = 100; // so canopyIds will sort properly // never read?
     measure = aMeasure;
     t1 = aT1;
     t2 = aT2;
     convergenceDelta = aDelta;
     kernelProfile = aKernelProfileDerivative;
+    this.runClustering = runClustering;
   }
-  
+
   public double getT1() {
     return t1;
   }
-  
+
   public double getT2() {
     return t2;
   }
-  
+
   /**
    * Merge the given canopy into the canopies list. If it touches any existing
    * canopy (norm<T1) then add the center of each to the other. If it covers any
@@ -131,10 +135,10 @@ public class MeanShiftCanopyClusterer {
     if (closestCoveringCanopy == null) {
       canopies.add(aCanopy);
     } else {
-      closestCoveringCanopy.merge(aCanopy);
+      closestCoveringCanopy.merge(aCanopy, runClustering);
     }
   }
-  
+
   /**
    * Shift the center to the new centroid of the cluster
    * 
@@ -143,12 +147,12 @@ public class MeanShiftCanopyClusterer {
    * @return if the cluster is converged
    */
   public boolean shiftToMean(MeanShiftCanopy canopy) {
-    canopy.observe(canopy.getCenter(), canopy.getBoundPoints().size());
+    canopy.observe(canopy.getCenter(), canopy.getMass());
     canopy.computeConvergence(measure, convergenceDelta);
     canopy.computeParameters();
     return canopy.isConverged();
   }
-  
+
   /**
    * Return if the point is covered by this canopy
    * 
@@ -161,7 +165,7 @@ public class MeanShiftCanopyClusterer {
   boolean covers(MeanShiftCanopy canopy, Vector point) {
     return measure.distance(canopy.getCenter(), point) < t1;
   }
-  
+
   /**
    * Return if the point is closely covered by the canopy
    * 
@@ -174,7 +178,7 @@ public class MeanShiftCanopyClusterer {
   public boolean closelyBound(MeanShiftCanopy canopy, Vector point) {
     return measure.distance(canopy.getCenter(), point) < t2;
   }
-  
+
   /**
    * This is the reference mean-shift implementation. Given its inputs it
    * iterates over the points and clusters until their centers converge or until
@@ -191,22 +195,22 @@ public class MeanShiftCanopyClusterer {
       DistanceMeasure measure, IKernelProfile aKernelProfileDerivative,
       double convergenceThreshold, double t1, double t2, int numIter) {
     MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(measure,
-        aKernelProfileDerivative, t1, t2, convergenceThreshold);
+        aKernelProfileDerivative, t1, t2, convergenceThreshold, true);
     int nextCanopyId = 0;
-    
+
     List<MeanShiftCanopy> canopies = Lists.newArrayList();
     for (Vector point : points) {
       clusterer.mergeCanopy(
           new MeanShiftCanopy(point, nextCanopyId++, measure), canopies);
     }
     List<MeanShiftCanopy> newCanopies = canopies;
-    boolean[] converged = {false};
+    boolean[] converged = { false };
     for (int iter = 0; !converged[0] && iter < numIter; iter++) {
       newCanopies = clusterer.iterate(newCanopies, converged);
     }
     return newCanopies;
   }
-  
+
   protected List<MeanShiftCanopy> iterate(Iterable<MeanShiftCanopy> canopies,
       boolean[] converged) {
     converged[0] = true;
@@ -217,22 +221,7 @@ public class MeanShiftCanopyClusterer {
     }
     return migratedCanopies;
   }
-  
-  protected static void verifyNonOverlap(Iterable<MeanShiftCanopy> canopies) {
-    Collection<Integer> coveredPoints = new HashSet<Integer>();
-    // verify no overlap
-    for (MeanShiftCanopy canopy : canopies) {
-      for (int v : canopy.getBoundPoints().toList()) {
-        if (coveredPoints.contains(v)) {
-          log.info("Duplicate bound point: {} in Canopy: {}", v,
-              canopy.asFormatString(null));
-        } else {
-          coveredPoints.add(v);
-        }
-      }
-    }
-  }
-  
+
   protected static MeanShiftCanopy findCoveringCanopy(MeanShiftCanopy canopy,
       Iterable<MeanShiftCanopy> clusters) {
     // canopies use canopyIds assigned when input vectors are processed as
@@ -247,5 +236,5 @@ public class MeanShiftCanopyClusterer {
     }
     return null;
   }
-  
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java Thu Jul 21 21:13:02 2011
@@ -18,13 +18,14 @@
 package org.apache.mahout.clustering.meanshift;
 
 public interface MeanShiftCanopyConfigKeys {
-  
-  // keys used by Driver, Mapper, Combiner & Reducer
-  String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
-  String KERNEL_PROFILE_KEY = "org.apache.mahout.clustering.canopy.kernelprofile";
-  String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
-  String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
-  String CONTROL_PATH_KEY = "org.apache.mahout.clustering.control.path";
-  String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.canopy.convergence";
-  
+
+	// keys used by Driver, Mapper, Combiner & Reducer
+	String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
+	String KERNEL_PROFILE_KEY = "org.apache.mahout.clustering.canopy.kernelprofile";
+	String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
+	String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
+	String CONTROL_PATH_KEY = "org.apache.mahout.clustering.control.path";
+	String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.canopy.convergence";
+	String CLUSTER_POINTS_KEY = "org.apache.mahout.clustering.meanshift.clusterPointsKey";
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java Thu Jul 21 21:13:02 2011
@@ -57,19 +57,27 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.io.Closeables;
 
+/**
+ * This class implements the driver for Mean Shift Canopy clustering
+ * 
+ */
 public class MeanShiftCanopyDriver extends AbstractJob {
-  
+
+  public static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
+
   private static final Logger log = LoggerFactory
       .getLogger(MeanShiftCanopyDriver.class);
-  
+
   public static final String INPUT_IS_CANOPIES_OPTION = "inputIsCanopies";
+
   public static final String STATE_IN_KEY = "org.apache.mahout.clustering.meanshift.stateInKey";
+
   private static final String CONTROL_CONVERGED = "control/converged";
-  
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new Configuration(), new MeanShiftCanopyDriver(), args);
   }
-  
+
   @Override
   public int run(String[] args) throws Exception {
     addInputOption();
@@ -84,11 +92,11 @@ public class MeanShiftCanopyDriver exten
     addOption(DefaultOptionCreator.t2Option().create());
     addOption(DefaultOptionCreator.clusteringOption().create());
     addOption(DefaultOptionCreator.methodOption().create());
-    
+
     if (parseArguments(args) == null) {
       return -1;
     }
-    
+
     Path input = getInputPath();
     Path output = getOutputPath();
     if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
@@ -107,17 +115,17 @@ public class MeanShiftCanopyDriver exten
     boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
         .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
     ClassLoader ccl = Thread.currentThread().getContextClassLoader();
-    DistanceMeasure measure = ccl.loadClass(measureClass)
-        .asSubclass(DistanceMeasure.class).newInstance();
+    DistanceMeasure measure = ccl.loadClass(measureClass).asSubclass(
+        DistanceMeasure.class).newInstance();
     IKernelProfile kernelProfile = ccl.loadClass(kernelProfileClass)
         .asSubclass(IKernelProfile.class).newInstance();
     run(getConf(), input, output, measure, kernelProfile, t1, t2,
         convergenceDelta, maxIterations, inputIsCanopies, runClustering,
         runSequential);
-    
+
     return 0;
   }
-  
+
   /**
    * Run the job where the input format can be either Vectors or Canopies. If
    * requested, cluster the input data using the computed Canopies
@@ -160,16 +168,17 @@ public class MeanShiftCanopyDriver exten
     } else {
       createCanopyFromVectors(conf, input, clustersIn, measure, runSequential);
     }
-    
+
     Path clustersOut = buildClusters(conf, clustersIn, output, measure,
-        kernelProfile, t1, t2, convergenceDelta, maxIterations, runSequential);
+        kernelProfile, t1, t2, convergenceDelta, maxIterations, runSequential,
+        runClustering);
     if (runClustering) {
       clusterData(inputIsCanopies ? input : new Path(output,
           Cluster.INITIAL_CLUSTERS_DIR), clustersOut, new Path(output,
           Cluster.CLUSTERED_POINTS_DIR), runSequential);
     }
   }
-  
+
   /**
    * Convert input vectors to MeanShiftCanopies for further processing
    */
@@ -182,7 +191,7 @@ public class MeanShiftCanopyDriver exten
       createCanopyFromVectorsMR(conf, input, output, measure);
     }
   }
-  
+
   /**
    * Convert vectors to MeanShiftCanopies sequentially
    * 
@@ -206,15 +215,15 @@ public class MeanShiftCanopyDriver exten
       try {
         for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(
             s.getPath(), conf)) {
-          writer.append(new Text(),
-              MeanShiftCanopy.initialCanopy(value.get(), id++, measure));
+          writer.append(new Text(), MeanShiftCanopy.initialCanopy(value.get(),
+              id++, measure));
         }
       } finally {
         Closeables.closeQuietly(writer);
       }
     }
   }
-  
+
   /**
    * Convert vectors to MeanShiftCanopies using Hadoop
    */
@@ -231,16 +240,16 @@ public class MeanShiftCanopyDriver exten
     job.setNumReduceTasks(0);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
-    
+
     FileInputFormat.setInputPaths(job, input);
     FileOutputFormat.setOutputPath(job, output);
-    
+
     if (!job.waitForCompletion(true)) {
       throw new InterruptedException(
           "Mean Shift createCanopyFromVectorsMR failed on input " + input);
     }
   }
-  
+
   /**
    * Iterate over the input clusters to produce the next cluster directories for
    * each iteration
@@ -265,30 +274,33 @@ public class MeanShiftCanopyDriver exten
    *          an int number of iterations
    * @param runSequential
    *          if true run in sequential execution mode
+   * @param runClustering
+   *          if true accumulate merged clusters for subsequent clustering step
    */
   public static Path buildClusters(Configuration conf, Path clustersIn,
       Path output, DistanceMeasure measure, IKernelProfile kernelProfile,
       double t1, double t2, double convergenceDelta, int maxIterations,
-      boolean runSequential) throws IOException, InterruptedException,
-      ClassNotFoundException {
+      boolean runSequential, boolean runClustering) throws IOException,
+      InterruptedException, ClassNotFoundException {
     if (runSequential) {
       return buildClustersSeq(clustersIn, output, measure, kernelProfile, t1,
-          t2, convergenceDelta, maxIterations);
+          t2, convergenceDelta, maxIterations, runClustering);
     } else {
       return buildClustersMR(conf, clustersIn, output, measure, kernelProfile,
-          t1, t2, convergenceDelta, maxIterations);
+          t1, t2, convergenceDelta, maxIterations, runClustering);
     }
   }
-  
+
   /**
    * Build new clusters sequentially
    * 
    */
   private static Path buildClustersSeq(Path clustersIn, Path output,
       DistanceMeasure measure, IKernelProfile aKernelProfile, double t1,
-      double t2, double convergenceDelta, int maxIterations) throws IOException {
+      double t2, double convergenceDelta, int maxIterations,
+      boolean runClustering) throws IOException {
     MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(measure,
-        aKernelProfile, t1, t2, convergenceDelta);
+        aKernelProfile, t1, t2, convergenceDelta, runClustering);
     List<MeanShiftCanopy> clusters = Lists.newArrayList();
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(clustersIn.toUri(), conf);
@@ -296,7 +308,7 @@ public class MeanShiftCanopyDriver exten
         clustersIn, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
       clusterer.mergeCanopy(value, clusters);
     }
-    boolean[] converged = {false};
+    boolean[] converged = { false };
     int iteration = 1;
     while (!converged[0] && iteration <= maxIterations) {
       log.info("Mean Shift Iteration: {}", iteration);
@@ -308,11 +320,11 @@ public class MeanShiftCanopyDriver exten
         for (MeanShiftCanopy cluster : clusters) {
           log.debug(
               "Writing Cluster:{} center:{} numPoints:{} radius:{} to: {}",
-              new Object[] {cluster.getId(),
+              new Object[] { cluster.getId(),
                   AbstractCluster.formatVector(cluster.getCenter(), null),
                   cluster.getNumPoints(),
                   AbstractCluster.formatVector(cluster.getRadius(), null),
-                  clustersOut.getName()});
+                  clustersOut.getName() });
           writer.append(new Text(cluster.getIdentifier()), cluster);
         }
       } finally {
@@ -323,33 +335,43 @@ public class MeanShiftCanopyDriver exten
     }
     return clustersIn;
   }
-  
+
   /**
    * Build new clusters using Hadoop
+   * 
    */
   private static Path buildClustersMR(Configuration conf, Path clustersIn,
       Path output, DistanceMeasure measure, IKernelProfile aKernelProfile,
-      double t1, double t2, double convergenceDelta, int maxIterations)
-      throws IOException, InterruptedException, ClassNotFoundException {
+      double t1, double t2, double convergenceDelta, int maxIterations,
+      boolean runClustering) throws IOException, InterruptedException,
+      ClassNotFoundException {
     // iterate until the clusters converge
     boolean converged = false;
     int iteration = 1;
     while (!converged && iteration <= maxIterations) {
-      log.info("Mean Shift Iteration {}", iteration);
+      int numReducers = Integer.valueOf(conf.get(MAPRED_REDUCE_TASKS, "1"));
+      log.info("Mean Shift Iteration: {}, numReducers {}", new Object[] {
+          iteration, numReducers });
       // point the output to a new directory per iteration
       Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
       Path controlOut = new Path(output, CONTROL_CONVERGED);
       runIterationMR(conf, clustersIn, clustersOut, controlOut, measure
           .getClass().getName(), aKernelProfile.getClass().getName(), t1, t2,
-          convergenceDelta);
+          convergenceDelta, runClustering);
       converged = FileSystem.get(new Configuration()).exists(controlOut);
       // now point the input to the old output directory
       clustersIn = clustersOut;
       iteration++;
+      // decrease the number of reducers if it is > 1 to cross-pollenate
+      // map sets
+      if (numReducers > 1) {
+        numReducers--;
+        conf.set(MAPRED_REDUCE_TASKS, String.valueOf(numReducers));
+      }
     }
     return clustersIn;
   }
-  
+
   /**
    * Run an iteration using Hadoop
    * 
@@ -371,33 +393,35 @@ public class MeanShiftCanopyDriver exten
    *          the T2 distance threshold
    * @param convergenceDelta
    *          the double convergence criteria
+   * @param runClustering
+   *          if true accumulate merged clusters for subsequent clustering step
    */
   private static void runIterationMR(Configuration conf, Path input,
       Path output, Path control, String measureClassName,
       String kernelProfileClassName, double t1, double t2,
-      double convergenceDelta) throws IOException, InterruptedException,
-      ClassNotFoundException {
-    
+      double convergenceDelta, boolean runClustering) throws IOException,
+      InterruptedException, ClassNotFoundException {
+
     conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY, measureClassName);
     conf.set(MeanShiftCanopyConfigKeys.KERNEL_PROFILE_KEY,
         kernelProfileClassName);
-    conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY,
-        String.valueOf(convergenceDelta));
+    conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, String
+        .valueOf(convergenceDelta));
     conf.set(MeanShiftCanopyConfigKeys.T1_KEY, String.valueOf(t1));
     conf.set(MeanShiftCanopyConfigKeys.T2_KEY, String.valueOf(t2));
     conf.set(MeanShiftCanopyConfigKeys.CONTROL_PATH_KEY, control.toString());
-    
+    conf.set(MeanShiftCanopyConfigKeys.CLUSTER_POINTS_KEY, String
+        .valueOf(runClustering));
     Job job = new Job(conf,
         "Mean Shift Driver running runIteration over input: " + input);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(MeanShiftCanopy.class);
-    
+
     FileInputFormat.setInputPaths(job, input);
     FileOutputFormat.setOutputPath(job, output);
-    
+
     job.setMapperClass(MeanShiftCanopyMapper.class);
     job.setReducerClass(MeanShiftCanopyReducer.class);
-    job.setNumReduceTasks(1);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setJarByClass(MeanShiftCanopyDriver.class);
@@ -406,7 +430,7 @@ public class MeanShiftCanopyDriver exten
           + input);
     }
   }
-  
+
   /**
    * Run the job using supplied arguments
    * 
@@ -428,7 +452,7 @@ public class MeanShiftCanopyDriver exten
       clusterDataMR(input, clustersIn, output);
     }
   }
-  
+
   /**
    * Cluster the data sequentially
    */
@@ -450,7 +474,7 @@ public class MeanShiftCanopyDriver exten
           output, "part-m-" + part++), IntWritable.class,
           WeightedVectorWritable.class);
       try {
-        for (Pair<Writable,MeanShiftCanopy> record : new SequenceFileIterable<Writable,MeanShiftCanopy>(
+        for (Pair<Writable, MeanShiftCanopy> record : new SequenceFileIterable<Writable, MeanShiftCanopy>(
             s.getPath(), conf)) {
           MeanShiftCanopy canopy = record.getSecond();
           MeanShiftCanopy closest = MeanShiftCanopyClusterer
@@ -463,7 +487,7 @@ public class MeanShiftCanopyDriver exten
       }
     }
   }
-  
+
   /**
    * Cluster the data using Hadoop
    */
@@ -476,15 +500,15 @@ public class MeanShiftCanopyDriver exten
     job.setOutputKeyClass(IntWritable.class);
     job.setOutputValueClass(WeightedVectorWritable.class);
     job.setMapperClass(MeanShiftCanopyClusterMapper.class);
-    
+
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setNumReduceTasks(0);
     job.setJarByClass(MeanShiftCanopyDriver.class);
-    
+
     FileInputFormat.setInputPaths(job, input);
     FileOutputFormat.setOutputPath(job, output);
-    
+
     if (!job.waitForCompletion(true)) {
       throw new InterruptedException(
           "Mean Shift Clustering failed on clustersIn " + clustersIn);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java Thu Jul 21 21:13:02 2011
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.Collection;
 
 import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -31,10 +33,14 @@ public class MeanShiftCanopyMapper exten
   
   private MeanShiftCanopyClusterer clusterer;
 
+private Integer numReducers;
+
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
     super.setup(context);
-    clusterer = new MeanShiftCanopyClusterer(context.getConfiguration());
+    Configuration conf = context.getConfiguration();
+	clusterer = new MeanShiftCanopyClusterer(conf);
+    numReducers = Integer.valueOf(conf.get(MeanShiftCanopyDriver.MAPRED_REDUCE_TASKS, "1"));
   }
 
   @Override
@@ -45,9 +51,14 @@ public class MeanShiftCanopyMapper exten
 
   @Override
   protected void cleanup(Context context) throws IOException, InterruptedException {
+	int reducer = 0;
     for (MeanShiftCanopy canopy : canopies) {
       clusterer.shiftToMean(canopy);
-      context.write(new Text("canopy"), canopy);
+      context.write(new Text(String.valueOf(reducer)), canopy);
+      reducer++;
+      if (reducer >= numReducers){
+    	  reducer=0;
+      }
     }
     super.cleanup(context);
   }

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java Thu Jul 21 21:13:02 2011
@@ -51,14 +51,15 @@ import org.junit.Before;
 import org.junit.Test;
 
 public final class TestMeanShift extends MahoutTestCase {
-  
+
   private Vector[] raw = null;
-  
+
   // DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
-  
+
   private final DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure();
+
   private final IKernelProfile kernelProfile = new TriangularKernelProfile();
-  
+
   /**
    * Print the canopies to the transcript
    * 
@@ -70,7 +71,7 @@ public final class TestMeanShift extends
       System.out.println(canopy.asFormatString(null));
     }
   }
-  
+
   /**
    * Print a graphical representation of the clustered image points as a 10x10
    * character mask
@@ -93,7 +94,7 @@ public final class TestMeanShift extends
       System.out.println(anOut);
     }
   }
-  
+
   private List<MeanShiftCanopy> getInitialCanopies() {
     int nextCanopyId = 0;
     List<MeanShiftCanopy> canopies = Lists.newArrayList();
@@ -103,7 +104,7 @@ public final class TestMeanShift extends
     }
     return canopies;
   }
-  
+
   @Override
   @Before
   public void setUp() throws Exception {
@@ -124,7 +125,7 @@ public final class TestMeanShift extends
       }
     }
   }
-  
+
   /**
    * Story: User can exercise the reference implementation to verify that the
    * test datapoints are clustered in a reasonable manner.
@@ -133,7 +134,7 @@ public final class TestMeanShift extends
   public void testReferenceImplementation() {
     MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(
         new EuclideanDistanceMeasure(), new TriangularKernelProfile(), 4.0,
-        1.0, 0.5);
+        1.0, 0.5, true);
     List<MeanShiftCanopy> canopies = Lists.newArrayList();
     // add all points to the canopies
     int nextCanopyId = 0;
@@ -156,7 +157,7 @@ public final class TestMeanShift extends
       System.out.println(iter++);
     }
   }
-  
+
   /**
    * Test the MeanShiftCanopyClusterer's reference implementation. Should
    * produce the same final output as above.
@@ -169,7 +170,7 @@ public final class TestMeanShift extends
     printCanopies(canopies);
     printImage(canopies);
   }
-  
+
   /**
    * Story: User can produce initial canopy centers using a
    * EuclideanDistanceMeasure and a CanopyMapper/Combiner which clusters input
@@ -178,7 +179,7 @@ public final class TestMeanShift extends
   @Test
   public void testCanopyMapperEuclidean() throws Exception {
     MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(
-        euclideanDistanceMeasure, kernelProfile, 4, 1, 0.5);
+        euclideanDistanceMeasure, kernelProfile, 4, 1, 0.5, true);
     // get the initial canopies
     List<MeanShiftCanopy> canopies = getInitialCanopies();
     // build the reference set
@@ -188,7 +189,7 @@ public final class TestMeanShift extends
       clusterer.mergeCanopy(new MeanShiftCanopy(aRaw, nextCanopyId++,
           euclideanDistanceMeasure), refCanopies);
     }
-    
+
     Configuration conf = new Configuration();
     conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY,
         "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
@@ -197,49 +198,52 @@ public final class TestMeanShift extends
     conf.set(MeanShiftCanopyConfigKeys.T1_KEY, "4");
     conf.set(MeanShiftCanopyConfigKeys.T2_KEY, "1");
     conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.5");
-    
+
     // map the data
     MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
-    DummyRecordWriter<Text,MeanShiftCanopy> mapWriter = new DummyRecordWriter<Text,MeanShiftCanopy>();
-    Mapper<WritableComparable<?>,MeanShiftCanopy,Text,MeanShiftCanopy>.Context mapContext = DummyRecordWriter
+    DummyRecordWriter<Text, MeanShiftCanopy> mapWriter = new DummyRecordWriter<Text, MeanShiftCanopy>();
+    Mapper<WritableComparable<?>, MeanShiftCanopy, Text, MeanShiftCanopy>.Context mapContext = DummyRecordWriter
         .build(mapper, conf, mapWriter);
     mapper.setup(mapContext);
     for (MeanShiftCanopy canopy : canopies) {
       mapper.map(new Text(), canopy, mapContext);
     }
     mapper.cleanup(mapContext);
-    
+
     // now verify the output
     assertEquals("Number of map results", 1, mapWriter.getData().size());
-    List<MeanShiftCanopy> data = mapWriter.getValue(new Text("canopy"));
+    List<MeanShiftCanopy> data = mapWriter.getValue(new Text("0"));
     assertEquals("Number of canopies", refCanopies.size(), data.size());
-    
+
     // add all points to the reference canopies
-    Map<String,MeanShiftCanopy> refCanopyMap = Maps.newHashMap();
+    Map<String, MeanShiftCanopy> refCanopyMap = Maps.newHashMap();
     for (MeanShiftCanopy canopy : refCanopies) {
       clusterer.shiftToMean(canopy);
       refCanopyMap.put(canopy.getIdentifier(), canopy);
     }
     // build a map of the combiner output
-    Map<String,MeanShiftCanopy> canopyMap = Maps.newHashMap();
+    Map<String, MeanShiftCanopy> canopyMap = Maps.newHashMap();
     for (MeanShiftCanopy d : data) {
       canopyMap.put(d.getIdentifier(), d);
     }
     // compare the maps
-    for (Map.Entry<String,MeanShiftCanopy> stringMeanShiftCanopyEntry : refCanopyMap
+    for (Map.Entry<String, MeanShiftCanopy> stringMeanShiftCanopyEntry : refCanopyMap
         .entrySet()) {
       MeanShiftCanopy ref = stringMeanShiftCanopyEntry.getValue();
-      
+
       MeanShiftCanopy canopy = canopyMap.get((ref.isConverged() ? "MSV-"
-          : "MSC-") + ref.getId());
+          : "MSC-")
+          + ref.getId());
       assertEquals("ids", ref.getId(), canopy.getId());
       assertEquals("centers(" + ref.getIdentifier() + ')', ref.getCenter()
           .asFormatString(), canopy.getCenter().asFormatString());
       assertEquals("bound points", ref.getBoundPoints().toList().size(), canopy
           .getBoundPoints().toList().size());
+      assertEquals("num bound points", ref.getNumPoints(), canopy
+          .getNumPoints());
     }
   }
-  
+
   /**
    * Story: User can produce final canopy centers using a
    * EuclideanDistanceMeasure and a CanopyReducer which clusters input centroid
@@ -248,7 +252,7 @@ public final class TestMeanShift extends
   @Test
   public void testCanopyReducerEuclidean() throws Exception {
     MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(
-        euclideanDistanceMeasure, kernelProfile, 4, 1, 0.5);
+        euclideanDistanceMeasure, kernelProfile, 4, 1, 0.5, true);
     // get the initial canopies
     List<MeanShiftCanopy> canopies = getInitialCanopies();
     // build the mapper output reference set
@@ -269,7 +273,7 @@ public final class TestMeanShift extends
     for (MeanShiftCanopy canopy : reducerReference) {
       clusterer.shiftToMean(canopy);
     }
-    
+
     Configuration conf = new Configuration();
     conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY,
         "org.apache.mahout.common.distance.EuclideanDistanceMeasure");
@@ -279,46 +283,47 @@ public final class TestMeanShift extends
     conf.set(MeanShiftCanopyConfigKeys.T2_KEY, "1");
     conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.5");
     conf.set(MeanShiftCanopyConfigKeys.CONTROL_PATH_KEY, "output/control");
-    
+
     MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
-    DummyRecordWriter<Text,MeanShiftCanopy> mapWriter = new DummyRecordWriter<Text,MeanShiftCanopy>();
-    Mapper<WritableComparable<?>,MeanShiftCanopy,Text,MeanShiftCanopy>.Context mapContext = DummyRecordWriter
+    DummyRecordWriter<Text, MeanShiftCanopy> mapWriter = new DummyRecordWriter<Text, MeanShiftCanopy>();
+    Mapper<WritableComparable<?>, MeanShiftCanopy, Text, MeanShiftCanopy>.Context mapContext = DummyRecordWriter
         .build(mapper, conf, mapWriter);
     mapper.setup(mapContext);
-    
+
     // map the data
     for (MeanShiftCanopy canopy : canopies) {
       mapper.map(new Text(), canopy, mapContext);
     }
     mapper.cleanup(mapContext);
-    
+
     assertEquals("Number of map results", 1, mapWriter.getData().size());
     // now reduce the mapper output
     MeanShiftCanopyReducer reducer = new MeanShiftCanopyReducer();
-    DummyRecordWriter<Text,MeanShiftCanopy> reduceWriter = new DummyRecordWriter<Text,MeanShiftCanopy>();
-    Reducer<Text,MeanShiftCanopy,Text,MeanShiftCanopy>.Context reduceContext = DummyRecordWriter
+    DummyRecordWriter<Text, MeanShiftCanopy> reduceWriter = new DummyRecordWriter<Text, MeanShiftCanopy>();
+    Reducer<Text, MeanShiftCanopy, Text, MeanShiftCanopy>.Context reduceContext = DummyRecordWriter
         .build(reducer, conf, reduceWriter, Text.class, MeanShiftCanopy.class);
     reducer.setup(reduceContext);
-    reducer.reduce(new Text("canopy"), mapWriter.getValue(new Text("canopy")),
+    reducer.reduce(new Text("0"), mapWriter.getValue(new Text("0")),
         reduceContext);
     reducer.cleanup(reduceContext);
-    
+
     // now verify the output
     assertEquals("Number of canopies", reducerReference.size(), reduceWriter
         .getKeys().size());
-    
+
     // add all points to the reference canopy maps
-    Map<String,MeanShiftCanopy> reducerReferenceMap = Maps.newHashMap();
+    Map<String, MeanShiftCanopy> reducerReferenceMap = Maps.newHashMap();
     for (MeanShiftCanopy canopy : reducerReference) {
       reducerReferenceMap.put(canopy.getIdentifier(), canopy);
     }
     // compare the maps
-    for (Map.Entry<String,MeanShiftCanopy> mapEntry : reducerReferenceMap
+    for (Map.Entry<String, MeanShiftCanopy> mapEntry : reducerReferenceMap
         .entrySet()) {
       MeanShiftCanopy refCanopy = mapEntry.getValue();
-      
+
       List<MeanShiftCanopy> values = reduceWriter.getValue(new Text((refCanopy
-          .isConverged() ? "MSV-" : "MSC-") + refCanopy.getId()));
+          .isConverged() ? "MSV-" : "MSC-")
+          + refCanopy.getId()));
       assertEquals("values", 1, values.size());
       MeanShiftCanopy reducerCanopy = values.get(0);
       assertEquals("ids", refCanopy.getId(), reducerCanopy.getId());
@@ -331,9 +336,11 @@ public final class TestMeanShift extends
           reducerCenter);
       assertEquals("bound points", refCanopy.getBoundPoints().toList().size(),
           reducerCanopy.getBoundPoints().toList().size());
+      assertEquals("num bound points", refCanopy.getNumPoints(), reducerCanopy
+          .getNumPoints());
     }
   }
-  
+
   /**
    * Story: User can produce final point clustering using a Hadoop map/reduce
    * job and a EuclideanDistanceMeasure.
@@ -356,7 +363,7 @@ public final class TestMeanShift extends
     Path output = getTestTempDirPath("output");
     // MeanShiftCanopyDriver.runJob(input, output,
     // EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10, false, false);
-    String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION),
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
         getTestTempDirPath("testdata").toString(),
         optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
         optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
@@ -368,7 +375,7 @@ public final class TestMeanShift extends
         optKey(DefaultOptionCreator.CLUSTERING_OPTION),
         optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "7",
         optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.2",
-        optKey(DefaultOptionCreator.OVERWRITE_OPTION)};
+        optKey(DefaultOptionCreator.OVERWRITE_OPTION) };
     ToolRunner.run(conf, new MeanShiftCanopyDriver(), args);
     Path outPart = new Path(output, "clusters-4/part-r-00000");
     long count = HadoopUtil.countRecords(outPart, conf);
@@ -379,11 +386,12 @@ public final class TestMeanShift extends
     // now test the initial clusters to ensure the type of their centers has
     // been retained
     while (iterator.hasNext()) {
-      Cluster canopy = (Cluster) iterator.next();
+      MeanShiftCanopy canopy = (MeanShiftCanopy) iterator.next();
       assertTrue(canopy.getCenter() instanceof DenseVector);
+      assertFalse(canopy.getBoundPoints().isEmpty());
     }
   }
-  
+
   /**
    * Story: User can produce final point clustering using a Hadoop map/reduce
    * job and a EuclideanDistanceMeasure.
@@ -407,7 +415,7 @@ public final class TestMeanShift extends
     System.out.println("Output Path: " + output);
     // MeanShiftCanopyDriver.runJob(input, output,
     // EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10, false, false);
-    String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION),
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
         getTestTempDirPath("testdata").toString(),
         optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
         optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
@@ -421,10 +429,107 @@ public final class TestMeanShift extends
         optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.2",
         optKey(DefaultOptionCreator.OVERWRITE_OPTION),
         optKey(DefaultOptionCreator.METHOD_OPTION),
-        DefaultOptionCreator.SEQUENTIAL_METHOD};
+        DefaultOptionCreator.SEQUENTIAL_METHOD };
+    ToolRunner.run(new Configuration(), new MeanShiftCanopyDriver(), args);
+    Path outPart = new Path(output, "clusters-7/part-r-00000");
+    long count = HadoopUtil.countRecords(outPart, conf);
+    assertEquals("count", 3, count);
+  }
+
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a EuclideanDistanceMeasure.
+   */
+  @Test
+  public void testCanopyEuclideanMRJobNoClustering() throws Exception {
+    Path input = getTestTempDirPath("testdata");
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
+    Collection<VectorWritable> points = Lists.newArrayList();
+    for (Vector v : raw) {
+      points.add(new VectorWritable(v));
+    }
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file2"), fs, conf);
+    // now run the Job using the run() command. Other tests can continue to use
+    // runJob().
+    Path output = getTestTempDirPath("output");
+    // MeanShiftCanopyDriver.runJob(input, output,
+    // EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10, false, false);
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+        getTestTempDirPath("testdata").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+        optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+        EuclideanDistanceMeasure.class.getName(),
+        optKey(DefaultOptionCreator.KERNEL_PROFILE_OPTION),
+        TriangularKernelProfile.class.getName(),
+        optKey(DefaultOptionCreator.T1_OPTION), "4",
+        optKey(DefaultOptionCreator.T2_OPTION), "1",
+        optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "7",
+        optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.2",
+        optKey(DefaultOptionCreator.OVERWRITE_OPTION) };
+    ToolRunner.run(conf, new MeanShiftCanopyDriver(), args);
+    Path outPart = new Path(output, "clusters-3/part-r-00000");
+    long count = HadoopUtil.countRecords(outPart, conf);
+    assertEquals("count", 3, count);
+    Iterator<?> iterator = new SequenceFileValueIterator<Writable>(outPart,
+        true, conf);
+    while (iterator.hasNext()) {
+      MeanShiftCanopy canopy = (MeanShiftCanopy) iterator.next();
+      assertTrue(canopy.getCenter() instanceof DenseVector);
+      assertEquals(1, canopy.getBoundPoints().size());
+    }
+  }
+
+  /**
+   * Story: User can produce final point clustering using a Hadoop map/reduce
+   * job and a EuclideanDistanceMeasure.
+   */
+  @Test
+  public void testCanopyEuclideanSeqJobNoClustering() throws Exception {
+    Path input = getTestTempDirPath("testdata");
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
+    Collection<VectorWritable> points = Lists.newArrayList();
+    for (Vector v : raw) {
+      points.add(new VectorWritable(v));
+    }
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points,
+        getTestTempFilePath("testdata/file2"), fs, conf);
+    // now run the Job using the run() command. Other tests can continue to use
+    // runJob().
+    Path output = getTestTempDirPath("output");
+    System.out.println("Output Path: " + output);
+    // MeanShiftCanopyDriver.runJob(input, output,
+    // EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10, false, false);
+    String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+        getTestTempDirPath("testdata").toString(),
+        optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+        optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+        EuclideanDistanceMeasure.class.getName(),
+        optKey(DefaultOptionCreator.KERNEL_PROFILE_OPTION),
+        TriangularKernelProfile.class.getName(),
+        optKey(DefaultOptionCreator.T1_OPTION), "4",
+        optKey(DefaultOptionCreator.T2_OPTION), "1",
+        optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "7",
+        optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.2",
+        optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+        optKey(DefaultOptionCreator.METHOD_OPTION),
+        DefaultOptionCreator.SEQUENTIAL_METHOD };
     ToolRunner.run(new Configuration(), new MeanShiftCanopyDriver(), args);
     Path outPart = new Path(output, "clusters-7/part-r-00000");
     long count = HadoopUtil.countRecords(outPart, conf);
     assertEquals("count", 3, count);
+    Iterator<?> iterator = new SequenceFileValueIterator<Writable>(outPart,
+        true, conf);
+    while (iterator.hasNext()) {
+      MeanShiftCanopy canopy = (MeanShiftCanopy) iterator.next();
+      assertTrue(canopy.getCenter() instanceof DenseVector);
+      assertEquals(1, canopy.getBoundPoints().size());
+    }
   }
 }

Modified: mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java
URL: http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
--- mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java (original)
+++ mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java Thu Jul 21 21:13:02 2011
@@ -72,7 +72,7 @@ public class DisplayMeanShift extends Di
     int i = 0;
     for (Cluster cluster : CLUSTERS.get(CLUSTERS.size() - 1)) {
       MeanShiftCanopy canopy = (MeanShiftCanopy) cluster;
-      if (canopy.getBoundPoints().toList().size() >= significance
+      if (canopy.getMass() >= significance
           * DisplayClustering.SAMPLE_DATA.size()) {
         g2.setColor(COLORS[Math.min(i++, DisplayClustering.COLORS.length - 1)]);
         int count = 0;



Mime
View raw message