hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1554740 - in /hama/trunk/core/src: main/java/org/apache/hama/bsp/ test/java/org/apache/hama/bsp/ test/java/org/apache/hama/pipes/
Date Thu, 02 Jan 2014 05:56:52 GMT
Author: edwardyoon
Date: Thu Jan  2 05:56:51 2014
New Revision: 1554740

URL: http://svn.apache.org/r1554740
Log:
Fix bug in BSPJobClient

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
    hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1554740&r1=1554739&r2=1554740&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Jan  2 05:56:51
2014
@@ -412,14 +412,13 @@ public class BSPJobClient extends Config
                 Constants.RUNTIME_PARTITIONING_CLASS)));
       }
 
-      if ((numTasks > 0 && numTasks != numSplits)
-          || (job.getConfiguration().getBoolean(
-              Constants.ENABLE_RUNTIME_PARTITIONING, false) && job
-              .getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null)) {
+      if (numTasks == 0) {
+        numTasks = numSplits;
+      }
 
-        if (numTasks == 0) {
-          numTasks = numSplits;
-        }
+      if (job.getConfiguration().getBoolean(
+          Constants.ENABLE_RUNTIME_PARTITIONING, false)
+          && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) !=
null) {
 
         HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
 
@@ -428,10 +427,9 @@ public class BSPJobClient extends Config
           conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
               .get(Constants.RUNTIME_PARTITIONING_DIR));
         }
-        if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null) {
-          conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
-              job.get(Constants.RUNTIME_PARTITIONING_CLASS));
-        }
+
+        conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
+            job.get(Constants.RUNTIME_PARTITIONING_CLASS));
         BSPJob partitioningJob = new BSPJob(conf);
         LOG.debug("partitioningJob input: "
             + partitioningJob.get(Constants.JOB_INPUT_DIR));

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1554740&r1=1554739&r2=1554740&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Thu Jan
 2 05:56:51 2014
@@ -107,7 +107,8 @@ public class TestBSPMasterGroomServer ex
 
   public static void checkOutput(FileSystem fileSys, Configuration conf,
       int tasks) throws Exception {
-    FileStatus[] listStatus = fileSys.listStatus(OUTPUT_PATH);
+    FileStatus[] listStatus = fileSys.globStatus(new Path(OUTPUT_PATH + "/part-*"));
+    
     assertEquals(listStatus.length, tasks);
     for (FileStatus status : listStatus) {
       if (!status.isDir()) {

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1554740&r1=1554739&r2=1554740&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java Thu
Jan  2 05:56:51 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.commons.util.KeyValuePair;
@@ -159,6 +160,7 @@ public class TestKeyValueTextInputFormat
       job.setJobName("Test KeyValueTextInputFormat together with HashPartitioner");
       job.setBspClass(KeyValueHashPartitionedBSP.class);
       
+      job.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
       job.setPartitioner(HashPartitioner.class);
 
       job.setInputPath(dataPath);

Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1554740&r1=1554739&r2=1554740&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Thu Jan  2 05:56:51
2014
@@ -52,7 +52,6 @@ import org.apache.hama.commons.io.PipesK
 import org.apache.hama.commons.io.PipesVectorWritable;
 import org.apache.hama.commons.math.DenseDoubleVector;
 import org.apache.hama.commons.math.DoubleVector;
-import org.junit.Test;
 
 /**
  * Test case for {@link PipesBSP}
@@ -65,18 +64,19 @@ public class TestPipes extends HamaClust
   public static final String EXAMPLE_SUMMATION_EXEC = "/examples/summation";
   public static final String EXAMPLE_MATRIXMULTIPLICATION_EXEC = "/examples/matrixmultiplication";
   public static final String EXAMPLE_TMP_OUTPUT = "/tmp/test-example/";
-  public static final String HAMA_TMP_OUTPUT = "/tmp/hama-test/";
+  public static final String HAMA_TMP_OUTPUT = "/tmp/hama-pipes/";
   public static final String HAMA_TMP_DISK_QUEUE_OUTPUT = "/tmp/messageQueue";
   public static final int DOUBLE_PRECISION = 6;
 
-  protected HamaConfiguration configuration;
+  private HamaConfiguration configuration;
+  private static FileSystem fs = null;
 
   public TestPipes() {
     configuration = new HamaConfiguration();
 
     try {
       // Cleanup temp Hama locations
-      FileSystem fs = FileSystem.get(configuration);
+      fs = FileSystem.get(configuration);
       cleanup(fs, new Path(HAMA_TMP_OUTPUT));
       cleanup(fs, new Path(HAMA_TMP_DISK_QUEUE_OUTPUT));
       // Remove local temp folder
@@ -109,8 +109,9 @@ public class TestPipes extends HamaClust
     super.tearDown();
   }
 
-  @Test
   public void testPipes() throws Exception {
+    System.setProperty(EXAMPLES_INSTALL_PROPERTY,
+        "/home/edward/workspace/hama-trunk/c++/target/native/");
 
     assertNotNull("System property " + EXAMPLES_INSTALL_PROPERTY
         + " is not defined!", System.getProperty(EXAMPLES_INSTALL_PROPERTY));
@@ -121,35 +122,45 @@ public class TestPipes extends HamaClust
       return;
     }
 
-    LOG.info(EXAMPLES_INSTALL_PROPERTY + " is defined: '"
-        + System.getProperty(EXAMPLES_INSTALL_PROPERTY) + "'");
+    // *** Summation Test ***
+    summation();
+
+    // *** MatrixMultiplication Test ***
+    matrixMult();
+    
+    // Remove local temp folder
+    cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
+  }
 
+  private void summation() throws Exception {
     // Setup Paths
     String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
     Path summationExec = new Path(examplesInstallPath + EXAMPLE_SUMMATION_EXEC);
-    Path matrixmultiplicationExec = new Path(examplesInstallPath
-        + EXAMPLE_MATRIXMULTIPLICATION_EXEC);
-    Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/in");
-    Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "testing/out");
+    Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/in");
+    Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "summation/out");
 
-    FileSystem fs = FileSystem.get(configuration);
-
-    // *** Summation Test ***
     // Generate Summation input
     BigDecimal sum = writeSummationInputFile(fs, inputPath);
 
     // Run Summation example
     runProgram(getSummationJob(configuration), summationExec, inputPath,
-        outputPath, 3, this.numOfGroom);
+        outputPath, 1, this.numOfGroom);
 
     // Verify output
     verifySummationOutput(configuration, outputPath, sum);
-
     // Clean input and output folder
     cleanup(fs, inputPath);
     cleanup(fs, outputPath);
+  }
+
+  private void matrixMult() throws Exception {
+    String examplesInstallPath = System.getProperty(EXAMPLES_INSTALL_PROPERTY);
+    Path matrixmultiplicationExec = new Path(examplesInstallPath
+        + EXAMPLE_MATRIXMULTIPLICATION_EXEC);
+
+    Path inputPath = new Path(EXAMPLE_TMP_OUTPUT + "matmult/in");
+    Path outputPath = new Path(EXAMPLE_TMP_OUTPUT + "matmult/out");
 
-    // *** MatrixMultiplication Test ***
     // Generate matrix dimensions
     Random rand = new Random();
     // (0-19) + 11 -> between 11-30
@@ -168,14 +179,14 @@ public class TestPipes extends HamaClust
     // Run MatrixMultiplication example
     runProgram(
         getMatrixMultiplicationJob(configuration, transposedMatrixBPath),
-        matrixmultiplicationExec, matrixAPath, outputPath, 3, this.numOfGroom);
+        matrixmultiplicationExec, matrixAPath, outputPath, 2, this.numOfGroom);
 
     // Verify output
     double[][] matrixC = multiplyMatrix(matrixA, matrixB);
     verifyMatrixMultiplicationOutput(configuration, outputPath, matrixC);
 
-    // Remove local temp folder
-    cleanup(fs, new Path(EXAMPLE_TMP_OUTPUT));
+    cleanup(fs, inputPath);
+    cleanup(fs, outputPath);
   }
 
   static BSPJob getSummationJob(HamaConfiguration conf) throws IOException {
@@ -187,6 +198,7 @@ public class TestPipes extends HamaClust
     bsp.setOutputKeyClass(Text.class);
     bsp.setOutputValueClass(DoubleWritable.class);
     bsp.set("bsp.message.class", DoubleWritable.class.getName());
+
     return bsp;
   }
 
@@ -199,8 +211,13 @@ public class TestPipes extends HamaClust
     bsp.setOutputFormat(SequenceFileOutputFormat.class);
     bsp.setOutputKeyClass(IntWritable.class);
     bsp.setOutputValueClass(PipesVectorWritable.class);
+
+    bsp.set(Constants.RUNTIME_PARTITIONING_DIR, HAMA_TMP_OUTPUT + "/parts");
     bsp.set("bsp.message.class", PipesKeyValueWritable.class.getName());
+    
+    bsp.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
     bsp.setPartitioner(PipesPartitioner.class);
+    
     // sort sent messages
     bsp.set(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
         "org.apache.hama.bsp.message.queue.SortedMessageTransferProtocol");
@@ -226,14 +243,12 @@ public class TestPipes extends HamaClust
       out.writeBytes(line);
 
       sum = sum.add(new BigDecimal(truncatedValue));
-      LOG.info("input[" + i + "]: '" + line + "' sum: " + sum.toString());
     }
     out.close();
     return sum;
   }
 
   static double[][] createRandomMatrix(int rows, int columns, Random rand) {
-    LOG.info("createRandomMatrix rows: " + rows + " cols: " + columns);
     final double[][] matrix = new double[rows][columns];
     double rangeMin = 0;
     double rangeMax = 100;
@@ -255,7 +270,6 @@ public class TestPipes extends HamaClust
     // Write matrix to DFS
     SequenceFile.Writer writer = null;
     try {
-      FileSystem fs = FileSystem.get(conf);
       writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class,
           PipesVectorWritable.class);
 
@@ -314,7 +328,6 @@ public class TestPipes extends HamaClust
 
   static void verifyOutput(HamaConfiguration conf, Path outputPath,
       String[] expectedResults) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
     FileStatus[] listStatus = fs.listStatus(outputPath);
     for (FileStatus status : listStatus) {
       if (!status.isDir()) {
@@ -342,7 +355,6 @@ public class TestPipes extends HamaClust
 
   static void verifySummationOutput(HamaConfiguration conf, Path outputPath,
       BigDecimal sum) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
     FileStatus[] listStatus = fs.listStatus(outputPath);
     for (FileStatus status : listStatus) {
       if (!status.isDir()) {
@@ -365,31 +377,22 @@ public class TestPipes extends HamaClust
 
   static void verifyMatrixMultiplicationOutput(HamaConfiguration conf,
       Path outputPath, double[][] matrix) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
     FileStatus[] listStatus = fs.listStatus(outputPath);
     for (FileStatus status : listStatus) {
       if (!status.isDir()) {
-        LOG.info("Output File: " + status.getPath());
         SequenceFile.Reader reader = new SequenceFile.Reader(fs,
             status.getPath(), conf);
         IntWritable key = new IntWritable();
         PipesVectorWritable value = new PipesVectorWritable();
         int rowIdx = 0;
         while (reader.next(key, value)) {
-
           assertEquals("Expected rowIdx: '" + rowIdx + "' != '" + key.get()
               + "'", rowIdx, key.get());
 
           DoubleVector rowVector = value.getVector();
-          LOG.info("key: " + key.get() + " value: " + rowVector.toString());
 
           for (int colIdx = 0; colIdx < rowVector.getLength(); colIdx++) {
-
             double colValue = rowVector.get(colIdx);
-
-            LOG.info("value[" + rowIdx + "," + colIdx + "]: " + colValue
-                + " expected: " + matrix[rowIdx][colIdx]);
-
             assertEquals("Expected colValue: '" + matrix[rowIdx][colIdx]
                 + "' != '" + colValue + "' in row: " + rowIdx + " values: "
                 + rowVector.toString(), matrix[rowIdx][colIdx], colValue,
@@ -408,8 +411,8 @@ public class TestPipes extends HamaClust
   }
 
   static void runProgram(BSPJob bsp, Path program, Path inputPath,
-      Path outputPath, int numBspTasks, int numOfGroom) throws IOException {
-
+      Path outputPath, int numBspTasks, int numOfGroom) throws IOException,
+      ClassNotFoundException, InterruptedException {
     HamaConfiguration conf = (HamaConfiguration) bsp.getConfiguration();
     bsp.setJobName("Test Hama Pipes " + program.getName());
     bsp.setBspClass(PipesBSP.class);
@@ -421,7 +424,6 @@ public class TestPipes extends HamaClust
     Submitter.setIsJavaRecordWriter(conf, true);
 
     BSPJobClient jobClient = new BSPJobClient(conf);
-    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
 
     // Set bspTaskNum
     ClusterStatus cluster = jobClient.getClusterStatus(false);
@@ -430,7 +432,6 @@ public class TestPipes extends HamaClust
 
     // Copy binary to DFS
     Path testExec = new Path(EXAMPLE_TMP_OUTPUT + "testing/bin/application");
-    FileSystem fs = FileSystem.get(conf);
     fs.delete(testExec.getParent(), true);
     fs.copyFromLocalFile(program, testExec);
 



Mime
View raw message