mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeast...@apache.org
Subject svn commit: r966816 [1/2] - in /mahout/trunk: core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/ core/src/main/java/org/apache/mahout/clustering/kmeans/ core/src/main/java/org/apache/mahout/clustering/meanshift/ core/src/test/java/org/apache/...
Date Thu, 22 Jul 2010 19:25:21 GMT
Author: jeastman
Date: Thu Jul 22 19:25:20 2010
New Revision: 966816

URL: http://svn.apache.org/viewvc?rev=966816&view=rev
Log:
MAHOUT-294:
- added method=sequential|mapreduce argument to MeanShift, k-Means and FuzzyK to allow reference implementations to be exercised from the command line
- added/updated unit tests to verify sequential implementations from command line
- all tests run

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterMapper.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java
    mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
    mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
    mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/meanshift/Job.java
    mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
    mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java?rev=966816&r1=966815&r2=966816&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterer.java Thu Jul 22 19:25:20 2010
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.common.distance.DistanceMeasure;
@@ -49,12 +50,10 @@ public class FuzzyKMeansClusterer {
   /**
     * Init the fuzzy k-means clusterer with the distance measure to use for comparison.
     * 
-    * @param measure
-    *          The distance measure to use for comparing clusters against points.
-    * @param convergenceDelta
-    *          When do we define a cluster to have converged?
-    * 
-    * */
+   * @param measure
+   * @param convergenceDelta
+   * @param m
+   */
   public FuzzyKMeansClusterer(DistanceMeasure measure, double convergenceDelta, double m) {
     this.measure = measure;
     this.convergenceDelta = convergenceDelta;
@@ -66,6 +65,64 @@ public class FuzzyKMeansClusterer {
   }
 
   /**
+   * This is the reference k-means implementation. Given its inputs it iterates over the points and clusters
+   * until their centers converge or until the maximum number of iterations is exceeded.
+   * 
+   * @param points
+   *          the input List<Vector> of points
+   * @param clusters
+   *          the initial List<SoftCluster> of clusters
+   * @param measure
+   *          the DistanceMeasure to use
+   * @param threshold
+   *          the double convergence threshold
+   * @param m
+   *          the double "fuzzyness" argument (>1)
+   * @param numIter
+   *          the maximum number of iterations
+   * @return
+   *          a List<List<SoftCluster>> of clusters produced per iteration
+   */
+  public static List<List<SoftCluster>> clusterPoints(List<Vector> points,
+                                                      List<SoftCluster> clusters,
+                                                      DistanceMeasure measure,
+                                                      double threshold,
+                                                      double m,
+                                                      int numIter) {
+    List<List<SoftCluster>> clustersList = new ArrayList<List<SoftCluster>>();
+    clustersList.add(clusters);
+    FuzzyKMeansClusterer clusterer = new FuzzyKMeansClusterer(measure, threshold, m);
+    boolean converged = false;
+    int iteration = 0;
+    for (int iter = 0; !converged && iter < numIter; iter++) {
+      List<SoftCluster> next = new ArrayList<SoftCluster>();
+      List<SoftCluster> cs = clustersList.get(iteration++);
+      for (SoftCluster c : cs) {
+        next.add(new SoftCluster(c.getCenter(), c.getId()));
+      }
+      clustersList.add(next);
+      converged = runFuzzyKMeansIteration(points, clustersList.get(iteration), clusterer);
+    }
+    return clustersList;
+  }
+
+  /**
+   * Perform a single iteration over the points and clusters, assigning points to clusters and returning if
+   * the iterations are completed.
+   * 
+   * @param points
+   *          the List<Vector> having the input points
+   * @param clusterList
+   *          the List<Cluster> clusters
+   */
+  protected static boolean runFuzzyKMeansIteration(List<Vector> points, List<SoftCluster> clusterList, FuzzyKMeansClusterer clusterer) {
+    for (Vector point : points) {
+      clusterer.addPointToClusters(clusterList, point);
+    }
+    return clusterer.testConvergence(clusterList);
+  }
+
+  /**
    * Configure the distance measure from the job
    * 
    * @param job
@@ -103,8 +160,10 @@ public class FuzzyKMeansClusterer {
    *          the Context to emit into
    * @throws InterruptedException 
    */
-  public void emitPointProbToCluster(Vector point, List<SoftCluster> clusters,
-      Mapper<WritableComparable<?>, VectorWritable, Text, FuzzyKMeansInfo>.Context context) throws IOException, InterruptedException {
+  public void emitPointProbToCluster(Vector point,
+                                     List<SoftCluster> clusters,
+                                     Mapper<WritableComparable<?>, VectorWritable, Text, FuzzyKMeansInfo>.Context context)
+      throws IOException, InterruptedException {
 
     List<Double> clusterDistanceList = new ArrayList<Double>();
     for (SoftCluster cluster : clusters) {
@@ -153,9 +212,10 @@ public class FuzzyKMeansClusterer {
     return this.measure;
   }
 
-  public void emitPointToClusters(VectorWritable point, List<SoftCluster> clusters,
-      Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context) throws IOException,
-      InterruptedException {
+  public void emitPointToClusters(VectorWritable point,
+                                  List<SoftCluster> clusters,
+                                  Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context)
+      throws IOException, InterruptedException {
     // calculate point distances for all clusters    
     List<Double> clusterDistanceList = new ArrayList<Double>();
     for (SoftCluster cluster : clusters) {
@@ -177,14 +237,16 @@ public class FuzzyKMeansClusterer {
   /**
    * Emit the point to the cluster with the highest pdf
    */
-  static void emitMostLikelyCluster(Vector point, List<SoftCluster> clusters, Vector clusterPdfList,
-      Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context) throws IOException,
-      InterruptedException {
+  private void emitMostLikelyCluster(Vector point,
+                                     List<SoftCluster> clusters,
+                                     Vector pi,
+                                     Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context)
+      throws IOException, InterruptedException {
     int clusterId = -1;
     double clusterPdf = 0;
     for (int i = 0; i < clusters.size(); i++) {
       // System.out.println("cluster-" + clusters.get(i).getId() + "@ " + ClusterBase.formatVector(center, null));
-      double pdf = clusterPdfList.get(i);
+      double pdf = pi.get(i);
       if (pdf > clusterPdf) {
         clusterId = clusters.get(i).getId();
         clusterPdf = pdf;
@@ -197,9 +259,11 @@ public class FuzzyKMeansClusterer {
   /**
    * Emit the point to all clusters
    */
-  void emitAllClusters(Vector point, List<SoftCluster> clusters, Vector pi,
-      Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context) throws IOException,
-      InterruptedException {
+  private void emitAllClusters(Vector point,
+                               List<SoftCluster> clusters,
+                               Vector pi,
+                               Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context)
+      throws IOException, InterruptedException {
     for (int i = 0; i < clusters.size(); i++) {
       double pdf = pi.get(i);
       if (pdf > threshold) {
@@ -210,77 +274,79 @@ public class FuzzyKMeansClusterer {
   }
 
   /**
-   * This is the reference k-means implementation. Given its inputs it iterates over the points and clusters
-   * until their centers converge or until the maximum number of iterations is exceeded.
-   * 
-   * @param points
-   *          the input List<Vector> of points
-   * @param clusters
-   *          the initial List<SoftCluster> of clusters
-   * @param measure
-   *          the DistanceMeasure to use
-   * @param threshold
-   *          the double convergence threshold
-   * @param m
-   *          the double "fuzzyness" argument (>1)
-   * @param numIter
-   *          the maximum number of iterations
-   * @return
-   *          a List<List<SoftCluster>> of clusters produced per iteration
-   */
-  public static List<List<SoftCluster>> clusterPoints(List<Vector> points, List<SoftCluster> clusters, DistanceMeasure measure,
-      double threshold, double m, int numIter) {
-    List<List<SoftCluster>> clustersList = new ArrayList<List<SoftCluster>>();
-    clustersList.add(clusters);
-    FuzzyKMeansClusterer clusterer = new FuzzyKMeansClusterer(measure, threshold, m);
-    boolean converged = false;
-    int iteration = 0;
-    for (int iter = 0; !converged && iter < numIter; iter++) {
-      List<SoftCluster> next = new ArrayList<SoftCluster>();
-      List<SoftCluster> cs = clustersList.get(iteration++);
-      for (SoftCluster c : cs) {
-        next.add(new SoftCluster(c.getCenter(), c.getId()));
-      }
-      clustersList.add(next);
-      converged = runFuzzyKMeansIteration(points, clustersList.get(iteration), clusterer);
-    }
-    return clustersList;
-  }
-
-  /**
-   * Perform a single iteration over the points and clusters, assigning points to clusters and returning if
-   * the iterations are completed.
-   * 
-   * @param points
-   *          the List<Vector> having the input points
    * @param clusterList
-   *          the List<Cluster> clusters
+   * @param point
    */
-  public static boolean runFuzzyKMeansIteration(List<Vector> points, List<SoftCluster> clusterList, FuzzyKMeansClusterer clusterer) {
-    for (Vector point : points) {
-      List<Double> clusterDistanceList = new ArrayList<Double>();
-      for (SoftCluster cluster : clusterList) {
-        clusterDistanceList.add(clusterer.getMeasure().distance(point, cluster.getCenter()));
-      }
+  protected void addPointToClusters(List<SoftCluster> clusterList, Vector point) {
+    List<Double> clusterDistanceList = new ArrayList<Double>();
+    for (SoftCluster cluster : clusterList) {
+      clusterDistanceList.add(getMeasure().distance(point, cluster.getCenter()));
+    }
 
-      for (int i = 0; i < clusterList.size(); i++) {
-        double probWeight = clusterer.computeProbWeight(clusterDistanceList.get(i), clusterDistanceList);
-        clusterList.get(i).addPoint(point, Math.pow(probWeight, clusterer.getM()));
-      }
+    for (int i = 0; i < clusterList.size(); i++) {
+      double probWeight = computeProbWeight(clusterDistanceList.get(i), clusterDistanceList);
+      clusterList.get(i).addPoint(point, Math.pow(probWeight, getM()));
     }
+  }
+
+  protected boolean testConvergence(List<SoftCluster> clusters) {
     boolean converged = true;
-    for (SoftCluster cluster : clusterList) {
-      if (!clusterer.computeConvergence(cluster)) {
+    for (SoftCluster cluster : clusters) {
+      if (!computeConvergence(cluster)) {
         converged = false;
       }
     }
     // update the cluster centers
     if (!converged) {
-      for (SoftCluster cluster : clusterList) {
+      for (SoftCluster cluster : clusters) {
         cluster.recomputeCenter();
       }
     }
     return converged;
+  }
 
+  public void emitPointToClusters(VectorWritable point, List<SoftCluster> clusters, Writer writer) throws IOException,
+      InterruptedException {
+    // calculate point distances for all clusters    
+    List<Double> clusterDistanceList = new ArrayList<Double>();
+    for (SoftCluster cluster : clusters) {
+      clusterDistanceList.add(getMeasure().distance(cluster.getCenter(), point.get()));
+    }
+    // calculate point pdf for all clusters
+    Vector pi = new DenseVector(clusters.size());
+    for (int i = 0; i < clusters.size(); i++) {
+      double probWeight = computeProbWeight(clusterDistanceList.get(i), clusterDistanceList);
+      pi.set(i, probWeight);
+    }
+    if (emitMostLikely) {
+      emitMostLikelyCluster(point.get(), clusters, pi, writer);
+    } else {
+      emitAllClusters(point.get(), clusters, pi, writer);
+    }
+  }
+
+  private void emitAllClusters(Vector point, List<SoftCluster> clusters, Vector pi, Writer writer) throws IOException {
+    for (int i = 0; i < clusters.size(); i++) {
+      double pdf = pi.get(i);
+      if (pdf > threshold) {
+        // System.out.println("cluster-" + clusterId + ": " + ClusterBase.formatVector(point, null));
+        writer.append(new IntWritable(i), new WeightedVectorWritable(pdf, new VectorWritable(point)));
+      }
+    }
+  }
+
+  private void emitMostLikelyCluster(Vector point, List<SoftCluster> clusters, Vector pi, Writer writer) throws IOException {
+    int clusterId = -1;
+    double clusterPdf = 0;
+    for (int i = 0; i < clusters.size(); i++) {
+      // System.out.println("cluster-" + clusters.get(i).getId() + "@ " + ClusterBase.formatVector(center, null));
+      double pdf = pi.get(i);
+      if (pdf > clusterPdf) {
+        clusterId = clusters.get(i).getId();
+        clusterPdf = pdf;
+      }
+    }
+    // System.out.println("cluster-" + clusterId + ": " + ClusterBase.formatVector(point, null));
+    writer.append(new IntWritable(clusterId), new WeightedVectorWritable(clusterPdf, new VectorWritable(point)));
   }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=966816&r1=966815&r2=966816&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Thu Jul 22 19:25:20 2010
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -37,11 +38,14 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.clustering.kmeans.OutputLogFilter;
 import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
 import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.apache.mahout.math.VectorWritable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,7 +90,10 @@ public class FuzzyKMeansDriver extends A
    *          a boolean if true emit only most likely cluster for each point
    * @param threshold 
    *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
+   * @param runSequential if true run in sequential execution mode
    * @throws IOException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public static void runJob(Path input,
                             Path clustersIn,
@@ -98,7 +105,9 @@ public class FuzzyKMeansDriver extends A
                             float m,
                             boolean runClustering,
                             boolean emitMostLikely,
-                            double threshold) throws IOException, ClassNotFoundException, InterruptedException {
+                            double threshold,
+                            boolean runSequential) throws IOException, ClassNotFoundException, InterruptedException,
+      InstantiationException, IllegalAccessException {
 
     new FuzzyKMeansDriver().job(input,
                                 clustersIn,
@@ -110,7 +119,8 @@ public class FuzzyKMeansDriver extends A
                                 m,
                                 runClustering,
                                 emitMostLikely,
-                                threshold);
+                                threshold,
+                                runSequential);
   }
 
   @Override
@@ -135,6 +145,7 @@ public class FuzzyKMeansDriver extends A
     addOption(DefaultOptionCreator.clusteringOption().create());
     addOption(DefaultOptionCreator.emitMostLikelyOption().create());
     addOption(DefaultOptionCreator.thresholdOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
 
     if (parseArguments(args) == null) {
       return -1;
@@ -162,6 +173,7 @@ public class FuzzyKMeansDriver extends A
           .get(DefaultOptionCreator.NUM_CLUSTERS_OPTION)));
     }
     boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
+    boolean runSequential = (getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD));
     job(input,
         clusters,
         output,
@@ -172,7 +184,8 @@ public class FuzzyKMeansDriver extends A
         fuzziness,
         runClustering,
         emitMostLikely,
-        threshold);
+        threshold,
+        runSequential);
     return 0;
   }
 
@@ -275,9 +288,12 @@ public class FuzzyKMeansDriver extends A
    *          a boolean if true emit only most likely cluster for each point
    * @param threshold 
    *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
+   * @param runSequential if true run in sequential execution mode
    * @throws IOException
    * @throws ClassNotFoundException
    * @throws InterruptedException
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public void job(Path input,
                   Path clustersIn,
@@ -289,18 +305,32 @@ public class FuzzyKMeansDriver extends A
                   float m,
                   boolean runClustering,
                   boolean emitMostLikely,
-                  double threshold) throws IOException, ClassNotFoundException, InterruptedException {
-    Path clustersOut = buildClusters(input, clustersIn, output, measureClass, convergenceDelta, maxIterations, numReduceTasks, m);
+                  double threshold,
+                  boolean runSequential) throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+      IllegalAccessException {
+    ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+    Class<?> cl = ccl.loadClass(measureClass);
+    DistanceMeasure measure = (DistanceMeasure) cl.newInstance();
+    Path clustersOut = buildClusters(input,
+                                     clustersIn,
+                                     output,
+                                     measure,
+                                     convergenceDelta,
+                                     maxIterations,
+                                     numReduceTasks,
+                                     m,
+                                     runSequential);
     if (runClustering) {
       log.info("Clustering ");
       clusterData(input,
                   clustersOut,
                   new Path(output, Cluster.CLUSTERED_POINTS_DIR),
-                  measureClass,
+                  measure,
                   convergenceDelta,
                   m,
                   emitMostLikely,
-                  threshold);
+                  threshold,
+                  runSequential);
     }
   }
 
@@ -313,7 +343,7 @@ public class FuzzyKMeansDriver extends A
    *          the directory pathname for initial & computed clusters
    * @param output
    *          the directory pathname for output points
-   * @param measureClass
+   * @param measure
    *          the classname of the DistanceMeasure
    * @param convergenceDelta
    *          the convergence delta value
@@ -324,17 +354,116 @@ public class FuzzyKMeansDriver extends A
    * @param m
    *          the fuzzification factor, see
    *          http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
+   * @param runSequential if true run in sequential execution mode
    * @return the Path of the final clusters directory
    * @throws IOException
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public Path buildClusters(Path input,
                             Path clustersIn,
                             Path output,
-                            String measureClass,
+                            DistanceMeasure measure,
                             double convergenceDelta,
                             int maxIterations,
                             int numReduceTasks,
-                            float m) throws IOException {
+                            float m,
+                            boolean runSequential) throws IOException, InstantiationException, IllegalAccessException {
+    if (runSequential) {
+      return buildClustersSeq(input, clustersIn, output, measure, convergenceDelta, maxIterations, numReduceTasks, m);
+
+    } else {
+      return buildClustersMR(input, clustersIn, output, measure, convergenceDelta, maxIterations, numReduceTasks, m);
+    }
+  }
+
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @param convergenceDelta
+   * @param maxIterations
+   * @param numReduceTasks
+   * @param m
+   * @return
+   * @throws IOException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
+   */
+  private Path buildClustersSeq(Path input,
+                                Path clustersIn,
+                                Path output,
+                                DistanceMeasure measure,
+                                double convergenceDelta,
+                                int maxIterations,
+                                int numReduceTasks,
+                                float m) throws IOException, InstantiationException, IllegalAccessException {
+    FuzzyKMeansClusterer clusterer = new FuzzyKMeansClusterer(measure, convergenceDelta, m);
+    List<SoftCluster> clusters = new ArrayList<SoftCluster>();
+
+    FuzzyKMeansUtil.configureWithClusterInfo(clustersIn, clusters);
+    if (clusters.isEmpty()) {
+      throw new IllegalStateException("Clusters is empty!");
+    }
+    boolean converged = false;
+    int iteration = 1;
+    while (!converged && iteration <= maxIterations) {
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.get(input.toUri(), conf);
+      FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
+      for (FileStatus s : status) {
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
+        try {
+          WritableComparable<?> key = (WritableComparable<?>) reader.getKeyClass().newInstance();
+          VectorWritable vw = (VectorWritable) reader.getValueClass().newInstance();
+          while (reader.next(key, vw)) {
+            clusterer.addPointToClusters(clusters, vw.get());
+            vw = (VectorWritable) reader.getValueClass().newInstance();
+          }
+        } finally {
+          reader.close();
+        }
+      }
+      converged = clusterer.testConvergence(clusters);
+      Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(clustersOut, "part-r-00000"),
+                                                           Text.class,
+                                                           SoftCluster.class);
+      try {
+        for (SoftCluster cluster : clusters) {
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        writer.close();
+      }
+      clustersIn = clustersOut;
+    }
+    return clustersIn;
+  }
+
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @param measure.getClass().
+   * @param convergenceDelta
+   * @param maxIterations
+   * @param numReduceTasks
+   * @param m
+   * @return
+   * @throws IOException
+   */
+  private Path buildClustersMR(Path input,
+                               Path clustersIn,
+                               Path output,
+                               DistanceMeasure measure,
+                               double convergenceDelta,
+                               int maxIterations,
+                               int numReduceTasks,
+                               float m) throws IOException {
     boolean converged = false;
     int iteration = 1;
 
@@ -344,7 +473,14 @@ public class FuzzyKMeansDriver extends A
 
       // point the output to a new directory per iteration
       Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
-      converged = runIteration(input, clustersIn, clustersOut, measureClass, convergenceDelta, numReduceTasks, iteration, m);
+      converged = runIteration(input,
+                               clustersIn,
+                               clustersOut,
+                               measure.getClass().getName(),
+                               convergenceDelta,
+                               numReduceTasks,
+                               iteration,
+                               m);
 
       // now point the input to the old output directory
       clustersIn = clustersOut;
@@ -362,7 +498,7 @@ public class FuzzyKMeansDriver extends A
    *          the directory pathname for input clusters
    * @param output
    *          the directory pathname for output points
-   * @param measureClass
+   * @param measure
    *          the classname of the DistanceMeasure
    * @param convergenceDelta
    *          the convergence delta value
@@ -370,20 +506,107 @@ public class FuzzyKMeansDriver extends A
    *          a boolean if true emit only most likely cluster for each point
    * @param threshold 
    *          a double threshold value emits all clusters having greater pdf (emitMostLikely = false)
+   * @param runSequential if true run in sequential execution mode
    * @throws IOException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public void clusterData(Path input,
                           Path clustersIn,
                           Path output,
-                          String measureClass,
+                          DistanceMeasure measure,
                           double convergenceDelta,
                           float m,
                           boolean emitMostLikely,
-                          double threshold) throws IOException, ClassNotFoundException, InterruptedException {
+                          double threshold,
+                          boolean runSequential) throws IOException, ClassNotFoundException, InterruptedException,
+      InstantiationException, IllegalAccessException {
+    if (runSequential) {
+      clusterDataSeq(input, clustersIn, output, measure, convergenceDelta, m, emitMostLikely, threshold);
+    } else {
+      clusterDataMR(input, clustersIn, output, measure, convergenceDelta, m, emitMostLikely, threshold);
+    }
+  }
+
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @param convergenceDelta
+   * @param m
+   * @param emitMostLikely
+   * @param threshold
+   * @throws IOException 
+   * @throws InterruptedException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
+   */
+  private void clusterDataSeq(Path input,
+                              Path clustersIn,
+                              Path output,
+                              DistanceMeasure measure,
+                              double convergenceDelta,
+                              float m,
+                              boolean emitMostLikely,
+                              double threshold) throws IOException, InterruptedException, InstantiationException,
+      IllegalAccessException {
+    FuzzyKMeansClusterer clusterer = new FuzzyKMeansClusterer(measure, convergenceDelta, m);
+    List<SoftCluster> clusters = new ArrayList<SoftCluster>();
+    FuzzyKMeansUtil.configureWithClusterInfo(clustersIn, clusters);
+    if (clusters.isEmpty()) {
+      throw new IllegalStateException("Clusters is empty!");
+    }
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
+    FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
+    int part = 0;
+    for (FileStatus s : status) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(output, "part-m-" + part),
+                                                           IntWritable.class,
+                                                           WeightedVectorWritable.class);
+      try {
+        WritableComparable<?> key = (WritableComparable<?>) reader.getKeyClass().newInstance();
+        VectorWritable vw = (VectorWritable) reader.getValueClass().newInstance();
+        while (reader.next(key, vw)) {
+          clusterer.emitPointToClusters(vw, clusters, writer);
+          vw = (VectorWritable) reader.getValueClass().newInstance();
+        }
+      } finally {
+        reader.close();
+        writer.close();
+      }
+    }
+
+  }
 
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @param convergenceDelta
+   * @param m
+   * @param emitMostLikely
+   * @param threshold
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private void clusterDataMR(Path input,
+                             Path clustersIn,
+                             Path output,
+                             DistanceMeasure measure,
+                             double convergenceDelta,
+                             float m,
+                             boolean emitMostLikely,
+                             double threshold) throws IOException, InterruptedException, ClassNotFoundException {
     Configuration conf = new Configuration();
     conf.set(FuzzyKMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
-    conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
+    conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
     conf.set(FuzzyKMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, String.valueOf(convergenceDelta));
     conf.set(FuzzyKMeansConfigKeys.M_KEY, String.valueOf(m));
     conf.set(FuzzyKMeansConfigKeys.EMIT_MOST_LIKELY_KEY, Boolean.toString(emitMostLikely));

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java?rev=966816&r1=966815&r2=966816&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterer.java Thu Jul 22 19:25:20 2010
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.mahout.clustering.WeightedVectorWritable;
 import org.apache.mahout.common.distance.DistanceMeasure;
@@ -55,7 +56,7 @@ public class KMeansClusterer {
 
   /**
    * Iterates over all clusters and identifies the one closes to the given point. Distance measure used is
-   * configured at creation time of .
+   * configured at creation time.
    * 
    * @param point
    *          a point to find a cluster for.
@@ -64,8 +65,10 @@ public class KMeansClusterer {
    * @throws InterruptedException 
    * @throws IOException 
    */
-  public void emitPointToNearestCluster(Vector point, List<Cluster> clusters,
-      Mapper<WritableComparable<?>, VectorWritable, Text, KMeansInfo>.Context context) throws IOException, InterruptedException {
+  public void emitPointToNearestCluster(Vector point,
+                                        List<Cluster> clusters,
+                                        Mapper<WritableComparable<?>, VectorWritable, Text, KMeansInfo>.Context context)
+      throws IOException, InterruptedException {
     Cluster nearestCluster = null;
     double nearestDistance = Double.MAX_VALUE;
     for (Cluster cluster : clusters) {
@@ -82,8 +85,52 @@ public class KMeansClusterer {
     context.write(new Text(nearestCluster.getIdentifier()), new KMeansInfo(1, point));
   }
 
-  public void outputPointWithClusterInfo(Vector vector, List<Cluster> clusters,
-      Mapper<WritableComparable<?>,VectorWritable,IntWritable,WeightedVectorWritable>.Context context) throws IOException, InterruptedException {
+  /**
+   * Sequential implementation to add point to the nearest cluster
+   * @param point
+   * @param clusters
+   */
+  protected void addPointToNearestCluster(Vector point, List<Cluster> clusters) {
+    Cluster closestCluster = null;
+    double closestDistance = Double.MAX_VALUE;
+    for (Cluster cluster : clusters) {
+      double distance = measure.distance(cluster.getCenter(), point);
+      if (closestCluster == null || closestDistance > distance) {
+        closestCluster = cluster;
+        closestDistance = distance;
+      }
+    }
+    closestCluster.addPoint(point);
+  }
+
+  /**
+   * Sequential implementation to test convergence and update cluster centers
+   * 
+   * @param clusters
+   * @param distanceThreshold
+   * @return
+   */
+  protected boolean testConvergence(List<Cluster> clusters, double distanceThreshold) {
+    // test for convergence
+    boolean converged = true;
+    for (Cluster cluster : clusters) {
+      if (!cluster.computeConvergence(measure, distanceThreshold)) {
+        converged = false;
+      }
+    }
+    // update the cluster centers
+    if (!converged) {
+      for (Cluster cluster : clusters) {
+        cluster.recomputeCenter();
+      }
+    }
+    return converged;
+  }
+
+  public void outputPointWithClusterInfo(Vector vector,
+                                         List<Cluster> clusters,
+                                         Mapper<WritableComparable<?>, VectorWritable, IntWritable, WeightedVectorWritable>.Context context)
+      throws IOException, InterruptedException {
     Cluster nearestCluster = null;
     double nearestDistance = Double.MAX_VALUE;
     for (Cluster cluster : clusters) {
@@ -98,6 +145,35 @@ public class KMeansClusterer {
   }
 
   /**
+   * Iterates over all clusters and identifies the one closes to the given point. Distance measure used is
+   * configured at creation time.
+   * 
+   * @param point
+   *          a point to find a cluster for.
+   * @param clusters
+   *          a List<Cluster> to test.
+   * @throws InterruptedException 
+   * @throws IOException 
+   */
+  protected void emitPointToNearestCluster(Vector point, List<Cluster> clusters, Writer writer) throws IOException,
+      InterruptedException {
+    Cluster nearestCluster = null;
+    double nearestDistance = Double.MAX_VALUE;
+    for (Cluster cluster : clusters) {
+      Vector clusterCenter = cluster.getCenter();
+      double distance = this.measure.distance(clusterCenter.getLengthSquared(), clusterCenter, point);
+      if (log.isDebugEnabled()) {
+        log.debug("{} Cluster: {}", distance, cluster.getId());
+      }
+      if ((distance < nearestDistance) || (nearestCluster == null)) {
+        nearestCluster = cluster;
+        nearestDistance = distance;
+      }
+    }
+    writer.append(new IntWritable(nearestCluster.getId()), new WeightedVectorWritable(1, new VectorWritable(point)));
+  }
+
+  /**
    * This is the reference k-means implementation. Given its inputs it iterates over the points and clusters
    * until their centers converge or until the maximum number of iterations is exceeded.
    * 
@@ -110,8 +186,11 @@ public class KMeansClusterer {
    * @param maxIter
    *          the maximum number of iterations
    */
-  public static List<List<Cluster>> clusterPoints(List<Vector> points, List<Cluster> clusters, DistanceMeasure measure,
-      int maxIter, double distanceThreshold) {
+  public static List<List<Cluster>> clusterPoints(List<Vector> points,
+                                                  List<Cluster> clusters,
+                                                  DistanceMeasure measure,
+                                                  int maxIter,
+                                                  double distanceThreshold) {
     List<List<Cluster>> clustersList = new ArrayList<List<Cluster>>();
     clustersList.add(clusters);
 
@@ -140,35 +219,16 @@ public class KMeansClusterer {
    *          a DistanceMeasure to use
    * @return
    */
-  public static boolean runKMeansIteration(List<Vector> points, List<Cluster> clusters, DistanceMeasure measure,
-      double distanceThreshold) {
+  protected static boolean runKMeansIteration(List<Vector> points,
+                                              List<Cluster> clusters,
+                                              DistanceMeasure measure,
+                                              double distanceThreshold) {
     // iterate through all points, assigning each to the nearest cluster
+    KMeansClusterer clusterer = new KMeansClusterer(measure);
     for (Vector point : points) {
-      Cluster closestCluster = null;
-      double closestDistance = Double.MAX_VALUE;
-      for (Cluster cluster : clusters) {
-        double distance = measure.distance(cluster.getCenter(), point);
-        if (closestCluster == null || closestDistance > distance) {
-          closestCluster = cluster;
-          closestDistance = distance;
-        }
-      }
-      closestCluster.addPoint(point);
+      clusterer.addPointToNearestCluster(point, clusters);
     }
-    // test for convergence
-    boolean converged = true;
-    for (Cluster cluster : clusters) {
-      if (!cluster.computeConvergence(measure, distanceThreshold)) {
-        converged = false;
-      }
-    }
-    // update the cluster centers
-    if (!converged) {
-      for (Cluster cluster : clusters) {
-        cluster.recomputeCenter();
-      }
-    }
-    return converged;
+    return clusterer.testConvergence(clusters, distanceThreshold);
   }
 
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=966816&r1=966815&r2=966816&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Thu Jul 22 19:25:20 2010
@@ -17,6 +17,8 @@
 package org.apache.mahout.clustering.kmeans;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -26,6 +28,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -35,6 +38,7 @@ import org.apache.mahout.clustering.Weig
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
 import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
 import org.apache.mahout.math.VectorWritable;
 import org.slf4j.Logger;
@@ -70,8 +74,11 @@ public class KMeansDriver extends Abstra
    *          the number of reducers
    * @param runClustering 
    *          true if points are to be clustered after iterations are completed
+   * @param runSequential if true execute sequential algorithm 
    * @throws ClassNotFoundException 
    * @throws InterruptedException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public static void runJob(Path input,
                             Path clustersIn,
@@ -80,8 +87,18 @@ public class KMeansDriver extends Abstra
                             double convergenceDelta,
                             int maxIterations,
                             int numReduceTasks,
-                            boolean runClustering) throws IOException, InterruptedException, ClassNotFoundException {
-    new KMeansDriver().job(input, clustersIn, output, measureClass, convergenceDelta, maxIterations, numReduceTasks, runClustering);
+                            boolean runClustering,
+                            boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
+    new KMeansDriver().job(input,
+                           clustersIn,
+                           output,
+                           measureClass,
+                           convergenceDelta,
+                           maxIterations,
+                           numReduceTasks,
+                           runClustering,
+                           runSequential);
   }
 
   @Override
@@ -102,6 +119,7 @@ public class KMeansDriver extends Abstra
     addOption(DefaultOptionCreator.overwriteOption().create());
     addOption(DefaultOptionCreator.numReducersOption().create());
     addOption(DefaultOptionCreator.clusteringOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
 
     if (parseArguments(args) == null) {
       return -1;
@@ -125,7 +143,8 @@ public class KMeansDriver extends Abstra
           .parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)));
     }
     boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
-    job(input, clusters, output, measureClass, convergenceDelta, maxIterations, numReduceTasks, runClustering);
+    boolean runSequential = (getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD));
+    job(input, clusters, output, measureClass, convergenceDelta, maxIterations, numReduceTasks, runClustering, runSequential);
     return 0;
   }
 
@@ -149,18 +168,27 @@ public class KMeansDriver extends Abstra
    *          the number of reducers
    * @param runClustering 
    *          true if points are to be clustered after iterations are completed
+   * @param runSequential if true execute sequential algorithm
    * @throws IOException
    * @throws InterruptedException
    * @throws ClassNotFoundException
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public void job(Path input,
-                   Path clustersIn,
-                   Path output,
-                   String measureClass,
-                   double convergenceDelta,
-                   int maxIterations,
-                   int numReduceTasks,
-                   boolean runClustering) throws IOException, InterruptedException, ClassNotFoundException {
+                  Path clustersIn,
+                  Path output,
+                  String measureClass,
+                  double convergenceDelta,
+                  int maxIterations,
+                  int numReduceTasks,
+                  boolean runClustering,
+                  boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException,
+      IllegalAccessException {
+    ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+    Class<?> cl = ccl.loadClass(measureClass);
+    DistanceMeasure measure = (DistanceMeasure) cl.newInstance();
+
     // iterate until the clusters converge
     String delta = Double.toString(convergenceDelta);
     if (log.isInfoEnabled()) {
@@ -168,10 +196,10 @@ public class KMeansDriver extends Abstra
       log.info("convergence: {} max Iterations: {} num Reduce Tasks: {} Input Vectors: {}", new Object[] { convergenceDelta,
           maxIterations, numReduceTasks, VectorWritable.class.getName() });
     }
-    Path clustersOut = buildClusters(input, clustersIn, output, measureClass, maxIterations, numReduceTasks, delta);
+    Path clustersOut = buildClusters(input, clustersIn, output, measure, maxIterations, numReduceTasks, delta, runSequential);
     if (runClustering) {
       log.info("Clustering data");
-      clusterData(input, clustersOut, new Path(output, Cluster.CLUSTERED_POINTS_DIR), measureClass, delta);
+      clusterData(input, clustersOut, new Path(output, Cluster.CLUSTERED_POINTS_DIR), measure, delta, runSequential);
     }
   }
 
@@ -184,33 +212,132 @@ public class KMeansDriver extends Abstra
    *          the directory pathname for initial & computed clusters
    * @param output
    *          the directory pathname for output points
-   * @param measureClass
+   * @param measure
    *          the classname of the DistanceMeasure
-   * @param convergenceDelta
-   *          the convergence delta value
    * @param maxIterations
    *          the maximum number of iterations
    * @param numReduceTasks
    *          the number of reducers
+   * @param runSequential if true execute sequential algorithm
+   * @param convergenceDelta
+   *          the convergence delta value
    * @return the Path of the final clusters directory
    * @throws IOException
    * @throws InterruptedException
    * @throws ClassNotFoundException
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public Path buildClusters(Path input,
-                             Path clustersIn,
-                             Path output,
-                             String measureClass,
-                             int maxIterations,
-                             int numReduceTasks,
-                             String delta) throws IOException, InterruptedException, ClassNotFoundException {
+                            Path clustersIn,
+                            Path output,
+                            DistanceMeasure measure,
+                            int maxIterations,
+                            int numReduceTasks,
+                            String delta,
+                            boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
+    if (runSequential) {
+      return buildClustersSeq(input, clustersIn, output, measure, maxIterations, numReduceTasks, delta);
+    } else {
+      return buildClustersMR(input, clustersIn, output, measure, maxIterations, numReduceTasks, delta);
+    }
+  }
+
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @param maxIterations
+   * @param numReduceTasks
+   * @param delta
+   * @return
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
+   * @throws IOException 
+   * @throws ClassNotFoundException 
+   */
+  private Path buildClustersSeq(Path input,
+                                Path clustersIn,
+                                Path output,
+                                DistanceMeasure measure,
+                                int maxIterations,
+                                int numReduceTasks,
+                                String delta) throws InstantiationException, IllegalAccessException, IOException,
+      ClassNotFoundException {
+    KMeansClusterer clusterer = new KMeansClusterer(measure);
+    List<Cluster> clusters = new ArrayList<Cluster>();
+
+    KMeansUtil.configureWithClusterInfo(clustersIn, clusters);
+    if (clusters.isEmpty()) {
+      throw new IllegalStateException("Clusters is empty!");
+    }
+    boolean converged = false;
+    int iteration = 1;
+    while (!converged && iteration <= maxIterations) {
+      Configuration conf = new Configuration();
+      FileSystem fs = FileSystem.get(input.toUri(), conf);
+      FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
+      for (FileStatus s : status) {
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
+        try {
+          WritableComparable<?> key = (WritableComparable<?>) reader.getKeyClass().newInstance();
+          VectorWritable vw = (VectorWritable) reader.getValueClass().newInstance();
+          while (reader.next(key, vw)) {
+            clusterer.addPointToNearestCluster(vw.get(), clusters);
+            vw = (VectorWritable) reader.getValueClass().newInstance();
+          }
+        } finally {
+          reader.close();
+        }
+      }
+      converged = clusterer.testConvergence(clusters, Double.parseDouble(delta));
+      Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(clustersOut, "part-r-00000"),
+                                                           Text.class,
+                                                           Cluster.class);
+      try {
+        for (Cluster cluster : clusters) {
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        writer.close();
+      }
+      clustersIn = clustersOut;
+    }
+    return clustersIn;
+  }
+
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @param maxIterations
+   * @param numReduceTasks
+   * @param delta
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private Path buildClustersMR(Path input,
+                               Path clustersIn,
+                               Path output,
+                               DistanceMeasure measure,
+                               int maxIterations,
+                               int numReduceTasks,
+                               String delta) throws IOException, InterruptedException, ClassNotFoundException {
     boolean converged = false;
     int iteration = 1;
     while (!converged && (iteration <= maxIterations)) {
       log.info("Iteration {}", iteration);
       // point the output to a new directory per iteration
       Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
-      converged = runIteration(input, clustersIn, clustersOut, measureClass, delta, numReduceTasks);
+      converged = runIteration(input, clustersIn, clustersOut, measure.getClass().getName(), delta, numReduceTasks);
       // now point the input to the old output directory
       clustersIn = clustersOut;
       iteration++;
@@ -323,23 +450,95 @@ public class KMeansDriver extends Abstra
    *          the directory pathname for input clusters
    * @param output
    *          the directory pathname for output points
-   * @param measureClass
+   * @param measure
    *          the classname of the DistanceMeasure
    * @param convergenceDelta
    *          the convergence delta value
+   * @param runSequential if true execute sequential algorithm
    * @throws ClassNotFoundException 
    * @throws InterruptedException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
-  public void clusterData(Path input, Path clustersIn, Path output, String measureClass, String convergenceDelta)
-      throws IOException, InterruptedException, ClassNotFoundException {
+  public void clusterData(Path input,
+                          Path clustersIn,
+                          Path output,
+                          DistanceMeasure measure,
+                          String convergenceDelta,
+                          boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
     if (log.isInfoEnabled()) {
       log.info("Running Clustering");
-      log.info("Input: {} Clusters In: {} Out: {} Distance: {}", new Object[] { input, clustersIn, output, measureClass });
+      log.info("Input: {} Clusters In: {} Out: {} Distance: {}", new Object[] { input, clustersIn, output, measure });
       log.info("convergence: {} Input Vectors: {}", convergenceDelta, VectorWritable.class.getName());
     }
+    if (runSequential) {
+      clusterDataSeq(input, clustersIn, output, measure, convergenceDelta);
+    } else {
+      clusterDataMR(input, clustersIn, output, measure, convergenceDelta);
+    }
+  }
+
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @param convergenceDelta
+   * @throws InterruptedException 
+   * @throws IOException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
+   */
+  private void clusterDataSeq(Path input, Path clustersIn, Path output, DistanceMeasure measure, String convergenceDelta)
+      throws IOException, InterruptedException, InstantiationException, IllegalAccessException {
+    KMeansClusterer clusterer = new KMeansClusterer(measure);
+    List<Cluster> clusters = new ArrayList<Cluster>();
+    KMeansUtil.configureWithClusterInfo(clustersIn, clusters);
+    if (clusters.isEmpty()) {
+      throw new IllegalStateException("Clusters is empty!");
+    }
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
+    FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
+    int part = 0;
+    for (FileStatus s : status) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(output, "part-m-" + part),
+                                                           IntWritable.class,
+                                                           WeightedVectorWritable.class);
+      try {
+        WritableComparable<?> key = (WritableComparable<?>) reader.getKeyClass().newInstance();
+        VectorWritable vw = (VectorWritable) reader.getValueClass().newInstance();
+        while (reader.next(key, vw)) {
+          clusterer.emitPointToNearestCluster(vw.get(), clusters, writer);
+          vw = (VectorWritable) reader.getValueClass().newInstance();
+        }
+      } finally {
+        reader.close();
+        writer.close();
+      }
+    }
+
+  }
+
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @param convergenceDelta
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private void clusterDataMR(Path input, Path clustersIn, Path output, DistanceMeasure measure, String convergenceDelta)
+      throws IOException, InterruptedException, ClassNotFoundException {
     Configuration conf = new Configuration();
     conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
-    conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
+    conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass().getName());
     conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
 
     Job job = new Job(conf);

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterMapper.java?rev=966816&r1=966815&r2=966816&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterMapper.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterMapper.java Thu Jul 22 19:25:20 2010
@@ -66,15 +66,15 @@ public class MeanShiftCanopyClusterMappe
     }
   }
 
-  public static List<MeanShiftCanopy> getCanopies(Configuration configuration) {
-    String statePath = configuration.get(MeanShiftCanopyDriver.STATE_IN_KEY);
+  public static List<MeanShiftCanopy> getCanopies(Configuration conf) {
+    String statePath = conf.get(MeanShiftCanopyDriver.STATE_IN_KEY);
     List<MeanShiftCanopy> canopies = new ArrayList<MeanShiftCanopy>();
     try {
       Path path = new Path(statePath);
-      FileSystem fs = FileSystem.get(path.toUri(), configuration);
+      FileSystem fs = FileSystem.get(path.toUri(), conf);
       FileStatus[] status = fs.listStatus(path, new OutputLogFilter());
       for (FileStatus s : status) {
-        SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), configuration);
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
         try {
           Text key = new Text();
           MeanShiftCanopy canopy = new MeanShiftCanopy();

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java?rev=966816&r1=966815&r2=966816&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java Thu Jul 22 19:25:20 2010
@@ -30,8 +30,6 @@ public class MeanShiftCanopyClusterer {
 
   private double convergenceDelta;
 
-  // the next canopyId to be allocated
-  // private int nextCanopyId = 0;
   // the T1 distance threshold
   private double t1;
 
@@ -166,29 +164,37 @@ public class MeanShiftCanopyClusterer {
                                                     double t2,
                                                     int numIter) {
     MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(measure, t1, t2, convergenceThreshold);
+    int nextCanopyId = 0;
 
     List<MeanShiftCanopy> canopies = new ArrayList<MeanShiftCanopy>();
-    int nextCanopyId = 0;
     for (Vector point : points) {
       clusterer.mergeCanopy(new MeanShiftCanopy(point, nextCanopyId++), canopies);
     }
-
-    boolean converged = false;
-    for (int iter = 0; !converged && iter < numIter; iter++) {
-      converged = true;
-      List<MeanShiftCanopy> migratedCanopies = new ArrayList<MeanShiftCanopy>();
-      for (MeanShiftCanopy canopy : canopies) {
-        converged = clusterer.shiftToMean(canopy) && converged;
-        clusterer.mergeCanopy(canopy, migratedCanopies);
-      }
-      canopies = migratedCanopies;
-
-      //verifyNonOverlap(canopies); useful for debugging
+    List<MeanShiftCanopy> newCanopies = canopies;
+    boolean[] converged = { false };
+    for (int iter = 0; !converged[0] && iter < numIter; iter++) {
+      newCanopies = clusterer.iterate(newCanopies, converged);
     }
+    canopies = newCanopies;
     return canopies;
   }
 
-   @SuppressWarnings("unused")
+  /**
+   * @param canopies
+   * @param converged
+   * @return
+   */
+  protected List<MeanShiftCanopy> iterate(List<MeanShiftCanopy> canopies, boolean[] converged) {
+    converged[0] = true;
+    List<MeanShiftCanopy> migratedCanopies = new ArrayList<MeanShiftCanopy>();
+    for (MeanShiftCanopy canopy : canopies) {
+      converged[0] = shiftToMean(canopy) && converged[0];
+      mergeCanopy(canopy, migratedCanopies);
+    }
+    return migratedCanopies;
+  }
+
+  @SuppressWarnings("unused")
   private static void verifyNonOverlap(List<MeanShiftCanopy> canopies) {
     Set<Integer> coveredPoints = new HashSet<Integer>();
     // verify no overlap
@@ -203,4 +209,22 @@ public class MeanShiftCanopyClusterer {
     }
   }
 
+  /**
+   * @param canopy
+   * @param clusters
+   * @return
+   */
+  protected MeanShiftCanopy findCoveringCanopy(MeanShiftCanopy canopy, List<MeanShiftCanopy> clusters) {
+    // canopies use canopyIds assigned when input vectors are processed as vectorIds too
+    int vectorId = canopy.getId();
+    for (MeanShiftCanopy msc : clusters) {
+      for (int containedId : msc.getBoundPoints().toList()) {
+        if (vectorId == containedId) {
+          return msc;
+        }
+      }
+    }
+    return null;
+  }
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java?rev=966816&r1=966815&r2=966816&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java Thu Jul 22 19:25:20 2010
@@ -18,12 +18,17 @@
 package org.apache.mahout.clustering.meanshift;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -31,9 +36,13 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.WeightedVectorWritable;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.kmeans.OutputLogFilter;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.VectorWritable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,8 +84,11 @@ public class MeanShiftCanopyDriver exten
               true if the input path already contains MeanShiftCanopies and does not need to be converted from Vectors
    * @param runClustering 
    *          true if the input points are to be clustered once the iterations complete
+   * @param runSequential if true run in sequential execution mode
    * @throws ClassNotFoundException 
    * @throws InterruptedException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public static void runJob(Path input,
                             Path output,
@@ -86,7 +98,9 @@ public class MeanShiftCanopyDriver exten
                             double convergenceDelta,
                             int maxIterations,
                             boolean inputIsCanopies,
-                            boolean runClustering) throws IOException, InterruptedException, ClassNotFoundException {
+                            boolean runClustering,
+                            boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
     new MeanShiftCanopyDriver().job(input,
                                     output,
                                     measureClassName,
@@ -95,7 +109,8 @@ public class MeanShiftCanopyDriver exten
                                     convergenceDelta,
                                     maxIterations,
                                     inputIsCanopies,
-                                    runClustering);
+                                    runClustering,
+                                    runSequential);
   }
 
   /* (non-Javadoc)
@@ -115,6 +130,7 @@ public class MeanShiftCanopyDriver exten
     addOption(DefaultOptionCreator.t1Option().create());
     addOption(DefaultOptionCreator.t2Option().create());
     addOption(DefaultOptionCreator.clusteringOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
 
     if (parseArguments(args) == null) {
       return -1;
@@ -132,8 +148,9 @@ public class MeanShiftCanopyDriver exten
     double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
     int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
     boolean inputIsCanopies = hasOption(INPUT_IS_CANOPIES_OPTION);
+    boolean runSequential = (getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD));
 
-    job(input, output, measureClass, t1, t2, convergenceDelta, maxIterations, inputIsCanopies, runClustering);
+    job(input, output, measureClass, t1, t2, convergenceDelta, maxIterations, inputIsCanopies, runClustering, runSequential);
     return 0;
   }
 
@@ -212,9 +229,12 @@ public class MeanShiftCanopyDriver exten
               true if the input path already contains MeanShiftCanopies and does not need to be converted from Vectors
    * @param runClustering 
    *          true if the input points are to be clustered once the iterations complete
+   * @param runSequential if true run in sequential execution mode
    * @throws IOException
    * @throws InterruptedException
    * @throws ClassNotFoundException
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public void job(Path input,
                   Path output,
@@ -224,22 +244,91 @@ public class MeanShiftCanopyDriver exten
                   double convergenceDelta,
                   int maxIterations,
                   boolean inputIsCanopies,
-                  boolean runClustering) throws IOException, InterruptedException, ClassNotFoundException {
+                  boolean runClustering,
+                  boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException,
+      IllegalAccessException {
     Path clustersIn = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
     if (inputIsCanopies) {
       clustersIn = input;
     } else {
-      createCanopyFromVectors(input, clustersIn);
+      createCanopyFromVectors(input, clustersIn, runSequential);
     }
-    Path clustersOut = buildClusters(clustersIn, output, measureClassName, t1, t2, convergenceDelta, maxIterations);
+    ClassLoader ccl = Thread.currentThread().getContextClassLoader();
+    Class<?> cl = ccl.loadClass(measureClassName);
+    DistanceMeasure measure = (DistanceMeasure) cl.newInstance();
+
+    Path clustersOut = buildClusters(clustersIn, output, measure, t1, t2, convergenceDelta, maxIterations, runSequential);
     if (runClustering) {
       clusterData(inputIsCanopies ? input : new Path(output, Cluster.INITIAL_CLUSTERS_DIR),
                   clustersOut,
-                  new Path(output, Cluster.CLUSTERED_POINTS_DIR));
+                  new Path(output, Cluster.CLUSTERED_POINTS_DIR),
+                  runSequential,
+                  measure);
+    }
+  }
+
+  /**
+   * @param input
+   * @param output
+   * @param runSequential
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
+   */
+  public void createCanopyFromVectors(Path input, Path output, boolean runSequential) throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
+    if (runSequential) {
+      createCanopyFromVectorsSeq(input, output);
+    } else {
+      createCanopyFromVectorsMR(input, output);
     }
   }
 
-  public void createCanopyFromVectors(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
+  /**
+   * @param input the Path to the input VectorWritable data
+   * @param output the Path to the initial clusters directory
+   * @throws IOException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
+   */
+  private void createCanopyFromVectorsSeq(Path input, Path output) throws IOException, InstantiationException,
+      IllegalAccessException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
+    FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
+    int part = 0;
+    int id = 0;
+    for (FileStatus s : status) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(output, "part-m-" + part++),
+                                                           Text.class,
+                                                           MeanShiftCanopy.class);
+      try {
+        WritableComparable<?> key = (WritableComparable<?>) reader.getKeyClass().newInstance();
+        VectorWritable vw = (VectorWritable) reader.getValueClass().newInstance();
+        while (reader.next(key, vw)) {
+          writer.append(new Text(), new MeanShiftCanopy(vw.get(), id++));
+          vw = (VectorWritable) reader.getValueClass().newInstance();
+        }
+      } finally {
+        reader.close();
+        writer.close();
+      }
+    }
+  }
+
+  /**
+   * @param input
+   * @param output
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private void createCanopyFromVectorsMR(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
     Configuration conf = new Configuration();
     Job job = new Job(conf);
     job.setOutputKeyClass(Text.class);
@@ -262,7 +351,7 @@ public class MeanShiftCanopyDriver exten
    *          the input directory Path
    * @param output
    *          the output Path
-   * @param measureClassName
+   * @param measure
    *          the DistanceMeasure class name
    * @param t1
    *          the T1 distance threshold
@@ -272,21 +361,114 @@ public class MeanShiftCanopyDriver exten
    *          the double convergence criteria
    * @param maxIterations
    *          an int number of iterations
+   * @param runSequential if true run in sequential execution mode
    * @param input
    *          the input pathname String
-   * 
    * @return
    * @throws IOException
    * @throws InterruptedException
    * @throws ClassNotFoundException
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
   public Path buildClusters(Path clustersIn,
                             Path output,
-                            String measureClassName,
+                            DistanceMeasure measure,
                             double t1,
                             double t2,
                             double convergenceDelta,
-                            int maxIterations) throws IOException, InterruptedException, ClassNotFoundException {
+                            int maxIterations,
+                            boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
+    if (runSequential) {
+      return buildClustersSeq(clustersIn, output, measure, t1, t2, convergenceDelta, maxIterations);
+    } else {
+      return buildClustersMR(clustersIn, output, measure, t1, t2, convergenceDelta, maxIterations);
+    }
+  }
+
+  /**
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @param t1
+   * @param t2
+   * @param convergenceDelta
+   * @param maxIterations
+   * @return
+   * @throws IOException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
+   */
+  private Path buildClustersSeq(Path clustersIn,
+                                Path output,
+                                DistanceMeasure measure,
+                                double t1,
+                                double t2,
+                                double convergenceDelta,
+                                int maxIterations) throws IOException, InstantiationException, IllegalAccessException {
+    MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(measure, t1, t2, convergenceDelta);
+    List<MeanShiftCanopy> clusters = new ArrayList<MeanShiftCanopy>();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(clustersIn.toUri(), conf);
+    FileStatus[] status = fs.listStatus(clustersIn, new OutputLogFilter());
+    for (FileStatus s : status) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
+      try {
+        WritableComparable<?> key = (WritableComparable<?>) reader.getKeyClass().newInstance();
+        MeanShiftCanopy canopy = (MeanShiftCanopy) reader.getValueClass().newInstance();
+        while (reader.next(key, canopy)) {
+          clusterer.mergeCanopy(canopy, clusters);
+          canopy = (MeanShiftCanopy) reader.getValueClass().newInstance();
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    boolean[] converged = { false };
+    int iteration = 1;
+    while (!converged[0] && iteration <= maxIterations) {
+      log.info("Iteration: " + iteration);
+      clusters = clusterer.iterate(clusters, converged);
+      Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(clustersOut, "part-r-00000"),
+                                                           Text.class,
+                                                           MeanShiftCanopy.class);
+      try {
+        for (MeanShiftCanopy cluster : clusters) {
+          writer.append(new Text(cluster.getIdentifier()), cluster);
+        }
+      } finally {
+        writer.close();
+      }
+      clustersIn = clustersOut;
+      iteration++;
+    }
+    return clustersIn;
+  }
+
+  /**
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @param t1
+   * @param t2
+   * @param convergenceDelta
+   * @param maxIterations
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private Path buildClustersMR(Path clustersIn,
+                               Path output,
+                               DistanceMeasure measure,
+                               double t1,
+                               double t2,
+                               double convergenceDelta,
+                               int maxIterations) throws IOException, InterruptedException, ClassNotFoundException {
     // iterate until the clusters converge
     boolean converged = false;
     int iteration = 1;
@@ -295,7 +477,7 @@ public class MeanShiftCanopyDriver exten
       // point the output to a new directory per iteration
       Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
       Path controlOut = new Path(output, CONTROL_CONVERGED);
-      runIteration(clustersIn, clustersOut, controlOut, measureClassName, t1, t2, convergenceDelta);
+      runIteration(clustersIn, clustersOut, controlOut, measure.getClass().getName(), t1, t2, convergenceDelta);
       converged = FileSystem.get(new Configuration()).exists(controlOut);
       // now point the input to the old output directory
       clustersIn = clustersOut;
@@ -313,13 +495,90 @@ public class MeanShiftCanopyDriver exten
    *          the directory pathname for input clusters
    * @param output
    *          the directory pathname for output clustered points
+   * @param runSequential if true run in sequential execution mode
+   * @param measure the DistanceMeasure to use
    * @throws ClassNotFoundException 
    * @throws InterruptedException 
    * @throws IOException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
    */
-  public void clusterData(Path input, Path clustersIn, Path output) throws IOException, InterruptedException,
-      ClassNotFoundException {
+  public void clusterData(Path input, Path clustersIn, Path output, boolean runSequential, DistanceMeasure measure)
+      throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException {
+    if (runSequential) {
+      clusterDataSeq(input, clustersIn, output, measure);
+    } else {
+      clusterDataMR(input, clustersIn, output);
+    }
+  }
+
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @param measure
+   * @throws IOException 
+   * @throws IllegalAccessException 
+   * @throws InstantiationException 
+   */
+  private void clusterDataSeq(Path input, Path clustersIn, Path output, DistanceMeasure measure) throws IOException,
+      InstantiationException, IllegalAccessException {
+    MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(measure, 0, 0, 0);
+
+    List<MeanShiftCanopy> clusters = new ArrayList<MeanShiftCanopy>();
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(clustersIn.toUri(), conf);
+    FileStatus[] status = fs.listStatus(clustersIn, new OutputLogFilter());
+    for (FileStatus s : status) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
+      try {
+        Text key = (Text) reader.getKeyClass().newInstance();
+        MeanShiftCanopy value = (MeanShiftCanopy) reader.getValueClass().newInstance();
+        while (reader.next(key, value)) {
+          clusters.add(value);
+          value = (MeanShiftCanopy) reader.getValueClass().newInstance();
+        }
+      } finally {
+        reader.close();
+      }
+    }
+    // iterate over all points, assigning each to the closest canopy and outputting that clustering
+    fs = FileSystem.get(input.toUri(), conf);
+    status = fs.listStatus(input, new OutputLogFilter());
+    Path outPath = new Path(output, CanopyDriver.DEFAULT_CLUSTERED_POINTS_DIRECTORY);
+    int part = 0;
+    for (FileStatus s : status) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(), conf);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(outPath, "part-m-" + part++),
+                                                           IntWritable.class,
+                                                           WeightedVectorWritable.class);
+      try {
+        WritableComparable<?> key = (WritableComparable<?>) reader.getKeyClass().newInstance();
+        MeanShiftCanopy canopy = (MeanShiftCanopy) reader.getValueClass().newInstance();
+        while (reader.next(key, canopy)) {
+          MeanShiftCanopy closest = clusterer.findCoveringCanopy(canopy, clusters);
+          writer.append(new IntWritable(closest.getId()), new WeightedVectorWritable(1, new VectorWritable(canopy.getCenter())));
+          canopy = (MeanShiftCanopy) reader.getValueClass().newInstance();
+        }
+      } finally {
+        reader.close();
+        writer.close();
+      }
+    }
+  }
 
+  /**
+   * @param input
+   * @param clustersIn
+   * @param output
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private void clusterDataMR(Path input, Path clustersIn, Path output) throws IOException, InterruptedException,
+      ClassNotFoundException {
     Configuration conf = new Configuration();
     conf.set(STATE_IN_KEY, clustersIn.toString());
     Job job = new Job(conf);

Modified: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java?rev=966816&r1=966815&r2=966816&view=diff
==============================================================================
--- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java (original)
+++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java Thu Jul 22 19:25:20 2010
@@ -183,6 +183,74 @@ public class TestFuzzyKmeansClustering e
     }
   }
 
+  public void testFuzzyKMeansSeqJob() throws Exception {
+    List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.reference);
+  
+    Path pointsPath = getTestTempDirPath("points");
+    Path clustersPath = getTestTempDirPath("clusters");
+    Configuration conf = new Configuration();
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf);
+  
+    for (int k = 0; k < points.size(); k++) {
+      System.out.println("testKFuzzyKMeansMRJob k= " + k);
+      // pick k initial cluster centers at random
+      SequenceFile.Writer writer = new SequenceFile.Writer(fs,
+                                                           conf,
+                                                           new Path(clustersPath, "part-00000"),
+                                                           Text.class,
+                                                           SoftCluster.class);
+      for (int i = 0; i < k + 1; i++) {
+        Vector vec = tweakValue(points.get(i).get());
+  
+        SoftCluster cluster = new SoftCluster(vec);
+        // add the center so the centroid will be correct upon output
+        cluster.addPoint(cluster.getCenter(), 1);
+        /*
+         * writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n');
+         */
+        writer.append(new Text(cluster.getIdentifier()), cluster);
+  
+      }
+      writer.close();
+  
+      // now run the Job using the run() command line options.
+      Path output = getTestTempDirPath("output");
+      /*      FuzzyKMeansDriver.runJob(pointsPath,
+                                     clustersPath,
+                                     output,
+                                     EuclideanDistanceMeasure.class.getName(),
+                                     0.001,
+                                     2,
+                                     k + 1,
+                                     2,
+                                     false,
+                                     true,
+                                     0);
+      */
+      String[] args = { 
+          optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(), 
+          optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(), 
+          optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+          optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(),
+          optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001", 
+          optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", 
+          optKey(FuzzyKMeansDriver.M_OPTION), "2.0", 
+          optKey(DefaultOptionCreator.CLUSTERING_OPTION),
+          optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION), 
+          optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+          optKey(DefaultOptionCreator.METHOD_OPTION), DefaultOptionCreator.SEQUENTIAL_METHOD };
+      new FuzzyKMeansDriver().run(args);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(output, "clusteredPoints/part-m-0"), conf);
+      IntWritable key = new IntWritable();
+      WeightedVectorWritable out = new WeightedVectorWritable();
+      while (reader.next(key, out)) {
+        // make sure we can read all the clusters
+      }
+      reader.close();
+    }
+  
+  }
+
   public void testFuzzyKMeansMRJob() throws Exception {
     List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.reference);
 



Mime
View raw message