hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1679360 - in /hama/trunk/core/src: main/java/org/apache/hama/bsp/BSPJobClient.java main/java/org/apache/hama/bsp/PartitioningRunner.java test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Date Thu, 14 May 2015 13:01:19 GMT
Author: edwardyoon
Date: Thu May 14 13:01:18 2015
New Revision: 1679360

URL: http://svn.apache.org/r1679360
Log: (empty)

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679360&r1=1679359&r2=1679360&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu May 14 13:01:18
2015
@@ -471,7 +471,7 @@ public class BSPJobClient extends Config
         partitioningJob.getConfiguration().setClass(
             MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
             MessageQueue.class);
-        
+
         partitioningJob.setBoolean(Constants.FORCE_SET_BSP_TASKS, true);
         partitioningJob.setInputFormat(job.getInputFormat().getClass());
         partitioningJob.setInputKeyClass(job.getInputKeyClass());
@@ -578,6 +578,15 @@ public class BSPJobClient extends Config
       DataOutputBuffer buffer = new DataOutputBuffer();
       RawSplit rawSplit = new RawSplit();
       for (InputSplit split : splits) {
+
+        // set partitionID to rawSplit
+        if (split.getClass().getName().equals(FileSplit.class.getName())
+            && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) !=
null
+            && job.get("bsp.partitioning.runner.job") == null) {
+          String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
+          rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
+        }
+
         rawSplit.setClassName(split.getClass().getName());
         buffer.reset();
         split.write(buffer);
@@ -629,7 +638,10 @@ public class BSPJobClient extends Config
     for (int i = 0; i < len; ++i) {
       RawSplit split = new RawSplit();
       split.readFields(in);
-      result[i] = split;
+      if (split.getPartitionID() != Integer.MIN_VALUE)
+        result[split.getPartitionID()] = split;
+      else
+        result[i] = split;
     }
     return result;
   }
@@ -1075,12 +1087,21 @@ public class BSPJobClient extends Config
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();
     private String[] locations;
+    private int partitionID = Integer.MIN_VALUE;
     long dataLength;
 
     public void setBytes(byte[] data, int offset, int length) {
       bytes.set(data, offset, length);
     }
 
+    public void setPartitionID(int id) {
+      this.partitionID = id;
+    }
+
+    public int getPartitionID() {
+      return partitionID;
+    }
+
     public void setClassName(String className) {
       splitClass = className;
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1679360&r1=1679359&r2=1679360&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Thu May 14 13:01:18
2015
@@ -144,7 +144,6 @@ public class PartitioningRunner extends
       raw = new MapWritable();
       raw.put(rawRecord.getKey(), rawRecord.getValue());
       
-      System.out.println(peer.getPeerName(index) + ", " + rawRecord.getKey() + ", " + rawRecord.getValue());
       peer.send(peer.getPeerName(index), raw);
     }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679360&r1=1679359&r2=1679360&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java Thu
May 14 13:01:18 2015
@@ -78,8 +78,6 @@ public class TestKeyValueTextInputFormat
 
         int expectedPeerId = Math.abs(key.hashCode() % numTasks);
 
-        System.out.println(peer.getPeerName() + ": " + key + ", " + value + ", " + expectedPeerId);
-        /*
         if (expectedPeerId == peer.getPeerIndex()) {
           expectedKeys.put(new Text(key), new Text(value));
         } else {
@@ -88,7 +86,6 @@ public class TestKeyValueTextInputFormat
               new BooleanWritable(true));
           break;
         }
-        */
       }
       message.put(new Text(KeyValueHashPartitionedBSP.TEST_INPUT_VALUES),
           expectedKeys);
@@ -106,7 +103,6 @@ public class TestKeyValueTextInputFormat
         while ((msg = peer.getCurrentMessage()) != null) {
           blValue = (BooleanWritable) msg.get(new Text(
               KeyValueHashPartitionedBSP.TEST_UNEXPECTED_KEYS));
-          System.out.println(">>>>> " + peer.getPeerName() + ", " + blValue.get());
           assertEquals(false, blValue.get());
           values = (MapWritable) msg.get(new Text(
               KeyValueHashPartitionedBSP.TEST_INPUT_VALUES));



Mime
View raw message