Author: gsingers
Date: Tue Jun 23 18:23:18 2009
New Revision: 787776
URL: http://svn.apache.org/viewvc?rev=787776&view=rev
Log:
MAHOUT-137: convert kmeans, canopy and fuzzy kmeans to use writable instead of string based serialization
Added:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/JWriterTermInfoWriter.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/JWriterVectorWriter.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/SequenceFileVectorWriter.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/TermInfoWriter.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/io/VectorWriter.java
Modified:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputDriver.java
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/InputMapper.java
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/dirichlet/Job.java
lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/Driver.java
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java Tue Jun 23 18:23:18 2009
@@ -18,6 +18,7 @@
package org.apache.mahout.clustering.canopy;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.mahout.matrix.AbstractVector;
@@ -26,6 +27,8 @@
import org.apache.mahout.utils.DistanceMeasure;
import java.io.IOException;
+import java.io.DataOutput;
+import java.io.DataInput;
import java.util.List;
/**
@@ -34,7 +37,7 @@
* a point total which is the sum of all the points and is used to compute the
* centroid when needed.
*/
-public class Canopy {
+public class Canopy implements Writable {
// keys used by Driver, Mapper, Combiner & Reducer
public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
@@ -58,7 +61,7 @@
private static DistanceMeasure measure;
// this canopy's canopyId
- private final int canopyId;
+ private int canopyId;
// the current center
private Vector center = new SparseVector(0);
@@ -70,6 +73,12 @@
private Vector pointTotal = null;
/**
+ * Used w
+ */
+ public Canopy() {
+ }
+
+ /**
* Create a new Canopy containing the given point
*
* @param point a point in vector space
@@ -164,7 +173,7 @@
* @param collector an OutputCollector in which to emit the point
*/
public static void emitPointToNewCanopies(Vector point,
- List<Canopy> canopies, OutputCollector<Text, Text> collector)
+ List<Canopy> canopies, OutputCollector<Text, Vector> collector)
throws IOException {
boolean pointStronglyBound = false;
for (Canopy canopy : canopies) {
@@ -188,13 +197,11 @@
*
* @param point the point to be added
* @param canopies the List<Canopy> to be appended
- * @param writable the original Writable from the input, may include arbitrary
- * payload information after the point [...]<payload>
* @param collector an OutputCollector in which to emit the point
*/
public static void emitPointToExistingCanopies(Vector point,
- List<Canopy> canopies, Text writable,
- OutputCollector<Text, Text> collector) throws IOException {
+ List<Canopy> canopies,
+ OutputCollector<Text, Vector> collector) throws IOException {
double minDist = Double.MAX_VALUE;
Canopy closest = null;
boolean isCovered = false;
@@ -202,7 +209,7 @@
double dist = measure.distance(canopy.getCenter(), point);
if (dist < t1) {
isCovered = true;
- collector.collect(new Text(canopy.getIdentifier()), writable);
+ collector.collect(new Text(canopy.getIdentifier()), point);
} else if (dist < minDist) {
minDist = dist;
closest = canopy;
@@ -211,7 +218,23 @@
// if the point is not contained in any canopies (due to canopy centroid
// clustering), emit the point to the closest covering canopy.
if (!isCovered)
- collector.collect(new Text(closest.getIdentifier()), writable);
+ collector.collect(new Text(closest.getIdentifier()), point);
+ }
+
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(canopyId);
+ computeCentroid().write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ canopyId = in.readInt();
+ this.center = new SparseVector();
+ center.readFields(in);
+ this.pointTotal = center.clone();
+ this.numPoints = 1;
}
/**
@@ -260,10 +283,9 @@
*
* @param point a point to emit.
*/
- public void emitPoint(Vector point, OutputCollector<Text, Text> collector)
+ public void emitPoint(Vector point, OutputCollector<Text, Vector> collector)
throws IOException {
- collector.collect(new Text(this.getIdentifier()), new Text(point
- .asFormatString()));
+ collector.collect(new Text(this.getIdentifier()), point);
}
@Override
@@ -302,8 +324,8 @@
*
* @return a SparseVector (required by Mapper) which is the new centroid
*/
- public SparseVector computeCentroid() {
- SparseVector result = new SparseVector(pointTotal.size());
+ public Vector computeCentroid() {
+ Vector result = new SparseVector(pointTotal.size());
for (int i = 0; i < pointTotal.size(); i++)
result.set(i, pointTotal.get(i) / numPoints);
return result;
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusteringJob.java Tue Jun 23 18:23:18 2009
@@ -17,6 +17,8 @@
package org.apache.mahout.clustering.canopy;
+import org.apache.mahout.matrix.Vector;
+
import java.io.IOException;
public class CanopyClusteringJob {
@@ -36,13 +38,15 @@
/**
* @param args
*/
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
String input = args[0];
String output = args[1];
String measureClassName = args[2];
double t1 = Double.parseDouble(args[3]);
double t2 = Double.parseDouble(args[4]);
- runJob(input, output, measureClassName, t1, t2);
+ String vectorClassName = args[5];
+ Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
+ runJob(input, output, measureClassName, t1, t2, vectorClass);
}
/**
@@ -55,9 +59,9 @@
* @param t2 the T2 distance threshold
*/
public static void runJob(String input, String output,
- String measureClassName, double t1, double t2) throws IOException {
- CanopyDriver.runJob(input, output + DEFAULT_CANOPIES_OUTPUT_DIRECTORY, measureClassName, t1, t2);
- ClusterDriver.runJob(input, output + DEFAULT_CANOPIES_OUTPUT_DIRECTORY, output, measureClassName, t1, t2);
+ String measureClassName, double t1, double t2, Class<? extends Vector> vectorClass) throws IOException {
+ CanopyDriver.runJob(input, output + DEFAULT_CANOPIES_OUTPUT_DIRECTORY, measureClassName, t1, t2, vectorClass);
+ ClusterDriver.runJob(input, output + DEFAULT_CANOPIES_OUTPUT_DIRECTORY, output, measureClassName, t1, t2, vectorClass);
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java Tue Jun 23 18:23:18 2009
@@ -17,8 +17,6 @@
package org.apache.mahout.clustering.canopy;
-import java.io.IOException;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -26,21 +24,26 @@
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.mahout.matrix.SparseVector;
+import org.apache.mahout.matrix.Vector;
+
+import java.io.IOException;
public class CanopyDriver {
private CanopyDriver() {
}
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
String input = args[0];
String output = args[1];
String measureClassName = args[2];
double t1 = Double.parseDouble(args[3]);
double t2 = Double.parseDouble(args[4]);
- runJob(input, output, measureClassName, t1, t2);
+ String vectorClassName = args[5];
+ Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
+ runJob(input, output, measureClassName, t1, t2, vectorClass);
}
/**
@@ -51,9 +54,13 @@
* @param measureClassName the DistanceMeasure class name
* @param t1 the T1 distance threshold
* @param t2 the T2 distance threshold
+ * @param vectorClass the {@link Class} of Vector to use for the Map Output Key. Must be a concrete type
+ *
+ * @see org.apache.mahout.matrix.SparseVector
+ * @see org.apache.mahout.matrix.DenseVector
*/
public static void runJob(String input, String output,
- String measureClassName, double t1, double t2) throws IOException {
+ String measureClassName, double t1, double t2, Class<? extends Vector> vectorClass) throws IOException {
JobClient client = new JobClient();
JobConf conf = new JobConf(
org.apache.mahout.clustering.canopy.CanopyDriver.class);
@@ -61,10 +68,12 @@
conf.set(Canopy.T1_KEY, String.valueOf(t1));
conf.set(Canopy.T2_KEY, String.valueOf(t2));
+ conf.setInputFormat(SequenceFileInputFormat.class);
+
conf.setMapOutputKeyClass(Text.class);
- conf.setMapOutputValueClass(SparseVector.class);
+ conf.setMapOutputValueClass(vectorClass);
conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
+ conf.setOutputValueClass(Canopy.class);
FileInputFormat.setInputPaths(conf, new Path(input));
Path outPath = new Path(output);
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java Tue Jun 23 18:23:18 2009
@@ -33,17 +33,16 @@
import java.util.List;
public class CanopyMapper extends MapReduceBase implements
- Mapper<WritableComparable<?>, Text, Text, Vector> {
+ Mapper<WritableComparable<?>, Vector, Text, Vector> {
private final List<Canopy> canopies = new ArrayList<Canopy>();
private OutputCollector<Text, Vector> outputCollector;
@Override
- public void map(WritableComparable<?> key, Text values,
+ public void map(WritableComparable<?> key, Vector point,
OutputCollector<Text, Vector> output, Reporter reporter) throws IOException {
outputCollector = output;
- Vector point = AbstractVector.decodeVector(values.toString());
Canopy.addPointToCanopies(point, canopies);
}
@@ -61,7 +60,7 @@
@Override
public void close() throws IOException {
for (Canopy canopy : canopies) {
- SparseVector centroid = canopy.computeCentroid();
+ Vector centroid = canopy.computeCentroid();
outputCollector.collect(new Text("centroid"), centroid);
}
super.close();
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java Tue Jun 23 18:23:18 2009
@@ -31,20 +31,19 @@
import org.apache.mahout.matrix.Vector;
public class CanopyReducer extends MapReduceBase implements
- Reducer<Text, Vector, Text, Text> {
+ Reducer<Text, Vector, Text, Canopy> {
private final List<Canopy> canopies = new ArrayList<Canopy>();
@Override
public void reduce(Text key, Iterator<Vector> values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ OutputCollector<Text, Canopy> output, Reporter reporter) throws IOException {
while (values.hasNext()) {
Vector point = values.next();
Canopy.addPointToCanopies(point, canopies);
}
for (Canopy canopy : canopies)
- output.collect(new Text(canopy.getIdentifier()), new Text(Canopy
- .formatCanopy(canopy)));
+ output.collect(new Text(canopy.getIdentifier()), canopy);
}
@Override
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterDriver.java Tue Jun 23 18:23:18 2009
@@ -24,7 +24,10 @@
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.mahout.matrix.Vector;
import java.io.IOException;
@@ -35,14 +38,16 @@
private ClusterDriver() {
}
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
String points = args[0];
String canopies = args[1];
String output = args[2];
String measureClassName = args[3];
double t1 = Double.parseDouble(args[4]);
double t2 = Double.parseDouble(args[5]);
- runJob(points, canopies, output, measureClassName, t1, t2);
+ String vectorClassName = args[6];
+ Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
+ runJob(points, canopies, output, measureClassName, t1, t2, vectorClass);
}
/**
@@ -54,9 +59,10 @@
* @param measureClassName the DistanceMeasure class name
* @param t1 the T1 distance threshold
* @param t2 the T2 distance threshold
+ * @param vectorClass The {@link Class} of Vector to use for the Output Value Class. Must be concrete.
*/
public static void runJob(String points, String canopies, String output,
- String measureClassName, double t1, double t2) throws IOException {
+ String measureClassName, double t1, double t2, Class<? extends Vector> vectorClass) throws IOException {
JobClient client = new JobClient();
JobConf conf = new JobConf(
org.apache.mahout.clustering.canopy.ClusterDriver.class);
@@ -66,8 +72,13 @@
conf.set(Canopy.T2_KEY, String.valueOf(t2));
conf.set(Canopy.CANOPY_PATH_KEY, canopies);
+ conf.setInputFormat(SequenceFileInputFormat.class);
+
+ /*conf.setMapOutputKeyClass(Text.class);
+ conf.setMapOutputValueClass(SparseVector.class);*/
conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
+ conf.setOutputValueClass(vectorClass);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(points));
Path outPath = new Path(output + DEFAULT_CLUSTER_OUTPUT_DIRECTORY);
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java Tue Jun 23 18:23:18 2009
@@ -29,21 +29,21 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.matrix.AbstractVector;
import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.SparseVector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ClusterMapper extends MapReduceBase implements
- Mapper<WritableComparable<?>, Text, Text, Text> {
+ Mapper<WritableComparable<?>, Vector, Text, Vector> {
private List<Canopy> canopies;
@Override
- public void map(WritableComparable<?> key, Text values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
- Vector point = AbstractVector.decodeVector(values.toString());
- Canopy.emitPointToExistingCanopies(point, canopies, values, output);
+ public void map(WritableComparable<?> key, Vector point,
+ OutputCollector<Text, Vector> output, Reporter reporter) throws IOException {
+ Canopy.emitPointToExistingCanopies(point, canopies, output);
}
/**
@@ -69,10 +69,10 @@
SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
try {
Text key = new Text();
- Text value = new Text();
+ Canopy value = new Canopy();
while (reader.next(key, value)) {
- Canopy canopy = Canopy.decodeCanopy(value.toString());
- canopies.add(canopy);
+ canopies.add(value);
+ value = new Canopy();
}
} finally {
reader.close();
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansClusterMapper.java Tue Jun 23 18:23:18 2009
@@ -18,20 +18,51 @@
package org.apache.mahout.clustering.fuzzykmeans;
import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.mahout.matrix.AbstractVector;
import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.clustering.kmeans.Cluster;
-public class FuzzyKMeansClusterMapper extends FuzzyKMeansMapper {
+public class FuzzyKMeansClusterMapper extends MapReduceBase implements
+ Mapper<WritableComparable<?>, Vector, Text, FuzzyKMeansOutput> {
+ protected List<SoftCluster> clusters;
@Override
- public void map(WritableComparable<?> key, Text values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException
+ public void map(WritableComparable<?> key, Vector point,
+ OutputCollector<Text, FuzzyKMeansOutput> output, Reporter reporter) throws IOException
{
- Vector point = AbstractVector.decodeVector(values.toString());
- SoftCluster.outputPointWithClusterProbabilities(key.toString(), point, clusters, values, output);
- }
+ SoftCluster.outputPointWithClusterProbabilities(key.toString(), point, clusters, output);
+ }
+
+ /**
+ * Configure the mapper by providing its clusters. Used by unit tests.
+ *
+ * @param clusters a List<Cluster>
+ */
+ void config(List<SoftCluster> clusters) {
+ this.clusters = clusters;
+ }
+
+ @Override
+ public void configure(JobConf job) {
+
+ super.configure(job);
+ SoftCluster.configure(job);
+ clusters = new ArrayList<SoftCluster>();
+
+ FuzzyKMeansUtil.configureWithClusterInfo(job
+ .get(SoftCluster.CLUSTER_PATH_KEY), clusters);
+
+ if (clusters.isEmpty())
+ throw new NullPointerException("Cluster is empty!!!");
+ }
+
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansCombiner.java Tue Jun 23 18:23:18 2009
@@ -26,44 +26,28 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.matrix.AbstractVector;
-import org.apache.mahout.matrix.Vector;
public class FuzzyKMeansCombiner extends MapReduceBase implements
- Reducer<Text, Text, Text, Text> {
+ Reducer<Text, FuzzyKMeansInfo, Text, FuzzyKMeansInfo> {
@Override
- public void reduce(Text key, Iterator<Text> values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<FuzzyKMeansInfo> values,
+ OutputCollector<Text, FuzzyKMeansInfo> output, Reporter reporter) throws IOException {
SoftCluster cluster = new SoftCluster(key.toString().trim());
while (values.hasNext()) {
- String pointInfo = values.next().toString();
- // check whether this is already processed
- int mapperSepIndex = pointInfo
- .indexOf(FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR); // ~ separator is
- // used in mapper
- int combinerSepIndex = pointInfo
- .indexOf(FuzzyKMeansDriver.COMBINER_VALUE_SEPARATOR); // tab separator
- // is used in
- // combiner
- int index = mapperSepIndex == -1 ? combinerSepIndex : mapperSepIndex;// needed
- // to
- // split
- // prob and vector
- double pointProb = Double.parseDouble(pointInfo.substring(0, index));
+ //String pointInfo = values.next().toString();
+ FuzzyKMeansInfo info = values.next();
- String encodedVector = pointInfo.substring(index + 1);
- Vector v = AbstractVector.decodeVector(encodedVector);
- if (mapperSepIndex != -1) // first time thru combiner
+ if (info.combinerPass == 0) // first time thru combiner
{
- cluster.addPoint(v, Math.pow(pointProb, SoftCluster.getM()));
+ cluster.addPoint(info.getVector(), Math.pow(info.getProbability(), SoftCluster.getM()));
} else {
- cluster.addPoints(v, pointProb);
+ cluster.addPoints(info.getVector(), info.getProbability());
}
+ info.combinerPass++;
}
- output.collect(key, new Text(cluster.getPointProbSum()
- + FuzzyKMeansDriver.COMBINER_VALUE_SEPARATOR
- + cluster.getWeightedPointTotal().asFormatString()));
+ //TODO: how do we pass along the combinerPass? Or do we not need to?
+ output.collect(key, new FuzzyKMeansInfo(cluster.getPointProbSum(), cluster.getWeightedPointTotal(), 1));
}
@Override
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java Tue Jun 23 18:23:18 2009
@@ -28,12 +28,17 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueLineRecordReader;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.kmeans.Cluster;
+import org.apache.mahout.matrix.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,9 +47,6 @@
private static final Logger log = LoggerFactory
.getLogger(FuzzyKMeansDriver.class);
- public static final String MAPPER_VALUE_SEPARATOR = "~";
-
- public static final String COMBINER_VALUE_SEPARATOR = "\t";
private FuzzyKMeansDriver() {
}
@@ -54,8 +56,8 @@
.println("Usage: input clusterIn output measureClass convergenceDelta maxIterations m [doClusteringOnly]");
}
- public static void main(String[] args) {
- if (args.length < 7) {
+ public static void main(String[] args) throws ClassNotFoundException {
+ if (args.length < 8) {
System.out.println("Expected number of arguments: 7 or 8 : received:"
+ args.length);
printMessage();
@@ -68,16 +70,18 @@
double convergenceDelta = Double.parseDouble(args[index++]);
int maxIterations = Integer.parseInt(args[index++]);
float m = Float.parseFloat(args[index++]);
+ String vectorClassName = args[index++];
+ Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
boolean doClustering = false;
- if (args.length > 7)
+ if (args.length > 8){
doClustering = Boolean.parseBoolean(args[index++]);
+ }
if (doClustering) {
runClustering(input, clusters, output, measureClass, Double
- .toString(convergenceDelta), 500, m);
+ .toString(convergenceDelta), 500, m, vectorClass);
} else {
runJob(input, clusters, output, measureClass, convergenceDelta,
- maxIterations, 10, 10, m);
-
+ maxIterations, 10, 10, m, vectorClass);
}
}
@@ -92,10 +96,11 @@
* @param convergenceDelta the convergence delta value
* @param maxIterations the maximum number of iterations
* @param numMapTasks the number of mapper tasks
+ * @param vectorClass
*/
public static void runJob(String input, String clustersIn, String output,
- String measureClass, double convergenceDelta, int maxIterations,
- int numMapTasks, int numReduceTasks, float m) {
+ String measureClass, double convergenceDelta, int maxIterations,
+ int numMapTasks, int numReduceTasks, float m, Class<? extends Vector> vectorClass) {
boolean converged = false;
int iteration = 0;
@@ -119,7 +124,7 @@
log.info("Clustering ");
runClustering(input, clustersIn, output + File.separator + "points",
- measureClass, delta, numMapTasks, m);
+ measureClass, delta, numMapTasks, m, vectorClass);
}
/**
@@ -142,13 +147,18 @@
JobConf conf = new JobConf(FuzzyKMeansJob.class);
conf.setJobName("Fuzzy K Means{" + iterationNumber + '}');
+ conf.setMapOutputKeyClass(Text.class);
+ conf.setMapOutputValueClass(FuzzyKMeansInfo.class);
conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
+ conf.setOutputValueClass(SoftCluster.class);
FileInputFormat.setInputPaths(conf, new Path(input));
Path outPath = new Path(clustersOut);
FileOutputFormat.setOutputPath(conf, outPath);
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
conf.setMapperClass(FuzzyKMeansMapper.class);
conf.setCombinerClass(FuzzyKMeansCombiner.class);
conf.setReducerClass(FuzzyKMeansReducer.class);
@@ -185,13 +195,15 @@
*/
private static void runClustering(String input, String clustersIn,
String output, String measureClass, String convergenceDelta,
- int numMapTasks, float m) {
+ int numMapTasks, float m, Class<? extends Vector> vectorClass) {
JobConf conf = new JobConf(FuzzyKMeansDriver.class);
conf.setJobName("Fuzzy K Means Clustering");
+ conf.setMapOutputKeyClass(Text.class);
+ conf.setMapOutputValueClass(vectorClass);
conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
+ conf.setOutputValueClass(FuzzyKMeansOutput.class);
FileInputFormat.setInputPaths(conf, new Path(input));
Path outPath = new Path(output);
@@ -199,6 +211,9 @@
conf.setMapperClass(FuzzyKMeansClusterMapper.class);
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
// uncomment it to run locally
// conf.set("mapred.job.tracker", "local");
conf.setNumMapTasks(numMapTasks);
@@ -246,15 +261,16 @@
for (Path p : result) {
- KeyValueLineRecordReader reader = null;
+ SequenceFile.Reader reader = null;
try {
- reader = new KeyValueLineRecordReader(conf, new FileSplit(p, 0, fs
- .getFileStatus(p).getLen(), (String[]) null));
+ reader = new SequenceFile.Reader(fs, p, conf);
+ /*new KeyValueLineRecordReader(conf, new FileSplit(p, 0, fs
+ .getFileStatus(p).getLen(), (String[]) null));*/
Text key = new Text();
- Text value = new Text();
+ SoftCluster value = new SoftCluster();
while (converged && reader.next(key, value)) {
- converged = value.toString().charAt(0) == 'V';
+ converged = value.isConverged();
}
} finally {
if (reader != null) {
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java?rev=787776&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansInfo.java Tue Jun 23 18:23:18 2009
@@ -0,0 +1,59 @@
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.clustering.kmeans.Cluster;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+
+
+/**
+ *
+ *
+ **/
+public class FuzzyKMeansInfo implements Writable {
+
+ private double probability;
+ private Vector pointTotal;
+
+ public int combinerPass = 0;
+
+ public FuzzyKMeansInfo() {
+ }
+
+ public FuzzyKMeansInfo(double probability, Vector pointTotal) {
+ this.probability = probability;
+ this.pointTotal = pointTotal;
+ }
+
+ public FuzzyKMeansInfo(double probability, Vector pointTotal, int combinerPass) {
+ this.probability = probability;
+ this.pointTotal = pointTotal;
+ this.combinerPass = combinerPass;
+ }
+
+ public Vector getVector() {
+ return pointTotal;
+ }
+
+ public double getProbability() {
+ return probability;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeDouble(probability);
+ out.writeUTF(pointTotal.getClass().getSimpleName().toString());
+ pointTotal.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.probability = in.readDouble();
+ String className = in.readUTF();
+ pointTotal = Cluster.vectorNameToVector(className);
+ pointTotal.readFields(in);
+ }
+}
\ No newline at end of file
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansJob.java Tue Jun 23 18:23:18 2009
@@ -21,6 +21,7 @@
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.utils.ManhattanDistanceMeasure;
+import org.apache.mahout.matrix.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,10 +33,10 @@
private FuzzyKMeansJob() {
}
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
- if (args.length != 10) {
- log.warn("Expected num Arguments: 10 received: {}", args.length);
+ if (args.length != 11) {
+ log.warn("Expected num Arguments: 11 received: {}", args.length);
printMessage();
return;
}
@@ -50,9 +51,10 @@
int numReduceTasks = Integer.parseInt(args[index++]);
boolean doCanopy = Boolean.parseBoolean(args[index++]);
float m = Float.parseFloat(args[index++]);
-
+ String vectorClassName = args[index++];
+ Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
runJob(input, clusters, output, measureClass, convergenceDelta,
- maxIterations, numMapTasks, numReduceTasks, doCanopy, m);
+ maxIterations, numMapTasks, numReduceTasks, doCanopy, m, vectorClass);
}
/**
@@ -78,18 +80,18 @@
*/
public static void runJob(String input, String clustersIn, String output,
String measureClass, double convergenceDelta, int maxIterations,
- int numMapTasks, int numReduceTasks, boolean doCanopy, float m)
+ int numMapTasks, int numReduceTasks, boolean doCanopy, float m, Class<? extends Vector> vectorClass)
throws IOException {
// run canopy to find initial clusters
if (doCanopy) {
CanopyDriver.runJob(input, clustersIn, ManhattanDistanceMeasure.class
- .getName(), 100.1, 50.1);
+ .getName(), 100.1, 50.1, vectorClass);
}
// run fuzzy k -means
FuzzyKMeansDriver.runJob(input, clustersIn, output, measureClass,
- convergenceDelta, maxIterations, numMapTasks, numReduceTasks, m);
+ convergenceDelta, maxIterations, numMapTasks, numReduceTasks, m, vectorClass);
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansMapper.java Tue Jun 23 18:23:18 2009
@@ -34,17 +34,16 @@
import org.slf4j.LoggerFactory;
public class FuzzyKMeansMapper extends MapReduceBase implements
- Mapper<WritableComparable<?>, Text, Text, Text> {
+ Mapper<WritableComparable<?>, Vector, Text, FuzzyKMeansInfo> {
private static final Logger log = LoggerFactory.getLogger(FuzzyKMeansMapper.class);
protected List<SoftCluster> clusters;
@Override
- public void map(WritableComparable<?> key, Text values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
- Vector point = AbstractVector.decodeVector(values.toString());
- SoftCluster.emitPointProbToCluster(point, clusters, values, output);
+ public void map(WritableComparable<?> key, Vector point,
+ OutputCollector<Text, FuzzyKMeansInfo> output, Reporter reporter) throws IOException {
+ SoftCluster.emitPointProbToCluster(point, clusters, output);
}
/**
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java?rev=787776&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansOutput.java Tue Jun 23 18:23:18 2009
@@ -0,0 +1,66 @@
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+
+/**
+ *
+ *
+ **/
+public class FuzzyKMeansOutput implements Writable {
+ //parallel arrays
+ private SoftCluster[] clusters;
+ private double[] probabilities;
+
+ public FuzzyKMeansOutput() {
+ }
+
+ public FuzzyKMeansOutput(int size) {
+ clusters = new SoftCluster[size];
+ probabilities = new double[size];
+ }
+
+ public SoftCluster[] getClusters() {
+ return clusters;
+ }
+
+ public double[] getProbabilities() {
+ return probabilities;
+ }
+
+ public void add(int i, SoftCluster softCluster, double probWeight) {
+ clusters[i] = softCluster;
+ probabilities[i] = probWeight;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(clusters.length);
+ for (int i = 0; i < clusters.length; i++) {
+ clusters[i].write(out);
+ }
+ out.writeInt(probabilities.length);
+ for (int i = 0; i < probabilities.length; i++) {
+ out.writeDouble(probabilities[i]);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numClusters = in.readInt();
+ clusters = new SoftCluster[numClusters];
+ for (int i = 0; i < numClusters; i++) {
+ clusters[i] = new SoftCluster();
+ clusters[i].readFields(in);
+ }
+ int numProbs = in.readInt();
+ probabilities = new double[numProbs];
+ for (int i = 0; i < numProbs; i++) {
+ probabilities[i] = in.readDouble();
+ }
+ }
+}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansReducer.java Tue Jun 23 18:23:18 2009
@@ -34,44 +34,30 @@
import org.apache.mahout.matrix.Vector;
public class FuzzyKMeansReducer extends MapReduceBase implements
- Reducer<Text, Text, Text, Text> {
+ Reducer<Text, FuzzyKMeansInfo, Text, SoftCluster> {
protected Map<String, SoftCluster> clusterMap;
@Override
- public void reduce(Text key, Iterator<Text> values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<FuzzyKMeansInfo> values,
+ OutputCollector<Text, SoftCluster> output, Reporter reporter) throws IOException {
SoftCluster cluster = clusterMap.get(key.toString());
while (values.hasNext()) {
- String value = values.next().toString();
- int mapperSepIndex = value
- .indexOf(FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR); // tild separator
- // is used in
- // mapper
- int combinerSepIndex = value
- .indexOf(FuzzyKMeansDriver.COMBINER_VALUE_SEPARATOR); // tab separator
- // is used in
- // combiner
- int index = mapperSepIndex == -1 ? combinerSepIndex : mapperSepIndex;// needed
- // to
- // split
- // prob and vector
- double partialSumPtProb = Double.parseDouble(value.substring(0, index));
- Vector total = AbstractVector.decodeVector(value.substring(index + 1));
- if (mapperSepIndex != -1) // escaped from combiner
+ FuzzyKMeansInfo value = values.next();
+
+ if (value.combinerPass == 0) // escaped from combiner
{
- cluster.addPoint(total, Math.pow(partialSumPtProb, SoftCluster.getM()));
+ cluster.addPoint(value.getVector(), Math.pow(value.getProbability(), SoftCluster.getM()));
} else {
- cluster.addPoints(total, partialSumPtProb);
+ cluster.addPoints(value.getVector(), value.getProbability());
}
}
// force convergence calculation
cluster.computeConvergence();
- output.collect(new Text(cluster.getIdentifier()), new Text(SoftCluster
- .formatCluster(cluster)));
+ output.collect(new Text(cluster.getIdentifier()), cluster);
}
@Override
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java Tue Jun 23 18:23:18 2009
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueLineRecordReader;
@@ -72,24 +73,23 @@
//iterate thru the result path list
for (Path path : result) {
- RecordReader<Text, Text> recordReader = null;
-// SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
+ //RecordReader<Text, Text> recordReader = null;
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, job);
try {
- recordReader = new KeyValueLineRecordReader(job, new FileSplit(path, 0, fs.getFileStatus(path).getLen(), (String[]) null));
+ //recordReader = new KeyValueLineRecordReader(job, new FileSplit(path, 0, fs.getFileStatus(path).getLen(), (String[]) null));
Text key = new Text();
- Text value = new Text();
+ SoftCluster value = new SoftCluster();
//int counter = 1;
- while (recordReader.next(key, value)) {
+ while (reader.next(key, value)) {
//get the cluster info
- SoftCluster cluster = SoftCluster.decodeCluster(value.toString());
// add the center so the centroid will be correct on output
// formatting
// cluster.addPoint(cluster.getCenter(), 1);
- clusters.add(cluster);
+ clusters.add(value);
}
} finally {
- if (recordReader != null) {
- recordReader.close();
+ if (reader != null) {
+ reader.close();
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java Tue Jun 23 18:23:18 2009
@@ -18,10 +18,13 @@
package org.apache.mahout.clustering.fuzzykmeans;
import java.io.IOException;
+import java.io.DataOutput;
+import java.io.DataInput;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.mahout.matrix.AbstractVector;
@@ -29,8 +32,9 @@
import org.apache.mahout.matrix.SquareRootFunction;
import org.apache.mahout.matrix.Vector;
import org.apache.mahout.utils.DistanceMeasure;
+import org.apache.mahout.clustering.kmeans.Cluster;
-public class SoftCluster {
+public class SoftCluster implements Writable {
public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.kmeans.measure";
@@ -55,7 +59,7 @@
private static int nextClusterId = 0;
// this cluster's clusterId
- private final int clusterId;
+ private int clusterId;
// the current center
private Vector center = new SparseVector(0);
@@ -115,6 +119,27 @@
return null;
}
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(clusterId);
+ out.writeBoolean(converged);
+ Vector vector = computeCentroid();
+ out.writeUTF(vector.getClass().getSimpleName().toString());
+ vector.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ clusterId = in.readInt();
+ converged = in.readBoolean();
+ String className = in.readUTF();
+ this.center = Cluster.vectorNameToVector(className);
+ center.readFields(in);
+ this.pointProbSum = 0;
+ this.weightedPointTotal = center.like();
+ }
+
+
/**
* Configure the distance measure from the job
*
@@ -155,14 +180,12 @@
*
* @param point a point
* @param clusters a List<SoftCluster>
- * @param values a Writable containing the input point and possible other
- * values of interest (payload)
* @param output the OutputCollector to emit into
* @throws IOException
*/
public static void emitPointProbToCluster(Vector point,
- List<SoftCluster> clusters, Text values,
- OutputCollector<Text, Text> output) throws IOException {
+ List<SoftCluster> clusters,
+ OutputCollector<Text, FuzzyKMeansInfo> output) throws IOException {
List<Double> clusterDistanceList = new ArrayList<Double>();
for (SoftCluster cluster : clusters) {
clusterDistanceList.add(measure.distance(cluster.getCenter(), point));
@@ -175,8 +198,9 @@
// identifier,avoids
// too much data
// traffic
- Text value = new Text(Double.toString(probWeight)
- + FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR + values.toString());
+ /*Text value = new Text(Double.toString(probWeight)
+ + FuzzyKMeansDriver.MAPPER_VALUE_SEPARATOR + values.toString());*/
+ FuzzyKMeansInfo value = new FuzzyKMeansInfo(probWeight, point);
output.collect(key, value);
}
}
@@ -186,33 +210,31 @@
*
* @param point a point
* @param clusters a List<SoftCluster> to test
- * @param values a Writable containing the input point and possible other
- * values of interest (payload)
* @param output the OutputCollector to emit into
* @throws IOException
*/
public static void outputPointWithClusterProbabilities(String key,
- Vector point, List<SoftCluster> clusters, Text values,
- OutputCollector<Text, Text> output) throws IOException {
-
- String outputKey = values.toString();
- StringBuilder outputValue = new StringBuilder("[");
+ Vector point, List<SoftCluster> clusters,
+ OutputCollector<Text, FuzzyKMeansOutput> output) throws IOException {
List<Double> clusterDistanceList = new ArrayList<Double>();
for (SoftCluster cluster : clusters) {
clusterDistanceList.add(measure.distance(point, cluster.getCenter()));
}
-
+ FuzzyKMeansOutput fOutput = new FuzzyKMeansOutput(clusters.size());
for (int i = 0; i < clusters.size(); i++) {
// System.out.print("cluster:" + i + "\t" + clusterDistanceList.get(i));
double probWeight = computeProbWeight(clusterDistanceList.get(i),
clusterDistanceList);
- outputValue.append(clusters.get(i).clusterId).append(':').append(
- probWeight).append(' ');
+ /*outputValue.append(clusters.get(i).clusterId).append(':').append(
+ probWeight).append(' ');*/
+ fOutput.add(i, clusters.get(i), probWeight);
}
- output.collect(new Text(outputKey.trim()), new Text(outputValue.toString()
- .trim() + ']'));
+ String name = point.getName();
+ output.collect(new Text(name != null && name.equals("") == false ? name
+ : point.asFormatString()),
+ fOutput);
}
/**
@@ -252,6 +274,10 @@
return centroid;
}
+ //For Writable
+ public SoftCluster() {
+ }
+
/**
* Construct a new SoftCluster with the given point as its center
*
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Cluster.java Tue Jun 23 18:23:18 2009
@@ -17,18 +17,22 @@
package org.apache.mahout.clustering.kmeans;
import java.io.IOException;
+import java.io.DataOutput;
+import java.io.DataInput;
import java.util.List;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.mahout.matrix.AbstractVector;
import org.apache.mahout.matrix.SparseVector;
import org.apache.mahout.matrix.SquareRootFunction;
import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.DenseVector;
import org.apache.mahout.utils.DistanceMeasure;
-public class Cluster {
+public class Cluster implements Writable {
private static final String ERROR_UNKNOWN_CLUSTER_FORMAT = "Unknown cluster format:\n";
@@ -38,10 +42,19 @@
public static final String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.kmeans.convergence";
+ /**
+ * The number of iterations that have taken place
+ */
+ public static final String ITERATION_NUMBER = "org.apache.mahout.clustering.kmeans.iteration";
+ /**
+ * Boolean value indicating whether the initial input is from Canopy clustering
+ */
+ public static final String CANOPY_INPUT = "org.apache.mahout.clustering.kmeans.canopyInput";
+
private static int nextClusterId = 0;
// this cluster's clusterId
- private final int clusterId;
+ private int clusterId;
// the current center
private Vector center = new SparseVector(0);
@@ -63,9 +76,7 @@
// has the centroid converged with the center?
private boolean converged = false;
-
private static DistanceMeasure measure;
-
private static double convergenceDelta = 0;
/**
@@ -104,6 +115,43 @@
+ formattedString);
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(clusterId);
+ out.writeBoolean(converged);
+ Vector vector = computeCentroid();
+ out.writeUTF(vector.getClass().getSimpleName().toString());
+ vector.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ clusterId = in.readInt();
+ converged = in.readBoolean();
+ String className = in.readUTF();
+ this.center = vectorNameToVector(className);
+ center.readFields(in);
+ this.numPoints = 0;
+ this.pointTotal = center.like();
+ this.pointSquaredTotal = center.like();
+ }
+
+ /**
+ * Simplified version that only handles SparseVector and DenseVector
+ * @param className
+ * @return
+ */
+ public static Vector vectorNameToVector(String className) {
+ Vector vector;
+ if (className.endsWith("SparseVector")){
+ vector = new SparseVector();
+ } else {
+ vector = new DenseVector();
+ }
+ return vector;
+ }
+
/**
* Configure the distance measure from the job
*
@@ -143,13 +191,11 @@
*
* @param point a point
* @param clusters a List<Cluster> to test
- * @param values a Writable containing the input point and possible other
- * values of interest (payload)
* @param output the OutputCollector to emit into
* @throws IOException
*/
public static void emitPointToNearestCluster(Vector point,
- List<Cluster> clusters, Text values, OutputCollector<Text, Text> output)
+ List<Cluster> clusters, OutputCollector<Text, KMeansInfo> output)
throws IOException {
Cluster nearestCluster = null;
double nearestDistance = Double.MAX_VALUE;
@@ -161,13 +207,11 @@
}
}
// emit only clusterID
- String outKey = nearestCluster.getIdentifier();
- String value = "1\t" + values.toString();
- output.collect(new Text(outKey), new Text(value));
+ output.collect(new Text(nearestCluster.getIdentifier()), new KMeansInfo(1, point));
}
- public static void outputPointWithClusterInfo(String key, Vector point,
- List<Cluster> clusters, Text values, OutputCollector<Text, Text> output)
+ public static void outputPointWithClusterInfo(Vector point,
+ List<Cluster> clusters, OutputCollector<Text, Text> output)
throws IOException {
Cluster nearestCluster = null;
double nearestDistance = Double.MAX_VALUE;
@@ -178,8 +222,9 @@
nearestDistance = distance;
}
}
- output.collect(new Text(key), new Text(Integer
- .toString(nearestCluster.clusterId)));
+ //TODO: this is ugly
+ String name = point.getName();
+ output.collect(new Text(name != null && name.equals("") == false ? name : point.asFormatString()), new Text(String.valueOf(nearestCluster.clusterId)));
}
/**
@@ -216,6 +261,12 @@
}
/**
+ * For (de)serialization as a Writable
+ */
+ public Cluster() {
+ }
+
+ /**
* Construct a new cluster with the given point as its center
*
* @param center the center point
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansClusterMapper.java Tue Jun 23 18:23:18 2009
@@ -16,22 +16,50 @@
* limitations under the License.
*/
-import java.io.IOException;
-
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.mahout.matrix.AbstractVector;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.mahout.matrix.Vector;
-public class KMeansClusterMapper extends KMeansMapper {
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KMeansClusterMapper extends MapReduceBase implements
+ Mapper<WritableComparable<?>, Vector, Text, Text> {
+ protected List<Cluster> clusters;
+
+
+ @Override
+ public void map(WritableComparable<?> key, Vector point, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ Cluster.outputPointWithClusterInfo(point, clusters, output);
+ }
+
+ /**
+ * Configure the mapper by providing its clusters. Used by unit tests.
+ *
+ * @param clusters a List<Cluster>
+ */
+ void config(List<Cluster> clusters) {
+ this.clusters = clusters;
+ }
+
@Override
- public void map(WritableComparable<?> key, Text values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
- final String valuesAsString = values.toString();
- final Vector point = AbstractVector.decodeVector(valuesAsString);
- Cluster.outputPointWithClusterInfo(valuesAsString, point, clusters, values, output);
+ public void configure(JobConf job) {
+ super.configure(job);
+ Cluster.configure(job);
+
+ clusters = new ArrayList<Cluster>();
+
+ KMeansUtil.configureWithClusterInfo(job.get(Cluster.CLUSTER_PATH_KEY),
+ clusters);
+
+ if (clusters.isEmpty())
+ throw new NullPointerException("Cluster is empty!!!");
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansCombiner.java Tue Jun 23 18:23:18 2009
@@ -28,19 +28,18 @@
import org.apache.mahout.matrix.AbstractVector;
public class KMeansCombiner extends MapReduceBase implements
- Reducer<Text, Text, Text, Text> {
+ Reducer<Text, KMeansInfo, Text, KMeansInfo> {
@Override
- public void reduce(Text key, Iterator<Text> values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<KMeansInfo> values,
+ OutputCollector<Text, KMeansInfo> output, Reporter reporter) throws IOException {
Cluster cluster = new Cluster(key.toString());
while (values.hasNext()) {
- String[] numPointnValue = values.next().toString().split("\t");
- cluster.addPoints(Integer.parseInt(numPointnValue[0].trim()),
- AbstractVector.decodeVector(numPointnValue[1].trim()));
+ KMeansInfo next = values.next();
+ cluster.addPoints(next.getPoints(),
+ next.getPointTotal());
}
- output.collect(key, new Text(cluster.getNumPoints() + "\t"
- + cluster.getPointTotal().asFormatString()));
+ output.collect(key, new KMeansInfo(cluster.getNumPoints(), cluster.getPointTotal()));
}
@Override
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java Tue Jun 23 18:23:18 2009
@@ -27,6 +27,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.mahout.matrix.Vector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,15 +48,17 @@
*
* @param args Expects 6 args and they all correspond to the order of the params in {@link #runJob}
*/
- public static void main(String[] args) {
+ public static void main(String[] args) throws ClassNotFoundException {
String input = args[0];
String clusters = args[1];
String output = args[2];
String measureClass = args[3];
double convergenceDelta = Double.parseDouble(args[4]);
int maxIterations = Integer.parseInt(args[5]);
+ String vectorClassName = args[6];
+ Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
runJob(input, clusters, output, measureClass, convergenceDelta,
- maxIterations, 2);
+ maxIterations, 2, vectorClass);
}
/**
@@ -67,10 +71,11 @@
* @param convergenceDelta the convergence delta value
* @param maxIterations the maximum number of iterations
* @param numReduceTasks the number of reducers
+ * @param vectorClass
*/
public static void runJob(String input, String clustersIn, String output,
- String measureClass, double convergenceDelta, int maxIterations,
- int numReduceTasks) {
+ String measureClass, double convergenceDelta, int maxIterations,
+ int numReduceTasks, Class<? extends Vector> vectorClass) {
// iterate until the clusters converge
boolean converged = false;
int iteration = 0;
@@ -81,14 +86,14 @@
// point the output to a new directory per iteration
String clustersOut = output + "/clusters-" + iteration;
converged = runIteration(input, clustersIn, clustersOut, measureClass,
- delta, numReduceTasks);
+ delta, numReduceTasks, iteration);
// now point the input to the old output directory
clustersIn = output + "/clusters-" + iteration;
iteration++;
}
// now actually cluster the points
log.info("Clustering ");
- runClustering(input, clustersIn, output + DEFAULT_OUTPUT_DIRECTORY, measureClass, delta);
+ runClustering(input, clustersIn, output + DEFAULT_OUTPUT_DIRECTORY, measureClass, delta, vectorClass);
}
/**
@@ -100,20 +105,23 @@
* @param measureClass the classname of the DistanceMeasure
* @param convergenceDelta the convergence delta value
* @param numReduceTasks the number of reducer tasks
+ * @param iteration The iteration number
* @return true if the iteration successfully runs
*/
private static boolean runIteration(String input, String clustersIn,
- String clustersOut, String measureClass, String convergenceDelta,
- int numReduceTasks) {
+ String clustersOut, String measureClass, String convergenceDelta,
+ int numReduceTasks, int iteration) {
JobClient client = new JobClient();
JobConf conf = new JobConf(KMeansDriver.class);
+ conf.setMapOutputKeyClass(Text.class);
+ conf.setMapOutputValueClass(KMeansInfo.class);
conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
+ conf.setOutputValueClass(Cluster.class);
FileInputFormat.setInputPaths(conf, new Path(input));
Path outPath = new Path(clustersOut);
FileOutputFormat.setOutputPath(conf, outPath);
-
+ conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapperClass(KMeansMapper.class);
conf.setCombinerClass(KMeansCombiner.class);
@@ -122,6 +130,8 @@
conf.set(Cluster.CLUSTER_PATH_KEY, clustersIn);
conf.set(Cluster.DISTANCE_MEASURE_KEY, measureClass);
conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
+ conf.setInt(Cluster.ITERATION_NUMBER, iteration);
+
client.setConf(conf);
try {
JobClient.runJob(conf);
@@ -143,11 +153,16 @@
* @param convergenceDelta the convergence delta value
*/
private static void runClustering(String input, String clustersIn,
- String output, String measureClass, String convergenceDelta) {
+ String output, String measureClass, String convergenceDelta, Class<? extends Vector> vectorClass) {
JobClient client = new JobClient();
JobConf conf = new JobConf(KMeansDriver.class);
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setMapOutputKeyClass(Text.class);
+ conf.setMapOutputValueClass(vectorClass);
conf.setOutputKeyClass(Text.class);
+ //the output is the cluster id
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, new Path(input));
@@ -182,10 +197,10 @@
Path outPart = new Path(filePath);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, outPart, conf);
Text key = new Text();
- Text value = new Text();
+ Cluster value = new Cluster();
boolean converged = true;
while (converged && reader.next(key, value)) {
- converged = value.toString().charAt(0) == 'V';
+ converged = value.isConverged();
}
return converged;
}
Added: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java?rev=787776&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java (added)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansInfo.java Tue Jun 23 18:23:18 2009
@@ -0,0 +1,50 @@
+package org.apache.mahout.clustering.kmeans;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.matrix.Vector;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.DataInput;
+
+
+/**
+ *
+ *
+ **/
+public class KMeansInfo implements Writable {
+
+ private int points;
+ private Vector pointTotal;
+
+ public KMeansInfo() {
+ }
+
+ public KMeansInfo(int points, Vector pointTotal) {
+ this.points = points;
+ this.pointTotal = pointTotal;
+ }
+
+ public int getPoints() {
+ return points;
+ }
+
+ public Vector getPointTotal() {
+ return pointTotal;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(points);
+ out.writeUTF(pointTotal.getClass().getSimpleName().toString());
+ pointTotal.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.points = in.readInt();
+ String className = in.readUTF();
+ pointTotal = Cluster.vectorNameToVector(className);
+ pointTotal.readFields(in);
+ }
+}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansJob.java Tue Jun 23 18:23:18 2009
@@ -23,13 +23,14 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.matrix.Vector;
public class KMeansJob {
private KMeansJob() {
}
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
if (args.length != 7) {
System.err.println("Expected number of arguments 10 and received:"
@@ -46,9 +47,10 @@
double convergenceDelta = Double.parseDouble(args[index++]);
int maxIterations = Integer.parseInt(args[index++]);
int numCentroids = Integer.parseInt(args[index++]);
-
+ String vectorClassName = args[6];
+ Class<? extends Vector> vectorClass = (Class<? extends Vector>) Class.forName(vectorClassName);
runJob(input, clusters, output, measureClass, convergenceDelta,
- maxIterations, numCentroids);
+ maxIterations, numCentroids, vectorClass);
}
/**
@@ -61,10 +63,11 @@
* @param measureClass the classname of the DistanceMeasure
* @param convergenceDelta the convergence delta value
* @param maxIterations the maximum number of iterations
+ * @param vectorClass
*/
public static void runJob(String input, String clustersIn, String output,
- String measureClass, double convergenceDelta, int maxIterations,
- int numCentroids) throws IOException {
+ String measureClass, double convergenceDelta, int maxIterations,
+ int numCentroids, Class<? extends Vector> vectorClass) throws IOException {
// delete the output directory
JobConf conf = new JobConf(KMeansJob.class);
Path outPath = new Path(output);
@@ -75,6 +78,6 @@
fs.mkdirs(outPath);
KMeansDriver.runJob(input, clustersIn, output, measureClass,
- convergenceDelta, maxIterations, numCentroids);
+ convergenceDelta, maxIterations, numCentroids, vectorClass);
}
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansMapper.java Tue Jun 23 18:23:18 2009
@@ -31,15 +31,14 @@
import org.apache.mahout.matrix.Vector;
public class KMeansMapper extends MapReduceBase implements
- Mapper<WritableComparable<?>, Text, Text, Text> {
+ Mapper<WritableComparable<?>, Vector, Text, KMeansInfo> {
protected List<Cluster> clusters;
@Override
- public void map(WritableComparable<?> key, Text values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
- Vector point = AbstractVector.decodeVector(values.toString());
- Cluster.emitPointToNearestCluster(point, clusters, values, output);
+ public void map(WritableComparable<?> key, Vector point,
+ OutputCollector<Text, KMeansInfo> output, Reporter reporter) throws IOException {
+ Cluster.emitPointToNearestCluster(point, clusters, output);
}
/**
@@ -57,7 +56,7 @@
Cluster.configure(job);
clusters = new ArrayList<Cluster>();
-
+ int iteration = job.getInt(Cluster.ITERATION_NUMBER, -1);
KMeansUtil.configureWithClusterInfo(job.get(Cluster.CLUSTER_PATH_KEY),
clusters);
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansReducer.java Tue Jun 23 18:23:18 2009
@@ -30,27 +30,25 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.mahout.matrix.AbstractVector;
+import org.apache.mahout.matrix.Vector;
public class KMeansReducer extends MapReduceBase implements
- Reducer<Text, Text, Text, Text> {
+ Reducer<Text, KMeansInfo, Text, Cluster> {
private Map<String, Cluster> clusterMap;
@Override
- public void reduce(Text key, Iterator<Text> values,
- OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ public void reduce(Text key, Iterator<KMeansInfo> values,
+ OutputCollector<Text, Cluster> output, Reporter reporter) throws IOException {
Cluster cluster = clusterMap.get(key.toString());
while (values.hasNext()) {
- String value = values.next().toString();
- String[] numNValue = value.split("\t");
- cluster.addPoints(Integer.parseInt(numNValue[0].trim()), AbstractVector
- .decodeVector(numNValue[1].trim()));
+ KMeansInfo delta = values.next();
+ cluster.addPoints(delta.getPoints(), delta.getPointTotal());
}
// force convergence calculation
cluster.computeConvergence();
- output.collect(new Text(cluster.getIdentifier()), new Text(Cluster
- .formatCluster(cluster)));
+ output.collect(new Text(cluster.getIdentifier()), cluster);
}
@Override
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java Tue Jun 23 18:23:18 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.clustering.canopy.Canopy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,13 +72,24 @@
for (Path path : result) {
SequenceFile.Reader reader = null;
try {
- reader =new SequenceFile.Reader(fs, path, job);
+ reader =new SequenceFile.Reader(fs, path, job);
+ Class valueClass = reader.getValueClass();
Text key = new Text();
- Text value = new Text();
- while (reader.next(key, value)) {
- // get the cluster info
- Cluster cluster = Cluster.decodeCluster(value.toString());
- clusters.add(cluster);
+ if (valueClass.equals(Cluster.class)){
+ Cluster value = new Cluster();
+ while (reader.next(key, value)) {
+ // get the cluster info
+ clusters.add(value);
+ value = new Cluster();
+ }
+ } else if (valueClass.equals(Canopy.class)){
+ Canopy value = new Canopy();
+ while (reader.next(key, value)) {
+ // get the cluster info
+ Cluster cluster = new Cluster(value.getCenter(), value.getCanopyId());
+ clusters.add(cluster);
+ value = new Canopy();
+ }
}
} finally {
if (reader != null) {
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java?rev=787776&r1=787775&r2=787776&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/matrix/Vector.java Tue Jun 23 18:23:18 2009
@@ -23,6 +23,9 @@
/**
* The basic interface including numerous convenience functions
+ * <p/>
+ * NOTE: All implementing classes must have a constructor that takes an int for cardinality
+ * and a no-arg constructor that can be used for marshalling the Writable instance
*/
public interface Vector extends Iterable<Vector.Element>, Cloneable, Writable {
Added: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java?rev=787776&view=auto
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java (added)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/ClusteringTestUtils.java Tue Jun 23 18:23:18 2009
@@ -0,0 +1,31 @@
+package org.apache.mahout.clustering;
+
+import org.apache.mahout.matrix.Vector;
+import org.apache.mahout.matrix.SparseVector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.LongWritable;
+
+import java.util.List;
+import java.io.IOException;
+
+/**
+ *
+ *
+ **/
+public class ClusteringTestUtils {
+
+ public static void writePointsToFile(List<Vector> points, String fileName, FileSystem fs, Configuration conf)
+ throws IOException {
+ Path path = new Path(fileName);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, LongWritable.class, SparseVector.class);
+ long recNum = 0;
+ for (Vector point : points) {
+ //point.write(dataOut);
+ writer.append(new LongWritable(recNum++), point);
+ }
+ writer.close();
+ }
+}
|