hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1555747 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/pipes/util/ examples/src/main/java/org/apache/hama/examples/ ml/src/main/java/org/apache/hama/ml/kmeans/ ml/src/test/java/org/apache/hama/ml/kmeans/
Date Mon, 06 Jan 2014 13:28:06 GMT
Author: millecker
Date: Mon Jan  6 13:28:05 2014
New Revision: 1555747

URL: http://svn.apache.org/r1555747
Log:
HAMA-834: Fix KMeans example

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java
    hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1555747&r1=1555746&r2=1555747&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jan  6 13:28:05 2014
@@ -11,6 +11,7 @@ Release 0.7.0 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-834: Fix KMeans example (Martin Illecker)
    HAMA-831: Support for multi records with same vertexID (edwardyoon)
    HAMA-830: KMeans and NeuralNetwork doesn't load config file (edwardyoon)
    HAMA-812: In local mode BSPJobClient.close throws Exception (Martin Illecker)

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java?rev=1555747&r1=1555746&r2=1555747&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/util/SequenceFileDumper.java Mon Jan
 6 13:28:05 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
@@ -123,8 +124,18 @@ public class SequenceFileDumper {
           sub = Integer.parseInt(cmdLine.getOptionValue("substring"));
         }
 
-        Writable key = (Writable) reader.getKeyClass().newInstance();
-        Writable value = (Writable) reader.getValueClass().newInstance();
+        Writable key;
+        if (reader.getKeyClass() != NullWritable.class) {
+          key = (Writable) reader.getKeyClass().newInstance();
+        } else {
+          key = NullWritable.get();
+        }
+        Writable value;
+        if (reader.getValueClass() != NullWritable.class) {
+          value = (Writable) reader.getValueClass().newInstance();
+        } else {
+          value = NullWritable.get();
+        }
 
         writer.append("Key class: ")
             .append(String.valueOf(reader.getKeyClass()))

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java?rev=1555747&r1=1555746&r2=1555747&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/Kmeans.java Mon Jan  6 13:28:05
2014
@@ -67,10 +67,11 @@ public class Kmeans {
     Path out = new Path(args[1]);
     FileSystem fs = FileSystem.get(conf);
     Path center = null;
-    if (fs.isFile(in))
+    if (fs.isFile(in)) {
       center = new Path(in.getParent(), "center/cen.seq");
-    else
+    } else {
       center = new Path(in, "center/cen.seq");
+    }
     Path centerOut = new Path(out, "center/center_output.seq");
     conf.set(KMeansBSP.CENTER_IN_PATH, center.toString());
     conf.set(KMeansBSP.CENTER_OUT_PATH, centerOut.toString());
@@ -84,12 +85,18 @@ public class Kmeans {
       int dimension = Integer.parseInt(args[6]);
       System.out.println("N: " + count + " Dimension: " + dimension
           + " Iterations: " + iterations);
+      if (!fs.isFile(in)) {
+        in = new Path(in, "input.seq");
+      }
       // prepare the input, like deleting old versions and creating centers
       KMeansBSP.prepareInput(count, k, dimension, conf, in, center, out, fs);
     } else {
+      if (!fs.isFile(in)) {
+        System.out.println("Cannot read text input file: " + in.toString());
+        return;
+      }
       // Set the last argument to TRUE if first column is required to be the key
-      KMeansBSP.prepareInputText(k, conf, in, center, out, fs, true);
-      in = new Path(in.getParent(), "textinput/in.seq");
+      in = KMeansBSP.prepareInputText(k, conf, in, center, out, fs, true);
     }
 
     BSPJob job = KMeansBSP.createJob(conf, in, out, true);

Modified: hama/trunk/ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java?rev=1555747&r1=1555746&r2=1555747&view=diff
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java (original)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kmeans/KMeansBSP.java Mon Jan  6 13:28:05
2014
@@ -112,7 +112,8 @@ public final class KMeansBSP
       try {
         distanceMeasurer = ReflectionUtils.newInstance(distanceClass);
       } catch (ClassNotFoundException e) {
-        throw new RuntimeException("Wrong DistanceMeasurer implementation " + distanceClass
+ " provided");
+        throw new RuntimeException("Wrong DistanceMeasurer implementation "
+            + distanceClass + " provided");
       }
     } else {
       distanceMeasurer = new EuclidianDistance();
@@ -244,8 +245,8 @@ public final class KMeansBSP
       // add the vector to the center
       newCenterArray[lowestDistantCenter] = newCenterArray[lowestDistantCenter]
           .addUnsafe(key);
-      summationCount[lowestDistantCenter]++;
     }
+    summationCount[lowestDistantCenter]++;
   }
 
   private int getNearestCenter(DoubleVector key) {
@@ -514,7 +515,7 @@ public final class KMeansBSP
       fs.delete(out, true);
 
     if (fs.exists(center))
-      fs.delete(out, true);
+      fs.delete(center, true);
 
     if (fs.exists(in))
       fs.delete(in, true);

Modified: hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java?rev=1555747&r1=1555746&r2=1555747&view=diff
==============================================================================
--- hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java (original)
+++ hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java Mon Jan  6 13:28:05
2014
@@ -18,70 +18,125 @@
 package org.apache.hama.ml.kmeans;
 
 import java.io.BufferedWriter;
+import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.HashMap;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.commons.math.DoubleVector;
 
 public class TestKMeansBSP extends TestCase {
+  public static final String TMP_OUTPUT = "/tmp/clustering/";
 
   public void testRunJob() throws Exception {
-    Configuration conf = new Configuration();
-    Path in = new Path("/tmp/clustering/in/in.txt");
-    Path out = new Path("/tmp/clustering/out/");
+    Configuration conf = new HamaConfiguration();
     FileSystem fs = FileSystem.get(conf);
-    Path center = null;
-
-    try {
-      center = new Path(in.getParent(), "center/cen.seq");
+    if (fs.exists(new Path(TMP_OUTPUT))) {
+      fs.delete(new Path(TMP_OUTPUT), true);
+    }
 
-      Path centerOut = new Path(out, "center/center_output.seq");
-      conf.set(KMeansBSP.CENTER_IN_PATH, center.toString());
-      conf.set(KMeansBSP.CENTER_OUT_PATH, centerOut.toString());
-      int iterations = 10;
-      conf.setInt(KMeansBSP.MAX_ITERATIONS_KEY, iterations);
-      int k = 1;
-
-      FSDataOutputStream create = fs.create(in);
-      BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(create));
-      StringBuilder sb = new StringBuilder();
-
-      for (int i = 0; i < 100; i++) {
-        sb.append(i);
-        sb.append('\t');
-        sb.append(i);
-        sb.append('\n');
+    // test for bspTaskNum 1 to 4
+    for (int i = 1; i < 5; i++) {
+      try {
+        test(conf, fs, i);
+      } finally {
+        fs.delete(new Path(TMP_OUTPUT), true);
       }
+    }
+  }
 
-      bw.write(sb.toString());
-      bw.close();
-
-      in = KMeansBSP.prepareInputText(k, conf, in, center, out, fs, false);
+  /**
+   * Test
+   * 
+   * Create 101 input vectors of dimension two
+   * 
+   * Input vectors: (0,0) (1,1) (2,2) ... (100,100)
+   * 
+   * k = 1, maxIterations = 10
+   * 
+   * Resulting center should be (50,50)
+   */
+  private void test(Configuration conf, FileSystem fs, int numBspTask)
+      throws IOException, InterruptedException, ClassNotFoundException {
+
+    Path in = new Path(TMP_OUTPUT + "in");
+    Path out = new Path(TMP_OUTPUT + "out");
+    Path centerIn = new Path(TMP_OUTPUT + "center/center_input.seq");
+    Path centerOut = new Path(TMP_OUTPUT + "center/center_output.seq");
+    conf.set(KMeansBSP.CENTER_IN_PATH, centerIn.toString());
+    conf.set(KMeansBSP.CENTER_OUT_PATH, centerOut.toString());
+
+    int k = 1;
+    int iterations = 10;
+    conf.setInt(KMeansBSP.MAX_ITERATIONS_KEY, iterations);
+
+    in = generateInputText(k, conf, fs, in, centerIn, out, numBspTask);
+
+    BSPJob job = KMeansBSP.createJob(conf, in, out, true);
+    job.setNumBspTask(numBspTask);
+
+    // just submit the job
+    boolean result = job.waitForCompletion(true);
+
+    assertEquals(true, result);
+
+    HashMap<Integer, DoubleVector> centerMap = KMeansBSP.readClusterCenters(
+        conf, out, centerOut, fs);
+    System.out.println(centerMap);
+
+    assertEquals(1, centerMap.size()); // because k = 1
+
+    DoubleVector doubleVector = centerMap.get(0);
+    assertEquals(Double.valueOf(50), doubleVector.get(0));
+    assertEquals(Double.valueOf(50), doubleVector.get(1));
+  }
 
-      BSPJob job = KMeansBSP.createJob(conf, in, out, true);
+  private Path generateInputText(int k, Configuration conf, FileSystem fs,
+      Path in, Path centerIn, Path out, int numBspTask) throws IOException {
+    int totalNumberOfPoints = 100;
+    int interval = totalNumberOfPoints / numBspTask;
+    Path parts = new Path(in, "parts");
+
+    for (int part = 0; part < numBspTask; part++) {
+      Path partIn = new Path(parts, "part" + part + "/input.txt");
+      BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(
+          fs.create(partIn)));
+
+      int start = interval * part;
+      int end = start + interval - 1;
+      if ((numBspTask - 1) == part) {
+        end = totalNumberOfPoints;
+      }
+      System.out
+          .println("Partition " + part + ": from " + start + " to " + end);
 
-      // just submit the job
-      boolean result = job.waitForCompletion(true);
+      for (int i = start; i <= end; i++) {
+        bw.append(i + "\t" + i + "\n");
+      }
+      bw.close();
 
-      assertEquals(true, result);
+      // Convert input text to sequence file
+      Path seqFile = null;
+      if (part == 0) {
+        seqFile = KMeansBSP.prepareInputText(k, conf, partIn, centerIn, out,
+            fs, false);
+      } else {
+        seqFile = KMeansBSP.prepareInputText(0, conf, partIn, new Path(centerIn
+            + "_empty.seq"), out, fs, false);
+      }
 
-      HashMap<Integer, DoubleVector> centerMap = KMeansBSP.readClusterCenters(
-          conf, out, centerOut, fs);
-      System.out.println(centerMap);
-      assertEquals(1, centerMap.size());
-      DoubleVector doubleVector = centerMap.get(0);
-      assertTrue(doubleVector.get(0) >= 50 && doubleVector.get(0) < 51);
-      assertTrue(doubleVector.get(1) >= 50 && doubleVector.get(1) < 51);
-    } finally {
-      fs.delete(new Path("/tmp/clustering"), true);
+      fs.moveFromLocalFile(seqFile, new Path(parts, "part" + part + ".seq"));
+      fs.delete(seqFile.getParent(), true);
+      fs.delete(partIn.getParent(), true);
     }
+
+    return parts;
   }
 
 }



Mime
View raw message