hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1435008 - /hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Date Fri, 18 Jan 2013 02:07:26 GMT
Author: edwardyoon
Date: Fri Jan 18 02:07:25 2013
New Revision: 1435008

URL: http://svn.apache.org/viewvc?rev=1435008&view=rev
Log:
HAMA-716: Fix partitioning runner bug

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1435008&r1=1435007&r2=1435008&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Fri Jan 18 02:07:25
2013
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +38,7 @@ import org.apache.hama.util.KeyValuePair
 
 public class PartitioningRunner extends
     BSP<Writable, Writable, Writable, Writable, NullWritable> {
+  public static final Log LOG = LogFactory.getLog(PartitioningRunner.class);
 
   private Configuration conf;
   private int desiredNum;
@@ -91,10 +94,9 @@ public class PartitioningRunner extends
         KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
 
     public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
-        @SuppressWarnings("rawtypes")
-        Partitioner partitioner, Configuration conf,
-        @SuppressWarnings("rawtypes")
-        BSPPeer peer, int numTasks);
+        @SuppressWarnings("rawtypes") Partitioner partitioner,
+        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+        int numTasks);
   }
 
   /**
@@ -111,10 +113,9 @@ public class PartitioningRunner extends
     @SuppressWarnings("unchecked")
     @Override
     public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
-        @SuppressWarnings("rawtypes")
-        Partitioner partitioner, Configuration conf,
-        @SuppressWarnings("rawtypes")
-        BSPPeer peer, int numTasks) {
+        @SuppressWarnings("rawtypes") Partitioner partitioner,
+        Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
+        int numTasks) {
       return Math.abs(partitioner.getPartition(outputRecord.getKey(),
           outputRecord.getValue(), numTasks));
     }
@@ -179,20 +180,27 @@ public class PartitioningRunner extends
     // scan.
     FileStatus[] status = fs.listStatus(partitionDir);
     for (int j = 0; j < status.length; j++) {
-      int idx = Integer.parseInt(status[j].getPath().getName().split("[-]")[1]);
-      int assignedID = idx / (desiredNum / peer.getNumPeers());
+      int partitionID = Integer.parseInt(status[j].getPath().getName()
+          .split("[-]")[1]);
+      int assignedID = partitionID / (desiredNum / peer.getNumPeers());
       if (assignedID == peer.getNumPeers())
         assignedID = assignedID - 1;
 
       // TODO set replica factor to 1.
       // TODO and check whether we can write to specific DataNode.
       if (assignedID == peer.getPeerIndex()) {
+        Path partitionFile = new Path(partitionDir + "/"
+            + getPartitionName(partitionID));
+
         FileStatus[] files = fs.listStatus(status[j].getPath());
         SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
-            new Path(partitionDir + "/" + getPartitionName(j)), outputKeyClass,
-            outputValueClass, CompressionType.NONE);
+            partitionFile, outputKeyClass, outputValueClass,
+            CompressionType.NONE);
 
         for (int i = 0; i < files.length; i++) {
+          LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
+              + "/" + getPartitionName(partitionID));
+
           SequenceFile.Reader reader = new SequenceFile.Reader(fs,
               files[i].getPath(), conf);
 



Mime
View raw message