hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amareshw...@apache.org
Subject svn commit: r993293 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Date Tue, 07 Sep 2010 09:06:13 GMT
Author: amareshwari
Date: Tue Sep  7 09:06:12 2010
New Revision: 993293

URL: http://svn.apache.org/viewvc?rev=993293&view=rev
Log:
MAPREDUCE-1597. Fixes CombineFileInputFormat to work with non-splittable files. Contributed
by Amareshwari Sriramadasu

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=993293&r1=993292&r2=993293&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Sep  7 09:06:12 2010
@@ -283,6 +283,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1975. Fixes unnecessary InterruptedException log in gridmix.
     (Ravi Gummadi via amareshwari)
 
+    MAPREDUCE-1597. Fixes CombineFileInputFormat to work with non-splittable
+    files. (amareshwari)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=993293&r1=993292&r2=993293&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
Tue Sep  7 09:06:12 2010
@@ -38,6 +38,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -140,6 +143,16 @@ public abstract class CombineFileInputFo
     }
     pools.add(multi);
   }
+  
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    final CompressionCodec codec =
+      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
+  }
 
   /**
    * default constructor
@@ -223,12 +236,12 @@ public abstract class CombineFileInputFo
         }
       }
       // create splits for all files in this pool.
-      getMoreSplits(conf, myPaths.toArray(new Path[myPaths.size()]), 
+      getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
                     maxSize, minSizeNode, minSizeRack, splits);
     }
 
     // create splits for all files that are not in any pool.
-    getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]), 
+    getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), 
                   maxSize, minSizeNode, minSizeRack, splits);
 
     // free up rackToNodes map
@@ -239,10 +252,11 @@ public abstract class CombineFileInputFo
   /**
    * Return all the splits in the specified set of paths
    */
-  private void getMoreSplits(Configuration conf, Path[] paths, 
+  private void getMoreSplits(JobContext job, Path[] paths, 
                              long maxSize, long minSizeNode, long minSizeRack,
                              List<InputSplit> splits)
     throws IOException {
+    Configuration conf = job.getConfiguration();
 
     // all blocks for all the files in input set
     OneFileInfo[] files;
@@ -267,9 +281,9 @@ public abstract class CombineFileInputFo
     // populate all the blocks for all files
     long totLength = 0;
     for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], conf, 
-                                 rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes,
-                                 maxSize);
+      files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
+                                 rackToBlocks, blockToNodes, nodeToBlocks,
+                                 rackToNodes, maxSize);
       totLength += files[i].getLength();
     }
 
@@ -465,6 +479,7 @@ public abstract class CombineFileInputFo
     private OneBlockInfo[] blocks;       // all blocks in this file
 
     OneFileInfo(Path path, Configuration conf,
+                boolean isSplitable,
                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
                 HashMap<OneBlockInfo, String[]> blockToNodes,
                 HashMap<String, List<OneBlockInfo>> nodeToBlocks,
@@ -482,69 +497,78 @@ public abstract class CombineFileInputFo
       if (locations == null) {
         blocks = new OneBlockInfo[0];
       } else {
-        ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(locations.length);
-        for (int i = 0; i < locations.length; i++) {
-           
-          fileSize += locations[i].getLength();
-
-          // each split can be a maximum of maxSize
-          long left = locations[i].getLength();
-          long myOffset = locations[i].getOffset();
-          long myLength = 0;
-          while (left > 0) {
-            if (maxSize == 0) {
-              myLength = left;
-            } else {
-              if (left > maxSize && left < 2 * maxSize) {
-                // if remainder is between max and 2*max - then 
-                // instead of creating splits of size max, left-max we
-                // create splits of size left/2 and left/2. This is
-                // a heuristic to avoid creating really really small
-                // splits.
-                myLength = left / 2;
+        if (!isSplitable) {
+          // if the file is not splitable, just create the one block with
+          // full file length
+          blocks = new OneBlockInfo[1];
+          fileSize = stat.getLen();
+          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+              .getHosts(), locations[0].getTopologyPaths());
+        } else {
+          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+              locations.length);
+          for (int i = 0; i < locations.length; i++) {
+            fileSize += locations[i].getLength();
+
+            // each split can be a maximum of maxSize
+            long left = locations[i].getLength();
+            long myOffset = locations[i].getOffset();
+            long myLength = 0;
+            while (left > 0) {
+              if (maxSize == 0) {
+                myLength = left;
               } else {
-                myLength = Math.min(maxSize, left);
+                if (left > maxSize && left < 2 * maxSize) {
+                  // if remainder is between max and 2*max - then
+                  // instead of creating splits of size max, left-max we
+                  // create splits of size left/2 and left/2. This is
+                  // a heuristic to avoid creating really really small
+                  // splits.
+                  myLength = left / 2;
+                } else {
+                  myLength = Math.min(maxSize, left);
+                }
               }
+              OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+                  myLength, locations[i].getHosts(), locations[i]
+                      .getTopologyPaths());
+              left -= myLength;
+              myOffset += myLength;
+
+              blocksList.add(oneblock);
             }
-            OneBlockInfo oneblock =  new OneBlockInfo(path, 
-                                       myOffset,
-                                       myLength,
-                                       locations[i].getHosts(),
-                                       locations[i].getTopologyPaths()); 
-            left -= myLength;
-            myOffset += myLength;
-
-            blocksList.add(oneblock);
-
-            // add this block to the block --> node locations map
-            blockToNodes.put(oneblock, oneblock.hosts);
-
-            // add this block to the rack --> block map
-            for (int j = 0; j < oneblock.racks.length; j++) {
-              String rack = oneblock.racks[j];
-              List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-              if (blklist == null) {
-                blklist = new ArrayList<OneBlockInfo>();
-                rackToBlocks.put(rack, blklist);
-              }
-              blklist.add(oneblock);
-              // Add this host to rackToNodes map
-              addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
-           }
-
-            // add this block to the node --> block map
-            for (int j = 0; j < oneblock.hosts.length; j++) {
-              String node = oneblock.hosts[j];
-              List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-              if (blklist == null) {
-                blklist = new ArrayList<OneBlockInfo>();
-                nodeToBlocks.put(node, blklist);
-              }
-              blklist.add(oneblock);
+          }
+          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+        }
+
+        for (OneBlockInfo oneblock : blocks) {
+          // add this block to the block --> node locations map
+          blockToNodes.put(oneblock, oneblock.hosts);
+
+          // add this block to the rack --> block map
+          for (int j = 0; j < oneblock.racks.length; j++) {
+            String rack = oneblock.racks[j];
+            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              rackToBlocks.put(rack, blklist);
+            }
+            blklist.add(oneblock);
+            // Add this host to rackToNodes map
+            addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
+          }
+
+          // add this block to the node --> block map
+          for (int j = 0; j < oneblock.hosts.length; j++) {
+            String node = oneblock.hosts[j];
+            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              nodeToBlocks.put(node, blklist);
             }
+            blklist.add(oneblock);
           }
         }
-        blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
       }
     }
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=993293&r1=993292&r2=993293&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Tue Sep  7 09:06:12 2010
@@ -18,8 +18,10 @@
 package org.apache.hadoop.mapreduce.lib.input;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.zip.GZIPOutputStream;
 
 import junit.framework.TestCase;
 
@@ -648,12 +650,376 @@ public class TestCombineFileInputFormat 
     FSDataOutputStream stm = fileSys.create(name, true,
                                             conf.getInt("io.file.buffer.size", 4096),
                                             replication, (long)BLOCKSIZE);
+    writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
+  }
+
+  // Creates the gzip file and return the FileStatus
+  static FileStatus writeGzipFile(Configuration conf, Path name,
+      short replication, int numBlocks) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+
+    GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
+        .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
+    writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
+    return fileSys.getFileStatus(name);
+  }
+
+  private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
+      OutputStream out, short replication, int numBlocks) throws IOException {
     for (int i = 0; i < numBlocks; i++) {
-      stm.write(databuf);
+      out.write(databuf);
     }
-    stm.close();
+    out.close();
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
+  
+  public void testSplitPlacementForCompressedFiles() throws IOException {
+    MiniDFSCluster dfs = null;
+    FileSystem fileSys = null;
+    try {
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
+       *  files
+       * 1) file1 and file5, just after starting the datanode on r1, with 
+       *    a repl factor of 1, and,
+       * 2) file2, just after starting the datanode on r2, with 
+       *    a repl factor of 2, and,
+       * 3) file3, file4 after starting the all three datanodes, with a repl 
+       *    factor of 3.
+       * At the end, file1, file5 will be present on only datanode1, file2 will 
+       * be present on datanode 1 and datanode2 and 
+       * file3, file4 will be present on all datanodes. 
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      Path file1 = new Path(dir1 + "/file1.gz");
+      FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
+      // create another file on the same datanode
+      Path file5 = new Path(dir5 + "/file5.gz");
+      FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = Job.getInstance(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+      List<InputSplit> splits = inFormat.getSplits(job);
+      System.out.println("Made splits(Test0): " + splits.size());
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test0): " + split);
+      }
+      assertEquals(splits.size(), 1);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f5.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+      
+      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+      dfs.waitActive();
+
+      // create file on two datanodes.
+      Path file2 = new Path(dir2 + "/file2.gz");
+      FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
+
+      // split it using a CombinedFile input format
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job);
+      System.out.println("Made splits(Test1): " + splits.size());
+
+      // make sure that each split has different locations
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(2, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // create another file on 3 datanodes and 3 racks.
+      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+      dfs.waitActive();
+      Path file3 = new Path(dir3 + "/file3.gz");
+      FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test2): " + split);
+      }
+      assertEquals(3, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // create file4 on all three racks
+      Path file4 = new Path(dir4 + "/file4.gz");
+      FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job,
+          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test3): " + split);
+      }
+      assertEquals(3, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(f1.getLen());
+      inFormat.setMaxSplitSize(f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test4): " + split);
+      }
+      assertEquals(4, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f4.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(3);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is twice file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(f1.getLen());
+      inFormat.setMaxSplitSize(2 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test5): " + split);
+      }
+      assertEquals(3, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is 4 times file1's length 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(2 * f1.getLen());
+      inFormat.setMaxSplitSize(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job,
+          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test6): " + split);
+      }
+      assertEquals(2, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(f2.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size and min-split-size per rack is 4 times file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(4 * f1.getLen());
+      inFormat.setMinSplitSizeRack(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test7): " + split);
+      }
+      assertEquals(1, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(4, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+      // minimum split size per node is 4 times file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test8): " + split);
+      }
+      assertEquals(1, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(4, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+      // Rack 1 has file1, file2 and file3 and file4
+      // Rack 2 has file2 and file3 and file4
+      // Rack 3 has file3 and file4
+      // setup a filter so that only file1 and file2 can be combined
+      inFormat = new DummyInputFormat();
+      FileInputFormat.addInputPath(job, inDir);
+      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+      inFormat.createPool(new TestFilter(dir1), 
+                          new TestFilter(dir2));
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test9): " + split);
+      }
+      assertEquals(3, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+      // measure performance when there are multiple pools and
+      // many files in each pool.
+      int numPools = 100;
+      int numFiles = 1000;
+      DummyInputFormat1 inFormat1 = new DummyInputFormat1();
+      for (int i = 0; i < numFiles; i++) {
+        FileInputFormat.setInputPaths(job, file1);
+      }
+      inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
+      final Path dirNoMatch1 = new Path(inDir, "/dirxx");
+      final Path dirNoMatch2 = new Path(inDir, "/diryy");
+      for (int i = 0; i < numPools; i++) {
+        inFormat1.createPool(new TestFilter(dirNoMatch1), 
+                            new TestFilter(dirNoMatch2));
+      }
+      long start = System.currentTimeMillis();
+      splits = inFormat1.getSplits(job);
+      long end = System.currentTimeMillis();
+      System.out.println("Elapsed time for " + numPools + " pools " +
+                         " and " + numFiles + " files is " + 
+                         ((end - start)) + " milli seconds.");
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
 
   static class TestFilter implements PathFilter {
     private Path p;



Mime
View raw message