hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1454129 - in /hadoop/common/branches/branch-1: CHANGES.txt src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Date Thu, 07 Mar 2013 22:18:35 GMT
Author: tucu
Date: Thu Mar  7 22:18:35 2013
New Revision: 1454129

URL: http://svn.apache.org/r1454129
Log:
MAPREDUCE-5038. old API CombineFileInputFormat missing fixes that are in new API. (sandyr
via tucu)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1454129&r1=1454128&r2=1454129&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Thu Mar  7 22:18:35 2013
@@ -528,6 +528,9 @@ Release 1.2.0 - unreleased
     HADOOP-9375. Port HADOOP-7290 to branch-1 to fix TestUserGroupInformation
     failure. (Xiaobo Peng via suresh)
 
+    MAPREDUCE-5038. old API CombineFileInputFormat missing fixes that are in 
+    new API. (sandyr via tucu)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1454129&r1=1454128&r2=1454129&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
(original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
Thu Mar  7 22:18:35 2013
@@ -20,7 +20,9 @@ package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Set;
@@ -33,6 +35,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
 
@@ -74,7 +78,7 @@ public abstract class CombineFileInputFo
   private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
 
   // mapping from a rack name to the set of Nodes in the rack 
-  private static HashMap<String, Set<String>> rackToNodes = 
+  private HashMap<String, Set<String>> rackToNodes = 
                             new HashMap<String, Set<String>>();
   /**
    * Specify the maximum size (in bytes) of each split. Each split is
@@ -127,6 +131,13 @@ public abstract class CombineFileInputFo
     pools.add(multi);
   }
 
+  @Override
+  protected boolean isSplitable(FileSystem fs, Path file) {
+    final CompressionCodec codec =
+      new CompressionCodecFactory(fs.getConf()).getCodec(file);
+    return codec == null;
+  }
+  
   /**
    * default constructor
    */
@@ -180,24 +191,31 @@ public abstract class CombineFileInputFo
     if (paths.length == 0) {
       return splits.toArray(new CombineFileSplit[splits.size()]);    
     }
+    
+    // Convert them to Paths first. This is a costly operation and 
+    // we should do it first, otherwise we will incur doing it multiple
+    // times, one time each for each pool in the next loop.
+    List<Path> newpaths = new LinkedList<Path>();
+    for (int i = 0; i < paths.length; i++) {
+      FileSystem fs = paths[i].getFileSystem(job);
+      Path p = fs.makeQualified(paths[i]);
+      newpaths.add(p);
+    }
+    paths = null;
 
     // In one single iteration, process all the paths in a single pool.
-    // Processing one pool at a time ensures that a split contans paths
+    // Processing one pool at a time ensures that a split contains paths
     // from a single pool only.
     for (MultiPathFilter onepool : pools) {
       ArrayList<Path> myPaths = new ArrayList<Path>();
       
       // pick one input path. If it matches all the filters in a pool,
       // add it to the output set
-      for (int i = 0; i < paths.length; i++) {
-        if (paths[i] == null) {  // already processed
-          continue;
-        }
-        FileSystem fs = paths[i].getFileSystem(job);
-        Path p = new Path(paths[i].toUri().getPath());
+      for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
+        Path p = iter.next();
         if (onepool.accept(p)) {
-          myPaths.add(paths[i]); // add it to my output set
-          paths[i] = null;       // already processed
+          myPaths.add(p); // add it to my output set
+          iter.remove();
         }
       }
       // create splits for all files in this pool.
@@ -205,16 +223,8 @@ public abstract class CombineFileInputFo
                     maxSize, minSizeNode, minSizeRack, splits);
     }
 
-    // Finally, process all paths that do not belong to any pool.
-    ArrayList<Path> myPaths = new ArrayList<Path>();
-    for (int i = 0; i < paths.length; i++) {
-      if (paths[i] == null) {  // already processed
-        continue;
-      }
-      myPaths.add(paths[i]);
-    }
     // create splits for all files that are not in any pool.
-    getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
+    getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), 
                   maxSize, minSizeNode, minSizeRack, splits);
 
     // free up rackToNodes map
@@ -253,13 +263,14 @@ public abstract class CombineFileInputFo
     // populate all the blocks for all files
     long totLength = 0;
     for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], job, 
-                                 rackToBlocks, blockToNodes, nodeToBlocks);
+      files[i] = new OneFileInfo(paths[i], job,
+                                  isSplitable(paths[i].getFileSystem(job), paths[i]),
+                                 rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
       totLength += files[i].getLength();
     }
 
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> nodes = new ArrayList<String>();
+    Set<String> nodes = new HashSet<String>();
     long curSplitSize = 0;
 
     // process all nodes and create splits that are local
@@ -312,7 +323,7 @@ public abstract class CombineFileInputFo
     // in 'overflow'. After the processing of all racks is complete, these overflow
     // blocks will be combined into splits.
     ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
-    ArrayList<String> racks = new ArrayList<String>();
+    Set<String> racks = new HashSet<String>();
 
     // Process all racks over and over again until there is no more work to do.
     while (blockToNodes.size() > 0) {
@@ -417,7 +428,7 @@ public abstract class CombineFileInputFo
    */
   private void addCreatedSplit(JobConf job,
                                List<CombineFileSplit> splitList, 
-                               List<String> locations, 
+                               Collection<String> locations, 
                                ArrayList<OneBlockInfo> validBlocks) {
     // create an input split
     Path[] fl = new Path[validBlocks.size()];
@@ -450,9 +461,11 @@ public abstract class CombineFileInputFo
     private OneBlockInfo[] blocks;       // all blocks in this file
 
     OneFileInfo(Path path, JobConf job,
+                boolean isSplitable,
                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
                 HashMap<OneBlockInfo, String[]> blockToNodes,
-                HashMap<String, List<OneBlockInfo>> nodeToBlocks)
+                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                HashMap<String, Set<String>> rackToNodes)
                 throws IOException {
       this.fileSize = 0;
 
@@ -465,17 +478,28 @@ public abstract class CombineFileInputFo
       if (locations == null) {
         blocks = new OneBlockInfo[0];
       } else {
-        blocks = new OneBlockInfo[locations.length];
-        for (int i = 0; i < locations.length; i++) {
-           
-          fileSize += locations[i].getLength();
-          OneBlockInfo oneblock =  new OneBlockInfo(path, 
-                                       locations[i].getOffset(), 
-                                       locations[i].getLength(),
-                                       locations[i].getHosts(),
-                                       locations[i].getTopologyPaths()); 
-          blocks[i] = oneblock;
+        if (!isSplitable) {
+          // if the file is not splitable, just create the one block with
+          // full file length
+          blocks = new OneBlockInfo[1];
+          fileSize = stat.getLen();
+          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+              .getHosts(), locations[0].getTopologyPaths());
+        } else {
+          blocks = new OneBlockInfo[locations.length];
+          for (int i = 0; i < locations.length; i++) {
+
+            fileSize += locations[i].getLength();
+            OneBlockInfo oneblock =  new OneBlockInfo(path, 
+                locations[i].getOffset(), 
+                locations[i].getLength(),
+                locations[i].getHosts(),
+                locations[i].getTopologyPaths()); 
+            blocks[i] = oneblock;
+          }
+        }
 
+        for (OneBlockInfo oneblock : blocks) {
           // add this block to the block --> node locations map
           blockToNodes.put(oneblock, oneblock.hosts);
 
@@ -489,8 +513,8 @@ public abstract class CombineFileInputFo
             }
             blklist.add(oneblock);
             // Add this host to rackToNodes map
-            addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
-         }
+            addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
+          }
 
           // add this block to the node --> block map
           for (int j = 0; j < oneblock.hosts.length; j++) {
@@ -553,7 +577,8 @@ public abstract class CombineFileInputFo
     }
   }
 
-  private static void addHostToRack(String rack, String host) {
+  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+      String rack, String host) {
     Set<String> hosts = rackToNodes.get(rack);
     if (hosts == null) {
       hosts = new HashSet<String>();
@@ -561,9 +586,10 @@ public abstract class CombineFileInputFo
     }
     hosts.add(host);
   }
+
   
-  private static List<String> getHosts(List<String> racks) {
-    List<String> hosts = new ArrayList<String>();
+  private Set<String> getHosts(Set<String> racks) {
+    Set<String> hosts = new HashSet<String>();
     for (String rack : racks) {
       hosts.addAll(rackToNodes.get(rack));
     }

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1454129&r1=1454128&r2=1454129&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
(original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Thu Mar  7 22:18:35 2013
@@ -18,12 +18,17 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.zip.GZIPOutputStream;
 
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -32,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
@@ -64,9 +70,11 @@ public class TestCombineFileInputFormat 
   final Path dir2 = new Path(inDir, "/dir2");
   final Path dir3 = new Path(inDir, "/dir3");
   final Path dir4 = new Path(inDir, "/dir4");
+  final Path dir5 = new Path(inDir, "/dir5");
 
   static final int BLOCKSIZE = 1024;
   static final byte[] databuf = new byte[BLOCKSIZE];
+  private static final String DUMMY_FS_URI = "dummyfs:///";
 
   private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
   
@@ -78,6 +86,24 @@ public class TestCombineFileInputFormat 
       return null;
     }
   }
+  
+  /** Dummy class to extend CombineFileInputFormat. It allows 
+   * non-existent files to be passed into the CombineFileInputFormat, allows
+   * for easy testing without having to create real files.
+   */
+  private class DummyInputFormat1 extends DummyInputFormat {
+    @Override
+    protected FileStatus[] listStatus(JobConf job) throws IOException {
+      Path[] files = getInputPaths(job);
+      FileStatus[] results = new FileStatus[files.length];
+      for (int i = 0; i < files.length; i++) {
+        Path p = files[i];
+        FileSystem fs = p.getFileSystem(job);
+        results[i] = fs.getFileStatus(p);
+      }
+      return results;
+    }
+  }
 
   public void testSplitPlacement() throws IOException {
     String namenode = null;
@@ -86,16 +112,16 @@ public class TestCombineFileInputFormat 
     FileSystem fileSys = null;
     String testName = "TestSplitPlacement";
     try {
-      /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
-       * 1) file1, just after starting the datanode on r1, with 
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
+       * 1) file1 and file5, just after starting the datanode on r1, with 
        *    a repl factor of 1, and,
        * 2) file2, just after starting the datanode on r2, with 
        *    a repl factor of 2, and,
-       * 3) file3 after starting the all three datanodes, with a repl 
+       * 3) file3, file4 after starting the all three datanodes, with a repl 
        *    factor of 3.
-       * At the end, file1 will be present on only datanode1, file2 will be
-       * present on datanode 1 and datanode2 and 
-       * file3 will be present on all datanodes. 
+       * At the end, file1, file5 will be present on only datanode1, file2 will 
+       * be present on datanode 1 and datanode2 and 
+       * file3, file4 will be present on all datanodes. 
        */
       JobConf conf = new JobConf();
       conf.setBoolean("dfs.replication.considerLoad", false);
@@ -111,6 +137,30 @@ public class TestCombineFileInputFormat 
       }
       Path file1 = new Path(dir1 + "/file1");
       writeFile(conf, file1, (short)1, 1);
+      // create another file on the same datanode
+      Path file5 = new Path(dir5 + "/file5");
+      writeFile(conf, file5, (short)1, 1);
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      JobConf job = new JobConf(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+      InputSplit[] splits = inFormat.getSplits(job, 1);
+      System.out.println("Made splits(Test0): " + splits.length);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test0): " + split);
+      }
+      assertEquals(splits.length, 1);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
       dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
       dfs.waitActive();
 
@@ -119,14 +169,14 @@ public class TestCombineFileInputFormat 
       writeFile(conf, file2, (short)2, 2);
 
       // split it using a CombinedFile input format
-      DummyInputFormat inFormat = new DummyInputFormat();
+      inFormat = new DummyInputFormat();
       inFormat.setInputPaths(conf, dir1 + "," + dir2);
       inFormat.setMinSplitSizeRack(BLOCKSIZE);
-      InputSplit[] splits = inFormat.getSplits(conf, 1);
+      splits = inFormat.getSplits(conf, 1);
       System.out.println("Made splits(Test1): " + splits.length);
 
       // make sure that each split has different locations
-      CombineFileSplit fileSplit = null;
+      fileSplit = null;
       for (int i = 0; i < splits.length; ++i) {
         fileSplit = (CombineFileSplit) splits[i];
         System.out.println("File split(Test1): " + fileSplit);
@@ -436,7 +486,7 @@ public class TestCombineFileInputFormat 
       }
     }
   }
-
+  
   static void writeFile(Configuration conf, Path name,
       short replication, int numBlocks) throws IOException {
     FileSystem fileSys = FileSystem.get(conf);
@@ -444,12 +494,409 @@ public class TestCombineFileInputFormat 
     FSDataOutputStream stm = fileSys.create(name, true,
                                             conf.getInt("io.file.buffer.size", 4096),
                                             replication, (long)BLOCKSIZE);
+    writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
+  }
+
+  // Creates the gzip file and return the FileStatus
+  static FileStatus writeGzipFile(Configuration conf, Path name,
+      short replication, int numBlocks) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+
+    GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
+        .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
+    writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
+    return fileSys.getFileStatus(name);
+  }
+
+  private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
+      OutputStream out, short replication, int numBlocks) throws IOException {
     for (int i = 0; i < numBlocks; i++) {
-      stm.write(databuf);
+      out.write(databuf);
     }
-    stm.close();
+    out.close();
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
+  
+  public void testSplitPlacementForCompressedFiles() throws IOException {
+    MiniDFSCluster dfs = null;
+    FileSystem fileSys = null;
+    try {
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
+       *  files
+       * 1) file1 and file5, just after starting the datanode on r1, with 
+       *    a repl factor of 1, and,
+       * 2) file2, just after starting the datanode on r2, with 
+       *    a repl factor of 2, and,
+       * 3) file3, file4 after starting the all three datanodes, with a repl 
+       *    factor of 3.
+       * At the end, file1, file5 will be present on only datanode1, file2 will 
+       * be present on datanode 1 and datanode2 and 
+       * file3, file4 will be present on all datanodes. 
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      Path file1 = new Path(dir1 + "/file1.gz");
+      FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
+      // create another file on the same datanode
+      Path file5 = new Path(dir5 + "/file5.gz");
+      FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      JobConf job = new JobConf(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+      InputSplit[] splits = inFormat.getSplits(job, 1);
+      System.out.println("Made splits(Test0): " + splits.length);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test0): " + split);
+      }
+      assertEquals(splits.length, 1);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f5.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+      
+      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+      dfs.waitActive();
+
+      // create file on two datanodes.
+      Path file2 = new Path(dir2 + "/file2.gz");
+      FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
+
+      // split it using a CombinedFile input format
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job, 1);
+      System.out.println("Made splits(Test1): " + splits.length);
+
+      // make sure that each split has different locations
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(2, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // create another file on 3 datanodes and 3 racks.
+      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+      dfs.waitActive();
+      Path file3 = new Path(dir3 + "/file3.gz");
+      FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test2): " + split);
+      }
+      assertEquals(3, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // create file4 on all three racks
+      Path file4 = new Path(dir4 + "/file4.gz");
+      FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job,
+          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test3): " + split);
+      }
+      assertEquals(3, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(f1.getLen());
+      inFormat.setMaxSplitSize(f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test4): " + split);
+      }
+      assertEquals(4, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f4.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[3];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is twice file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(f1.getLen());
+      inFormat.setMaxSplitSize(2 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test5): " + split);
+      }
+      assertEquals(3, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is 4 times file1's length 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(2 * f1.getLen());
+      inFormat.setMaxSplitSize(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job,
+          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test6): " + split);
+      }
+      assertEquals(2, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(f2.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size and min-split-size per rack is 4 times file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(4 * f1.getLen());
+      inFormat.setMinSplitSizeRack(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test7): " + split);
+      }
+      assertEquals(1, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(4, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+      // minimum split size per node is 4 times file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test8): " + split);
+      }
+      assertEquals(1, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(4, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+      // Rack 1 has file1, file2 and file3 and file4
+      // Rack 2 has file2 and file3 and file4
+      // Rack 3 has file3 and file4
+      // setup a filter so that only file1 and file2 can be combined
+      inFormat = new DummyInputFormat();
+      FileInputFormat.addInputPath(job, inDir);
+      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+      inFormat.createPool(job, new TestFilter(dir1), 
+                          new TestFilter(dir2));
+      splits = inFormat.getSplits(job, 1);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test9): " + split);
+      }
+      assertEquals(3, splits.length);
+      fileSplit = (CombineFileSplit) splits[0];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits[1];
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+      fileSplit = (CombineFileSplit) splits[2];
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+      // measure performance when there are multiple pools and
+      // many files in each pool.
+      int numPools = 100;
+      int numFiles = 1000;
+      DummyInputFormat1 inFormat1 = new DummyInputFormat1();
+      for (int i = 0; i < numFiles; i++) {
+        FileInputFormat.setInputPaths(job, file1);
+      }
+      inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
+      final Path dirNoMatch1 = new Path(inDir, "/dirxx");
+      final Path dirNoMatch2 = new Path(inDir, "/diryy");
+      for (int i = 0; i < numPools; i++) {
+        inFormat1.createPool(job, new TestFilter(dirNoMatch1), 
+                            new TestFilter(dirNoMatch2));
+      }
+      long start = System.currentTimeMillis();
+      splits = inFormat1.getSplits(job, 1);
+      long end = System.currentTimeMillis();
+      System.out.println("Elapsed time for " + numPools + " pools " +
+                         " and " + numFiles + " files is " + 
+                         ((end - start)) + " milli seconds.");
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+  
+  /**
+   * Test when input files are from non-default file systems
+   */
+  public void testForNonDefaultFileSystem() throws Throwable {
+    Configuration conf = new Configuration();
+
+    // use a fake file system scheme as default
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
+
+    // default fs path
+    assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
+    // add a local file
+    Path localPath = new Path("testFile1");
+    FileSystem lfs = FileSystem.getLocal(conf);
+    FSDataOutputStream dos = lfs.create(localPath);
+    dos.writeChars("Local file for CFIF");
+    dos.close();
+
+    conf.set("mapred.working.dir", "/");
+    JobConf job = new JobConf(conf);
+
+    FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
+    DummyInputFormat inFormat = new DummyInputFormat();
+    InputSplit[] splits = inFormat.getSplits(job, 1);
+    assertTrue(splits.length > 0);
+    for (InputSplit s : splits) {
+      CombineFileSplit cfs = (CombineFileSplit)s;
+      for (Path p : cfs.getPaths()) {
+        assertEquals(p.toUri().getScheme(), "file");
+      }
+    }
+  }
 
   static class TestFilter implements PathFilter {
     private Path p;
@@ -462,7 +909,7 @@ public class TestCombineFileInputFormat 
     // returns true if the specified path matches the prefix stored
     // in this TestFilter.
     public boolean accept(Path path) {
-      if (path.toString().indexOf(p.toString()) == 0) {
+      if (path.toUri().getPath().indexOf(p.toString()) == 0) {
         return true;
       }
       return false;



Mime
View raw message