hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1458017 - in /hadoop/common/branches/branch-1.2: CHANGES.txt src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Date Mon, 18 Mar 2013 21:31:31 GMT
Author: tucu
Date: Mon Mar 18 21:31:30 2013
New Revision: 1458017

URL: http://svn.apache.org/r1458017
Log:
Reverting MAPREDUCE-5038 (commit 1454129)

Modified:
    hadoop/common/branches/branch-1.2/CHANGES.txt
    hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
    hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java

Modified: hadoop/common/branches/branch-1.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/CHANGES.txt?rev=1458017&r1=1458016&r2=1458017&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.2/CHANGES.txt Mon Mar 18 21:31:30 2013
@@ -509,9 +509,6 @@ Release 1.2.0 - unreleased
     HADOOP-9375. Port HADOOP-7290 to branch-1 to fix TestUserGroupInformation
     failure. (Xiaobo Peng via suresh)
 
-    MAPREDUCE-5038. old API CombineFileInputFormat missing fixes that are in 
-    new API. (sandyr via tucu)
-
     MAPREDUCE-5049. CombineFileInputFormat counts all compressed files 
     non-splitable. (sandyr via tucu)
 

Modified: hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1458017&r1=1458016&r2=1458017&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
(original)
+++ hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
Mon Mar 18 21:31:30 2013
@@ -20,9 +20,7 @@ package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Set;
@@ -35,8 +33,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
@@ -79,7 +75,7 @@ public abstract class CombineFileInputFo
   private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
 
   // mapping from a rack name to the set of Nodes in the rack 
-  private HashMap<String, Set<String>> rackToNodes = 
+  private static HashMap<String, Set<String>> rackToNodes = 
                             new HashMap<String, Set<String>>();
   /**
    * Specify the maximum size (in bytes) of each split. Each split is
@@ -132,16 +128,6 @@ public abstract class CombineFileInputFo
     pools.add(multi);
   }
 
-  @Override
-  protected boolean isSplitable(FileSystem fs, Path file) {
-    final CompressionCodec codec =
-      new CompressionCodecFactory(fs.getConf()).getCodec(file);
-    if (null == codec) {
-      return true;
-    }
-    return codec instanceof SplittableCompressionCodec;
-  }
-  
   /**
    * default constructor
    */
@@ -195,31 +181,24 @@ public abstract class CombineFileInputFo
     if (paths.length == 0) {
       return splits.toArray(new CombineFileSplit[splits.size()]);    
     }
-    
-    // Convert them to Paths first. This is a costly operation and 
-    // we should do it first, otherwise we will incur doing it multiple
-    // times, one time each for each pool in the next loop.
-    List<Path> newpaths = new LinkedList<Path>();
-    for (int i = 0; i < paths.length; i++) {
-      FileSystem fs = paths[i].getFileSystem(job);
-      Path p = fs.makeQualified(paths[i]);
-      newpaths.add(p);
-    }
-    paths = null;
 
     // In one single iteration, process all the paths in a single pool.
-    // Processing one pool at a time ensures that a split contains paths
+    // Processing one pool at a time ensures that a split contans paths
     // from a single pool only.
     for (MultiPathFilter onepool : pools) {
       ArrayList<Path> myPaths = new ArrayList<Path>();
       
       // pick one input path. If it matches all the filters in a pool,
       // add it to the output set
-      for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
-        Path p = iter.next();
+      for (int i = 0; i < paths.length; i++) {
+        if (paths[i] == null) {  // already processed
+          continue;
+        }
+        FileSystem fs = paths[i].getFileSystem(job);
+        Path p = new Path(paths[i].toUri().getPath());
         if (onepool.accept(p)) {
-          myPaths.add(p); // add it to my output set
-          iter.remove();
+          myPaths.add(paths[i]); // add it to my output set
+          paths[i] = null;       // already processed
         }
       }
       // create splits for all files in this pool.
@@ -227,8 +206,16 @@ public abstract class CombineFileInputFo
                     maxSize, minSizeNode, minSizeRack, splits);
     }
 
+    // Finally, process all paths that do not belong to any pool.
+    ArrayList<Path> myPaths = new ArrayList<Path>();
+    for (int i = 0; i < paths.length; i++) {
+      if (paths[i] == null) {  // already processed
+        continue;
+      }
+      myPaths.add(paths[i]);
+    }
     // create splits for all files that are not in any pool.
-    getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), 
+    getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
                   maxSize, minSizeNode, minSizeRack, splits);
 
     // free up rackToNodes map
@@ -267,14 +254,13 @@ public abstract class CombineFileInputFo
     // populate all the blocks for all files
     long totLength = 0;
     for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], job,
-                                  isSplitable(paths[i].getFileSystem(job), paths[i]),
-                                 rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
+      files[i] = new OneFileInfo(paths[i], job, 
+                                 rackToBlocks, blockToNodes, nodeToBlocks);
       totLength += files[i].getLength();
     }
 
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
-    Set<String> nodes = new HashSet<String>();
+    ArrayList<String> nodes = new ArrayList<String>();
     long curSplitSize = 0;
 
     // process all nodes and create splits that are local
@@ -327,7 +313,7 @@ public abstract class CombineFileInputFo
     // in 'overflow'. After the processing of all racks is complete, these overflow
     // blocks will be combined into splits.
     ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
-    Set<String> racks = new HashSet<String>();
+    ArrayList<String> racks = new ArrayList<String>();
 
     // Process all racks over and over again until there is no more work to do.
     while (blockToNodes.size() > 0) {
@@ -432,7 +418,7 @@ public abstract class CombineFileInputFo
    */
   private void addCreatedSplit(JobConf job,
                                List<CombineFileSplit> splitList, 
-                               Collection<String> locations, 
+                               List<String> locations, 
                                ArrayList<OneBlockInfo> validBlocks) {
     // create an input split
     Path[] fl = new Path[validBlocks.size()];
@@ -465,11 +451,9 @@ public abstract class CombineFileInputFo
     private OneBlockInfo[] blocks;       // all blocks in this file
 
     OneFileInfo(Path path, JobConf job,
-                boolean isSplitable,
                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
                 HashMap<OneBlockInfo, String[]> blockToNodes,
-                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
-                HashMap<String, Set<String>> rackToNodes)
+                HashMap<String, List<OneBlockInfo>> nodeToBlocks)
                 throws IOException {
       this.fileSize = 0;
 
@@ -482,28 +466,17 @@ public abstract class CombineFileInputFo
       if (locations == null) {
         blocks = new OneBlockInfo[0];
       } else {
-        if (!isSplitable) {
-          // if the file is not splitable, just create the one block with
-          // full file length
-          blocks = new OneBlockInfo[1];
-          fileSize = stat.getLen();
-          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
-              .getHosts(), locations[0].getTopologyPaths());
-        } else {
-          blocks = new OneBlockInfo[locations.length];
-          for (int i = 0; i < locations.length; i++) {
-
-            fileSize += locations[i].getLength();
-            OneBlockInfo oneblock =  new OneBlockInfo(path, 
-                locations[i].getOffset(), 
-                locations[i].getLength(),
-                locations[i].getHosts(),
-                locations[i].getTopologyPaths()); 
-            blocks[i] = oneblock;
-          }
-        }
+        blocks = new OneBlockInfo[locations.length];
+        for (int i = 0; i < locations.length; i++) {
+           
+          fileSize += locations[i].getLength();
+          OneBlockInfo oneblock =  new OneBlockInfo(path, 
+                                       locations[i].getOffset(), 
+                                       locations[i].getLength(),
+                                       locations[i].getHosts(),
+                                       locations[i].getTopologyPaths()); 
+          blocks[i] = oneblock;
 
-        for (OneBlockInfo oneblock : blocks) {
           // add this block to the block --> node locations map
           blockToNodes.put(oneblock, oneblock.hosts);
 
@@ -517,8 +490,8 @@ public abstract class CombineFileInputFo
             }
             blklist.add(oneblock);
             // Add this host to rackToNodes map
-            addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
-          }
+            addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
+         }
 
           // add this block to the node --> block map
           for (int j = 0; j < oneblock.hosts.length; j++) {
@@ -581,8 +554,7 @@ public abstract class CombineFileInputFo
     }
   }
 
-  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
-      String rack, String host) {
+  private static void addHostToRack(String rack, String host) {
     Set<String> hosts = rackToNodes.get(rack);
     if (hosts == null) {
       hosts = new HashSet<String>();
@@ -590,10 +562,9 @@ public abstract class CombineFileInputFo
     }
     hosts.add(host);
   }
-
   
-  private Set<String> getHosts(Set<String> racks) {
-    Set<String> hosts = new HashSet<String>();
+  private static List<String> getHosts(List<String> racks) {
+    List<String> hosts = new ArrayList<String>();
     for (String rack : racks) {
       hosts.addAll(rackToNodes.get(rack));
     }

Modified: hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1458017&r1=1458016&r2=1458017&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
(original)
+++ hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Mon Mar 18 21:31:30 2013
@@ -18,17 +18,12 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.zip.GZIPOutputStream;
 
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -37,7 +32,6 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
@@ -70,11 +64,9 @@ public class TestCombineFileInputFormat 
   final Path dir2 = new Path(inDir, "/dir2");
   final Path dir3 = new Path(inDir, "/dir3");
   final Path dir4 = new Path(inDir, "/dir4");
-  final Path dir5 = new Path(inDir, "/dir5");
 
   static final int BLOCKSIZE = 1024;
   static final byte[] databuf = new byte[BLOCKSIZE];
-  private static final String DUMMY_FS_URI = "dummyfs:///";
 
   private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
   
@@ -86,24 +78,6 @@ public class TestCombineFileInputFormat 
       return null;
     }
   }
-  
-  /** Dummy class to extend CombineFileInputFormat. It allows 
-   * non-existent files to be passed into the CombineFileInputFormat, allows
-   * for easy testing without having to create real files.
-   */
-  private class DummyInputFormat1 extends DummyInputFormat {
-    @Override
-    protected FileStatus[] listStatus(JobConf job) throws IOException {
-      Path[] files = getInputPaths(job);
-      FileStatus[] results = new FileStatus[files.length];
-      for (int i = 0; i < files.length; i++) {
-        Path p = files[i];
-        FileSystem fs = p.getFileSystem(job);
-        results[i] = fs.getFileStatus(p);
-      }
-      return results;
-    }
-  }
 
   public void testSplitPlacement() throws IOException {
     String namenode = null;
@@ -112,16 +86,16 @@ public class TestCombineFileInputFormat 
     FileSystem fileSys = null;
     String testName = "TestSplitPlacement";
     try {
-      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
-       * 1) file1 and file5, just after starting the datanode on r1, with 
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
+       * 1) file1, just after starting the datanode on r1, with 
        *    a repl factor of 1, and,
        * 2) file2, just after starting the datanode on r2, with 
        *    a repl factor of 2, and,
-       * 3) file3, file4 after starting the all three datanodes, with a repl 
+       * 3) file3 after starting the all three datanodes, with a repl 
        *    factor of 3.
-       * At the end, file1, file5 will be present on only datanode1, file2 will 
-       * be present on datanode 1 and datanode2 and 
-       * file3, file4 will be present on all datanodes. 
+       * At the end, file1 will be present on only datanode1, file2 will be
+       * present on datanode 1 and datanode2 and 
+       * file3 will be present on all datanodes. 
        */
       JobConf conf = new JobConf();
       conf.setBoolean("dfs.replication.considerLoad", false);
@@ -137,30 +111,6 @@ public class TestCombineFileInputFormat 
       }
       Path file1 = new Path(dir1 + "/file1");
       writeFile(conf, file1, (short)1, 1);
-      // create another file on the same datanode
-      Path file5 = new Path(dir5 + "/file5");
-      writeFile(conf, file5, (short)1, 1);
-      // split it using a CombinedFile input format
-      DummyInputFormat inFormat = new DummyInputFormat();
-      JobConf job = new JobConf(conf);
-      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
-      InputSplit[] splits = inFormat.getSplits(job, 1);
-      System.out.println("Made splits(Test0): " + splits.length);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test0): " + split);
-      }
-      assertEquals(splits.length, 1);
-      CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-
       dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
       dfs.waitActive();
 
@@ -169,14 +119,14 @@ public class TestCombineFileInputFormat 
       writeFile(conf, file2, (short)2, 2);
 
       // split it using a CombinedFile input format
-      inFormat = new DummyInputFormat();
+      DummyInputFormat inFormat = new DummyInputFormat();
       inFormat.setInputPaths(conf, dir1 + "," + dir2);
       inFormat.setMinSplitSizeRack(BLOCKSIZE);
-      splits = inFormat.getSplits(conf, 1);
+      InputSplit[] splits = inFormat.getSplits(conf, 1);
       System.out.println("Made splits(Test1): " + splits.length);
 
       // make sure that each split has different locations
-      fileSplit = null;
+      CombineFileSplit fileSplit = null;
       for (int i = 0; i < splits.length; ++i) {
         fileSplit = (CombineFileSplit) splits[i];
         System.out.println("File split(Test1): " + fileSplit);
@@ -486,7 +436,7 @@ public class TestCombineFileInputFormat 
       }
     }
   }
-  
+
   static void writeFile(Configuration conf, Path name,
       short replication, int numBlocks) throws IOException {
     FileSystem fileSys = FileSystem.get(conf);
@@ -494,409 +444,12 @@ public class TestCombineFileInputFormat 
     FSDataOutputStream stm = fileSys.create(name, true,
                                             conf.getInt("io.file.buffer.size", 4096),
                                             replication, (long)BLOCKSIZE);
-    writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
-  }
-
-  // Creates the gzip file and return the FileStatus
-  static FileStatus writeGzipFile(Configuration conf, Path name,
-      short replication, int numBlocks) throws IOException {
-    FileSystem fileSys = FileSystem.get(conf);
-
-    GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
-        .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
-    writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
-    return fileSys.getFileStatus(name);
-  }
-
-  private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
-      OutputStream out, short replication, int numBlocks) throws IOException {
     for (int i = 0; i < numBlocks; i++) {
-      out.write(databuf);
+      stm.write(databuf);
     }
-    out.close();
+    stm.close();
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
-  
-  public void testSplitPlacementForCompressedFiles() throws IOException {
-    MiniDFSCluster dfs = null;
-    FileSystem fileSys = null;
-    try {
-      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
-       *  files
-       * 1) file1 and file5, just after starting the datanode on r1, with 
-       *    a repl factor of 1, and,
-       * 2) file2, just after starting the datanode on r2, with 
-       *    a repl factor of 2, and,
-       * 3) file3, file4 after starting the all three datanodes, with a repl 
-       *    factor of 3.
-       * At the end, file1, file5 will be present on only datanode1, file2 will 
-       * be present on datanode 1 and datanode2 and 
-       * file3, file4 will be present on all datanodes. 
-       */
-      Configuration conf = new Configuration();
-      conf.setBoolean("dfs.replication.considerLoad", false);
-      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
-      dfs.waitActive();
-
-      fileSys = dfs.getFileSystem();
-      if (!fileSys.mkdirs(inDir)) {
-        throw new IOException("Mkdirs failed to create " + inDir.toString());
-      }
-      Path file1 = new Path(dir1 + "/file1.gz");
-      FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
-      // create another file on the same datanode
-      Path file5 = new Path(dir5 + "/file5.gz");
-      FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
-      // split it using a CombinedFile input format
-      DummyInputFormat inFormat = new DummyInputFormat();
-      JobConf job = new JobConf(conf);
-      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
-      InputSplit[] splits = inFormat.getSplits(job, 1);
-      System.out.println("Made splits(Test0): " + splits.length);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test0): " + split);
-      }
-      assertEquals(splits.length, 1);
-      CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
-      assertEquals(f5.getLen(), fileSplit.getLength(1));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-      
-      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
-      dfs.waitActive();
-
-      // create file on two datanodes.
-      Path file2 = new Path(dir2 + "/file2.gz");
-      FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
-
-      // split it using a CombinedFile input format
-      inFormat = new DummyInputFormat();
-      FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
-      inFormat.setMinSplitSizeRack(f1.getLen());
-      splits = inFormat.getSplits(job, 1);
-      System.out.println("Made splits(Test1): " + splits.length);
-
-      // make sure that each split has different locations
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test1): " + split);
-      }
-      assertEquals(2, splits.length);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
-      // create another file on 3 datanodes and 3 racks.
-      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
-      dfs.waitActive();
-      Path file3 = new Path(dir3 + "/file3.gz");
-      FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
-      inFormat = new DummyInputFormat();
-      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
-      inFormat.setMinSplitSizeRack(f1.getLen());
-      splits = inFormat.getSplits(job, 1);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test2): " + split);
-      }
-      assertEquals(3, splits.length);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
-      // create file4 on all three racks
-      Path file4 = new Path(dir4 + "/file4.gz");
-      FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
-      inFormat = new DummyInputFormat();
-      FileInputFormat.setInputPaths(job,
-          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      inFormat.setMinSplitSizeRack(f1.getLen());
-      splits = inFormat.getSplits(job, 1);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test3): " + split);
-      }
-      assertEquals(3, splits.length);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
-      assertEquals(f4.getLen(), fileSplit.getLength(1));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
-      // maximum split size is file1's length
-      inFormat = new DummyInputFormat();
-      inFormat.setMinSplitSizeNode(f1.getLen());
-      inFormat.setMaxSplitSize(f1.getLen());
-      FileInputFormat.setInputPaths(job, 
-        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      splits = inFormat.getSplits(job, 1);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test4): " + split);
-      }
-      assertEquals(4, splits.length);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f4.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[3];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
-      // maximum split size is twice file1's length
-      inFormat = new DummyInputFormat();
-      inFormat.setMinSplitSizeNode(f1.getLen());
-      inFormat.setMaxSplitSize(2 * f1.getLen());
-      FileInputFormat.setInputPaths(job, 
-        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      splits = inFormat.getSplits(job, 1);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test5): " + split);
-      }
-      assertEquals(3, splits.length);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
-      assertEquals(f4.getLen(), fileSplit.getLength(1));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
-      // maximum split size is 4 times file1's length 
-      inFormat = new DummyInputFormat();
-      inFormat.setMinSplitSizeNode(2 * f1.getLen());
-      inFormat.setMaxSplitSize(4 * f1.getLen());
-      FileInputFormat.setInputPaths(job,
-          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      splits = inFormat.getSplits(job, 1);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test6): " + split);
-      }
-      assertEquals(2, splits.length);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
-      assertEquals(f4.getLen(), fileSplit.getLength(1));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(f2.getLen(), fileSplit.getLength(1));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
-      // maximum split size and min-split-size per rack is 4 times file1's length
-      inFormat = new DummyInputFormat();
-      inFormat.setMaxSplitSize(4 * f1.getLen());
-      inFormat.setMinSplitSizeRack(4 * f1.getLen());
-      FileInputFormat.setInputPaths(job, 
-        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      splits = inFormat.getSplits(job, 1);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test7): " + split);
-      }
-      assertEquals(1, splits.length);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(4, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-
-      // minimum split size per node is 4 times file1's length
-      inFormat = new DummyInputFormat();
-      inFormat.setMinSplitSizeNode(4 * f1.getLen());
-      FileInputFormat.setInputPaths(job, 
-        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      splits = inFormat.getSplits(job, 1);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test8): " + split);
-      }
-      assertEquals(1, splits.length);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(4, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-
-      // Rack 1 has file1, file2 and file3 and file4
-      // Rack 2 has file2 and file3 and file4
-      // Rack 3 has file3 and file4
-      // setup a filter so that only file1 and file2 can be combined
-      inFormat = new DummyInputFormat();
-      FileInputFormat.addInputPath(job, inDir);
-      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
-      inFormat.createPool(job, new TestFilter(dir1), 
-                          new TestFilter(dir2));
-      splits = inFormat.getSplits(job, 1);
-      for (InputSplit split : splits) {
-        System.out.println("File split(Test9): " + split);
-      }
-      assertEquals(3, splits.length);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-
-      // measure performance when there are multiple pools and
-      // many files in each pool.
-      int numPools = 100;
-      int numFiles = 1000;
-      DummyInputFormat1 inFormat1 = new DummyInputFormat1();
-      for (int i = 0; i < numFiles; i++) {
-        FileInputFormat.setInputPaths(job, file1);
-      }
-      inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
-      final Path dirNoMatch1 = new Path(inDir, "/dirxx");
-      final Path dirNoMatch2 = new Path(inDir, "/diryy");
-      for (int i = 0; i < numPools; i++) {
-        inFormat1.createPool(job, new TestFilter(dirNoMatch1), 
-                            new TestFilter(dirNoMatch2));
-      }
-      long start = System.currentTimeMillis();
-      splits = inFormat1.getSplits(job, 1);
-      long end = System.currentTimeMillis();
-      System.out.println("Elapsed time for " + numPools + " pools " +
-                         " and " + numFiles + " files is " + 
-                         ((end - start)) + " milli seconds.");
-    } finally {
-      if (dfs != null) {
-        dfs.shutdown();
-      }
-    }
-  }
-  
-  /**
-   * Test when input files are from non-default file systems
-   */
-  public void testForNonDefaultFileSystem() throws Throwable {
-    Configuration conf = new Configuration();
-
-    // use a fake file system scheme as default
-    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
-
-    // default fs path
-    assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
-    // add a local file
-    Path localPath = new Path("testFile1");
-    FileSystem lfs = FileSystem.getLocal(conf);
-    FSDataOutputStream dos = lfs.create(localPath);
-    dos.writeChars("Local file for CFIF");
-    dos.close();
-
-    conf.set("mapred.working.dir", "/");
-    JobConf job = new JobConf(conf);
-
-    FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
-    DummyInputFormat inFormat = new DummyInputFormat();
-    InputSplit[] splits = inFormat.getSplits(job, 1);
-    assertTrue(splits.length > 0);
-    for (InputSplit s : splits) {
-      CombineFileSplit cfs = (CombineFileSplit)s;
-      for (Path p : cfs.getPaths()) {
-        assertEquals(p.toUri().getScheme(), "file");
-      }
-    }
-  }
 
   static class TestFilter implements PathFilter {
     private Path p;
@@ -909,7 +462,7 @@ public class TestCombineFileInputFormat 
     // returns true if the specified path matches the prefix stored
     // in this TestFilter.
     public boolean accept(Path path) {
-      if (path.toUri().getPath().indexOf(p.toString()) == 0) {
+      if (path.toString().indexOf(p.toString()) == 0) {
         return true;
       }
       return false;



Mime
View raw message