hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r1235548 [6/8] - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/mapred/org/apache/hadoop/mapreduce/ src/mapred/org/apache/hadoop/mapreduce/lib/db/ src/mapred/org/apache/hadoop/map...
Date Tue, 24 Jan 2012 23:22:01 GMT
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+public class TestIntegerSplitter extends TestCase {
+  private long [] toLongArray(List<Long> in) {
+    long [] out = new long[in.size()];
+    for (int i = 0; i < in.size(); i++) {
+      out[i] = in.get(i).longValue();
+    }
+
+    return out;
+  }
+
+  public String formatLongArray(long [] ar) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    boolean first = true;
+    for (long val : ar) {
+      if (!first) {
+        sb.append(", ");
+      }
+
+      sb.append(Long.toString(val));
+      first = false;
+    }
+
+    sb.append("]");
+    return sb.toString();
+  }
+
+  public void assertLongArrayEquals(long [] expected, long [] actual) {
+    for (int i = 0; i < expected.length; i++) {
+      try {
+        assertEquals("Failure at position " + i + "; got " + actual[i]
+            + " instead of " + expected[i] + "; actual array is " + formatLongArray(actual),
+            expected[i], actual[i]);
+      } catch (ArrayIndexOutOfBoundsException oob) {
+        fail("Expected array with " + expected.length + " elements; got " + actual.length
+            + ". Actual array is " + formatLongArray(actual));
+      }
+    }
+
+    if (actual.length > expected.length) {
+      fail("Actual array has " + actual.length + " elements; expected " + expected.length
+          + ". ACtual array is " + formatLongArray(actual));
+    }
+  }
+
+  public void testEvenSplits() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(10, 0, 100);
+    long [] expected = { 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+  public void testOddSplits() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(10, 0, 95);
+    long [] expected = { 0, 9, 18, 27, 36, 45, 54, 63, 72, 81, 90, 95 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+
+  }
+
+  public void testSingletonSplit() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(1, 5, 5);
+    long [] expected = { 5, 5 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+  public void testSingletonSplit2() throws SQLException {
+    // Same test, but overly-high numSplits
+    List<Long> splits = new IntegerSplitter().split(5, 5, 5);
+    long [] expected = { 5, 5 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+  public void testTooManySplits() throws SQLException {
+    List<Long> splits = new IntegerSplitter().split(5, 3, 5);
+    long [] expected = { 3, 4, 5 };
+    assertLongArrayEquals(expected, toLongArray(splits));
+  }
+
+}
+

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.db;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+public class TestTextSplitter extends TestCase {
+
+  public String formatArray(Object [] ar) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    boolean first = true;
+    for (Object val : ar) {
+      if (!first) {
+        sb.append(", ");
+      }
+
+      sb.append(val.toString());
+      first = false;
+    }
+
+    sb.append("]");
+    return sb.toString();
+  }
+
+  public void assertArrayEquals(Object [] expected, Object [] actual) {
+    for (int i = 0; i < expected.length; i++) {
+      try {
+        assertEquals("Failure at position " + i + "; got " + actual[i]
+            + " instead of " + expected[i] + "; actual array is " + formatArray(actual),
+            expected[i], actual[i]);
+      } catch (ArrayIndexOutOfBoundsException oob) {
+        fail("Expected array with " + expected.length + " elements; got " + actual.length
+            + ". Actual array is " + formatArray(actual));
+      }
+    }
+
+    if (actual.length > expected.length) {
+      fail("Actual array has " + actual.length + " elements; expected " + expected.length
+          + ". Actual array is " + formatArray(actual));
+    }
+  }
+
+  public void testStringConvertEmpty() {
+    TextSplitter splitter = new TextSplitter();
+    BigDecimal emptyBigDec = splitter.stringToBigDecimal("");
+    assertEquals(BigDecimal.ZERO, emptyBigDec);
+  }
+
+  public void testBigDecConvertEmpty() {
+    TextSplitter splitter = new TextSplitter();
+    String emptyStr = splitter.bigDecimalToString(BigDecimal.ZERO);
+    assertEquals("", emptyStr);
+  }
+
+  public void testConvertA() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("A"));
+    assertEquals("A", out);
+  }
+
+  public void testConvertZ() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("Z"));
+    assertEquals("Z", out);
+  }
+
+  public void testConvertThreeChars() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("abc"));
+    assertEquals("abc", out);
+  }
+
+  public void testConvertStr() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("big str"));
+    assertEquals("big str", out);
+  }
+
+  public void testConvertChomped() {
+    TextSplitter splitter = new TextSplitter();
+    String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("AVeryLongStringIndeed"));
+    assertEquals("AVeryLon", out);
+  }
+
+  public void testAlphabetSplit() throws SQLException {
+    // This should give us 25 splits, one per letter.
+    TextSplitter splitter = new TextSplitter();
+    List<String> splits = splitter.split(25, "A", "Z", "");
+    String [] expected = { "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K",
+        "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z" };
+    assertArrayEquals(expected, splits.toArray(new String [0]));
+  }
+
+  public void testCommonPrefix() throws SQLException {
+    // Splits between 'Hand' and 'Hardy'
+    TextSplitter splitter = new TextSplitter();
+    List<String> splits = splitter.split(5, "nd", "rdy", "Ha");
+    // Don't check for exact values in the middle, because the splitter generates some
+    // ugly Unicode-isms. But do check that we get multiple splits and that it starts
+    // and ends on the correct points.
+    assertEquals("Hand", splits.get(0));
+    assertEquals("Hardy", splits.get(splits.size() -1));
+    assertEquals(6, splits.size());
+  }
+}
+

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.fieldsel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+
+import junit.framework.TestCase;
+import java.text.NumberFormat;
+
+public class TestMRFieldSelection extends TestCase {
+
+private static NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setMinimumIntegerDigits(4);
+    idFormat.setGroupingUsed(false);
+  }
+
+  public void testFieldSelection() throws Exception {
+    launch();
+  }
+  private static Path testDir = new Path(
+    System.getProperty("test.build.data", "/tmp"), "field");
+  
+  public static void launch() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    int numOfInputLines = 10;
+
+    Path outDir = new Path(testDir, "output_for_field_selection_test");
+    Path inDir = new Path(testDir, "input_for_field_selection_test");
+
+    StringBuffer inputData = new StringBuffer();
+    StringBuffer expectedOutput = new StringBuffer();
+    constructInputOutputData(inputData, expectedOutput, numOfInputLines);
+    
+    conf.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "-");
+    conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "6,5,1-3:0-");
+    conf.set(
+      FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, ":4,3,2,1,0,0-");
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir,
+      1, 1, inputData.toString());
+    job.setMapperClass(FieldSelectionMapper.class);
+    job.setReducerClass(FieldSelectionReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(1);
+
+    job.waitForCompletion(true);
+    assertTrue("Job Failed!", job.isSuccessful());
+
+    //
+    // Finally, we compare the reconstructed answer key with the
+    // original one.  Remember, we need to ignore zero-count items
+    // in the original key.
+    //
+    String outdata = MapReduceTestUtil.readOutput(outDir, conf);
+    assertEquals("Outputs doesnt match.",expectedOutput.toString(), outdata);
+    fs.delete(outDir, true);
+  }
+
+  public static void constructInputOutputData(StringBuffer inputData,
+      StringBuffer expectedOutput, int numOfInputLines) {
+    for (int i = 0; i < numOfInputLines; i++) {
+      inputData.append(idFormat.format(i));
+      inputData.append("-").append(idFormat.format(i+1));
+      inputData.append("-").append(idFormat.format(i+2));
+      inputData.append("-").append(idFormat.format(i+3));
+      inputData.append("-").append(idFormat.format(i+4));
+      inputData.append("-").append(idFormat.format(i+5));
+      inputData.append("-").append(idFormat.format(i+6));
+      inputData.append("\n");
+
+      expectedOutput.append(idFormat.format(i+3));
+      expectedOutput.append("-" ).append (idFormat.format(i+2));
+      expectedOutput.append("-" ).append (idFormat.format(i+1));
+      expectedOutput.append("-" ).append (idFormat.format(i+5));
+      expectedOutput.append("-" ).append (idFormat.format(i+6));
+
+      expectedOutput.append("-" ).append (idFormat.format(i+6));
+      expectedOutput.append("-" ).append (idFormat.format(i+5));
+      expectedOutput.append("-" ).append (idFormat.format(i+1));
+      expectedOutput.append("-" ).append (idFormat.format(i+2));
+      expectedOutput.append("-" ).append (idFormat.format(i+3));
+      expectedOutput.append("-" ).append (idFormat.format(i+0));
+      expectedOutput.append("-" ).append (idFormat.format(i+1));
+      expectedOutput.append("-" ).append (idFormat.format(i+2));
+      expectedOutput.append("-" ).append (idFormat.format(i+3));
+      expectedOutput.append("-" ).append (idFormat.format(i+4));
+      expectedOutput.append("-" ).append (idFormat.format(i+5));
+      expectedOutput.append("-" ).append (idFormat.format(i+6));
+      expectedOutput.append("\n");
+    }
+    System.out.println("inputData:");
+    System.out.println(inputData.toString());
+    System.out.println("ExpectedData:");
+    System.out.println(expectedOutput.toString());
+  }
+  
+  /**
+   * Launches all the tasks in order.
+   */
+  public static void main(String[] argv) throws Exception {
+    launch();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,1174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.zip.GZIPOutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class TestCombineFileInputFormat extends TestCase {
+
+  private static final String rack1[] = new String[] {
+    "/r1"
+  };
+  private static final String hosts1[] = new String[] {
+    "host1.rack1.com"
+  };
+  private static final String rack2[] = new String[] {
+    "/r2"
+  };
+  private static final String hosts2[] = new String[] {
+    "host2.rack2.com"
+  };
+  private static final String rack3[] = new String[] {
+    "/r3"
+  };
+  private static final String hosts3[] = new String[] {
+    "host3.rack3.com"
+  };
+  final Path inDir = new Path("/racktesting");
+  final Path outputPath = new Path("/output");
+  final Path dir1 = new Path(inDir, "/dir1");
+  final Path dir2 = new Path(inDir, "/dir2");
+  final Path dir3 = new Path(inDir, "/dir3");
+  final Path dir4 = new Path(inDir, "/dir4");
+  final Path dir5 = new Path(inDir, "/dir5");
+
+  static final int BLOCKSIZE = 1024;
+  static final byte[] databuf = new byte[BLOCKSIZE];
+
+  /** Dummy class to extend CombineFileInputFormat*/
+  private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
+    @Override
+    public RecordReader<Text,Text> createRecordReader(InputSplit split, 
+        TaskAttemptContext context) throws IOException {
+      return null;
+    }
+  }
+
+  /** Dummy class to extend CombineFileInputFormat. It allows 
+   * non-existent files to be passed into the CombineFileInputFormat, allows
+   * for easy testing without having to create real files.
+   */
+  private class DummyInputFormat1 extends DummyInputFormat {
+    @Override
+    protected List<FileStatus> listStatus(JobContext job) throws IOException {
+      Path[] files = getInputPaths(job);
+      List<FileStatus> results = new ArrayList<FileStatus>();
+      for (int i = 0; i < files.length; i++) {
+        Path p = files[i];
+        FileSystem fs = p.getFileSystem(job.getConfiguration());
+        results.add(fs.getFileStatus(p));
+      }
+      return results;
+    }
+  }
+
+  /** Dummy class to extend CombineFileInputFormat. It allows
+   * testing with files having missing blocks without actually removing replicas.
+   */
+  public static class MissingBlockFileSystem extends DistributedFileSystem {
+    String fileWithMissingBlocks;
+
+    @Override
+    public void initialize(URI name, Configuration conf) throws IOException {
+      fileWithMissingBlocks = "";
+      super.initialize(name, conf);
+    }
+
+    @Override
+    public BlockLocation[] getFileBlockLocations(
+        FileStatus stat, long start, long len) throws IOException {
+      if (stat.isDir()) {
+        return null;
+      }
+      System.out.println("File " + stat.getPath());
+      String name = stat.getPath().toUri().getPath();
+      BlockLocation[] locs =
+        super.getFileBlockLocations(stat, start, len);
+      if (name.equals(fileWithMissingBlocks)) {
+        System.out.println("Returing missing blocks for " + fileWithMissingBlocks);
+        locs[0] = new BlockLocation(new String[0], new String[0],
+            locs[0].getOffset(), locs[0].getLength());
+      }
+      return locs;
+    }
+
+    public void setFileWithMissingBlocks(String f) {
+      fileWithMissingBlocks = f;
+    }
+  }
+
+  private static final String DUMMY_KEY = "dummy.rr.key";
+
+  private static class DummyRecordReader extends RecordReader<Text, Text> {
+    private TaskAttemptContext context;
+    private CombineFileSplit s;
+    private int idx;
+    private boolean used;
+
+    public DummyRecordReader(CombineFileSplit split, TaskAttemptContext context,
+        Integer i) {
+      this.context = context;
+      this.idx = i;
+      this.s = split;
+      this.used = true;
+    }
+
+    /** @return a value specified in the context to check whether the
+     * context is properly updated by the initialize() method.
+     */
+    public String getDummyConfVal() {
+      return this.context.getConfiguration().get(DUMMY_KEY);
+    }
+
+    public void initialize(InputSplit split, TaskAttemptContext context) {
+      this.context = context;
+      this.s = (CombineFileSplit) split;
+
+      // By setting used to true in the c'tor, but false in initialize,
+      // we can check that initialize() is always called before use
+      // (e.g., in testReinit()).
+      this.used = false;
+    }
+
+    public boolean nextKeyValue() {
+      boolean ret = !used;
+      this.used = true;
+      return ret;
+    }
+
+    public Text getCurrentKey() {
+      return new Text(this.context.getConfiguration().get(DUMMY_KEY));
+    }
+
+    public Text getCurrentValue() {
+      return new Text(this.s.getPath(idx).toString());
+    }
+
+    public float getProgress() {
+      return used ? 1.0f : 0.0f;
+    }
+
+    public void close() {
+    }
+  }
+
+  /** Extend CFIF to use CFRR with DummyRecordReader */
+  private class ChildRRInputFormat extends CombineFileInputFormat<Text, Text> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public RecordReader<Text,Text> createRecordReader(InputSplit split, 
+        TaskAttemptContext context) throws IOException {
+      return new CombineFileRecordReader((CombineFileSplit) split, context,
+          (Class) DummyRecordReader.class);
+    }
+  }
+
+  public void testRecordReaderInit() throws InterruptedException, IOException {
+    // Test that we properly initialize the child recordreader when
+    // CombineFileInputFormat and CombineFileRecordReader are used.
+
+    TaskAttemptID taskId = new TaskAttemptID("jt", 0, true, 0, 0);
+    Configuration conf1 = new Configuration();
+    conf1.set(DUMMY_KEY, "STATE1");
+    TaskAttemptContext context1 = new TaskAttemptContext(conf1, taskId);
+
+    // This will create a CombineFileRecordReader that itself contains a
+    // DummyRecordReader.
+    InputFormat inputFormat = new ChildRRInputFormat();
+
+    Path [] files = { new Path("file1") };
+    long [] lengths = { 1 };
+
+    CombineFileSplit split = new CombineFileSplit(files, lengths);
+
+    RecordReader rr = inputFormat.createRecordReader(split, context1);
+    assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
+
+    // Verify that the initial configuration is the one being used.
+    // Right after construction the dummy key should have value "STATE1"
+    assertEquals("Invalid initial dummy key value", "STATE1",
+      rr.getCurrentKey().toString());
+
+    // Switch the active context for the RecordReader...
+    Configuration conf2 = new Configuration();
+    conf2.set(DUMMY_KEY, "STATE2");
+    TaskAttemptContext context2 = new TaskAttemptContext(conf2, taskId);
+    rr.initialize(split, context2);
+
+    // And verify that the new context is updated into the child record reader.
+    assertEquals("Invalid secondary dummy key value", "STATE2",
+      rr.getCurrentKey().toString());
+  }
+
+  public void testReinit() throws Exception {
+    // Test that a split containing multiple files works correctly,
+    // with the child RecordReader getting its initialize() method
+    // called a second time.
+    TaskAttemptID taskId = new TaskAttemptID("jt", 0, true, 0, 0);
+    Configuration conf = new Configuration();
+    TaskAttemptContext context = new TaskAttemptContext(conf, taskId);
+
+    // This will create a CombineFileRecordReader that itself contains a
+    // DummyRecordReader.
+    InputFormat inputFormat = new ChildRRInputFormat();
+
+    Path [] files = { new Path("file1"), new Path("file2") };
+    long [] lengths = { 1, 1 };
+
+    CombineFileSplit split = new CombineFileSplit(files, lengths);
+    RecordReader rr = inputFormat.createRecordReader(split, context);
+    assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
+
+    // first initialize() call comes from MapTask. We'll do it here.
+    rr.initialize(split, context);
+
+    // First value is first filename.
+    assertTrue(rr.nextKeyValue());
+    assertEquals("file1", rr.getCurrentValue().toString());
+
+    // The inner RR will return false, because it only emits one (k, v) pair.
+    // But there's another sub-split to process. This returns true to us.
+    assertTrue(rr.nextKeyValue());
+    
+    // And the 2nd rr will have its initialize method called correctly.
+    assertEquals("file2", rr.getCurrentValue().toString());
+    
+    // But after both child RR's have returned their singleton (k, v), this
+    // should also return false.
+    assertFalse(rr.nextKeyValue());
+  }
+
+  public void testSplitPlacement() throws IOException {
+    MiniDFSCluster dfs = null;
+    FileSystem fileSys = null;
+    try {
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
+       * 1) file1 and file5, just after starting the datanode on r1, with 
+       *    a repl factor of 1, and,
+       * 2) file2, just after starting the datanode on r2, with 
+       *    a repl factor of 2, and,
+       * 3) file3, file4 after starting the all three datanodes, with a repl 
+       *    factor of 3.
+       * At the end, file1, file5 will be present on only datanode1, file2 will 
+       * be present on datanode 1 and datanode2 and 
+       * file3, file4 will be present on all datanodes. 
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      Path file1 = new Path(dir1 + "/file1");
+      writeFile(conf, file1, (short)1, 1);
+      // create another file on the same datanode
+      Path file5 = new Path(dir5 + "/file5");
+      writeFile(conf, file5, (short)1, 1);
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = new Job(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+      List<InputSplit> splits = inFormat.getSplits(job);
+      System.out.println("Made splits(Test0): " + splits.size());
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test0): " + split);
+      }
+      assertEquals(splits.size(), 1);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+      
+      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+      dfs.waitActive();
+
+      // create file on two datanodes.
+      Path file2 = new Path(dir2 + "/file2");
+      writeFile(conf, file2, (short)2, 2);
+
+      // split it using a CombinedFile input format
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
+      inFormat.setMinSplitSizeRack(BLOCKSIZE);
+      splits = inFormat.getSplits(job);
+      System.out.println("Made splits(Test1): " + splits.size());
+
+      // make sure that each split has different locations
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(splits.size(), 2);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+
+      // create another file on 3 datanodes and 3 racks.
+      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+      dfs.waitActive();
+      Path file3 = new Path(dir3 + "/file3");
+      writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
+      inFormat.setMinSplitSizeRack(BLOCKSIZE);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test2): " + split);
+      }
+      assertEquals(splits.size(), 3);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 3);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+
+      // create file4 on all three racks
+      Path file4 = new Path(dir4 + "/file4");
+      writeFile(conf, file4, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      inFormat.setMinSplitSizeRack(BLOCKSIZE);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test3): " + split);
+      }
+      assertEquals(splits.size(), 3);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 6);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+
+      // maximum split size is 2 blocks 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(BLOCKSIZE);
+      inFormat.setMaxSplitSize(2*BLOCKSIZE);
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test4): " + split);
+      }
+      assertEquals(splits.size(), 5);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(1), 0);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(1), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+
+      // maximum split size is 3 blocks 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(BLOCKSIZE);
+      inFormat.setMaxSplitSize(3*BLOCKSIZE);
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test5): " + split);
+      }
+      assertEquals(splits.size(), 4);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 3);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getPath(0).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(2),  2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
+      fileSplit = (CombineFileSplit) splits.get(3);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
+
+      // maximum split size is 4 blocks 
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(4*BLOCKSIZE);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test6): " + split);
+      }
+      assertEquals(splits.size(), 3);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 4);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file3.getName());
+      assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 4);
+      assertEquals(fileSplit.getPath(0).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(1).getName(), file2.getName());
+      assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(1), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(2).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(2), BLOCKSIZE);
+      assertEquals(fileSplit.getLength(2), BLOCKSIZE);
+      assertEquals(fileSplit.getPath(3).getName(), file4.getName());
+      assertEquals(fileSplit.getOffset(3),  2 * BLOCKSIZE);
+      assertEquals(fileSplit.getLength(3), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getPath(0).getName(), file1.getName());
+      assertEquals(fileSplit.getOffset(0), 0);
+      assertEquals(fileSplit.getLength(0), BLOCKSIZE);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+
+      // maximum split size is 7 blocks and min is 3 blocks
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(7*BLOCKSIZE);
+      inFormat.setMinSplitSizeNode(3*BLOCKSIZE);
+      inFormat.setMinSplitSizeRack(3*BLOCKSIZE);
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test7): " + split);
+      }
+      assertEquals(splits.size(), 2);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 6);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 3);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
+
+      // Rack 1 has file1, file2 and file3 and file4
+      // Rack 2 has file2 and file3 and file4
+      // Rack 3 has file3 and file4
+      // setup a filter so that only file1 and file2 can be combined
+      inFormat = new DummyInputFormat();
+      FileInputFormat.addInputPath(job, inDir);
+      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+      inFormat.createPool(new TestFilter(dir1), 
+                          new TestFilter(dir2));
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(splits.size(), 3);
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(fileSplit.getNumPaths(), 2);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(fileSplit.getNumPaths(), 1);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(fileSplit.getNumPaths(), 6);
+      assertEquals(fileSplit.getLocations().length, 1);
+      assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
+
+      // measure performance when there are multiple pools and
+      // many files in each pool.
+      int numPools = 100;
+      int numFiles = 1000;
+      DummyInputFormat1 inFormat1 = new DummyInputFormat1();
+      for (int i = 0; i < numFiles; i++) {
+        FileInputFormat.setInputPaths(job, file1);
+      }
+      inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
+      final Path dirNoMatch1 = new Path(inDir, "/dirxx");
+      final Path dirNoMatch2 = new Path(inDir, "/diryy");
+      for (int i = 0; i < numPools; i++) {
+        inFormat1.createPool(new TestFilter(dirNoMatch1), 
+                            new TestFilter(dirNoMatch2));
+      }
+      long start = System.currentTimeMillis();
+      splits = inFormat1.getSplits(job);
+      long end = System.currentTimeMillis();
+      System.out.println("Elapsed time for " + numPools + " pools " +
+                         " and " + numFiles + " files is " + 
+                         ((end - start)/1000) + " seconds.");
+
+      // This file has three whole blocks. If the maxsplit size is
+      // half the block size, then there should be six splits.
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(BLOCKSIZE/2);
+      FileInputFormat.setInputPaths(job, dir3);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test8): " + split);
+      }
+      assertEquals(6, splits.size());
+
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
+  static void writeFile(Configuration conf, Path name,
+      short replication, int numBlocks) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            conf.getInt("io.file.buffer.size", 4096),
+                                            replication, (long)BLOCKSIZE);
+    writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
+  }
+
+  // Creates the gzip file and return the FileStatus
+  static FileStatus writeGzipFile(Configuration conf, Path name,
+      short replication, int numBlocks) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+
+    GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
+        .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
+    writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
+    return fileSys.getFileStatus(name);
+  }
+
+  private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
+      OutputStream out, short replication, int numBlocks) throws IOException {
+    for (int i = 0; i < numBlocks; i++) {
+      out.write(databuf);
+    }
+    out.close();
+    DFSTestUtil.waitReplication(fileSys, name, replication);
+  }
+  
+  public void testSplitPlacementForCompressedFiles() throws IOException {
+    MiniDFSCluster dfs = null;
+    FileSystem fileSys = null;
+    try {
+      /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
+       *  files
+       * 1) file1 and file5, just after starting the datanode on r1, with 
+       *    a repl factor of 1, and,
+       * 2) file2, just after starting the datanode on r2, with 
+       *    a repl factor of 2, and,
+       * 3) file3, file4 after starting the all three datanodes, with a repl 
+       *    factor of 3.
+       * At the end, file1, file5 will be present on only datanode1, file2 will 
+       * be present on datanode 1 and datanode2 and 
+       * file3, file4 will be present on all datanodes. 
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      Path file1 = new Path(dir1 + "/file1.gz");
+      FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
+      // create another file on the same datanode
+      Path file5 = new Path(dir5 + "/file5.gz");
+      FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = new Job(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+      List<InputSplit> splits = inFormat.getSplits(job);
+      System.out.println("Made splits(Test0): " + splits.size());
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test0): " + split);
+      }
+      assertEquals(splits.size(), 1);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f5.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+      
+      dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+      dfs.waitActive();
+
+      // create file on two datanodes.
+      Path file2 = new Path(dir2 + "/file2.gz");
+      FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
+
+      // split it using a CombinedFile input format
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job);
+      System.out.println("Made splits(Test1): " + splits.size());
+
+      // make sure that each split has different locations
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test1): " + split);
+      }
+      assertEquals(2, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // create another file on 3 datanodes and 3 racks.
+      dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+      dfs.waitActive();
+      Path file3 = new Path(dir3 + "/file3.gz");
+      FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test2): " + split);
+      }
+      assertEquals(3, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // create file4 on all three racks
+      Path file4 = new Path(dir4 + "/file4.gz");
+      FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
+      inFormat = new DummyInputFormat();
+      FileInputFormat.setInputPaths(job,
+          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      inFormat.setMinSplitSizeRack(f1.getLen());
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test3): " + split);
+      }
+      assertEquals(3, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(f1.getLen());
+      inFormat.setMaxSplitSize(f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test4): " + split);
+      }
+      assertEquals(4, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f4.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(3);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is twice file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(f1.getLen());
+      inFormat.setMaxSplitSize(2 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test5): " + split);
+      }
+      assertEquals(3, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size is 4 times file1's length 
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(2 * f1.getLen());
+      inFormat.setMaxSplitSize(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job,
+          dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test6): " + split);
+      }
+      assertEquals(2, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f3.getLen(), fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(f4.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
+      assertEquals(f2.getLen(), fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      // maximum split size and min-split-size per rack is 4 times file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMaxSplitSize(4 * f1.getLen());
+      inFormat.setMinSplitSizeRack(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test7): " + split);
+      }
+      assertEquals(1, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(4, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+      // minimum split size per node is 4 times file1's length
+      inFormat = new DummyInputFormat();
+      inFormat.setMinSplitSizeNode(4 * f1.getLen());
+      FileInputFormat.setInputPaths(job, 
+        dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test8): " + split);
+      }
+      assertEquals(1, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(4, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+      // Rack 1 has file1, file2 and file3 and file4
+      // Rack 2 has file2 and file3 and file4
+      // Rack 3 has file3 and file4
+      // setup a filter so that only file1 and file2 can be combined
+      inFormat = new DummyInputFormat();
+      FileInputFormat.addInputPath(job, inDir);
+      inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+      inFormat.createPool(new TestFilter(dir1), 
+                          new TestFilter(dir2));
+      splits = inFormat.getSplits(job);
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test9): " + split);
+      }
+      assertEquals(3, splits.size());
+      fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      fileSplit = (CombineFileSplit) splits.get(1);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+      fileSplit = (CombineFileSplit) splits.get(2);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+      // measure performance when there are multiple pools and
+      // many files in each pool.
+      int numPools = 100;
+      int numFiles = 1000;
+      DummyInputFormat1 inFormat1 = new DummyInputFormat1();
+      for (int i = 0; i < numFiles; i++) {
+        FileInputFormat.setInputPaths(job, file1);
+      }
+      inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
+      final Path dirNoMatch1 = new Path(inDir, "/dirxx");
+      final Path dirNoMatch2 = new Path(inDir, "/diryy");
+      for (int i = 0; i < numPools; i++) {
+        inFormat1.createPool(new TestFilter(dirNoMatch1), 
+                            new TestFilter(dirNoMatch2));
+      }
+      long start = System.currentTimeMillis();
+      splits = inFormat1.getSplits(job);
+      long end = System.currentTimeMillis();
+      System.out.println("Elapsed time for " + numPools + " pools " +
+                         " and " + numFiles + " files is " + 
+                         ((end - start)) + " milli seconds.");
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test that CFIF can handle missing blocks.
+   */
+  public void testMissingBlocks() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    FileSystem fileSys = null;
+    String testName = "testMissingBlocks";
+    try {
+      Configuration conf = new Configuration();
+      conf.set("fs.hdfs.impl", MissingBlockFileSystem.class.getName());
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+                 (dfs.getFileSystem()).getUri().getPort();
+
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+
+      Path file1 = new Path(dir1 + "/file1");
+      writeFile(conf, file1, (short)1, 1);
+      // create another file on the same datanode
+      Path file5 = new Path(dir5 + "/file5");
+      writeFile(conf, file5, (short)1, 1);
+
+      ((MissingBlockFileSystem)fileSys).setFileWithMissingBlocks(file1.toUri().getPath());
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = new Job(conf);
+      FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+      List<InputSplit> splits = inFormat.getSplits(job);
+      System.out.println("Made splits(Test0): " + splits.size());
+      for (InputSplit split : splits) {
+        System.out.println("File split(Test0): " + split);
+      }
+      assertEquals(splits.size(), 1);
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(1, fileSplit.getLocations().length);
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+      assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
+  static class TestFilter implements PathFilter {
+    private Path p;
+
+    // store a path prefix in this TestFilter
+    public TestFilter(Path p) {
+      this.p = p;
+    }
+
+    // returns true if the specified path matches the prefix stored
+    // in this TestFilter.
+    public boolean accept(Path path) {
+      if (path.toString().indexOf(p.toString()) == 0) {
+        return true;
+      }
+      return false;
+    }
+
+    public String toString() {
+      return "PathFilter:" + p;
+    }
+  }
+
+  /*
+   * Prints out the input splits for the specified files
+   */
+  private void splitRealFiles(String[] args) throws IOException {
+    Configuration conf = new Configuration();
+    Job job = new Job();
+    FileSystem fs = FileSystem.get(conf);
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new IOException("Wrong file system: " + fs.getClass().getName());
+    }
+    int blockSize = conf.getInt("dfs.block.size", 128 * 1024 * 1024);
+
+    DummyInputFormat inFormat = new DummyInputFormat();
+    for (int i = 0; i < args.length; i++) {
+      FileInputFormat.addInputPaths(job, args[i]);
+    }
+    inFormat.setMinSplitSizeRack(blockSize);
+    inFormat.setMaxSplitSize(10 * blockSize);
+
+    List<InputSplit> splits = inFormat.getSplits(job);
+    System.out.println("Total number of splits " + splits.size());
+    for (int i = 0; i < splits.size(); ++i) {
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(i);
+      System.out.println("Split[" + i + "] " + fileSplit);
+    }
+  }
+
+  public static void main(String[] args) throws Exception{
+
+    // if there are some parameters specified, then use those paths
+    if (args.length != 0) {
+      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
+      test.splitRealFiles(args);
+    } else {
+      TestCombineFileInputFormat test = new TestCombineFileInputFormat();
+      test.testSplitPlacement();
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class TestDelegatingInputFormat extends TestCase {
+
+  @SuppressWarnings("unchecked")
+  public void testSplitting() throws Exception {
+    Job job = new Job();
+    MiniDFSCluster dfs = null;
+    try {
+      dfs = new MiniDFSCluster(job.getConfiguration(), 4, true, new String[] { "/rack0",
+         "/rack0", "/rack1", "/rack1" }, new String[] { "host0", "host1",
+         "host2", "host3" });
+      FileSystem fs = dfs.getFileSystem();
+
+      Path path = getPath("/foo/bar", fs);
+      Path path2 = getPath("/foo/baz", fs);
+      Path path3 = getPath("/bar/bar", fs);
+      Path path4 = getPath("/bar/baz", fs);
+
+      final int numSplits = 100;
+
+      FileInputFormat.setMaxInputSplitSize(job, 
+              fs.getFileStatus(path).getLen() / numSplits);
+      MultipleInputs.addInputPath(job, path, TextInputFormat.class,
+         MapClass.class);
+      MultipleInputs.addInputPath(job, path2, TextInputFormat.class,
+         MapClass2.class);
+      MultipleInputs.addInputPath(job, path3, KeyValueTextInputFormat.class,
+         MapClass.class);
+      MultipleInputs.addInputPath(job, path4, TextInputFormat.class,
+         MapClass2.class);
+      DelegatingInputFormat inFormat = new DelegatingInputFormat();
+
+      int[] bins = new int[3];
+      for (InputSplit split : (List<InputSplit>)inFormat.getSplits(job)) {
+       assertTrue(split instanceof TaggedInputSplit);
+       final TaggedInputSplit tis = (TaggedInputSplit) split;
+       int index = -1;
+
+       if (tis.getInputFormatClass().equals(KeyValueTextInputFormat.class)) {
+         // path3
+         index = 0;
+       } else if (tis.getMapperClass().equals(MapClass.class)) {
+         // path
+         index = 1;
+       } else {
+         // path2 and path4
+         index = 2;
+       }
+
+       bins[index]++;
+      }
+
+      assertEquals("count is not equal to num splits", numSplits, bins[0]);
+      assertEquals("count is not equal to num splits", numSplits, bins[1]);
+      assertEquals("count is not equal to 2 * num splits",
+        numSplits * 2, bins[2]);
+    } finally {
+      if (dfs != null) {
+       dfs.shutdown();
+      }
+    }
+  }
+
+  static Path getPath(final String location, final FileSystem fs)
+      throws IOException {
+    Path path = new Path(location);
+
+    // create a multi-block file on hdfs
+    DataOutputStream out = fs.create(path, true, 4096, (short) 2, 512, null);
+    for (int i = 0; i < 1000; ++i) {
+      out.writeChars("Hello\n");
+    }
+    out.close();
+
+    return path;
+  }
+
+  static class MapClass extends Mapper<String, String, String, String> {
+  }
+
+  static class MapClass2 extends MapClass {
+  }
+
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java?rev=1235548&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java (added)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java Tue Jan 24 23:21:58 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class TestKeyValueTextInputFormat extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestKeyValueTextInputFormat.class.getName());
+
+  private static int MAX_LENGTH = 10000;
+  
+  private static Configuration defaultConf = new Configuration();
+  private static FileSystem localFs = null; 
+  static {
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestKeyValueTextInputFormat");
+  
+  public void testFormat() throws Exception {
+    Job job = new Job(defaultConf);
+    Path file = new Path(workDir, "test.txt");
+
+    int seed = new Random().nextInt();
+    LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length+= random.nextInt(MAX_LENGTH/10)+1) {
+
+      LOG.debug("creating; entries = " + length);
+
+      // create a file with length entries
+      Writer writer = new OutputStreamWriter(localFs.create(file));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i*2));
+          writer.write("\t");
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+
+      KeyValueTextInputFormat format = new KeyValueTextInputFormat();
+      JobContext jobContext = new JobContext(job.getConfiguration(), new JobID());
+      List<InputSplit> splits = format.getSplits(jobContext);
+      LOG.debug("splitting: got =        " + splits.size());
+      
+      TaskAttemptContext context = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+
+      // check each split
+      BitSet bits = new BitSet(length);
+      for (InputSplit split : splits) {
+        LOG.debug("split= " + split);
+        RecordReader<Text, Text> reader =
+          format.createRecordReader(split, context);
+        Class readerClass = reader.getClass();
+        assertEquals("reader class is KeyValueLineRecordReader.", KeyValueLineRecordReader.class, readerClass);        
+
+        reader.initialize(split, context);
+        try {
+          int count = 0;
+          while (reader.nextKeyValue()) {
+            int v = Integer.parseInt(reader.getCurrentValue().toString());
+            LOG.debug("read " + v);
+            if (bits.get(v)) {
+              LOG.warn("conflict with " + v + 
+                       " in split " + split +
+                       " at "+reader.getProgress());
+            }
+            assertFalse("Key in multiple partitions.", bits.get(v));
+            bits.set(v);
+            count++;
+          }
+          LOG.debug("split="+split+" count=" + count);
+        } finally {
+          reader.close();
+        }
+      }
+      assertEquals("Some keys in no partition.", length, bits.cardinality());
+
+    }
+  }
+  private LineReader makeStream(String str) throws IOException {
+    return new LineReader(new ByteArrayInputStream
+                                           (str.getBytes("UTF-8")), 
+                                           defaultConf);
+  }
+  
+  public void testUTF8() throws Exception {
+    LineReader in = makeStream("abcd\u20acbdcd\u20ac");
+    Text line = new Text();
+    in.readLine(line);
+    assertEquals("readLine changed utf8 characters", 
+                 "abcd\u20acbdcd\u20ac", line.toString());
+    in = makeStream("abc\u200axyz");
+    in.readLine(line);
+    assertEquals("split on fake newline", "abc\u200axyz", line.toString());
+  }
+
+  public void testNewLines() throws Exception {
+    LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+    Text out = new Text();
+    in.readLine(out);
+    assertEquals("line1 length", 1, out.getLength());
+    in.readLine(out);
+    assertEquals("line2 length", 2, out.getLength());
+    in.readLine(out);
+    assertEquals("line3 length", 0, out.getLength());
+    in.readLine(out);
+    assertEquals("line4 length", 3, out.getLength());
+    in.readLine(out);
+    assertEquals("line5 length", 4, out.getLength());
+    in.readLine(out);
+    assertEquals("line5 length", 5, out.getLength());
+    assertEquals("end of file", 0, in.readLine(out));
+  }
+  
+  private static void writeFile(FileSystem fs, Path name, 
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+  
+  private static List<Text> readSplit(KeyValueTextInputFormat format, 
+                                      InputSplit split, 
+                                      TaskAttemptContext context) throws IOException, InterruptedException {
+    List<Text> result = new ArrayList<Text>();
+    RecordReader<Text, Text> reader = format.createRecordReader(split, context);
+    reader.initialize(split, context);
+    while (reader.nextKeyValue()) {
+      result.add(new Text(reader.getCurrentValue()));
+    }
+    return result;
+  }
+  
+  /**
+   * Test using the gzip codec for reading
+   */
+  public static void testGzip() throws Exception {
+    Job job = new Job();
+    CompressionCodec gzip = new GzipCodec();
+    ReflectionUtils.setConf(gzip, job.getConfiguration());
+    localFs.delete(workDir, true);
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
+              "line-1\tthe quick\nline-2\tbrown\nline-3\tfox jumped\nline-4\tover\nline-5\t the lazy\nline-6\t dog\n");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+              "line-1\tthis is a test\nline-1\tof gzip\n");
+    FileInputFormat.setInputPaths(job, workDir);
+    
+    KeyValueTextInputFormat format = new KeyValueTextInputFormat();
+    JobContext jobContext = new JobContext(job.getConfiguration(), new JobID());
+    TaskAttemptContext context = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID());
+    List<InputSplit> splits = format.getSplits(jobContext);
+    assertEquals("compressed splits == 2", 2, splits.size());
+    FileSplit tmp = (FileSplit) splits.get(0);
+    if (tmp.getPath().getName().equals("part2.txt.gz")) {
+      splits.set(0, splits.get(1));
+      splits.set(1, tmp);
+    }
+    List<Text> results = readSplit(format, splits.get(0), context);
+    assertEquals("splits[0] length", 6, results.size());
+    assertEquals("splits[0][5]", " dog", results.get(5).toString());
+    results = readSplit(format, splits.get(1), context);
+    assertEquals("splits[1] length", 2, results.size());
+    assertEquals("splits[1][0]", "this is a test", 
+                 results.get(0).toString());    
+    assertEquals("splits[1][1]", "of gzip", 
+                 results.get(1).toString());    
+  }
+  
+  public static void main(String[] args) throws Exception {
+    new TestKeyValueTextInputFormat().testFormat();
+  }
+}

Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message