hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amareshw...@apache.org
Subject svn commit: r992989 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Date Mon, 06 Sep 2010 10:23:24 GMT
Author: amareshwari
Date: Mon Sep  6 10:23:24 2010
New Revision: 992989

URL: http://svn.apache.org/viewvc?rev=992989&view=rev
Log:
MAPREDUCE-2046. Fixes CombineFileInputFormat to allow splits with size less than DFS block
size. Contributed by dhruba borthakur

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=992989&r1=992988&r2=992989&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Sep  6 10:23:24 2010
@@ -277,6 +277,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2031. Fixes test failures TestTaskLauncher and
     TestTaskTrackerLocalization. (Ravi Gummadi via amareshwari)
 
+    MAPREDUCE-2046. Fixes CombineFileInputFormat to allow splits with size
+    less than DFS block size. (dhruba borthakur via amareshwari)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=992989&r1=992988&r2=992989&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
Mon Sep  6 10:23:24 2010
@@ -268,7 +268,8 @@ public abstract class CombineFileInputFo
     long totLength = 0;
     for (int i = 0; i < paths.length; i++) {
       files[i] = new OneFileInfo(paths[i], conf, 
-                                 rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
+                                 rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes,
+                                 maxSize);
       totLength += files[i].getLength();
     }
 
@@ -467,7 +468,8 @@ public abstract class CombineFileInputFo
                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
                 HashMap<OneBlockInfo, String[]> blockToNodes,
                 HashMap<String, List<OneBlockInfo>> nodeToBlocks,
-                HashMap<String, Set<String>> rackToNodes)
+                HashMap<String, Set<String>> rackToNodes,
+                long maxSize)
                 throws IOException {
       this.fileSize = 0;
 
@@ -480,44 +482,69 @@ public abstract class CombineFileInputFo
       if (locations == null) {
         blocks = new OneBlockInfo[0];
       } else {
-        blocks = new OneBlockInfo[locations.length];
+        ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(locations.length);
         for (int i = 0; i < locations.length; i++) {
            
           fileSize += locations[i].getLength();
-          OneBlockInfo oneblock =  new OneBlockInfo(path, 
-                                       locations[i].getOffset(), 
-                                       locations[i].getLength(),
+
+          // each split can be a maximum of maxSize
+          long left = locations[i].getLength();
+          long myOffset = locations[i].getOffset();
+          long myLength = 0;
+          while (left > 0) {
+            if (maxSize == 0) {
+              myLength = left;
+            } else {
+              if (left > maxSize && left < 2 * maxSize) {
+                // if remainder is between max and 2*max - then 
+                // instead of creating splits of size max, left-max we
+                // create splits of size left/2 and left/2. This is
+                // a heuristic to avoid creating really really small
+                // splits.
+                myLength = left / 2;
+              } else {
+                myLength = Math.min(maxSize, left);
+              }
+            }
+            OneBlockInfo oneblock =  new OneBlockInfo(path, 
+                                       myOffset,
+                                       myLength,
                                        locations[i].getHosts(),
                                        locations[i].getTopologyPaths()); 
-          blocks[i] = oneblock;
+            left -= myLength;
+            myOffset += myLength;
 
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
+            blocksList.add(oneblock);
 
-          // add this block to the rack --> block map
-          for (int j = 0; j < oneblock.racks.length; j++) {
-            String rack = oneblock.racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
-            }
-            blklist.add(oneblock);
-            // Add this host to rackToNodes map
-            addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
-         }
-
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
+            // add this block to the block --> node locations map
+            blockToNodes.put(oneblock, oneblock.hosts);
+
+            // add this block to the rack --> block map
+            for (int j = 0; j < oneblock.racks.length; j++) {
+              String rack = oneblock.racks[j];
+              List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+              if (blklist == null) {
+                blklist = new ArrayList<OneBlockInfo>();
+                rackToBlocks.put(rack, blklist);
+              }
+              blklist.add(oneblock);
+              // Add this host to rackToNodes map
+              addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
+           }
+
+            // add this block to the node --> block map
+            for (int j = 0; j < oneblock.hosts.length; j++) {
+              String node = oneblock.hosts[j];
+              List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+              if (blklist == null) {
+                blklist = new ArrayList<OneBlockInfo>();
+                nodeToBlocks.put(node, blklist);
+              }
+              blklist.add(oneblock);
             }
-            blklist.add(oneblock);
           }
         }
+        blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
       }
     }
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=992989&r1=992988&r2=992989&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Mon Sep  6 10:23:24 2010
@@ -463,7 +463,7 @@ public class TestCombineFileInputFormat 
         dir1 + "," + dir2 + "," + dir3 + "," + dir4);
       splits = inFormat.getSplits(job);
       for (InputSplit split : splits) {
-        System.out.println("File split(Test1): " + split);
+        System.out.println("File split(Test5): " + split);
       }
       assertEquals(splits.size(), 4);
       fileSplit = (CombineFileSplit) splits.get(0);
@@ -514,7 +514,7 @@ public class TestCombineFileInputFormat 
       FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
       splits = inFormat.getSplits(job);
       for (InputSplit split : splits) {
-        System.out.println("File split(Test1): " + split);
+        System.out.println("File split(Test6): " + split);
       }
       assertEquals(splits.size(), 3);
       fileSplit = (CombineFileSplit) splits.get(0);
@@ -562,7 +562,7 @@ public class TestCombineFileInputFormat 
         dir1 + "," + dir2 + "," + dir3 + "," + dir4);
       splits = inFormat.getSplits(job);
       for (InputSplit split : splits) {
-        System.out.println("File split(Test1): " + split);
+        System.out.println("File split(Test7): " + split);
       }
       assertEquals(splits.size(), 2);
       fileSplit = (CombineFileSplit) splits.get(0);
@@ -622,6 +622,18 @@ public class TestCombineFileInputFormat 
       System.out.println("Elapsed time for " + numPools + " pools " +
                          " and " + numFiles + " files is " + 
                          ((end - start)/1000) + " seconds.");
+
+      // This file has three whole blocks. If the maxsplit size is
+      // half the block size, then there should be six splits.
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(BLOCKSIZE/2);
+      FileInputFormat.setInputPaths(job, dir3);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test8): " + split);
+      }
+      assertEquals(6, splits.size());
+
     } finally {
       if (dfs != null) {
         dfs.shutdown();



Mime
View raw message