hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1679351 - in /hama/trunk/core/src: main/java/org/apache/hama/bsp/BSPJobClient.java test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Date Thu, 14 May 2015 12:07:03 GMT
Author: edwardyoon
Date: Thu May 14 12:07:03 2015
New Revision: 1679351

URL: http://svn.apache.org/r1679351
Log:
debugging

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679351&r1=1679350&r2=1679351&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu May 14 12:07:03
2015
@@ -465,6 +465,13 @@ public class BSPJobClient extends Config
         LOG.debug("partitioningJob input: "
             + partitioningJob.get(Constants.JOB_INPUT_DIR));
 
+        partitioningJob.getConfiguration().setClass(
+            MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
+            OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class);
+        partitioningJob.getConfiguration().setClass(
+            MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+            MessageQueue.class);
+        
         partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
         partitioningJob.setInputFormat(job.getInputFormat().getClass());
         partitioningJob.setInputKeyClass(job.getInputKeyClass());

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679351&r1=1679350&r2=1679351&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java Thu
May 14 12:07:03 2015
@@ -41,8 +41,7 @@ import org.junit.Test;
 
 public class TestKeyValueTextInputFormat extends TestCase {
 
-  public static class KeyValueHashPartitionedBSP 
-      extends 
+  public static class KeyValueHashPartitionedBSP extends
       BSP<Text, Text, NullWritable, NullWritable, MapWritable> {
     public static final String TEST_INPUT_VALUES = "test.bsp.max.input";
     public static final String TEST_UNEXPECTED_KEYS = "test.bsp.keys.unexpected";
@@ -51,7 +50,8 @@ public class TestKeyValueTextInputFormat
     private int numTasks = 0;
     private int maxValue = 0;
     private MapWritable expectedKeys = new MapWritable();
-    //private Set<Text> expectedKeys = new HashSet<Text>();
+
+    // private Set<Text> expectedKeys = new HashSet<Text>();
 
     @Override
     public void setup(
@@ -69,10 +69,11 @@ public class TestKeyValueTextInputFormat
       Text key = null;
       Text value = null;
       MapWritable message = new MapWritable();
-      message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new BooleanWritable(false));
+      message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS),
+          new BooleanWritable(false));
       KeyValuePair<Text, Text> tmp = null;
 
-      while ( (tmp = peer.readNext()) != null) {
+      while ((tmp = peer.readNext()) != null) {
         key = tmp.getKey();
         value = tmp.getValue();
 
@@ -80,60 +81,69 @@ public class TestKeyValueTextInputFormat
 
         if (expectedPeerId == peer.getPeerIndex()) {
           if (expectedKeys.containsKey(key)) {
+            System.out.println("duplicate: " + value + ", " + expectedKeys.get(key));
             // same key twice, incorrect
-            message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new BooleanWritable(true));
+            message.put(new Text(
+                KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS),
+                new BooleanWritable(true));
             break;
           } else {
             expectedKeys.put(new Text(key), new Text(value));
           }
         } else {
-          message.put(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS), new BooleanWritable(true));
+          System.out.println("duplicate: " + value + ", " + expectedKeys.get(key));
+          message.put(
+              new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS),
+              new BooleanWritable(true));
           break;
-        } //if (expectedPeerId == peer.getPeerIndex())
-      } //while (peer.readNext(key, value) != false)
-      message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES), expectedKeys);
-      
+        } // if (expectedPeerId == peer.getPeerIndex())
+      } // while (peer.readNext(key, value) != false)
+      message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES),
+          expectedKeys);
+
       int master = peer.getNumPeers() / 2;
-      String masterName = peer.getPeerName(master);
-      peer.send(masterName, message);
+      peer.send(peer.getPeerName(master), message);
       peer.sync();
 
-      if(peer.getPeerIndex() == master) {
+      if (peer.getPeerIndex() == master) {
         MapWritable msg = null;
         MapWritable values = null;
         BooleanWritable blValue = null;
         HashMap<Integer, Integer> input = new HashMap<Integer, Integer>();
 
-        while ( (msg = peer.getCurrentMessage()) != null ) {
-          blValue = (BooleanWritable) msg.get(new Text(KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
-          System.out.println(">>>>> " + peer.getPeerName() + ", "+ blValue.get());
+        while ((msg = peer.getCurrentMessage()) != null) {
+          blValue = (BooleanWritable) msg.get(new Text(
+              KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
+          System.out.println(">>>>> " + peer.getPeerName() + ", " + blValue.get());
           assertEquals(false, blValue.get());
-          values = (MapWritable) msg.get(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));
-          for (Map.Entry<Writable,Writable> w : values.entrySet()) {
-            input.put( Integer.valueOf( w.getKey().toString() ), Integer.valueOf( w.getValue().toString()
));
+          values = (MapWritable) msg.get(new Text(
+              KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));
+          for (Map.Entry<Writable, Writable> w : values.entrySet()) {
+            input.put(Integer.valueOf(w.getKey().toString()),
+                Integer.valueOf(w.getValue().toString()));
           }
         }
-        
-        for (int i=0; i<maxValue; i++) {
+
+        for (int i = 0; i < maxValue; i++) {
           assertEquals(true, input.containsKey(Integer.valueOf(i)));
-          assertEquals(i*i, input.get(Integer.valueOf(i)).intValue());
+          assertEquals(i * i, input.get(Integer.valueOf(i)).intValue());
         }
       }
       peer.sync();
     }
   }
-  
+
   @Test
   public void testInput() throws IOException {
-    
+
     Configuration fsConf = new Configuration();
     String strDataPath = "/tmp/test_keyvalueinputformat";
     Path dataPath = new Path(strDataPath);
     Path outPath = new Path("/tmp/test_keyvalueinputformat_out");
-    
+
     int maxValue = 1000;
     FileSystem fs = null;
-    
+
     try {
       URI uri = new URI(strDataPath);
       fs = FileSystem.get(uri, fsConf);
@@ -147,7 +157,7 @@ public class TestKeyValueTextInputFormat
       for (int i = 0; i < maxValue; ++i) {
         str.append(i);
         str.append("\t");
-        str.append(i*i);
+        str.append(i * i);
         str.append("\n");
       }
       fileOut.writeBytes(str.toString());
@@ -156,18 +166,17 @@ public class TestKeyValueTextInputFormat
     } catch (Exception e) {
       e.printStackTrace();
     }
-    
-    
+
     try {
       HamaConfiguration conf = new HamaConfiguration();
       conf.setInt(KeyValueHashPartitionedBSP.TEST_MAX_VALUE, maxValue);
       BSPJob job = new BSPJob(conf, TestKeyValueTextInputFormat.class);
       job.setJobName("Test KeyValueTextInputFormat together with HashPartitioner");
       job.setBspClass(KeyValueHashPartitionedBSP.class);
-      
+
       job.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
       job.setPartitioner(HashPartitioner.class);
-      
+
       job.setNumBspTask(2);
       job.setInputPath(dataPath);
       job.setInputFormat(KeyValueTextInputFormat.class);



Mime
View raw message