mahout-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gsing...@apache.org
Subject svn commit: r755972 - in /lucene/mahout/trunk: core/src/main/java/org/apache/mahout/clustering/kmeans/ core/src/test/java/org/apache/mahout/clustering/kmeans/ examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/
Date Thu, 19 Mar 2009 12:26:30 GMT
Author: gsingers
Date: Thu Mar 19 12:26:30 2009
New Revision: 755972

URL: http://svn.apache.org/viewvc?rev=755972&view=rev
Log:
MAHOUT-99: fix issue with reading cluster input

Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
    lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=755972&r1=755971&r2=755972&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
(original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
Thu Mar 19 12:26:30 2009
@@ -24,15 +24,14 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.KeyValueLineRecordReader;
-import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +101,6 @@
       int numReduceTasks) {
     JobClient client = new JobClient();
     JobConf conf = new JobConf(KMeansDriver.class);
-    conf.setInputFormat(TextInputFormat.class);
     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(Text.class);
 
@@ -110,6 +108,7 @@
     Path outPath = new Path(clustersOut);
     FileOutputFormat.setOutputPath(conf, outPath);
 
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
     conf.setMapperClass(KMeansMapper.class);
     conf.setCombinerClass(KMeansCombiner.class);
     conf.setReducerClass(KMeansReducer.class);
@@ -119,9 +118,9 @@
     conf.set(Cluster.DISTANCE_MEASURE_KEY, measureClass);
     conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta);
 
-    conf.set("mapred.child.java.opts", "-Xmx1536m");
+//    conf.set("mapred.child.java.opts", "-Xmx1536m");
     // uncomment it to run locally
-    conf.set("mapred.job.tracker", "local");
+//    conf.set("mapred.job.tracker", "local");
 
     client.setConf(conf);
     try {
@@ -164,7 +163,7 @@
     client.setConf(conf);
     // uncomment it to run locally
     // conf.set("mapred.job.tracker", "local");
-    conf.set("mapred.child.java.opts", "-Xmx1536m");
+//    conf.set("mapred.child.java.opts", "-Xmx1536m");
     try {
       JobClient.runJob(conf);
     } catch (IOException e) {
@@ -174,52 +173,24 @@
 
   /**
    * Return if all of the Clusters in the filePath have converged or not
-   * 
+   *
    * @param filePath the file path to the single file containing the clusters
-   * @param conf the JobConf
-   * @param fs the FileSystem
+   * @param conf     the JobConf
+   * @param fs       the FileSystem
    * @return true if all Clusters are converged
    * @throws IOException if there was an IO error
    */
-  private static boolean isConverged(String filePath, JobConf conf,
-      FileSystem fs) throws IOException {
-    Path clusterPath = new Path(filePath);
-    List<Path> result = new ArrayList<Path>();
-
-    PathFilter clusterFileFilter = new PathFilter() {
-      public boolean accept(Path path) {
-        return path.getName().startsWith("part");
-      }
-    };
-
-    FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(
-        clusterPath, clusterFileFilter)), clusterFileFilter);
-
-    for (FileStatus match : matches) {
-      result.add(fs.makeQualified(match.getPath()));
-    }
+ private static boolean isConverged(String filePath, JobConf conf, FileSystem fs)
+          throws IOException {
+    Path outPart = new Path(filePath);
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, outPart, conf);
+    Text key = new Text();
+    Text value = new Text();
     boolean converged = true;
-
-    for (Path p : result) {
-      KeyValueLineRecordReader reader = null;
-
-      try {
-        reader = new KeyValueLineRecordReader(conf, new FileSplit(p, 0, fs
-            .getFileStatus(p).getLen(), (String[]) null));
-        Text key = new Text();
-        Text value = new Text();
-
-        while (converged && reader.next(key, value)) {
-          converged = value.toString().startsWith("V");
-        }
-      } finally {
-        if (reader != null) {
-          reader.close();
-        }
-      }
-
+    while (converged && reader.next(key, value)) {
+      converged = value.toString().charAt(0) == 'V';
     }
-
     return converged;
   }
 }
+   
\ No newline at end of file

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java?rev=755972&r1=755971&r2=755972&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
(original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
Thu Mar 19 12:26:30 2009
@@ -25,11 +25,9 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.KeyValueLineRecordReader;
-import org.apache.hadoop.mapred.RecordReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +41,8 @@
    * @param clusters
    */
   public static void configureWithClusterInfo(String clusterPathStr,
-      List<Cluster> clusters) {
+      List<Cluster> clusters) {    
+    
     // Get the path location where the cluster Info is stored
     JobConf job = new JobConf(KMeansUtil.class);
     Path clusterPath = new Path(clusterPathStr);
@@ -68,23 +67,22 @@
 
       // iterate thru the result path list
       for (Path path : result) {
-        RecordReader<Text, Text> recordReader = null;
+        SequenceFile.Reader reader = null;
+//        RecordReader<Text, Text> recordReader = null;
         try {
-          recordReader = new KeyValueLineRecordReader(job, new FileSplit(path,
-              0, fs.getFileStatus(path).getLen(), (String[]) null));
+          reader =new SequenceFile.Reader(fs, path, job); 
           Text key = new Text();
           Text value = new Text();
           int counter = 1;
-          while (recordReader.next(key, value)) {
+          while (reader.next(key, value)) {
             // get the cluster info
             Cluster cluster = Cluster.decodeCluster(value.toString());
             clusters.add(cluster);
           }
         } finally {
-          if (recordReader != null) {
-            recordReader.close();
+          if (reader != null) {
+            reader.close();
           }
-
         }
       }
 

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java?rev=755972&r1=755971&r2=755972&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
(original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java
Thu Mar 19 12:26:30 2009
@@ -17,24 +17,11 @@
 
 package org.apache.mahout.clustering.kmeans;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 import junit.framework.TestCase;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -48,6 +35,20 @@
 import org.apache.mahout.utils.EuclideanDistanceMeasure;
 import org.apache.mahout.utils.ManhattanDistanceMeasure;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.InputStreamReader;
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.nio.charset.Charset;
+
 public class TestKmeansClustering extends TestCase {
 
   public static final double[][] reference = { { 1, 1 }, { 2, 1 }, { 1, 2 },
@@ -155,7 +156,7 @@
     Cluster.config(measure, 0.001);
     // try all possible values of k
     for (int k = 0; k < points.size(); k++) {
-      System.out.println("Test k=" + (k + 1) + ":");
+      System.out.println("Test k=" + (k + 1) + ':');
       // pick k initial cluster centers at random
       List<Cluster> clusters = new ArrayList<Cluster>();
       for (int i = 0; i < k + 1; i++) {
@@ -390,8 +391,8 @@
       JobConf job = new JobConf(KMeansDriver.class);
       FileSystem fs = FileSystem.get(job);
       Path path = new Path("testdata/clusters/part-00000");
-      BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fs
-          .create(path)));
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path,
+          Text.class, Text.class);
 
       for (int i = 0; i < k + 1; i++) {
         Vector vec = points.get(i);
@@ -399,8 +400,8 @@
         Cluster cluster = new Cluster(vec, i);
         // add the center so the centroid will be correct upon output
         cluster.addPoint(cluster.getCenter());
-        writer.write(cluster.getIdentifier() + "\t"
-            + Cluster.formatCluster(cluster) + "\n");
+        writer.append(new Text(cluster.getIdentifier()), new Text(Cluster
+            .formatCluster(cluster)));
       }
       writer.close();
       // now run the Job

Modified: lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java?rev=755972&r1=755971&r2=755972&view=diff
==============================================================================
--- lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
(original)
+++ lucene/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java
Thu Mar 19 12:26:30 2009
@@ -81,6 +81,6 @@
         .runJob(output + "/data", output, measureClass, t1, t2);
     KMeansDriver.runJob(output + "/data", output + "/canopies", output,
         measureClass, convergenceDelta, maxIterations,1);
-    OutputDriver.runJob(output + "/points", output + "/clustered-points");
+//    OutputDriver.runJob(output + "/points", output + "/clustered-points");
   }
 }



Mime
View raw message