hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r779939 [2/2] - in /hadoop/core/trunk: ./ src/examples/org/apache/hadoop/examples/ src/mapred/org/apache/hadoop/mapred/lib/ src/mapred/org/apache/hadoop/mapreduce/lib/input/ src/test/mapred/org/apache/hadoop/mapred/lib/ src/test/mapred/org/...
Date Fri, 29 May 2009 11:59:03 GMT
Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=779939&r1=779938&r2=779939&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Fri May 29 11:59:03 2009
@@ -1,513 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred.lib;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.MiniMRCluster;
-
-public class TestCombineFileInputFormat extends TestCase{
-
-  private static final String rack1[] = new String[] {
-    "/r1"
-  };
-  private static final String hosts1[] = new String[] {
-    "host1.rack1.com"
-  };
-  private static final String rack2[] = new String[] {
-    "/r2"
-  };
-  private static final String hosts2[] = new String[] {
-    "host2.rack2.com"
-  };
-  private static final String rack3[] = new String[] {
-    "/r3"
-  };
-  private static final String hosts3[] = new String[] {
-    "host3.rack3.com"
-  };
-  final Path inDir = new Path("/racktesting");
-  final Path outputPath = new Path("/output");
-  final Path dir1 = new Path(inDir, "/dir1");
-  final Path dir2 = new Path(inDir, "/dir2");
-  final Path dir3 = new Path(inDir, "/dir3");
-  final Path dir4 = new Path(inDir, "/dir4");
-
-  static final int BLOCKSIZE = 1024;
-  static final byte[] databuf = new byte[BLOCKSIZE];
-
-  private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
-  
-  /** Dummy class to extend CombineFileInputFormat*/
-  private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
-    @Override
-    public RecordReader<Text,Text> getRecordReader(InputSplit split, JobConf job
-        , Reporter reporter) throws IOException {
-      return null;
-    }
-  }
-
-  public void testSplitPlacement() throws IOException {
-    String namenode = null;
-    MiniDFSCluster dfs = null;
-    MiniMRCluster mr = null;
-    FileSystem fileSys = null;
-    String testName = "TestSplitPlacement";
-    try {
-      /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
-       * 1) file1, just after starting the datanode on r1, with 
-       *    a repl factor of 1, and,
-       * 2) file2, just after starting the datanode on r2, with 
-       *    a repl factor of 2, and,
-       * 3) file3 after starting the all three datanodes, with a repl 
-       *    factor of 3.
-       * At the end, file1 will be present on only datanode1, file2 will be
-       * present on datanode 1 and datanode2 and 
-       * file3 will be present on all datanodes. 
-       */
-      JobConf conf = new JobConf();
-      conf.setBoolean("dfs.replication.considerLoad", false);
-      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
-      dfs.waitActive();
-
-      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
-                 (dfs.getFileSystem()).getUri().getPort();
-
-      fileSys = dfs.getFileSystem();
-      if (!fileSys.mkdirs(inDir)) {
-        throw new IOException("Mkdirs failed to create " + inDir.toString());
-      }
-      Path file1 = new Path(dir1 + "/file1");
-      writeFile(conf, file1, (short)1, 1);
-      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
-      dfs.waitActive();
-
-      // create file on two datanodes.
-      Path file2 = new Path(dir2 + "/file2");
-      writeFile(conf, file2, (short)2, 2);
-
-      // split it using a CombinedFile input format
-      DummyInputFormat inFormat = new DummyInputFormat();
-      inFormat.setInputPaths(conf, dir1 + "," + dir2);
-      inFormat.setMinSplitSizeRack(BLOCKSIZE);
-      InputSplit[] splits = inFormat.getSplits(conf, 1);
-      System.out.println("Made splits(Test1): " + splits.length);
-
-      // make sure that each split has different locations
-      CombineFileSplit fileSplit = null;
-      for (int i = 0; i < splits.length; ++i) {
-        fileSplit = (CombineFileSplit) splits[i];
-        System.out.println("File split(Test1): " + fileSplit);
-      }
-      assertEquals(splits.length, 2);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(fileSplit.getNumPaths(), 2);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(fileSplit.getNumPaths(), 1);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
-
-      // create another file on 3 datanodes and 3 racks.
-      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
-      dfs.waitActive();
-      Path file3 = new Path(dir3 + "/file3");
-      writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3);
-      inFormat = new DummyInputFormat();
-      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3);
-      inFormat.setMinSplitSizeRack(BLOCKSIZE);
-      splits = inFormat.getSplits(conf, 1);
-      for (int i = 0; i < splits.length; ++i) {
-        fileSplit = (CombineFileSplit) splits[i];
-        System.out.println("File split(Test2): " + fileSplit);
-      }
-      assertEquals(splits.length, 3);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(fileSplit.getNumPaths(), 3);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
-      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(fileSplit.getNumPaths(), 2);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(fileSplit.getNumPaths(), 1);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
-
-      // create file4 on all three racks
-      Path file4 = new Path(dir4 + "/file4");
-      writeFile(conf, file4, (short)3, 3);
-      inFormat = new DummyInputFormat();
-      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      inFormat.setMinSplitSizeRack(BLOCKSIZE);
-      splits = inFormat.getSplits(conf, 1);
-      for (int i = 0; i < splits.length; ++i) {
-        fileSplit = (CombineFileSplit) splits[i];
-        System.out.println("File split(Test3): " + fileSplit);
-      }
-      assertEquals(splits.length, 3);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(fileSplit.getNumPaths(), 6);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
-      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(fileSplit.getNumPaths(), 2);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(fileSplit.getNumPaths(), 1);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
-
-      // maximum split size is 2 blocks 
-      inFormat = new DummyInputFormat();
-      inFormat.setMinSplitSizeNode(BLOCKSIZE);
-      inFormat.setMaxSplitSize(2*BLOCKSIZE);
-      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      splits = inFormat.getSplits(conf, 1);
-      for (int i = 0; i < splits.length; ++i) {
-        fileSplit = (CombineFileSplit) splits[i];
-        System.out.println("File split(Test4): " + fileSplit);
-      }
-      assertEquals(splits.length, 5);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(fileSplit.getNumPaths(), 2);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(0), 2 * BLOCKSIZE);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
-      assertEquals(fileSplit.getOffset(1), 0);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(fileSplit.getNumPaths(), 2);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file4.getName());
-      assertEquals(fileSplit.getOffset(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
-      assertEquals(fileSplit.getOffset(1), 2 * BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
-
-      // maximum split size is 3 blocks 
-      inFormat = new DummyInputFormat();
-      inFormat.setMinSplitSizeNode(BLOCKSIZE);
-      inFormat.setMaxSplitSize(3*BLOCKSIZE);
-      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      splits = inFormat.getSplits(conf, 1);
-      for (int i = 0; i < splits.length; ++i) {
-        fileSplit = (CombineFileSplit) splits[i];
-        System.out.println("File split(Test5): " + fileSplit);
-      }
-      assertEquals(splits.length, 4);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(fileSplit.getNumPaths(), 3);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
-      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(fileSplit.getPath(0).getName(), file4.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(2).getName(), file4.getName());
-      assertEquals(fileSplit.getOffset(2),  2 * BLOCKSIZE);
-      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(fileSplit.getNumPaths(), 2);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
-      fileSplit = (CombineFileSplit) splits[3];
-      assertEquals(fileSplit.getNumPaths(), 1);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
-
-      // maximum split size is 4 blocks 
-      inFormat = new DummyInputFormat();
-      inFormat.setMaxSplitSize(4*BLOCKSIZE);
-      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      splits = inFormat.getSplits(conf, 1);
-      for (int i = 0; i < splits.length; ++i) {
-        fileSplit = (CombineFileSplit) splits[i];
-        System.out.println("File split(Test6): " + fileSplit);
-      }
-      assertEquals(splits.length, 3);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(fileSplit.getNumPaths(), 4);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
-      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
-      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(fileSplit.getNumPaths(), 4);
-      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
-      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(2).getName(), file4.getName());
-      assertEquals(fileSplit.getOffset(2), BLOCKSIZE);
-      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
-      assertEquals(fileSplit.getPath(3).getName(), file4.getName());
-      assertEquals(fileSplit.getOffset(3),  2 * BLOCKSIZE);
-      assertEquals(fileSplit.getLength(3), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(fileSplit.getNumPaths(), 1);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
-      assertEquals(fileSplit.getOffset(0), 0);
-      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
-      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
-
-      // maximum split size is 7 blocks and min is 3 blocks
-      inFormat = new DummyInputFormat();
-      inFormat.setMaxSplitSize(7*BLOCKSIZE);
-      inFormat.setMinSplitSizeNode(3*BLOCKSIZE);
-      inFormat.setMinSplitSizeRack(3*BLOCKSIZE);
-      inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
-      splits = inFormat.getSplits(conf, 1);
-      for (int i = 0; i < splits.length; ++i) {
-        fileSplit = (CombineFileSplit) splits[i];
-        System.out.println("File split(Test7): " + fileSplit);
-      }
-      assertEquals(splits.length, 2);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(fileSplit.getNumPaths(), 6);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(fileSplit.getNumPaths(), 3);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
-
-      // Rack 1 has file1, file2 and file3 and file4
-      // Rack 2 has file2 and file3 and file4
-      // Rack 3 has file3 and file4
-      file1 = new Path(conf.getWorkingDirectory(), file1);
-      file2 = new Path(conf.getWorkingDirectory(), file2);
-      file3 = new Path(conf.getWorkingDirectory(), file3);
-      file4 = new Path(conf.getWorkingDirectory(), file4);
-
-      // setup a filter so that only file1 and file2 can be combined
-      inFormat = new DummyInputFormat();
-      inFormat.addInputPath(conf, inDir);
-      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
-      inFormat.createPool(conf, new TestFilter(dir1), 
-                          new TestFilter(dir2));
-      splits = inFormat.getSplits(conf, 1);
-      for (int i = 0; i < splits.length; ++i) {
-        fileSplit = (CombineFileSplit) splits[i];
-        System.out.println("File split(TestPool1): " + fileSplit);
-      }
-      assertEquals(splits.length, 3);
-      fileSplit = (CombineFileSplit) splits[0];
-      assertEquals(fileSplit.getNumPaths(), 2);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits[1];
-      assertEquals(fileSplit.getNumPaths(), 1);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
-      fileSplit = (CombineFileSplit) splits[2];
-      assertEquals(fileSplit.getNumPaths(), 6);
-      assertEquals(fileSplit.getLocations().length, 1);
-      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
-    } finally {
-      if (dfs != null) {
-        dfs.shutdown();
-      }
-    }
-  }
-
-  static void writeFile(Configuration conf, Path name,
-      short replication, int numBlocks) throws IOException {
-    FileSystem fileSys = FileSystem.get(conf);
-
-    FSDataOutputStream stm = fileSys.create(name, true,
-                                            conf.getInt("io.file.buffer.size", 4096),
-                                            replication, (long)BLOCKSIZE);
-    for (int i = 0; i < numBlocks; i++) {
-      stm.write(databuf);
-    }
-    stm.close();
-    DFSTestUtil.waitReplication(fileSys, name, replication);
-  }
-
-  static class TestFilter implements PathFilter {
-    private Path p;
-
-    // store a path prefix in this TestFilter
-    public TestFilter(Path p) {
-      this.p = p;
-    }
-
-    // returns true if the specified path matches the prefix stored
-    // in this TestFilter.
-    public boolean accept(Path path) {
-      if (path.toString().indexOf(p.toString()) == 0) {
-        return true;
-      }
-      return false;
-    }
-
-    public String toString() {
-      return "PathFilter:" + p;
-    }
-  }
-
-  /*
-   * Prints out the input splits for the specified files
-   */
-  private void splitRealFiles(String[] args) throws IOException {
-    JobConf conf = new JobConf();
-    FileSystem fs = FileSystem.get(conf);
-    if (!(fs instanceof DistributedFileSystem)) {
-      throw new IOException("Wrong file system: " + fs.getClass().getName());
-    }
-    int blockSize = conf.getInt("dfs.block.size", 128 * 1024 * 1024);
-
-    DummyInputFormat inFormat = new DummyInputFormat();
-    for (int i = 0; i < args.length; i++) {
-      inFormat.addInputPaths(conf, args[i]);
-    }
-    inFormat.setMinSplitSizeRack(blockSize);
-    inFormat.setMaxSplitSize(10 * blockSize);
-
-    InputSplit[] splits = inFormat.getSplits(conf, 1);
-    System.out.println("Total number of splits " + splits.length);
-    for (int i = 0; i < splits.length; ++i) {
-      CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
-      System.out.println("Split[" + i + "] " + fileSplit);
-    }
-  }
-
-  public static void main(String[] args) throws Exception{
-
-    // if there are some parameters specified, then use those paths
-    if (args.length != 0) {
-      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
-      test.splitRealFiles(args);
-    } else {
-      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
-      test.testSplitPlacement();
-    }
-  }
-}

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=779939&r1=779938&r2=779939&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Fri May 29 11:59:03 2009
@@ -27,6 +27,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.MultiFileWordCount;
 import org.apache.hadoop.examples.SecondarySort;
 import org.apache.hadoop.examples.WordCount;
 import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
@@ -41,6 +42,7 @@
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * A JUnit test to test min map-reduce cluster with local file system.
@@ -88,6 +90,7 @@
       Configuration conf = mr.createJobConf();
       runWordCount(conf);
       runSecondarySort(conf);
+      runMultiFileWordCount(conf);
     } finally {
       if (mr != null) { mr.shutdown(); }
     }
@@ -172,5 +175,21 @@
                  "------------------------------------------------\n" +
                  "10\t20\n10\t25\n10\t30\n", out);
   }
-  
+ 
+  public void runMultiFileWordCount(Configuration  conf) throws Exception  {
+    localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
+    localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);    
+    writeFile("in/part1", "this is a test\nof " +
+              "multi file word count test\ntest\n");
+    writeFile("in/part2", "more test");
+
+    int ret = ToolRunner.run(conf, new MultiFileWordCount(), 
+                new String[] {TEST_ROOT_DIR + "/in", TEST_ROOT_DIR + "/out"});
+    assertTrue("MultiFileWordCount failed", ret == 0);
+    String out = readFile("out/part-r-00000");
+    System.out.println(out);
+    assertEquals("a\t1\ncount\t1\nfile\t1\nis\t1\n" +
+      "more\t1\nmulti\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n", out);
+  }
+
 }

Added: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=779939&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (added)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Fri May 29 11:59:03 2009
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TestCombineFileInputFormat extends TestCase{
+
+  private static final String rack1[] = new String[] {
+    "/r1"
+  };
+  private static final String hosts1[] = new String[] {
+    "host1.rack1.com"
+  };
+  private static final String rack2[] = new String[] {
+    "/r2"
+  };
+  private static final String hosts2[] = new String[] {
+    "host2.rack2.com"
+  };
+  private static final String rack3[] = new String[] {
+    "/r3"
+  };
+  private static final String hosts3[] = new String[] {
+    "host3.rack3.com"
+  };
+  final Path inDir = new Path("/racktesting");
+  final Path outputPath = new Path("/output");
+  final Path dir1 = new Path(inDir, "/dir1");
+  final Path dir2 = new Path(inDir, "/dir2");
+  final Path dir3 = new Path(inDir, "/dir3");
+  final Path dir4 = new Path(inDir, "/dir4");
+
+  static final int BLOCKSIZE = 1024;
+  static final byte[] databuf = new byte[BLOCKSIZE];
+
+  /** Dummy class to extend CombineFileInputFormat*/
+  private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
+    @Override
+    public RecordReader<Text,Text> createRecordReader(InputSplit split, 
+        TaskAttemptContext context) throws IOException {
+      return null;
+    }
+  }
+
+  public void testSplitPlacement() throws IOException {
+    MiniDFSCluster dfs = null;
+    FileSystem fileSys = null;
+    try {
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
+       * 1) file1, just after starting the datanode on r1, with 
+       *    a repl factor of 1, and,
+       * 2) file2, just after starting the datanode on r2, with 
+       *    a repl factor of 2, and,
+       * 3) file3 after starting the all three datanodes, with a repl 
+       *    factor of 3.
+       * At the end, file1 will be present on only datanode1, file2 will be
+       * present on datanode 1 and datanode2 and 
+       * file3 will be present on all datanodes. 
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      Path file1 = new Path(dir1 + "/file1");
+      writeFile(conf, file1, (short)1, 1);
+      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+      dfs.waitActive();
+
+      // create file on two datanodes.
+      Path file2 = new Path(dir2 + "/file2");
+      writeFile(conf, file2, (short)2, 2);
+
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = new Job(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
+      inFormat.setMinSplitSizeRack(BLOCKSIZE);
+      List<InputSplit> splits = inFormat.getSplits(job);
+      System.out.println("Made splits(Test1): " + splits.size());
+
+      // make sure that each split has different locations
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(splits.size(), 2);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+
+      // create another file on 3 datanodes and 3 racks.
+      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+      dfs.waitActive();
+      Path file3 = new Path(dir3 + "/file3");
+      writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
+      inFormat.setMinSplitSizeRack(BLOCKSIZE);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test2): " + split);
+      }
+      assertEquals(splits.size(), 3);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 3);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+
+      // create file4 on all three racks
+      Path file4 = new Path(dir4 + "/file4");
+      writeFile(conf, file4, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      inFormat.setMinSplitSizeRack(BLOCKSIZE);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test3): " + split);
+      }
+      assertEquals(splits.size(), 3);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 6);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+
+      // maximum split size is 2 blocks 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(BLOCKSIZE);
+      inFormat.setMaxSplitSize(2*BLOCKSIZE);
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test4): " + split);
+      }
+      assertEquals(splits.size(), 5);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(1), 0);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(1), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+
+      // maximum split size is 3 blocks 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(BLOCKSIZE);
+      inFormat.setMaxSplitSize(3*BLOCKSIZE);
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(splits.size(), 4);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 3);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getPath(0).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(2),  2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
+      fileSplit = (CombineFileSplit) splits.get(3);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
+
+      // maximum split size is 4 blocks 
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(4*BLOCKSIZE);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(splits.size(), 3);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 4);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 4);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(3).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(3),  2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(3), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+
+      // maximum split size is 7 blocks and min is 3 blocks
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(7*BLOCKSIZE);
+      inFormat.setMinSplitSizeNode(3*BLOCKSIZE);
+      inFormat.setMinSplitSizeRack(3*BLOCKSIZE);
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(splits.size(), 2);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 6);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 3);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
+
+      // Rack 1 has file1, file2 and file3 and file4
+      // Rack 2 has file2 and file3 and file4
+      // Rack 3 has file3 and file4
+      // setup a filter so that only file1 and file2 can be combined
+      inFormat = new DummyInputFormat();
+      FileInputFormat.addInputPath(job, inDir);
+      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+      inFormat.createPool(new TestFilter(dir1), 
+                          new TestFilter(dir2));
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(splits.size(), 3);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 6);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
+  static void writeFile(Configuration conf, Path name,
+      short replication, int numBlocks) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            conf.getInt("io.file.buffer.size", 4096),
+                                            replication, (long)BLOCKSIZE);
+    for (int i = 0; i < numBlocks; i++) {
+      stm.write(databuf);
+    }
+    stm.close();
+    DFSTestUtil.waitReplication(fileSys, name, replication);
+  }
+
+  static class TestFilter implements PathFilter {
+    private Path p;
+
+    // store a path prefix in this TestFilter
+    public TestFilter(Path p) {
+      this.p = p;
+    }
+
+    // returns true if the specified path matches the prefix stored
+    // in this TestFilter.
+    public boolean accept(Path path) {
+      if (path.toString().indexOf(p.toString()) == 0) {
+        return true;
+      }
+      return false;
+    }
+
+    public String toString() {
+      return "PathFilter:" + p;
+    }
+  }
+
+  /*
+   * Prints out the input splits for the specified files
+   */
+  private void splitRealFiles(String[] args) throws IOException {
+    Configuration conf = new Configuration();
+    Job job = new Job();
+    FileSystem fs = FileSystem.get(conf);
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new IOException("Wrong file system: " + fs.getClass().getName());
+    }
+    int blockSize = conf.getInt("dfs.block.size", 128 * 1024 * 1024);
+
+    DummyInputFormat inFormat = new DummyInputFormat();
+    for (int i = 0; i < args.length; i++) {
+      FileInputFormat.addInputPaths(job, args[i]);
+    }
+    inFormat.setMinSplitSizeRack(blockSize);
+    inFormat.setMaxSplitSize(10 * blockSize);
+
+    List<InputSplit> splits = inFormat.getSplits(job);
+    System.out.println("Total number of splits " + splits.size());
+    for (int i = 0; i < splits.size(); ++i) {
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(i);
+      System.out.println("Split[" + i + "] " + fileSplit);
+    }
+  }
+
+  public static void main(String[] args) throws Exception{
+
+    // if there are some parameters specified, then use those paths
+    if (args.length != 0) {
+      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
+      test.splitRealFiles(args);
+    } else {
+      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
+      test.testSplitPlacement();
+    }
+  }
+}



Mime
View raw message