hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1389701 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
Date Tue, 25 Sep 2012 03:56:23 GMT
Author: edwardyoon
Date: Tue Sep 25 03:56:22 2012
New Revision: 1389701

URL: http://svn.apache.org/viewvc?rev=1389701&view=rev
Log:
Make the input spliter robustly

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1389701&r1=1389700&r2=1389701&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Sep 25 03:56:22 2012
@@ -8,6 +8,7 @@ Release 0.6 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-647: Make the input spliter robustly (Yuesheng Hu via edwardyoon)
    HAMA-635: Number of vertices value is inconsistent among tasks (Yuesheng Hu via tjungblut)
    HAMA-633: Fix CI Failures (tjungblut)
    HAMA-631: Add "commons-httpclient-3.1.jar" (Paul Gyuho Song via edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1389701&r1=1389700&r2=1389701&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Tue Sep 25 03:56:22
2012
@@ -97,7 +97,7 @@ public abstract class FileInputFormat<K,
       BSPJob job) throws IOException;
 
   /**
-   * Set a PathFilter to be applied to the input paths for the map-reduce job.
+   * Set a PathFilter to be applied to the input paths for the BSP job.
    * 
    * @param filter the PathFilter class use for filtering the input paths.
    */
@@ -205,6 +205,7 @@ public abstract class FileInputFormat<K,
     }
 
     ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+    long goalSize = 0;
     // take the short circuit path if we have already partitioned
     if (numSplits == files.length) {
       for (FileStatus file : files) {
@@ -214,9 +215,13 @@ public abstract class FileInputFormat<K,
         }
       }
       return splits.toArray(new FileSplit[splits.size()]);
+    } else if (files.length == 1) {
+      goalSize = totalSize / (numSplits == 0 ? 1 : numSplits - 1);
+    } else {
+      goalSize = totalSize
+          / (numSplits == 0 ? 1 : numSplits - files.length / 2 + 1);
     }
-
-    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
+    LOG.debug("numSplits: " + numSplits); 
     long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1),
         minSplitSize);
 
@@ -232,7 +237,7 @@ public abstract class FileInputFormat<K,
         if ((length != 0) && isSplitable(fs, path)) {
           long blockSize = file.getBlockSize();
           long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-          LOG.debug("computeSplitSize: " + splitSize + " (" + goalSize + ", "
+          LOG.info("computeSplitSize: " + splitSize + " (" + goalSize + ", "
               + minSize + ", " + blockSize + ")");
 
           long bytesRemaining = length;
@@ -264,7 +269,11 @@ public abstract class FileInputFormat<K,
   }
 
   protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
-    return Math.max(minSize, Math.min(goalSize, blockSize));
+    if (goalSize > blockSize) {
+      return Math.max(minSize, Math.max(goalSize, blockSize));
+    } else {
+      return Math.max(minSize, Math.min(goalSize, blockSize));
+    }
   }
 
   protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
@@ -283,12 +292,11 @@ public abstract class FileInputFormat<K,
   }
 
   /**
-   * Sets the given comma separated paths as the list of inputs for the
-   * map-reduce job.
+   * Sets the given comma separated paths as the list of inputs for the BSP job.
    * 
    * @param conf Configuration of the job
    * @param commaSeparatedPaths Comma separated paths to be set as the list of
-   *          inputs for the map-reduce job.
+   *          inputs for the BSP job.
    */
   public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) {
     setInputPaths(conf,
@@ -296,12 +304,11 @@ public abstract class FileInputFormat<K,
   }
 
   /**
-   * Add the given comma separated paths to the list of inputs for the
-   * map-reduce job.
+   * Add the given comma separated paths to the list of inputs for the BSP job.
    * 
    * @param conf The configuration of the job
    * @param commaSeparatedPaths Comma separated paths to be added to the list of
-   *          inputs for the map-reduce job.
+   *          inputs for the BSP job.
    */
   public static void addInputPaths(BSPJob conf, String commaSeparatedPaths) {
     for (String str : getPathStrings(commaSeparatedPaths)) {
@@ -310,12 +317,11 @@ public abstract class FileInputFormat<K,
   }
 
   /**
-   * Set the array of {@link Path}s as the list of inputs for the map-reduce
-   * job.
+   * Set the array of {@link Path}s as the list of inputs for the BSP job.
    * 
    * @param conf Configuration of the job.
    * @param inputPaths the {@link Path}s of the input directories/files for the
-   *          map-reduce job.
+   *          BSP job.
    */
   public static void setInputPaths(BSPJob conf, Path... inputPaths) {
     Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
@@ -330,11 +336,10 @@ public abstract class FileInputFormat<K,
   }
 
   /**
-   * Add a {@link Path} to the list of inputs for the map-reduce job.
+   * Add a {@link Path} to the list of inputs for the BSP job.
    * 
    * @param conf The configuration of the job
-   * @param path {@link Path} to be added to the list of inputs for the
-   *          map-reduce job.
+   * @param path {@link Path} to be added to the list of inputs for the BSP job.
    */
   public static void addInputPath(BSPJob conf, Path p) {
     Path path = new Path(conf.getWorkingDirectory(), p);
@@ -384,10 +389,10 @@ public abstract class FileInputFormat<K,
   }
 
   /**
-   * Get the list of input {@link Path}s for the map-reduce job.
+   * Get the list of input {@link Path}s for the BSP job.
    * 
    * @param conf The configuration of the job
-   * @return the list of input {@link Path}s for the map-reduce job.
+   * @return the list of input {@link Path}s for the BSP job.
    */
   public static Path[] getInputPaths(BSPJob conf) {
     String dirs = conf.getConf().get("bsp.input.dir", "");



Mime
View raw message