incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1200322 - in /incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp: BSPJobClient.java FileInputFormat.java
Date Thu, 10 Nov 2011 13:16:43 GMT
Author: tjungblut
Date: Thu Nov 10 13:16:42 2011
New Revision: 1200322

URL: http://svn.apache.org/viewvc?rev=1200322&view=rev
Log:
Fix for partitioning

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1200322&r1=1200321&r2=1200322&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Nov
10 13:16:42 2011
@@ -451,7 +451,7 @@ public class BSPJobClient extends Config
             Object key = recordReader.createKey();
             Object value = recordReader.createValue();
             while (recordReader.next(key, value)) {
-              int index = partitioner.getPartition(key, value, numOfTasks);
+              int index = Math.abs(partitioner.getPartition(key, value, numOfTasks));
               writers.get(index).append(key, value);
             }
             LOG.debug("Done with split " + i);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1200322&r1=1200321&r2=1200322&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Thu Nov
10 13:16:42 2011
@@ -185,12 +185,21 @@ public abstract class FileInputFormat<K,
       totalSize += file.getLen();
     }
 
+    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+    // take the short circuit path if we have already partitioned
+    if (numSplits == files.length) {
+      for (FileStatus file : files) {
+        splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
+            new String[0]));
+      }
+      return splits.toArray(new FileSplit[splits.size()]);
+    }
+
     long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
     long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1),
         minSplitSize);
 
     // generate splits
-    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
     NetworkTopology clusterMap = new NetworkTopology();
     for (FileStatus file : files) {
       Path path = file.getPath();
@@ -255,8 +264,8 @@ public abstract class FileInputFormat<K,
    *          inputs for the map-reduce job.
    */
   public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) {
-    setInputPaths(conf, StringUtils
-        .stringToPath(getPathStrings(commaSeparatedPaths)));
+    setInputPaths(conf,
+        StringUtils.stringToPath(getPathStrings(commaSeparatedPaths)));
   }
 
   /**



Mime
View raw message