mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gsing...@apache.org
Subject svn commit: r787776 [1/3] - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/clustering/canopy/ core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/ core/src/main/java/org/apache/mahout/clustering/kmeans/ core/src/main/java/org/apa...
Date Tue, 23 Jun 2009 18:23:20 GMT
Author: gsingers
Date: Tue Jun 23 18:23:18 2009
New Revision: 787776

URL: http://svn.apache.org/viewvc?rev=787776&view=rev
Log:
MAHOUT-137: convert kmeans, canopy and fuzzy kmeans to use writable instead of string based serialization

Added:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/JWriterTermInfoWriter.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/JWriterVectorWriter.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/SequenceFileVectorWriter.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/TermInfoWriter.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/VectorWriter.java
Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputDriver.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputMapper.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
    lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/Driver.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java Tue Jun 23 18:23:18 2009
@@ -18,6 +18,7 @@
 package org.apache.mahout.clustering.canopy;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.mahout.matrix.AbstractVector;
@@ -26,6 +27,8 @@
 import org.apache.mahout.utils.DistanceMeasure;
 
 import java.io.IOException;
+import java.io.DataOutput;
+import java.io.DataInput;
 import java.util.List;
 
 /**
@@ -34,7 +37,7 @@
  * a point total which is the sum of all the points and is used to compute the
  * centroid when needed.
  */
-public class Canopy {
+public class Canopy implements Writable {
 
   // keys used by Driver, Mapper, Combiner & Reducer
   public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
@@ -58,7 +61,7 @@
   private static DistanceMeasure measure;
 
   // this canopy's canopyId
-  private final int canopyId;
+  private int canopyId;
 
   // the current center
   private Vector center = new SparseVector(0);
@@ -70,6 +73,12 @@
   private Vector pointTotal = null;
 
   /**
+   * Used w
+   */
+  public Canopy() {
+  }
+
+  /**
    * Create a new Canopy containing the given point
    * 
    * @param point a point in vector space
@@ -164,7 +173,7 @@
    * @param collector an OutputCollector in which to emit the point
    */
   public static void emitPointToNewCanopies(Vector point,
-      List<Canopy> canopies, OutputCollector<Text, Text> collector)
+      List<Canopy> canopies, OutputCollector<Text, Vector> collector)
       throws IOException {
     boolean pointStronglyBound = false;
     for (Canopy canopy : canopies) {
@@ -188,13 +197,11 @@
    * 
    * @param point the point to be added
    * @param canopies the List<Canopy> to be appended
-   * @param writable the original Writable from the input, may include arbitrary
-   *        payload information after the point [...]<payload>
    * @param collector an OutputCollector in which to emit the point
    */
   public static void emitPointToExistingCanopies(Vector point,
-      List<Canopy> canopies, Text writable,
-      OutputCollector<Text, Text> collector) throws IOException {
+      List<Canopy> canopies,
+      OutputCollector<Text, Vector> collector) throws IOException {
     double minDist = Double.MAX_VALUE;
     Canopy closest = null;
     boolean isCovered = false;
@@ -202,7 +209,7 @@
       double dist = measure.distance(canopy.getCenter(), point);
       if (dist < t1) {
         isCovered = true;
-        collector.collect(new Text(canopy.getIdentifier()), writable);
+        collector.collect(new Text(canopy.getIdentifier()), point);
       } else if (dist < minDist) {
         minDist = dist;
         closest = canopy;
@@ -211,7 +218,23 @@
     // if the point is not contained in any canopies (due to canopy centroid
     // clustering), emit the point to the closest covering canopy.
     if (!isCovered)
-      collector.collect(new Text(closest.getIdentifier()), writable);
+      collector.collect(new Text(closest.getIdentifier()), point);
+  }
+
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(canopyId);
+    computeCentroid().write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    canopyId = in.readInt();
+    this.center = new SparseVector();
+    center.readFields(in);
+    this.pointTotal = center.clone();
+    this.numPoints = 1;
   }
 
   /**
@@ -260,10 +283,9 @@
    * 
    * @param point a point to emit.
    */
-  public void emitPoint(Vector point, OutputCollector<Text, Text> collector)
+  public void emitPoint(Vector point, OutputCollector<Text, Vector> collector)
       throws IOException {
-    collector.collect(new Text(this.getIdentifier()), new Text(point
-        .asFormatString()));
+    collector.collect(new Text(this.getIdentifier()), point);
   }
 
   @Override
@@ -302,8 +324,8 @@
    * 
    * @return a SparseVector (required by Mapper) which is the new centroid
    */
-  public SparseVector computeCentroid() {
-    SparseVector result = new SparseVector(pointTotal.size());
+  public Vector computeCentroid() {
+    Vector result = new SparseVector(pointTotal.size());
     for (int i = 0; i < pointTotal.size(); i++)
       result.set(i, pointTotal.get(i) / numPoints);
     return result;

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java Tue Jun 23 18:23:18 2009
@@ -17,6 +17,8 @@
 
 package org.apache.mahout.clustering.canopy;
 
+import org.apache.mahout.matrix.Vector;
+
 import java.io.IOException;
 
 public class CanopyClusteringJob {
@@ -36,13 +38,15 @@
   /**
    * @param args
    */
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws IOException, ClassNotFoundException {
     String input = args[0];
     String output = args[1];
     String measureClassName = args[2];
     double t1 = Double.parseDouble(args[3]);
     double t2 = Double.parseDouble(args[4]);
-    runJob(input, output, measureClassName, t1, t2);
+    String vectorClassName = args[5];
+    Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
+    runJob(input, output, measureClassName, t1, t2, vectorClass);
   }
 
   /**
@@ -55,9 +59,9 @@
    * @param t2               the T2 distance threshold
    */
   public static void runJob(String input, String output,
-                            String measureClassName, double t1, double t2) throws IOException {
-    CanopyDriver.runJob(input, output + DEFAULT_CANOPIES_OUTPUT_DIRECTORY, measureClassName, t1, t2);
-    ClusterDriver.runJob(input, output + DEFAULT_CANOPIES_OUTPUT_DIRECTORY, output, measureClassName, t1, t2);
+                            String measureClassName, double t1, double t2, Class<? extends Vector> vectorClass) throws IOException {
+    CanopyDriver.runJob(input, output + DEFAULT_CANOPIES_OUTPUT_DIRECTORY, measureClassName, t1, t2, vectorClass);
+    ClusterDriver.runJob(input, output + DEFAULT_CANOPIES_OUTPUT_DIRECTORY, output, measureClassName, t1, t2, vectorClass);
   }
 
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Tue Jun 23 18:23:18 2009
@@ -17,8 +17,6 @@
 
 package org.apache.mahout.clustering.canopy;
 
-import java.io.IOException;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -26,21 +24,26 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.mahout.matrix.SparseVector;
+import org.apache.mahout.matrix.Vector;
+
+import java.io.IOException;
 
 public class CanopyDriver {
 
   private CanopyDriver() {
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws IOException, ClassNotFoundException {
     String input = args[0];
     String output = args[1];
     String measureClassName = args[2];
     double t1 = Double.parseDouble(args[3]);
     double t2 = Double.parseDouble(args[4]);
-    runJob(input, output, measureClassName, t1, t2);
+    String vectorClassName = args[5];
+    Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
+    runJob(input, output, measureClassName, t1, t2, vectorClass);
   }
 
   /**
@@ -51,9 +54,13 @@
    * @param measureClassName the DistanceMeasure class name
    * @param t1               the T1 distance threshold
    * @param t2               the T2 distance threshold
+   * @param vectorClass      the {@link Class} of Vector to use for the Map Output Key.  Must be a concrete type
+   *
+   * @see org.apache.mahout.matrix.SparseVector
+   * @see org.apache.mahout.matrix.DenseVector
    */
   public static void runJob(String input, String output,
-                            String measureClassName, double t1, double t2) throws IOException {
+                            String measureClassName, double t1, double t2, Class<? extends Vector> vectorClass) throws IOException {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(
             org.apache.mahout.clustering.canopy.CanopyDriver.class);
@@ -61,10 +68,12 @@
     conf.set(Canopy.T1_KEY, String.valueOf(t1));
     conf.set(Canopy.T2_KEY, String.valueOf(t2));
 
+    conf.setInputFormat(SequenceFileInputFormat.class);
+
     conf.setMapOutputKeyClass(Text.class);
-    conf.setMapOutputValueClass(SparseVector.class);
+    conf.setMapOutputValueClass(vectorClass);
     conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(Text.class);
+    conf.setOutputValueClass(Canopy.class);
 
     FileInputFormat.setInputPaths(conf, new Path(input));
     Path outPath = new Path(output);

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java Tue Jun 23 18:23:18 2009
@@ -33,17 +33,16 @@
 import java.util.List;
 
 public class CanopyMapper extends MapReduceBase implements
-    Mapper<WritableComparable<?>, Text, Text, Vector> {
+    Mapper<WritableComparable<?>, Vector, Text, Vector> {
 
   private final List<Canopy> canopies = new ArrayList<Canopy>();
 
   private OutputCollector<Text, Vector> outputCollector;
 
   @Override
-  public void map(WritableComparable<?> key, Text values,
+  public void map(WritableComparable<?> key, Vector point,
       OutputCollector<Text, Vector> output, Reporter reporter) throws IOException {
     outputCollector = output;
-    Vector point = AbstractVector.decodeVector(values.toString());
     Canopy.addPointToCanopies(point, canopies);
   }
 
@@ -61,7 +60,7 @@
   @Override
   public void close() throws IOException {
     for (Canopy canopy : canopies) {
-      SparseVector centroid = canopy.computeCentroid();
+      Vector centroid = canopy.computeCentroid();
       outputCollector.collect(new Text("centroid"), centroid);
     }
     super.close();

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java Tue Jun 23 18:23:18 2009
@@ -31,20 +31,19 @@
 import org.apache.mahout.matrix.Vector;
 
 public class CanopyReducer extends MapReduceBase implements
-        Reducer<Text, Vector, Text, Text> {
+        Reducer<Text, Vector, Text, Canopy> {
 
   private final List<Canopy> canopies = new ArrayList<Canopy>();
 
   @Override
   public void reduce(Text key, Iterator<Vector> values,
-                     OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+                     OutputCollector<Text, Canopy> output, Reporter reporter) throws IOException {
     while (values.hasNext()) {
       Vector point = values.next();
       Canopy.addPointToCanopies(point, canopies);
     }
     for (Canopy canopy : canopies)
-      output.collect(new Text(canopy.getIdentifier()), new Text(Canopy
-              .formatCanopy(canopy)));
+      output.collect(new Text(canopy.getIdentifier()), canopy);
   }
 
   @Override

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java Tue Jun 23 18:23:18 2009
@@ -24,7 +24,10 @@
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.mahout.matrix.Vector;
 
 import java.io.IOException;
 
@@ -35,14 +38,16 @@
   private ClusterDriver() {
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws IOException, ClassNotFoundException {
     String points = args[0];
     String canopies = args[1];
     String output = args[2];
     String measureClassName = args[3];
     double t1 = Double.parseDouble(args[4]);
     double t2 = Double.parseDouble(args[5]);
-    runJob(points, canopies, output, measureClassName, t1, t2);
+    String vectorClassName = args[6];
+    Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
+    runJob(points, canopies, output, measureClassName, t1, t2, vectorClass);
   }
 
   /**
@@ -54,9 +59,10 @@
    * @param measureClassName the DistanceMeasure class name
    * @param t1               the T1 distance threshold
    * @param t2               the T2 distance threshold
+   * @param vectorClass      The {@link Class} of Vector to use for the Output Value Class.  Must be concrete.
    */
   public static void runJob(String points, String canopies, String output,
-                            String measureClassName, double t1, double t2) throws IOException {
+                            String measureClassName, double t1, double t2, Class<? extends Vector> vectorClass) throws IOException {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(
             org.apache.mahout.clustering.canopy.ClusterDriver.class);
@@ -66,8 +72,13 @@
     conf.set(Canopy.T2_KEY, String.valueOf(t2));
     conf.set(Canopy.CANOPY_PATH_KEY, canopies);
 
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    
+    /*conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(SparseVector.class);*/
     conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(Text.class);
+    conf.setOutputValueClass(vectorClass);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
 
     FileInputFormat.setInputPaths(conf, new Path(points));
     Path outPath = new Path(output + DEFAULT_CLUSTER_OUTPUT_DIRECTORY);

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java Tue Jun 23 18:23:18 2009
@@ -29,21 +29,21 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.matrix.AbstractVector;
 import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.SparseVector;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 public class ClusterMapper extends MapReduceBase implements
-        Mapper<WritableComparable<?>, Text, Text, Text> {
+        Mapper<WritableComparable<?>, Vector, Text, Vector> {
 
   private List<Canopy> canopies;
 
   @Override
-  public void map(WritableComparable<?> key, Text values,
-                  OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
-    Vector point = AbstractVector.decodeVector(values.toString());
-    Canopy.emitPointToExistingCanopies(point, canopies, values, output);
+  public void map(WritableComparable<?> key, Vector point,
+                  OutputCollector<Text, Vector> output, Reporter reporter) throws IOException {
+    Canopy.emitPointToExistingCanopies(point, canopies, output);
   }
 
   /**
@@ -69,10 +69,10 @@
       SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
       try {
         Text key = new Text();
-        Text value = new Text();
+        Canopy value = new Canopy();
         while (reader.next(key, value)) {
-          Canopy canopy = Canopy.decodeCanopy(value.toString());
-          canopies.add(canopy);
+          canopies.add(value);
+          value = new Canopy();
         }
       } finally {
         reader.close();

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java Tue Jun 23 18:23:18 2009
@@ -18,20 +18,51 @@
 package org.apache.mahout.clustering.fuzzykmeans;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.mahout.matrix.AbstractVector;
 import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.clustering.kmeans.Cluster;
 
-public class FuzzyKMeansClusterMapper extends FuzzyKMeansMapper {
+public class FuzzyKMeansClusterMapper extends MapReduceBase implements
+        Mapper<WritableComparable<?>, Vector, Text, FuzzyKMeansOutput> {
+  protected List<SoftCluster> clusters;
   @Override
-  public void map(WritableComparable<?> key, Text values,
-      OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+  public void map(WritableComparable<?> key, Vector point,
+      OutputCollector<Text, FuzzyKMeansOutput> output, Reporter reporter) throws IOException
   {
-    Vector point = AbstractVector.decodeVector(values.toString());
-    SoftCluster.outputPointWithClusterProbabilities(key.toString(), point, clusters, values, output);
-  }  
+    SoftCluster.outputPointWithClusterProbabilities(key.toString(), point, clusters, output);
+  }
+
+  /**
+   * Configure the mapper by providing its clusters. Used by unit tests.
+   *
+   * @param clusters a List<Cluster>
+   */
+  void config(List<SoftCluster> clusters) {
+    this.clusters = clusters;
+  }
+
+  @Override
+  public void configure(JobConf job) {
+
+    super.configure(job);
+    SoftCluster.configure(job);
+    clusters = new ArrayList<SoftCluster>();
+
+    FuzzyKMeansUtil.configureWithClusterInfo(job
+        .get(SoftCluster.CLUSTER_PATH_KEY), clusters);
+
+    if (clusters.isEmpty())
+      throw new NullPointerException("Cluster is empty!!!");
+  }
+
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java Tue Jun 23 18:23:18 2009
@@ -26,44 +26,28 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.matrix.AbstractVector;
-import org.apache.mahout.matrix.Vector;
 
 public class FuzzyKMeansCombiner extends MapReduceBase implements
-    Reducer<Text, Text, Text, Text> {
+    Reducer<Text, FuzzyKMeansInfo, Text, FuzzyKMeansInfo> {
 
   @Override
-  public void reduce(Text key, Iterator<Text> values,
-      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<FuzzyKMeansInfo> values,
+      OutputCollector<Text, FuzzyKMeansInfo> output, Reporter reporter) throws IOException {
     SoftCluster cluster = new SoftCluster(key.toString().trim());
     while (values.hasNext()) {
-      String pointInfo = values.next().toString();
-      // check whether this is already processed
-      int mapperSepIndex = pointInfo
-          .indexOf(FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR); // ~ separator is
-      // used in mapper
-      int combinerSepIndex = pointInfo
-          .indexOf(FuzzyKMeansDriver.COMBINER_VALUE_SEPARATOR); // tab separator
-      // is used in
-      // combiner
-      int index = mapperSepIndex == -1 ? combinerSepIndex : mapperSepIndex;// needed
-      // to
-      // split
-      // prob and vector
-      double pointProb = Double.parseDouble(pointInfo.substring(0, index));
+      //String pointInfo = values.next().toString();
+      FuzzyKMeansInfo info = values.next();
 
-      String encodedVector = pointInfo.substring(index + 1);
-      Vector v = AbstractVector.decodeVector(encodedVector);
-      if (mapperSepIndex != -1) // first time thru combiner
+      if (info.combinerPass == 0) // first time thru combiner
       {
-        cluster.addPoint(v, Math.pow(pointProb, SoftCluster.getM()));
+        cluster.addPoint(info.getVector(), Math.pow(info.getProbability(), SoftCluster.getM()));
       } else {
-        cluster.addPoints(v, pointProb);
+        cluster.addPoints(info.getVector(), info.getProbability());
       }
+      info.combinerPass++;
     }
-    output.collect(key, new Text(cluster.getPointProbSum()
-        + FuzzyKMeansDriver.COMBINER_VALUE_SEPARATOR
-        + cluster.getWeightedPointTotal().asFormatString()));
+    //TODO: how do we pass along the combinerPass?  Or do we not need to?
+    output.collect(key, new FuzzyKMeansInfo(cluster.getPointProbSum(), cluster.getWeightedPointTotal(), 1));
   }
 
   @Override

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Tue Jun 23 18:23:18 2009
@@ -28,12 +28,17 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.KeyValueLineRecordReader;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.kmeans.Cluster;
+import org.apache.mahout.matrix.Vector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,9 +47,6 @@
   private static final Logger log = LoggerFactory
       .getLogger(FuzzyKMeansDriver.class);
 
-  public static final String MAPPER_VALUE_SEPARATOR = "~";
-
-  public static final String COMBINER_VALUE_SEPARATOR = "\t";
 
   private FuzzyKMeansDriver() {
   }
@@ -54,8 +56,8 @@
         .println("Usage: input clusterIn output measureClass convergenceDelta maxIterations m [doClusteringOnly]");
   }
 
-  public static void main(String[] args) {
-    if (args.length < 7) {
+  public static void main(String[] args) throws ClassNotFoundException {
+    if (args.length < 8) {
       System.out.println("Expected number of arguments: 7 or 8 : received:"
           + args.length);
       printMessage();
@@ -68,16 +70,18 @@
     double convergenceDelta = Double.parseDouble(args[index++]);
     int maxIterations = Integer.parseInt(args[index++]);
     float m = Float.parseFloat(args[index++]);
+    String vectorClassName = args[index++];
+    Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
     boolean doClustering = false;
-    if (args.length > 7)
+    if (args.length > 8){
       doClustering = Boolean.parseBoolean(args[index++]);
+    }
     if (doClustering) {
       runClustering(input, clusters, output, measureClass, Double
-          .toString(convergenceDelta), 500, m);
+          .toString(convergenceDelta), 500, m, vectorClass);
     } else {
       runJob(input, clusters, output, measureClass, convergenceDelta,
-          maxIterations, 10, 10, m);
-
+          maxIterations, 10, 10, m, vectorClass);
     }
 
   }
@@ -92,10 +96,11 @@
    * @param convergenceDelta the convergence delta value
    * @param maxIterations the maximum number of iterations
    * @param numMapTasks the number of mapper tasks
+   * @param vectorClass
    */
   public static void runJob(String input, String clustersIn, String output,
-      String measureClass, double convergenceDelta, int maxIterations,
-      int numMapTasks, int numReduceTasks, float m) {
+                            String measureClass, double convergenceDelta, int maxIterations,
+                            int numMapTasks, int numReduceTasks, float m, Class<? extends Vector> vectorClass) {
 
     boolean converged = false;
     int iteration = 0;
@@ -119,7 +124,7 @@
     log.info("Clustering ");
 
     runClustering(input, clustersIn, output + File.separator + "points",
-        measureClass, delta, numMapTasks, m);
+        measureClass, delta, numMapTasks, m, vectorClass);
   }
 
   /**
@@ -142,13 +147,18 @@
     JobConf conf = new JobConf(FuzzyKMeansJob.class);
     conf.setJobName("Fuzzy K Means{" + iterationNumber + '}');
 
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(FuzzyKMeansInfo.class);
     conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(Text.class);
+    conf.setOutputValueClass(SoftCluster.class);
 
     FileInputFormat.setInputPaths(conf, new Path(input));
     Path outPath = new Path(clustersOut);
     FileOutputFormat.setOutputPath(conf, outPath);
 
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+
     conf.setMapperClass(FuzzyKMeansMapper.class);
     conf.setCombinerClass(FuzzyKMeansCombiner.class);
     conf.setReducerClass(FuzzyKMeansReducer.class);
@@ -185,13 +195,15 @@
    */
   private static void runClustering(String input, String clustersIn,
       String output, String measureClass, String convergenceDelta,
-      int numMapTasks, float m) {
+      int numMapTasks, float m, Class<? extends Vector> vectorClass) {
 
     JobConf conf = new JobConf(FuzzyKMeansDriver.class);
     conf.setJobName("Fuzzy K Means Clustering");
 
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(vectorClass);
     conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(Text.class);
+    conf.setOutputValueClass(FuzzyKMeansOutput.class);
 
     FileInputFormat.setInputPaths(conf, new Path(input));
     Path outPath = new Path(output);
@@ -199,6 +211,9 @@
 
     conf.setMapperClass(FuzzyKMeansClusterMapper.class);
 
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+
     // uncomment it to run locally
     // conf.set("mapred.job.tracker", "local");
     conf.setNumMapTasks(numMapTasks);
@@ -246,15 +261,16 @@
 
     for (Path p : result) {
 
-      KeyValueLineRecordReader reader = null;
+      SequenceFile.Reader reader = null;
 
       try {
-        reader = new KeyValueLineRecordReader(conf, new FileSplit(p, 0, fs
-            .getFileStatus(p).getLen(), (String[]) null));
+        reader = new SequenceFile.Reader(fs, p, conf);
+                /*new KeyValueLineRecordReader(conf, new FileSplit(p, 0, fs
+            .getFileStatus(p).getLen(), (String[]) null));*/
         Text key = new Text();
-        Text value = new Text();
+        SoftCluster value = new SoftCluster();
         while (converged && reader.next(key, value)) {
-          converged = value.toString().charAt(0) == 'V';
+          converged = value.isConverged();
         }
       } finally {
         if (reader != null) {

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java?rev=787776&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java Tue Jun 23 18:23:18 2009
@@ -0,0 +1,59 @@
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.clustering.kmeans.Cluster;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+
+
+/**
+ *
+ *
+ **/
+public class FuzzyKMeansInfo implements Writable {
+
+  private double probability;
+  private Vector pointTotal;
+
+  public int combinerPass = 0;
+
+  public FuzzyKMeansInfo() {
+  }
+
+  public FuzzyKMeansInfo(double probability, Vector pointTotal) {
+    this.probability = probability;
+    this.pointTotal = pointTotal;
+  }
+
+  public FuzzyKMeansInfo(double probability, Vector pointTotal, int combinerPass) {
+    this.probability = probability;
+    this.pointTotal = pointTotal;
+    this.combinerPass = combinerPass;
+  }
+
+  public Vector getVector() {
+    return pointTotal;
+  }
+
+  public double getProbability() {
+    return probability;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeDouble(probability);
+    out.writeUTF(pointTotal.getClass().getSimpleName().toString());
+    pointTotal.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.probability = in.readDouble();
+    String className = in.readUTF();
+    pointTotal = Cluster.vectorNameToVector(className);
+    pointTotal.readFields(in);
+  }
+}
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java Tue Jun 23 18:23:18 2009
@@ -21,6 +21,7 @@
 
 import org.apache.mahout.clustering.canopy.CanopyDriver;
 import org.apache.mahout.utils.ManhattanDistanceMeasure;
+import org.apache.mahout.matrix.Vector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,10 +33,10 @@
   private FuzzyKMeansJob() {
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws IOException, ClassNotFoundException {
 
-    if (args.length != 10) {
-      log.warn("Expected num Arguments: 10  received: {}", args.length);
+    if (args.length != 11) {
+      log.warn("Expected num Arguments: 11  received: {}", args.length);
       printMessage();
       return;
     }
@@ -50,9 +51,10 @@
     int numReduceTasks = Integer.parseInt(args[index++]);
     boolean doCanopy = Boolean.parseBoolean(args[index++]);
     float m = Float.parseFloat(args[index++]);
-
+    String vectorClassName = args[index++];
+    Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
     runJob(input, clusters, output, measureClass, convergenceDelta,
-        maxIterations, numMapTasks, numReduceTasks, doCanopy, m);
+        maxIterations, numMapTasks, numReduceTasks, doCanopy, m, vectorClass);
   }
 
   /**
@@ -78,18 +80,18 @@
    */
   public static void runJob(String input, String clustersIn, String output,
       String measureClass, double convergenceDelta, int maxIterations,
-      int numMapTasks, int numReduceTasks, boolean doCanopy, float m)
+      int numMapTasks, int numReduceTasks, boolean doCanopy, float m, Class<? extends Vector> vectorClass)
       throws IOException {
 
     // run canopy to find initial clusters
     if (doCanopy) {
       CanopyDriver.runJob(input, clustersIn, ManhattanDistanceMeasure.class
-          .getName(), 100.1, 50.1);
+          .getName(), 100.1, 50.1, vectorClass);
 
     }
     // run fuzzy k -means
     FuzzyKMeansDriver.runJob(input, clustersIn, output, measureClass,
-        convergenceDelta, maxIterations, numMapTasks, numReduceTasks, m);
+        convergenceDelta, maxIterations, numMapTasks, numReduceTasks, m, vectorClass);
 
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java Tue Jun 23 18:23:18 2009
@@ -34,17 +34,16 @@
 import org.slf4j.LoggerFactory;
 
 public class FuzzyKMeansMapper extends MapReduceBase implements
-    Mapper<WritableComparable<?>, Text, Text, Text> {
+    Mapper<WritableComparable<?>, Vector, Text, FuzzyKMeansInfo> {
 
   private static final Logger log = LoggerFactory.getLogger(FuzzyKMeansMapper.class);
 
   protected List<SoftCluster> clusters;
 
   @Override
-  public void map(WritableComparable<?> key, Text values,
-      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
-    Vector point = AbstractVector.decodeVector(values.toString());
-    SoftCluster.emitPointProbToCluster(point, clusters, values, output);
+  public void map(WritableComparable<?> key, Vector point,
+      OutputCollector<Text, FuzzyKMeansInfo> output, Reporter reporter) throws IOException {
+    SoftCluster.emitPointProbToCluster(point, clusters, output);
   }
 
   /**

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java?rev=787776&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java Tue Jun 23 18:23:18 2009
@@ -0,0 +1,66 @@
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+
+/**
+ *
+ *
+ **/
+public class FuzzyKMeansOutput implements Writable {
+  //parallel arrays
+  private SoftCluster[] clusters;
+  private double[] probabilities;
+
+  public FuzzyKMeansOutput() {
+  }
+
+  public FuzzyKMeansOutput(int size) {
+    clusters = new SoftCluster[size];
+    probabilities = new double[size];
+  }
+
+  public SoftCluster[] getClusters() {
+    return clusters;
+  }
+
+  public double[] getProbabilities() {
+    return probabilities;
+  }
+
+  public void add(int i, SoftCluster softCluster, double probWeight) {
+    clusters[i] = softCluster;
+    probabilities[i] = probWeight;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(clusters.length);
+    for (int i = 0; i < clusters.length; i++) {
+      clusters[i].write(out);
+    }
+    out.writeInt(probabilities.length);
+    for (int i = 0; i < probabilities.length; i++) {
+      out.writeDouble(probabilities[i]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numClusters = in.readInt();
+    clusters = new SoftCluster[numClusters];
+    for (int i = 0; i < numClusters; i++) {
+      clusters[i] = new SoftCluster();
+      clusters[i].readFields(in);
+    }
+    int numProbs = in.readInt();
+    probabilities = new double[numProbs];
+    for (int i = 0; i < numProbs; i++) {
+      probabilities[i] = in.readDouble();
+    }
+  }
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java Tue Jun 23 18:23:18 2009
@@ -34,44 +34,30 @@
 import org.apache.mahout.matrix.Vector;
 
 public class FuzzyKMeansReducer extends MapReduceBase implements
-    Reducer<Text, Text, Text, Text> {
+    Reducer<Text, FuzzyKMeansInfo, Text, SoftCluster> {
 
   protected Map<String, SoftCluster> clusterMap;
 
   @Override
-  public void reduce(Text key, Iterator<Text> values,
-      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<FuzzyKMeansInfo> values,
+      OutputCollector<Text, SoftCluster> output, Reporter reporter) throws IOException {
 
     SoftCluster cluster = clusterMap.get(key.toString());
 
     while (values.hasNext()) {
-      String value = values.next().toString();
-      int mapperSepIndex = value
-          .indexOf(FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR); // tild separator
-      // is used in
-      // mapper
-      int combinerSepIndex = value
-          .indexOf(FuzzyKMeansDriver.COMBINER_VALUE_SEPARATOR); // tab separator
-      // is used in
-      // combiner
-      int index = mapperSepIndex == -1 ? combinerSepIndex : mapperSepIndex;// needed
-      // to
-      // split
-      // prob and vector
-      double partialSumPtProb = Double.parseDouble(value.substring(0, index));
-      Vector total = AbstractVector.decodeVector(value.substring(index + 1));
-      if (mapperSepIndex != -1) // escaped from combiner
+      FuzzyKMeansInfo value = values.next();
+
+      if (value.combinerPass == 0) // escaped from combiner
       {
-        cluster.addPoint(total, Math.pow(partialSumPtProb, SoftCluster.getM()));
+        cluster.addPoint(value.getVector(), Math.pow(value.getProbability(), SoftCluster.getM()));
       } else {
-        cluster.addPoints(total, partialSumPtProb);
+        cluster.addPoints(value.getVector(), value.getProbability());
       }
 
     }
     // force convergence calculation
     cluster.computeConvergence();
-    output.collect(new Text(cluster.getIdentifier()), new Text(SoftCluster
-        .formatCluster(cluster)));
+    output.collect(new Text(cluster.getIdentifier()), cluster); 
   }
 
   @Override

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java Tue Jun 23 18:23:18 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.KeyValueLineRecordReader;
@@ -72,24 +73,23 @@
 
       //iterate thru the result path list
       for (Path path : result) {
-        RecordReader<Text, Text> recordReader = null;
-//        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+        //RecordReader<Text, Text> recordReader = null;
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
         try {
-          recordReader = new KeyValueLineRecordReader(job, new FileSplit(path, 0, fs.getFileStatus(path).getLen(), (String[]) null));
+          //recordReader = new KeyValueLineRecordReader(job, new FileSplit(path, 0, fs.getFileStatus(path).getLen(), (String[]) null));
           Text key = new Text();
-          Text value = new Text();
+          SoftCluster value = new SoftCluster();
           //int counter = 1;
-          while (recordReader.next(key, value)) {
+          while (reader.next(key, value)) {
             //get the cluster info
-            SoftCluster cluster = SoftCluster.decodeCluster(value.toString());
             // add the center so the centroid will be correct on output
             // formatting
 //            cluster.addPoint(cluster.getCenter(), 1);
-            clusters.add(cluster);
+            clusters.add(value);
           }
         } finally {
-          if (recordReader != null) {
-            recordReader.close();
+          if (reader != null) {
+            reader.close();
           }
 
         }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java Tue Jun 23 18:23:18 2009
@@ -18,10 +18,13 @@
 package org.apache.mahout.clustering.fuzzykmeans;
 
 import java.io.IOException;
+import java.io.DataOutput;
+import java.io.DataInput;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.mahout.matrix.AbstractVector;
@@ -29,8 +32,9 @@
 import org.apache.mahout.matrix.SquareRootFunction;
 import org.apache.mahout.matrix.Vector;
 import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.clustering.kmeans.Cluster;
 
-public class SoftCluster {
+public class SoftCluster implements Writable {
 
   public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.kmeans.measure";
 
@@ -55,7 +59,7 @@
   private static int nextClusterId = 0;
 
   // this cluster's clusterId
-  private final int clusterId;
+  private int clusterId;
 
   // the current center
   private Vector center = new SparseVector(0);
@@ -115,6 +119,27 @@
     return null;
   }
 
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(clusterId);
+    out.writeBoolean(converged);
+    Vector vector = computeCentroid();
+    out.writeUTF(vector.getClass().getSimpleName().toString());
+    vector.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    clusterId = in.readInt();
+    converged = in.readBoolean();
+    String className = in.readUTF();
+    this.center = Cluster.vectorNameToVector(className);
+    center.readFields(in);
+    this.pointProbSum = 0;
+    this.weightedPointTotal = center.like();
+  }
+
+
   /**
    * Configure the distance measure from the job
    * 
@@ -155,14 +180,12 @@
    * 
    * @param point a point
    * @param clusters a List<SoftCluster>
-   * @param values a Writable containing the input point and possible other
-   *        values of interest (payload)
    * @param output the OutputCollector to emit into
    * @throws IOException
    */
   public static void emitPointProbToCluster(Vector point,
-      List<SoftCluster> clusters, Text values,
-      OutputCollector<Text, Text> output) throws IOException {
+      List<SoftCluster> clusters,
+      OutputCollector<Text, FuzzyKMeansInfo> output) throws IOException {
     List<Double> clusterDistanceList = new ArrayList<Double>();
     for (SoftCluster cluster : clusters) {
       clusterDistanceList.add(measure.distance(cluster.getCenter(), point));
@@ -175,8 +198,9 @@
       // identifier,avoids
       // too much data
       // traffic
-      Text value = new Text(Double.toString(probWeight)
-          + FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR + values.toString());
+      /*Text value = new Text(Double.toString(probWeight)
+          + FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR + values.toString());*/
+      FuzzyKMeansInfo value = new FuzzyKMeansInfo(probWeight, point);
       output.collect(key, value);
     }
   }
@@ -186,33 +210,31 @@
    * 
    * @param point a point
    * @param clusters a List<SoftCluster> to test
-   * @param values a Writable containing the input point and possible other
-   *        values of interest (payload)
    * @param output the OutputCollector to emit into
    * @throws IOException
    */
   public static void outputPointWithClusterProbabilities(String key,
-      Vector point, List<SoftCluster> clusters, Text values,
-      OutputCollector<Text, Text> output) throws IOException {
-
-    String outputKey = values.toString();
-    StringBuilder outputValue = new StringBuilder("[");
+      Vector point, List<SoftCluster> clusters,
+      OutputCollector<Text, FuzzyKMeansOutput> output) throws IOException {
     List<Double> clusterDistanceList = new ArrayList<Double>();
 
     for (SoftCluster cluster : clusters) {
       clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
     }
-
+    FuzzyKMeansOutput fOutput = new FuzzyKMeansOutput(clusters.size());
     for (int i = 0; i < clusters.size(); i++) {
       // System.out.print("cluster:" + i + "\t" + clusterDistanceList.get(i));
 
       double probWeight = computeProbWeight(clusterDistanceList.get(i),
           clusterDistanceList);
-      outputValue.append(clusters.get(i).clusterId).append(':').append(
-          probWeight).append(' ');
+      /*outputValue.append(clusters.get(i).clusterId).append(':').append(
+          probWeight).append(' ');*/
+      fOutput.add(i, clusters.get(i), probWeight);
     }
-    output.collect(new Text(outputKey.trim()), new Text(outputValue.toString()
-        .trim() + ']'));
+    String name = point.getName();
+    output.collect(new Text(name != null && name.equals("") == false ? name
+            : point.asFormatString()),
+            fOutput);
   }
 
   /**
@@ -252,6 +274,10 @@
     return centroid;
   }
 
+  //For Writable
+  public SoftCluster() {
+  }
+
   /**
    * Construct a new SoftCluster with the given point as its center
    * 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java Tue Jun 23 18:23:18 2009
@@ -17,18 +17,22 @@
 package org.apache.mahout.clustering.kmeans;
 
 import java.io.IOException;
+import java.io.DataOutput;
+import java.io.DataInput;
 import java.util.List;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.mahout.matrix.AbstractVector;
 import org.apache.mahout.matrix.SparseVector;
 import org.apache.mahout.matrix.SquareRootFunction;
 import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.DenseVector;
 import org.apache.mahout.utils.DistanceMeasure;
 
-public class Cluster {
+public class Cluster implements Writable {
 
   private static final String ERROR_UNKNOWN_CLUSTER_FORMAT = "Unknown cluster format:\n";
 
@@ -38,10 +42,19 @@
 
   public static final String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.kmeans.convergence";
 
+  /**
+   * The number of iterations that have taken place
+   */
+  public static final String ITERATION_NUMBER = "org.apache.mahout.clustering.kmeans.iteration";
+  /**
+   * Boolean value indicating whether the initial input is from Canopy clustering
+   */
+  public static final String CANOPY_INPUT = "org.apache.mahout.clustering.kmeans.canopyInput";
+
   private static int nextClusterId = 0;
 
   // this cluster's clusterId
-  private final int clusterId;
+  private int clusterId;
 
   // the current center
   private Vector center = new SparseVector(0);
@@ -63,9 +76,7 @@
 
   // has the centroid converged with the center?
   private boolean converged = false;
-
   private static DistanceMeasure measure;
-
   private static double convergenceDelta = 0;
 
   /**
@@ -104,6 +115,43 @@
           + formattedString);
   }
 
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(clusterId);
+    out.writeBoolean(converged);
+    Vector vector = computeCentroid();
+    out.writeUTF(vector.getClass().getSimpleName().toString());
+    vector.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    clusterId = in.readInt();
+    converged = in.readBoolean();
+    String className = in.readUTF();
+    this.center = vectorNameToVector(className);
+    center.readFields(in);
+    this.numPoints = 0;
+    this.pointTotal = center.like();
+    this.pointSquaredTotal = center.like();
+  }
+
+  /**
+   * Simplified version that only handles SparseVector and DenseVector
+   * @param className
+   * @return
+   */
+  public static Vector vectorNameToVector(String className) {
+    Vector vector;
+    if (className.endsWith("SparseVector")){
+      vector = new SparseVector();
+    } else {
+      vector = new DenseVector();
+    }
+    return vector;
+  }
+
   /**
    * Configure the distance measure from the job
    * 
@@ -143,13 +191,11 @@
    * 
    * @param point a point
    * @param clusters a List<Cluster> to test
-   * @param values a Writable containing the input point and possible other
-   *        values of interest (payload)
    * @param output the OutputCollector to emit into
    * @throws IOException
    */
   public static void emitPointToNearestCluster(Vector point,
-      List<Cluster> clusters, Text values, OutputCollector<Text, Text> output)
+      List<Cluster> clusters, OutputCollector<Text, KMeansInfo> output)
       throws IOException {
     Cluster nearestCluster = null;
     double nearestDistance = Double.MAX_VALUE;
@@ -161,13 +207,11 @@
       }
     }
     // emit only clusterID
-    String outKey = nearestCluster.getIdentifier();
-    String value = "1\t" + values.toString();
-    output.collect(new Text(outKey), new Text(value));
+    output.collect(new Text(nearestCluster.getIdentifier()), new KMeansInfo(1, point));
   }
 
-  public static void outputPointWithClusterInfo(String key, Vector point,
-      List<Cluster> clusters, Text values, OutputCollector<Text, Text> output)
+  public static void outputPointWithClusterInfo(Vector point,
+      List<Cluster> clusters, OutputCollector<Text, Text> output)
       throws IOException {
     Cluster nearestCluster = null;
     double nearestDistance = Double.MAX_VALUE;
@@ -178,8 +222,9 @@
         nearestDistance = distance;
       }
     }
-    output.collect(new Text(key), new Text(Integer
-        .toString(nearestCluster.clusterId)));
+    //TODO: this is ugly
+    String name = point.getName();
+    output.collect(new Text(name != null && name.equals("") == false ? name : point.asFormatString()), new Text(String.valueOf(nearestCluster.clusterId)));
   }
 
   /**
@@ -216,6 +261,12 @@
   }
 
   /**
+   * For (de)serialization as a Writable
+   */
+  public Cluster() {
+  }
+
+  /**
    * Construct a new cluster with the given point as its center
    * 
    * @param center the center point

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java Tue Jun 23 18:23:18 2009
@@ -16,22 +16,50 @@
  * limitations under the License.
  */
 
-import java.io.IOException;
-
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.matrix.AbstractVector;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.mahout.matrix.Vector;
 
-public class KMeansClusterMapper extends KMeansMapper {
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KMeansClusterMapper extends MapReduceBase  implements
+        Mapper<WritableComparable<?>, Vector, Text, Text> {
+  protected List<Cluster> clusters;
+
+
+  @Override
+  public void map(WritableComparable<?> key, Vector point, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    Cluster.outputPointWithClusterInfo(point, clusters, output);
+  }
+
+  /**
+   * Configure the mapper by providing its clusters. Used by unit tests.
+   *
+   * @param clusters a List<Cluster>
+   */
+  void config(List<Cluster> clusters) {
+    this.clusters = clusters;
+  }
+
   @Override
-  public void map(WritableComparable<?> key, Text values,
-      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
-    final String valuesAsString = values.toString();
-    final Vector point = AbstractVector.decodeVector(valuesAsString);
-    Cluster.outputPointWithClusterInfo(valuesAsString, point, clusters, values, output);
+  public void configure(JobConf job) {
+    super.configure(job);
+    Cluster.configure(job);
+
+    clusters = new ArrayList<Cluster>();
+
+    KMeansUtil.configureWithClusterInfo(job.get(Cluster.CLUSTER_PATH_KEY),
+            clusters);
+
+    if (clusters.isEmpty())
+      throw new NullPointerException("Cluster is empty!!!");
   }
 
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java Tue Jun 23 18:23:18 2009
@@ -28,19 +28,18 @@
 import org.apache.mahout.matrix.AbstractVector;
 
 public class KMeansCombiner extends MapReduceBase implements
-    Reducer<Text, Text, Text, Text> {
+    Reducer<Text, KMeansInfo, Text, KMeansInfo> {
 
   @Override
-  public void reduce(Text key, Iterator<Text> values,
-      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<KMeansInfo> values,
+      OutputCollector<Text, KMeansInfo> output, Reporter reporter) throws IOException {
     Cluster cluster = new Cluster(key.toString());
     while (values.hasNext()) {
-      String[] numPointnValue = values.next().toString().split("\t");
-      cluster.addPoints(Integer.parseInt(numPointnValue[0].trim()),
-          AbstractVector.decodeVector(numPointnValue[1].trim()));
+      KMeansInfo next = values.next();
+      cluster.addPoints(next.getPoints(),
+          next.getPointTotal());
     }
-    output.collect(key, new Text(cluster.getNumPoints() + "\t"
-        + cluster.getPointTotal().asFormatString()));
+    output.collect(key, new KMeansInfo(cluster.getNumPoints(), cluster.getPointTotal())); 
   }
 
   @Override

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Tue Jun 23 18:23:18 2009
@@ -27,6 +27,8 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.mahout.matrix.Vector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,15 +48,17 @@
    * 
    * @param args Expects 6 args and they all correspond to the order of the params in {@link #runJob}
    */
-  public static void main(String[] args) {
+  public static void main(String[] args) throws ClassNotFoundException {
     String input = args[0];
     String clusters = args[1];
     String output = args[2];
     String measureClass = args[3];
     double convergenceDelta = Double.parseDouble(args[4]);
     int maxIterations = Integer.parseInt(args[5]);
+    String vectorClassName = args[6];
+    Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
     runJob(input, clusters, output, measureClass, convergenceDelta,
-        maxIterations, 2);
+        maxIterations, 2, vectorClass);
   }
 
   /**
@@ -67,10 +71,11 @@
    * @param convergenceDelta the convergence delta value
    * @param maxIterations the maximum number of iterations
    * @param numReduceTasks the number of reducers
+   * @param vectorClass
    */
   public static void runJob(String input, String clustersIn, String output,
-      String measureClass, double convergenceDelta, int maxIterations,
-      int numReduceTasks) {
+                            String measureClass, double convergenceDelta, int maxIterations,
+                            int numReduceTasks, Class<? extends Vector> vectorClass) {
     // iterate until the clusters converge
     boolean converged = false;
     int iteration = 0;
@@ -81,14 +86,14 @@
       // point the output to a new directory per iteration
       String clustersOut = output + "/clusters-" + iteration;
       converged = runIteration(input, clustersIn, clustersOut, measureClass,
-          delta, numReduceTasks);
+          delta, numReduceTasks, iteration);
       // now point the input to the old output directory
       clustersIn = output + "/clusters-" + iteration;
       iteration++;
     }
     // now actually cluster the points
     log.info("Clustering ");
-    runClustering(input, clustersIn, output + DEFAULT_OUTPUT_DIRECTORY, measureClass, delta);
+    runClustering(input, clustersIn, output + DEFAULT_OUTPUT_DIRECTORY, measureClass, delta, vectorClass);
   }
 
   /**
@@ -100,20 +105,23 @@
    * @param measureClass the classname of the DistanceMeasure
    * @param convergenceDelta the convergence delta value
    * @param numReduceTasks the number of reducer tasks
+   * @param iteration The iteration number
    * @return true if the iteration successfully runs
    */
   private static boolean runIteration(String input, String clustersIn,
-      String clustersOut, String measureClass, String convergenceDelta,
-      int numReduceTasks) {
+                                      String clustersOut, String measureClass, String convergenceDelta,
+                                      int numReduceTasks, int iteration) {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(KMeansDriver.class);
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(KMeansInfo.class);
     conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(Text.class);
+    conf.setOutputValueClass(Cluster.class);
 
     FileInputFormat.setInputPaths(conf, new Path(input));
     Path outPath = new Path(clustersOut);
     FileOutputFormat.setOutputPath(conf, outPath);
-
+    conf.setInputFormat(SequenceFileInputFormat.class);
     conf.setOutputFormat(SequenceFileOutputFormat.class);
     conf.setMapperClass(KMeansMapper.class);
     conf.setCombinerClass(KMeansCombiner.class);
@@ -122,6 +130,8 @@
     conf.set(Cluster.CLUSTER_PATH_KEY, clustersIn);
     conf.set(Cluster.DISTANCE_MEASURE_KEY, measureClass);
     conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+    conf.setInt(Cluster.ITERATION_NUMBER, iteration);
+    
     client.setConf(conf);
     try {
       JobClient.runJob(conf);
@@ -143,11 +153,16 @@
    * @param convergenceDelta the convergence delta value
    */
   private static void runClustering(String input, String clustersIn,
-      String output, String measureClass, String convergenceDelta) {
+      String output, String measureClass, String convergenceDelta, Class<? extends Vector> vectorClass) {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(KMeansDriver.class);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
 
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(vectorClass);
     conf.setOutputKeyClass(Text.class);
+    //the output is the cluster id
     conf.setOutputValueClass(Text.class);
 
     FileInputFormat.setInputPaths(conf, new Path(input));
@@ -182,10 +197,10 @@
     Path outPart = new Path(filePath);
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, outPart, conf);
     Text key = new Text();
-    Text value = new Text();
+    Cluster value = new Cluster();
     boolean converged = true;
     while (converged && reader.next(key, value)) {
-      converged = value.toString().charAt(0) == 'V';
+      converged = value.isConverged();
     }
     return converged;
   }

Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java?rev=787776&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java Tue Jun 23 18:23:18 2009
@@ -0,0 +1,50 @@
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.matrix.Vector;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+
+
+/**
+ *
+ *
+ **/
+public class KMeansInfo implements Writable {
+
+  private int points;
+  private Vector pointTotal;
+
+  public KMeansInfo() {
+  }
+
+  public KMeansInfo(int points, Vector pointTotal) {
+    this.points = points;
+    this.pointTotal = pointTotal;
+  }
+
+  public int getPoints() {
+    return points;
+  }
+
+  public Vector getPointTotal() {
+    return pointTotal;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(points);
+    out.writeUTF(pointTotal.getClass().getSimpleName().toString());
+    pointTotal.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.points = in.readInt();
+    String className = in.readUTF();
+    pointTotal = Cluster.vectorNameToVector(className);
+    pointTotal.readFields(in);
+  }
+}

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java Tue Jun 23 18:23:18 2009
@@ -23,13 +23,14 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.matrix.Vector;
 
 public class KMeansJob {
 
   private KMeansJob() {
   }
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) throws IOException, ClassNotFoundException {
 
     if (args.length != 7) {
       System.err.println("Expected number of arguments 10 and received:"
@@ -46,9 +47,10 @@
     double convergenceDelta = Double.parseDouble(args[index++]);
     int maxIterations = Integer.parseInt(args[index++]);
     int numCentroids = Integer.parseInt(args[index++]);
-
+    String vectorClassName = args[6];
+    Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
     runJob(input, clusters, output, measureClass, convergenceDelta,
-        maxIterations, numCentroids);
+        maxIterations, numCentroids, vectorClass);
   }
 
   /**
@@ -61,10 +63,11 @@
    * @param measureClass the classname of the DistanceMeasure
    * @param convergenceDelta the convergence delta value
    * @param maxIterations the maximum number of iterations
+   * @param vectorClass
    */
   public static void runJob(String input, String clustersIn, String output,
-      String measureClass, double convergenceDelta, int maxIterations,
-      int numCentroids) throws IOException {
+                            String measureClass, double convergenceDelta, int maxIterations,
+                            int numCentroids, Class<? extends Vector> vectorClass) throws IOException {
     // delete the output directory
     JobConf conf = new JobConf(KMeansJob.class);
     Path outPath = new Path(output);
@@ -75,6 +78,6 @@
     fs.mkdirs(outPath);
 
     KMeansDriver.runJob(input, clustersIn, output, measureClass,
-        convergenceDelta, maxIterations, numCentroids);
+        convergenceDelta, maxIterations, numCentroids, vectorClass);
   }
 }

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java Tue Jun 23 18:23:18 2009
@@ -31,15 +31,14 @@
 import org.apache.mahout.matrix.Vector;
 
 public class KMeansMapper extends MapReduceBase implements
-    Mapper<WritableComparable<?>, Text, Text, Text> {
+    Mapper<WritableComparable<?>, Vector, Text, KMeansInfo> {
 
   protected List<Cluster> clusters;
 
   @Override
-  public void map(WritableComparable<?> key, Text values,
-      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
-    Vector point = AbstractVector.decodeVector(values.toString());
-    Cluster.emitPointToNearestCluster(point, clusters, values, output);
+  public void map(WritableComparable<?> key, Vector point,
+      OutputCollector<Text, KMeansInfo> output, Reporter reporter) throws IOException {
+    Cluster.emitPointToNearestCluster(point, clusters,  output);
   }
 
   /**
@@ -57,7 +56,7 @@
     Cluster.configure(job);
 
     clusters = new ArrayList<Cluster>();
-
+    int iteration = job.getInt(Cluster.ITERATION_NUMBER, -1);
     KMeansUtil.configureWithClusterInfo(job.get(Cluster.CLUSTER_PATH_KEY),
         clusters);
 

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java Tue Jun 23 18:23:18 2009
@@ -30,27 +30,25 @@
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.Vector;
 
 public class KMeansReducer extends MapReduceBase implements
-    Reducer<Text, Text, Text, Text> {
+    Reducer<Text, KMeansInfo, Text, Cluster> {
 
   private Map<String, Cluster> clusterMap;
 
   @Override
-  public void reduce(Text key, Iterator<Text> values,
-      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<KMeansInfo> values,
+      OutputCollector<Text, Cluster> output, Reporter reporter) throws IOException {
     Cluster cluster = clusterMap.get(key.toString());
 
     while (values.hasNext()) {
-      String value = values.next().toString();
-      String[] numNValue = value.split("\t");
-      cluster.addPoints(Integer.parseInt(numNValue[0].trim()), AbstractVector
-          .decodeVector(numNValue[1].trim()));
+      KMeansInfo delta = values.next();
+      cluster.addPoints(delta.getPoints(), delta.getPointTotal());
     }
     // force convergence calculation
     cluster.computeConvergence();
-    output.collect(new Text(cluster.getIdentifier()), new Text(Cluster
-        .formatCluster(cluster)));
+    output.collect(new Text(cluster.getIdentifier()), cluster);
   }
 
   @Override

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java Tue Jun 23 18:23:18 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.clustering.canopy.Canopy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,13 +72,24 @@
       for (Path path : result) {
         SequenceFile.Reader reader = null;
         try {
-          reader =new SequenceFile.Reader(fs, path, job); 
+          reader =new SequenceFile.Reader(fs, path, job);
+          Class valueClass = reader.getValueClass();
           Text key = new Text();
-          Text value = new Text();
-          while (reader.next(key, value)) {
-            // get the cluster info
-            Cluster cluster = Cluster.decodeCluster(value.toString());
-            clusters.add(cluster);
+          if (valueClass.equals(Cluster.class)){
+            Cluster value = new Cluster();
+            while (reader.next(key, value)) {
+              // get the cluster info
+              clusters.add(value);
+              value = new Cluster();
+            }
+          } else if (valueClass.equals(Canopy.class)){
+            Canopy value = new Canopy();
+            while (reader.next(key, value)) {
+              // get the cluster info
+              Cluster cluster = new Cluster(value.getCenter(), value.getCanopyId());
+              clusters.add(cluster);
+              value = new Canopy();
+            }
           }
         } finally {
           if (reader != null) {

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java Tue Jun 23 18:23:18 2009
@@ -23,6 +23,9 @@
 
 /**
  * The basic interface including numerous convenience functions
+ * <p/>
+ * NOTE: All implementing classes must have a constructor that takes an int for cardinality
+ * and a no-arg constructor that can be used for marshalling the Writable instance
  */
 public interface Vector extends Iterable<Vector.Element>, Cloneable, Writable {
 

Added: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java?rev=787776&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java (added)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java Tue Jun 23 18:23:18 2009
@@ -0,0 +1,31 @@
+package org.apache.mahout.clustering;
+
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.SparseVector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.LongWritable;
+
+import java.util.List;
+import java.io.IOException;
+
+/**
+ *
+ *
+ **/
+public class ClusteringTestUtils {
+
+  public static void writePointsToFile(List<Vector> points, String fileName, FileSystem fs, Configuration conf)
+          throws IOException {
+    Path path = new Path(fileName);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, LongWritable.class, SparseVector.class);
+    long recNum = 0;
+    for (Vector point : points) {
+      //point.write(dataOut);
+      writer.append(new LongWritable(recNum++), point);
+    }
+    writer.close();
+  }
+}



Mime
View raw message