Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3549A97C0 for ; Tue, 24 Jan 2012 23:22:54 +0000 (UTC) Received: (qmail 32005 invoked by uid 500); 24 Jan 2012 23:22:53 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 31872 invoked by uid 500); 24 Jan 2012 23:22:53 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 31731 invoked by uid 99); 24 Jan 2012 23:22:53 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2012 23:22:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2012 23:22:49 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 49C4D2388A67 for ; Tue, 24 Jan 2012 23:22:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1235548 [6/8] - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/ src/mapred/org/apache/hadoop/mapreduce/ src/mapred/org/apache/hadoop/mapreduce/lib/db/ src/mapred/org/apache/hadoop/map... Date: Tue, 24 Jan 2012 23:22:01 -0000 To: common-commits@hadoop.apache.org From: tomwhite@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120124232205.49C4D2388A67@eris.apache.org> Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java?rev=1235548&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java Tue Jan 24 23:21:58 2012 @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.db; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +public class TestIntegerSplitter extends TestCase { + private long [] toLongArray(List in) { + long [] out = new long[in.size()]; + for (int i = 0; i < in.size(); i++) { + out[i] = in.get(i).longValue(); + } + + return out; + } + + public String formatLongArray(long [] ar) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + boolean first = true; + for (long val : ar) { + if (!first) { + sb.append(", "); + } + + sb.append(Long.toString(val)); + first = false; + } + + sb.append("]"); + return sb.toString(); + } + + public void assertLongArrayEquals(long [] expected, long [] actual) { + for (int i = 0; i < expected.length; i++) { + try { + assertEquals("Failure at position " + i + "; got " + actual[i] + + " instead of " + expected[i] + "; actual array is " + formatLongArray(actual), + expected[i], actual[i]); + } catch (ArrayIndexOutOfBoundsException oob) { + fail("Expected array with " + expected.length + " elements; got " + actual.length + + ". Actual array is " + formatLongArray(actual)); + } + } + + if (actual.length > expected.length) { + fail("Actual array has " + actual.length + " elements; expected " + expected.length + + ". ACtual array is " + formatLongArray(actual)); + } + } + + public void testEvenSplits() throws SQLException { + List splits = new IntegerSplitter().split(10, 0, 100); + long [] expected = { 0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100 }; + assertLongArrayEquals(expected, toLongArray(splits)); + } + + public void testOddSplits() throws SQLException { + List splits = new IntegerSplitter().split(10, 0, 95); + long [] expected = { 0, 9, 18, 27, 36, 45, 54, 63, 72, 81, 90, 95 }; + assertLongArrayEquals(expected, toLongArray(splits)); + + } + + public void testSingletonSplit() throws SQLException { + List splits = new IntegerSplitter().split(1, 5, 5); + long [] expected = { 5, 5 }; + assertLongArrayEquals(expected, toLongArray(splits)); + } + + public void testSingletonSplit2() throws SQLException { + // Same test, but overly-high numSplits + List splits = new IntegerSplitter().split(5, 5, 5); + long [] expected = { 5, 5 }; + assertLongArrayEquals(expected, toLongArray(splits)); + } + + public void testTooManySplits() throws SQLException { + List splits = new IntegerSplitter().split(5, 3, 5); + long [] expected = { 3, 4, 5 }; + assertLongArrayEquals(expected, toLongArray(splits)); + } + +} + Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestIntegerSplitter.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java?rev=1235548&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java Tue Jan 24 23:21:58 2012 @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.db; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import junit.framework.TestCase; + +public class TestTextSplitter extends TestCase { + + public String formatArray(Object [] ar) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + boolean first = true; + for (Object val : ar) { + if (!first) { + sb.append(", "); + } + + sb.append(val.toString()); + first = false; + } + + sb.append("]"); + return sb.toString(); + } + + public void assertArrayEquals(Object [] expected, Object [] actual) { + for (int i = 0; i < expected.length; i++) { + try { + assertEquals("Failure at position " + i + "; got " + actual[i] + + " instead of " + expected[i] + "; actual array is " + formatArray(actual), + expected[i], actual[i]); + } catch (ArrayIndexOutOfBoundsException oob) { + fail("Expected array with " + expected.length + " elements; got " + actual.length + + ". Actual array is " + formatArray(actual)); + } + } + + if (actual.length > expected.length) { + fail("Actual array has " + actual.length + " elements; expected " + expected.length + + ". Actual array is " + formatArray(actual)); + } + } + + public void testStringConvertEmpty() { + TextSplitter splitter = new TextSplitter(); + BigDecimal emptyBigDec = splitter.stringToBigDecimal(""); + assertEquals(BigDecimal.ZERO, emptyBigDec); + } + + public void testBigDecConvertEmpty() { + TextSplitter splitter = new TextSplitter(); + String emptyStr = splitter.bigDecimalToString(BigDecimal.ZERO); + assertEquals("", emptyStr); + } + + public void testConvertA() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("A")); + assertEquals("A", out); + } + + public void testConvertZ() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("Z")); + assertEquals("Z", out); + } + + public void testConvertThreeChars() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("abc")); + assertEquals("abc", out); + } + + public void testConvertStr() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("big str")); + assertEquals("big str", out); + } + + public void testConvertChomped() { + TextSplitter splitter = new TextSplitter(); + String out = splitter.bigDecimalToString(splitter.stringToBigDecimal("AVeryLongStringIndeed")); + assertEquals("AVeryLon", out); + } + + public void testAlphabetSplit() throws SQLException { + // This should give us 25 splits, one per letter. + TextSplitter splitter = new TextSplitter(); + List splits = splitter.split(25, "A", "Z", ""); + String [] expected = { "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", + "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z" }; + assertArrayEquals(expected, splits.toArray(new String [0])); + } + + public void testCommonPrefix() throws SQLException { + // Splits between 'Hand' and 'Hardy' + TextSplitter splitter = new TextSplitter(); + List splits = splitter.split(5, "nd", "rdy", "Ha"); + // Don't check for exact values in the middle, because the splitter generates some + // ugly Unicode-isms. But do check that we get multiple splits and that it starts + // and ends on the correct points. + assertEquals("Hand", splits.get(0)); + assertEquals("Hardy", splits.get(splits.size() -1)); + assertEquals(6, splits.size()); + } +} + Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/db/TestTextSplitter.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java?rev=1235548&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java Tue Jan 24 23:21:58 2012 @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.fieldsel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; + +import junit.framework.TestCase; +import java.text.NumberFormat; + +public class TestMRFieldSelection extends TestCase { + +private static NumberFormat idFormat = NumberFormat.getInstance(); + static { + idFormat.setMinimumIntegerDigits(4); + idFormat.setGroupingUsed(false); + } + + public void testFieldSelection() throws Exception { + launch(); + } + private static Path testDir = new Path( + System.getProperty("test.build.data", "/tmp"), "field"); + + public static void launch() throws Exception { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + int numOfInputLines = 10; + + Path outDir = new Path(testDir, "output_for_field_selection_test"); + Path inDir = new Path(testDir, "input_for_field_selection_test"); + + StringBuffer inputData = new StringBuffer(); + StringBuffer expectedOutput = new StringBuffer(); + constructInputOutputData(inputData, expectedOutput, numOfInputLines); + + conf.set(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "-"); + conf.set(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "6,5,1-3:0-"); + conf.set( + FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, ":4,3,2,1,0,0-"); + Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, + 1, 1, inputData.toString()); + job.setMapperClass(FieldSelectionMapper.class); + job.setReducerClass(FieldSelectionReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(1); + + job.waitForCompletion(true); + assertTrue("Job Failed!", job.isSuccessful()); + + // + // Finally, we compare the reconstructed answer key with the + // original one. Remember, we need to ignore zero-count items + // in the original key. + // + String outdata = MapReduceTestUtil.readOutput(outDir, conf); + assertEquals("Outputs doesnt match.",expectedOutput.toString(), outdata); + fs.delete(outDir, true); + } + + public static void constructInputOutputData(StringBuffer inputData, + StringBuffer expectedOutput, int numOfInputLines) { + for (int i = 0; i < numOfInputLines; i++) { + inputData.append(idFormat.format(i)); + inputData.append("-").append(idFormat.format(i+1)); + inputData.append("-").append(idFormat.format(i+2)); + inputData.append("-").append(idFormat.format(i+3)); + inputData.append("-").append(idFormat.format(i+4)); + inputData.append("-").append(idFormat.format(i+5)); + inputData.append("-").append(idFormat.format(i+6)); + inputData.append("\n"); + + expectedOutput.append(idFormat.format(i+3)); + expectedOutput.append("-" ).append (idFormat.format(i+2)); + expectedOutput.append("-" ).append (idFormat.format(i+1)); + expectedOutput.append("-" ).append (idFormat.format(i+5)); + expectedOutput.append("-" ).append (idFormat.format(i+6)); + + expectedOutput.append("-" ).append (idFormat.format(i+6)); + expectedOutput.append("-" ).append (idFormat.format(i+5)); + expectedOutput.append("-" ).append (idFormat.format(i+1)); + expectedOutput.append("-" ).append (idFormat.format(i+2)); + expectedOutput.append("-" ).append (idFormat.format(i+3)); + expectedOutput.append("-" ).append (idFormat.format(i+0)); + expectedOutput.append("-" ).append (idFormat.format(i+1)); + expectedOutput.append("-" ).append (idFormat.format(i+2)); + expectedOutput.append("-" ).append (idFormat.format(i+3)); + expectedOutput.append("-" ).append (idFormat.format(i+4)); + expectedOutput.append("-" ).append (idFormat.format(i+5)); + expectedOutput.append("-" ).append (idFormat.format(i+6)); + expectedOutput.append("\n"); + } + System.out.println("inputData:"); + System.out.println(inputData.toString()); + System.out.println("ExpectedData:"); + System.out.println(expectedOutput.toString()); + } + + /** + * Launches all the tasks in order. + */ + public static void main(String[] argv) throws Exception { + launch(); + } +} Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/fieldsel/TestMRFieldSelection.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1235548&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Tue Jan 24 23:21:58 2012 @@ -0,0 +1,1174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.List; +import java.util.ArrayList; +import java.util.zip.GZIPOutputStream; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; + +public class TestCombineFileInputFormat extends TestCase { + + private static final String rack1[] = new String[] { + "/r1" + }; + private static final String hosts1[] = new String[] { + "host1.rack1.com" + }; + private static final String rack2[] = new String[] { + "/r2" + }; + private static final String hosts2[] = new String[] { + "host2.rack2.com" + }; + private static final String rack3[] = new String[] { + "/r3" + }; + private static final String hosts3[] = new String[] { + "host3.rack3.com" + }; + final Path inDir = new Path("/racktesting"); + final Path outputPath = new Path("/output"); + final Path dir1 = new Path(inDir, "/dir1"); + final Path dir2 = new Path(inDir, "/dir2"); + final Path dir3 = new Path(inDir, "/dir3"); + final Path dir4 = new Path(inDir, "/dir4"); + final Path dir5 = new Path(inDir, "/dir5"); + + static final int BLOCKSIZE = 1024; + static final byte[] databuf = new byte[BLOCKSIZE]; + + /** Dummy class to extend CombineFileInputFormat*/ + private class DummyInputFormat extends CombineFileInputFormat { + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException { + return null; + } + } + + /** Dummy class to extend CombineFileInputFormat. It allows + * non-existent files to be passed into the CombineFileInputFormat, allows + * for easy testing without having to create real files. + */ + private class DummyInputFormat1 extends DummyInputFormat { + @Override + protected List listStatus(JobContext job) throws IOException { + Path[] files = getInputPaths(job); + List results = new ArrayList(); + for (int i = 0; i < files.length; i++) { + Path p = files[i]; + FileSystem fs = p.getFileSystem(job.getConfiguration()); + results.add(fs.getFileStatus(p)); + } + return results; + } + } + + /** Dummy class to extend CombineFileInputFormat. It allows + * testing with files having missing blocks without actually removing replicas. + */ + public static class MissingBlockFileSystem extends DistributedFileSystem { + String fileWithMissingBlocks; + + @Override + public void initialize(URI name, Configuration conf) throws IOException { + fileWithMissingBlocks = ""; + super.initialize(name, conf); + } + + @Override + public BlockLocation[] getFileBlockLocations( + FileStatus stat, long start, long len) throws IOException { + if (stat.isDir()) { + return null; + } + System.out.println("File " + stat.getPath()); + String name = stat.getPath().toUri().getPath(); + BlockLocation[] locs = + super.getFileBlockLocations(stat, start, len); + if (name.equals(fileWithMissingBlocks)) { + System.out.println("Returing missing blocks for " + fileWithMissingBlocks); + locs[0] = new BlockLocation(new String[0], new String[0], + locs[0].getOffset(), locs[0].getLength()); + } + return locs; + } + + public void setFileWithMissingBlocks(String f) { + fileWithMissingBlocks = f; + } + } + + private static final String DUMMY_KEY = "dummy.rr.key"; + + private static class DummyRecordReader extends RecordReader { + private TaskAttemptContext context; + private CombineFileSplit s; + private int idx; + private boolean used; + + public DummyRecordReader(CombineFileSplit split, TaskAttemptContext context, + Integer i) { + this.context = context; + this.idx = i; + this.s = split; + this.used = true; + } + + /** @return a value specified in the context to check whether the + * context is properly updated by the initialize() method. + */ + public String getDummyConfVal() { + return this.context.getConfiguration().get(DUMMY_KEY); + } + + public void initialize(InputSplit split, TaskAttemptContext context) { + this.context = context; + this.s = (CombineFileSplit) split; + + // By setting used to true in the c'tor, but false in initialize, + // we can check that initialize() is always called before use + // (e.g., in testReinit()). + this.used = false; + } + + public boolean nextKeyValue() { + boolean ret = !used; + this.used = true; + return ret; + } + + public Text getCurrentKey() { + return new Text(this.context.getConfiguration().get(DUMMY_KEY)); + } + + public Text getCurrentValue() { + return new Text(this.s.getPath(idx).toString()); + } + + public float getProgress() { + return used ? 1.0f : 0.0f; + } + + public void close() { + } + } + + /** Extend CFIF to use CFRR with DummyRecordReader */ + private class ChildRRInputFormat extends CombineFileInputFormat { + @SuppressWarnings("unchecked") + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException { + return new CombineFileRecordReader((CombineFileSplit) split, context, + (Class) DummyRecordReader.class); + } + } + + public void testRecordReaderInit() throws InterruptedException, IOException { + // Test that we properly initialize the child recordreader when + // CombineFileInputFormat and CombineFileRecordReader are used. + + TaskAttemptID taskId = new TaskAttemptID("jt", 0, true, 0, 0); + Configuration conf1 = new Configuration(); + conf1.set(DUMMY_KEY, "STATE1"); + TaskAttemptContext context1 = new TaskAttemptContext(conf1, taskId); + + // This will create a CombineFileRecordReader that itself contains a + // DummyRecordReader. + InputFormat inputFormat = new ChildRRInputFormat(); + + Path [] files = { new Path("file1") }; + long [] lengths = { 1 }; + + CombineFileSplit split = new CombineFileSplit(files, lengths); + + RecordReader rr = inputFormat.createRecordReader(split, context1); + assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader); + + // Verify that the initial configuration is the one being used. + // Right after construction the dummy key should have value "STATE1" + assertEquals("Invalid initial dummy key value", "STATE1", + rr.getCurrentKey().toString()); + + // Switch the active context for the RecordReader... + Configuration conf2 = new Configuration(); + conf2.set(DUMMY_KEY, "STATE2"); + TaskAttemptContext context2 = new TaskAttemptContext(conf2, taskId); + rr.initialize(split, context2); + + // And verify that the new context is updated into the child record reader. + assertEquals("Invalid secondary dummy key value", "STATE2", + rr.getCurrentKey().toString()); + } + + public void testReinit() throws Exception { + // Test that a split containing multiple files works correctly, + // with the child RecordReader getting its initialize() method + // called a second time. + TaskAttemptID taskId = new TaskAttemptID("jt", 0, true, 0, 0); + Configuration conf = new Configuration(); + TaskAttemptContext context = new TaskAttemptContext(conf, taskId); + + // This will create a CombineFileRecordReader that itself contains a + // DummyRecordReader. + InputFormat inputFormat = new ChildRRInputFormat(); + + Path [] files = { new Path("file1"), new Path("file2") }; + long [] lengths = { 1, 1 }; + + CombineFileSplit split = new CombineFileSplit(files, lengths); + RecordReader rr = inputFormat.createRecordReader(split, context); + assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader); + + // first initialize() call comes from MapTask. We'll do it here. + rr.initialize(split, context); + + // First value is first filename. + assertTrue(rr.nextKeyValue()); + assertEquals("file1", rr.getCurrentValue().toString()); + + // The inner RR will return false, because it only emits one (k, v) pair. + // But there's another sub-split to process. This returns true to us. + assertTrue(rr.nextKeyValue()); + + // And the 2nd rr will have its initialize method called correctly. + assertEquals("file2", rr.getCurrentValue().toString()); + + // But after both child RR's have returned their singleton (k, v), this + // should also return false. + assertFalse(rr.nextKeyValue()); + } + + public void testSplitPlacement() throws IOException { + MiniDFSCluster dfs = null; + FileSystem fileSys = null; + try { + /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files + * 1) file1 and file5, just after starting the datanode on r1, with + * a repl factor of 1, and, + * 2) file2, just after starting the datanode on r2, with + * a repl factor of 2, and, + * 3) file3, file4 after starting the all three datanodes, with a repl + * factor of 3. + * At the end, file1, file5 will be present on only datanode1, file2 will + * be present on datanode 1 and datanode2 and + * file3, file4 will be present on all datanodes. + */ + Configuration conf = new Configuration(); + conf.setBoolean("dfs.replication.considerLoad", false); + dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1); + dfs.waitActive(); + + fileSys = dfs.getFileSystem(); + if (!fileSys.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + Path file1 = new Path(dir1 + "/file1"); + writeFile(conf, file1, (short)1, 1); + // create another file on the same datanode + Path file5 = new Path(dir5 + "/file5"); + writeFile(conf, file5, (short)1, 1); + // split it using a CombinedFile input format + DummyInputFormat inFormat = new DummyInputFormat(); + Job job = new Job(conf); + FileInputFormat.setInputPaths(job, dir1 + "," + dir5); + List splits = inFormat.getSplits(job); + System.out.println("Made splits(Test0): " + splits.size()); + for (InputSplit split : splits) { + System.out.println("File split(Test0): " + split); + } + assertEquals(splits.size(), 1); + CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file5.getName(), fileSplit.getPath(1).getName()); + assertEquals(0, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + + dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null); + dfs.waitActive(); + + // create file on two datanodes. + Path file2 = new Path(dir2 + "/file2"); + writeFile(conf, file2, (short)2, 2); + + // split it using a CombinedFile input format + inFormat = new DummyInputFormat(); + FileInputFormat.setInputPaths(job, dir1 + "," + dir2); + inFormat.setMinSplitSizeRack(BLOCKSIZE); + splits = inFormat.getSplits(job); + System.out.println("Made splits(Test1): " + splits.size()); + + // make sure that each split has different locations + for (InputSplit split : splits) { + System.out.println("File split(Test1): " + split); + } + assertEquals(splits.size(), 2); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(fileSplit.getNumPaths(), 2); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(fileSplit.getNumPaths(), 1); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file1.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 + + // create another file on 3 datanodes and 3 racks. + dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null); + dfs.waitActive(); + Path file3 = new Path(dir3 + "/file3"); + writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3); + inFormat = new DummyInputFormat(); + FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3); + inFormat.setMinSplitSizeRack(BLOCKSIZE); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test2): " + split); + } + assertEquals(splits.size(), 3); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(fileSplit.getNumPaths(), 3); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getPath(2).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); + assertEquals(fileSplit.getLength(2), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(fileSplit.getNumPaths(), 2); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(fileSplit.getNumPaths(), 1); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file1.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 + + // create file4 on all three racks + Path file4 = new Path(dir4 + "/file4"); + writeFile(conf, file4, (short)3, 3); + inFormat = new DummyInputFormat(); + FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4); + inFormat.setMinSplitSizeRack(BLOCKSIZE); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test3): " + split); + } + assertEquals(splits.size(), 3); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(fileSplit.getNumPaths(), 6); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getPath(2).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); + assertEquals(fileSplit.getLength(2), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(fileSplit.getNumPaths(), 2); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(fileSplit.getNumPaths(), 1); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file1.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 + + // maximum split size is 2 blocks + inFormat = new DummyInputFormat(); + inFormat.setMinSplitSizeNode(BLOCKSIZE); + inFormat.setMaxSplitSize(2*BLOCKSIZE); + FileInputFormat.setInputPaths(job, + dir1 + "," + dir2 + "," + dir3 + "," + dir4); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test4): " + split); + } + assertEquals(splits.size(), 5); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(fileSplit.getNumPaths(), 2); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(fileSplit.getPath(0).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(0), 2 * BLOCKSIZE); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file4.getName()); + assertEquals(fileSplit.getOffset(1), 0); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(fileSplit.getNumPaths(), 2); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file4.getName()); + assertEquals(fileSplit.getOffset(0), BLOCKSIZE); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file4.getName()); + assertEquals(fileSplit.getOffset(1), 2 * BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); + + // maximum split size is 3 blocks + inFormat = new DummyInputFormat(); + inFormat.setMinSplitSizeNode(BLOCKSIZE); + inFormat.setMaxSplitSize(3*BLOCKSIZE); + FileInputFormat.setInputPaths(job, + dir1 + "," + dir2 + "," + dir3 + "," + dir4); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test5): " + split); + } + assertEquals(splits.size(), 4); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(fileSplit.getNumPaths(), 3); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getPath(2).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); + assertEquals(fileSplit.getLength(2), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(fileSplit.getPath(0).getName(), file4.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file4.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getPath(2).getName(), file4.getName()); + assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); + assertEquals(fileSplit.getLength(2), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(fileSplit.getNumPaths(), 2); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], "host2.rack2.com"); + fileSplit = (CombineFileSplit) splits.get(3); + assertEquals(fileSplit.getNumPaths(), 1); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file1.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], "host1.rack1.com"); + + // maximum split size is 4 blocks + inFormat = new DummyInputFormat(); + inFormat.setMaxSplitSize(4*BLOCKSIZE); + FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3 + "," + dir4); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test6): " + split); + } + assertEquals(splits.size(), 3); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(fileSplit.getNumPaths(), 4); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getPath(2).getName(), file3.getName()); + assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); + assertEquals(fileSplit.getLength(2), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(fileSplit.getNumPaths(), 4); + assertEquals(fileSplit.getPath(0).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getPath(1).getName(), file2.getName()); + assertEquals(fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(fileSplit.getLength(1), BLOCKSIZE); + assertEquals(fileSplit.getPath(2).getName(), file4.getName()); + assertEquals(fileSplit.getOffset(2), BLOCKSIZE); + assertEquals(fileSplit.getLength(2), BLOCKSIZE); + assertEquals(fileSplit.getPath(3).getName(), file4.getName()); + assertEquals(fileSplit.getOffset(3), 2 * BLOCKSIZE); + assertEquals(fileSplit.getLength(3), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], "host2.rack2.com"); + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(fileSplit.getNumPaths(), 1); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getPath(0).getName(), file1.getName()); + assertEquals(fileSplit.getOffset(0), 0); + assertEquals(fileSplit.getLength(0), BLOCKSIZE); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 + + // maximum split size is 7 blocks and min is 3 blocks + inFormat = new DummyInputFormat(); + inFormat.setMaxSplitSize(7*BLOCKSIZE); + inFormat.setMinSplitSizeNode(3*BLOCKSIZE); + inFormat.setMinSplitSizeRack(3*BLOCKSIZE); + FileInputFormat.setInputPaths(job, + dir1 + "," + dir2 + "," + dir3 + "," + dir4); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test7): " + split); + } + assertEquals(splits.size(), 2); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(fileSplit.getNumPaths(), 6); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getLocations()[0], "host3.rack3.com"); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(fileSplit.getNumPaths(), 3); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getLocations()[0], "host1.rack1.com"); + + // Rack 1 has file1, file2 and file3 and file4 + // Rack 2 has file2 and file3 and file4 + // Rack 3 has file3 and file4 + // setup a filter so that only file1 and file2 can be combined + inFormat = new DummyInputFormat(); + FileInputFormat.addInputPath(job, inDir); + inFormat.setMinSplitSizeRack(1); // everything is at least rack local + inFormat.createPool(new TestFilter(dir1), + new TestFilter(dir2)); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test1): " + split); + } + assertEquals(splits.size(), 3); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(fileSplit.getNumPaths(), 2); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(fileSplit.getNumPaths(), 1); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(fileSplit.getNumPaths(), 6); + assertEquals(fileSplit.getLocations().length, 1); + assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 + + // measure performance when there are multiple pools and + // many files in each pool. + int numPools = 100; + int numFiles = 1000; + DummyInputFormat1 inFormat1 = new DummyInputFormat1(); + for (int i = 0; i < numFiles; i++) { + FileInputFormat.setInputPaths(job, file1); + } + inFormat1.setMinSplitSizeRack(1); // everything is at least rack local + final Path dirNoMatch1 = new Path(inDir, "/dirxx"); + final Path dirNoMatch2 = new Path(inDir, "/diryy"); + for (int i = 0; i < numPools; i++) { + inFormat1.createPool(new TestFilter(dirNoMatch1), + new TestFilter(dirNoMatch2)); + } + long start = System.currentTimeMillis(); + splits = inFormat1.getSplits(job); + long end = System.currentTimeMillis(); + System.out.println("Elapsed time for " + numPools + " pools " + + " and " + numFiles + " files is " + + ((end - start)/1000) + " seconds."); + + // This file has three whole blocks. If the maxsplit size is + // half the block size, then there should be six splits. + inFormat = new DummyInputFormat(); + inFormat.setMaxSplitSize(BLOCKSIZE/2); + FileInputFormat.setInputPaths(job, dir3); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test8): " + split); + } + assertEquals(6, splits.size()); + + } finally { + if (dfs != null) { + dfs.shutdown(); + } + } + } + + static void writeFile(Configuration conf, Path name, + short replication, int numBlocks) throws IOException { + FileSystem fileSys = FileSystem.get(conf); + + FSDataOutputStream stm = fileSys.create(name, true, + conf.getInt("io.file.buffer.size", 4096), + replication, (long)BLOCKSIZE); + writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks); + } + + // Creates the gzip file and return the FileStatus + static FileStatus writeGzipFile(Configuration conf, Path name, + short replication, int numBlocks) throws IOException { + FileSystem fileSys = FileSystem.get(conf); + + GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf + .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE)); + writeDataAndSetReplication(fileSys, name, out, replication, numBlocks); + return fileSys.getFileStatus(name); + } + + private static void writeDataAndSetReplication(FileSystem fileSys, Path name, + OutputStream out, short replication, int numBlocks) throws IOException { + for (int i = 0; i < numBlocks; i++) { + out.write(databuf); + } + out.close(); + DFSTestUtil.waitReplication(fileSys, name, replication); + } + + public void testSplitPlacementForCompressedFiles() throws IOException { + MiniDFSCluster dfs = null; + FileSystem fileSys = null; + try { + /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped + * files + * 1) file1 and file5, just after starting the datanode on r1, with + * a repl factor of 1, and, + * 2) file2, just after starting the datanode on r2, with + * a repl factor of 2, and, + * 3) file3, file4 after starting the all three datanodes, with a repl + * factor of 3. + * At the end, file1, file5 will be present on only datanode1, file2 will + * be present on datanode 1 and datanode2 and + * file3, file4 will be present on all datanodes. + */ + Configuration conf = new Configuration(); + conf.setBoolean("dfs.replication.considerLoad", false); + dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1); + dfs.waitActive(); + + fileSys = dfs.getFileSystem(); + if (!fileSys.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + Path file1 = new Path(dir1 + "/file1.gz"); + FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1); + // create another file on the same datanode + Path file5 = new Path(dir5 + "/file5.gz"); + FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1); + // split it using a CombinedFile input format + DummyInputFormat inFormat = new DummyInputFormat(); + Job job = new Job(conf); + FileInputFormat.setInputPaths(job, dir1 + "," + dir5); + List splits = inFormat.getSplits(job); + System.out.println("Made splits(Test0): " + splits.size()); + for (InputSplit split : splits) { + System.out.println("File split(Test0): " + split); + } + assertEquals(splits.size(), 1); + CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(file5.getName(), fileSplit.getPath(1).getName()); + assertEquals(0, fileSplit.getOffset(1)); + assertEquals(f5.getLen(), fileSplit.getLength(1)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + + dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null); + dfs.waitActive(); + + // create file on two datanodes. + Path file2 = new Path(dir2 + "/file2.gz"); + FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2); + + // split it using a CombinedFile input format + inFormat = new DummyInputFormat(); + FileInputFormat.setInputPaths(job, dir1 + "," + dir2); + inFormat.setMinSplitSizeRack(f1.getLen()); + splits = inFormat.getSplits(job); + System.out.println("Made splits(Test1): " + splits.size()); + + // make sure that each split has different locations + for (InputSplit split : splits) { + System.out.println("File split(Test1): " + split); + } + assertEquals(2, splits.size()); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f2.getLen(), fileSplit.getLength(0)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + // create another file on 3 datanodes and 3 racks. + dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null); + dfs.waitActive(); + Path file3 = new Path(dir3 + "/file3.gz"); + FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3); + inFormat = new DummyInputFormat(); + FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3); + inFormat.setMinSplitSizeRack(f1.getLen()); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test2): " + split); + } + assertEquals(3, splits.size()); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f3.getLen(), fileSplit.getLength(0)); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f2.getLen(), fileSplit.getLength(0)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + // create file4 on all three racks + Path file4 = new Path(dir4 + "/file4.gz"); + FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3); + inFormat = new DummyInputFormat(); + FileInputFormat.setInputPaths(job, + dir1 + "," + dir2 + "," + dir3 + "," + dir4); + inFormat.setMinSplitSizeRack(f1.getLen()); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test3): " + split); + } + assertEquals(3, splits.size()); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f3.getLen(), fileSplit.getLength(0)); + assertEquals(file4.getName(), fileSplit.getPath(1).getName()); + assertEquals(0, fileSplit.getOffset(1)); + assertEquals(f4.getLen(), fileSplit.getLength(1)); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f2.getLen(), fileSplit.getLength(0)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + // maximum split size is file1's length + inFormat = new DummyInputFormat(); + inFormat.setMinSplitSizeNode(f1.getLen()); + inFormat.setMaxSplitSize(f1.getLen()); + FileInputFormat.setInputPaths(job, + dir1 + "," + dir2 + "," + dir3 + "," + dir4); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test4): " + split); + } + assertEquals(4, splits.size()); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f3.getLen(), fileSplit.getLength(0)); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(file4.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f4.getLen(), fileSplit.getLength(0)); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f2.getLen(), fileSplit.getLength(0)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(3); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + // maximum split size is twice file1's length + inFormat = new DummyInputFormat(); + inFormat.setMinSplitSizeNode(f1.getLen()); + inFormat.setMaxSplitSize(2 * f1.getLen()); + FileInputFormat.setInputPaths(job, + dir1 + "," + dir2 + "," + dir3 + "," + dir4); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test5): " + split); + } + assertEquals(3, splits.size()); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f3.getLen(), fileSplit.getLength(0)); + assertEquals(file4.getName(), fileSplit.getPath(1).getName()); + assertEquals(0, fileSplit.getOffset(1)); + assertEquals(f4.getLen(), fileSplit.getLength(1)); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f2.getLen(), fileSplit.getLength(0)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + // maximum split size is 4 times file1's length + inFormat = new DummyInputFormat(); + inFormat.setMinSplitSizeNode(2 * f1.getLen()); + inFormat.setMaxSplitSize(4 * f1.getLen()); + FileInputFormat.setInputPaths(job, + dir1 + "," + dir2 + "," + dir3 + "," + dir4); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test6): " + split); + } + assertEquals(2, splits.size()); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f3.getLen(), fileSplit.getLength(0)); + assertEquals(file4.getName(), fileSplit.getPath(1).getName()); + assertEquals(0, fileSplit.getOffset(1)); + assertEquals(f4.getLen(), fileSplit.getLength(1)); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE); + assertEquals(f2.getLen(), fileSplit.getLength(1)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + // maximum split size and min-split-size per rack is 4 times file1's length + inFormat = new DummyInputFormat(); + inFormat.setMaxSplitSize(4 * f1.getLen()); + inFormat.setMinSplitSizeRack(4 * f1.getLen()); + FileInputFormat.setInputPaths(job, + dir1 + "," + dir2 + "," + dir3 + "," + dir4); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test7): " + split); + } + assertEquals(1, splits.size()); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(4, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + + // minimum split size per node is 4 times file1's length + inFormat = new DummyInputFormat(); + inFormat.setMinSplitSizeNode(4 * f1.getLen()); + FileInputFormat.setInputPaths(job, + dir1 + "," + dir2 + "," + dir3 + "," + dir4); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test8): " + split); + } + assertEquals(1, splits.size()); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(4, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + + // Rack 1 has file1, file2 and file3 and file4 + // Rack 2 has file2 and file3 and file4 + // Rack 3 has file3 and file4 + // setup a filter so that only file1 and file2 can be combined + inFormat = new DummyInputFormat(); + FileInputFormat.addInputPath(job, inDir); + inFormat.setMinSplitSizeRack(1); // everything is at least rack local + inFormat.createPool(new TestFilter(dir1), + new TestFilter(dir2)); + splits = inFormat.getSplits(job); + for (InputSplit split : splits) { + System.out.println("File split(Test9): " + split); + } + assertEquals(3, splits.size()); + fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 + fileSplit = (CombineFileSplit) splits.get(1); + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + fileSplit = (CombineFileSplit) splits.get(2); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 + + // measure performance when there are multiple pools and + // many files in each pool. + int numPools = 100; + int numFiles = 1000; + DummyInputFormat1 inFormat1 = new DummyInputFormat1(); + for (int i = 0; i < numFiles; i++) { + FileInputFormat.setInputPaths(job, file1); + } + inFormat1.setMinSplitSizeRack(1); // everything is at least rack local + final Path dirNoMatch1 = new Path(inDir, "/dirxx"); + final Path dirNoMatch2 = new Path(inDir, "/diryy"); + for (int i = 0; i < numPools; i++) { + inFormat1.createPool(new TestFilter(dirNoMatch1), + new TestFilter(dirNoMatch2)); + } + long start = System.currentTimeMillis(); + splits = inFormat1.getSplits(job); + long end = System.currentTimeMillis(); + System.out.println("Elapsed time for " + numPools + " pools " + + " and " + numFiles + " files is " + + ((end - start)) + " milli seconds."); + } finally { + if (dfs != null) { + dfs.shutdown(); + } + } + } + + /** + * Test that CFIF can handle missing blocks. + */ + public void testMissingBlocks() throws IOException { + String namenode = null; + MiniDFSCluster dfs = null; + FileSystem fileSys = null; + String testName = "testMissingBlocks"; + try { + Configuration conf = new Configuration(); + conf.set("fs.hdfs.impl", MissingBlockFileSystem.class.getName()); + conf.setBoolean("dfs.replication.considerLoad", false); + dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1); + dfs.waitActive(); + + namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + + (dfs.getFileSystem()).getUri().getPort(); + + fileSys = dfs.getFileSystem(); + if (!fileSys.mkdirs(inDir)) { + throw new IOException("Mkdirs failed to create " + inDir.toString()); + } + + Path file1 = new Path(dir1 + "/file1"); + writeFile(conf, file1, (short)1, 1); + // create another file on the same datanode + Path file5 = new Path(dir5 + "/file5"); + writeFile(conf, file5, (short)1, 1); + + ((MissingBlockFileSystem)fileSys).setFileWithMissingBlocks(file1.toUri().getPath()); + // split it using a CombinedFile input format + DummyInputFormat inFormat = new DummyInputFormat(); + Job job = new Job(conf); + FileInputFormat.setInputPaths(job, dir1 + "," + dir5); + List splits = inFormat.getSplits(job); + System.out.println("Made splits(Test0): " + splits.size()); + for (InputSplit split : splits) { + System.out.println("File split(Test0): " + split); + } + assertEquals(splits.size(), 1); + CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0); + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file5.getName(), fileSplit.getPath(1).getName()); + assertEquals(0, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + + } finally { + if (dfs != null) { + dfs.shutdown(); + } + } + } + + static class TestFilter implements PathFilter { + private Path p; + + // store a path prefix in this TestFilter + public TestFilter(Path p) { + this.p = p; + } + + // returns true if the specified path matches the prefix stored + // in this TestFilter. + public boolean accept(Path path) { + if (path.toString().indexOf(p.toString()) == 0) { + return true; + } + return false; + } + + public String toString() { + return "PathFilter:" + p; + } + } + + /* + * Prints out the input splits for the specified files + */ + private void splitRealFiles(String[] args) throws IOException { + Configuration conf = new Configuration(); + Job job = new Job(); + FileSystem fs = FileSystem.get(conf); + if (!(fs instanceof DistributedFileSystem)) { + throw new IOException("Wrong file system: " + fs.getClass().getName()); + } + int blockSize = conf.getInt("dfs.block.size", 128 * 1024 * 1024); + + DummyInputFormat inFormat = new DummyInputFormat(); + for (int i = 0; i < args.length; i++) { + FileInputFormat.addInputPaths(job, args[i]); + } + inFormat.setMinSplitSizeRack(blockSize); + inFormat.setMaxSplitSize(10 * blockSize); + + List splits = inFormat.getSplits(job); + System.out.println("Total number of splits " + splits.size()); + for (int i = 0; i < splits.size(); ++i) { + CombineFileSplit fileSplit = (CombineFileSplit) splits.get(i); + System.out.println("Split[" + i + "] " + fileSplit); + } + } + + public static void main(String[] args) throws Exception{ + + // if there are some parameters specified, then use those paths + if (args.length != 0) { + TestCombineFileInputFormat test = new TestCombineFileInputFormat(); + test.splitRealFiles(args); + } else { + TestCombineFileInputFormat test = new TestCombineFileInputFormat(); + test.testSplitPlacement(); + } + } +} Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java?rev=1235548&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java Tue Jan 24 23:21:58 2012 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; + +public class TestDelegatingInputFormat extends TestCase { + + @SuppressWarnings("unchecked") + public void testSplitting() throws Exception { + Job job = new Job(); + MiniDFSCluster dfs = null; + try { + dfs = new MiniDFSCluster(job.getConfiguration(), 4, true, new String[] { "/rack0", + "/rack0", "/rack1", "/rack1" }, new String[] { "host0", "host1", + "host2", "host3" }); + FileSystem fs = dfs.getFileSystem(); + + Path path = getPath("/foo/bar", fs); + Path path2 = getPath("/foo/baz", fs); + Path path3 = getPath("/bar/bar", fs); + Path path4 = getPath("/bar/baz", fs); + + final int numSplits = 100; + + FileInputFormat.setMaxInputSplitSize(job, + fs.getFileStatus(path).getLen() / numSplits); + MultipleInputs.addInputPath(job, path, TextInputFormat.class, + MapClass.class); + MultipleInputs.addInputPath(job, path2, TextInputFormat.class, + MapClass2.class); + MultipleInputs.addInputPath(job, path3, KeyValueTextInputFormat.class, + MapClass.class); + MultipleInputs.addInputPath(job, path4, TextInputFormat.class, + MapClass2.class); + DelegatingInputFormat inFormat = new DelegatingInputFormat(); + + int[] bins = new int[3]; + for (InputSplit split : (List)inFormat.getSplits(job)) { + assertTrue(split instanceof TaggedInputSplit); + final TaggedInputSplit tis = (TaggedInputSplit) split; + int index = -1; + + if (tis.getInputFormatClass().equals(KeyValueTextInputFormat.class)) { + // path3 + index = 0; + } else if (tis.getMapperClass().equals(MapClass.class)) { + // path + index = 1; + } else { + // path2 and path4 + index = 2; + } + + bins[index]++; + } + + assertEquals("count is not equal to num splits", numSplits, bins[0]); + assertEquals("count is not equal to num splits", numSplits, bins[1]); + assertEquals("count is not equal to 2 * num splits", + numSplits * 2, bins[2]); + } finally { + if (dfs != null) { + dfs.shutdown(); + } + } + } + + static Path getPath(final String location, final FileSystem fs) + throws IOException { + Path path = new Path(location); + + // create a multi-block file on hdfs + DataOutputStream out = fs.create(path, true, 4096, (short) 2, 512, null); + for (int i = 0; i < 1000; ++i) { + out.writeChars("Hello\n"); + } + out.close(); + + return path; + } + + static class MapClass extends Mapper { + } + + static class MapClass2 extends MapClass { + } + +} Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java?rev=1235548&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java Tue Jan 24 23:21:58 2012 @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.*; +import java.util.*; +import junit.framework.TestCase; + +import org.apache.commons.logging.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.util.LineReader; +import org.apache.hadoop.util.ReflectionUtils; + +public class TestKeyValueTextInputFormat extends TestCase { + private static final Log LOG = + LogFactory.getLog(TestKeyValueTextInputFormat.class.getName()); + + private static int MAX_LENGTH = 10000; + + private static Configuration defaultConf = new Configuration(); + private static FileSystem localFs = null; + static { + try { + localFs = FileSystem.getLocal(defaultConf); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + private static Path workDir = + new Path(new Path(System.getProperty("test.build.data", "."), "data"), + "TestKeyValueTextInputFormat"); + + public void testFormat() throws Exception { + Job job = new Job(defaultConf); + Path file = new Path(workDir, "test.txt"); + + int seed = new Random().nextInt(); + LOG.info("seed = "+seed); + Random random = new Random(seed); + + localFs.delete(workDir, true); + FileInputFormat.setInputPaths(job, workDir); + + // for a variety of lengths + for (int length = 0; length < MAX_LENGTH; + length+= random.nextInt(MAX_LENGTH/10)+1) { + + LOG.debug("creating; entries = " + length); + + // create a file with length entries + Writer writer = new OutputStreamWriter(localFs.create(file)); + try { + for (int i = 0; i < length; i++) { + writer.write(Integer.toString(i*2)); + writer.write("\t"); + writer.write(Integer.toString(i)); + writer.write("\n"); + } + } finally { + writer.close(); + } + + KeyValueTextInputFormat format = new KeyValueTextInputFormat(); + JobContext jobContext = new JobContext(job.getConfiguration(), new JobID()); + List splits = format.getSplits(jobContext); + LOG.debug("splitting: got = " + splits.size()); + + TaskAttemptContext context = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + + // check each split + BitSet bits = new BitSet(length); + for (InputSplit split : splits) { + LOG.debug("split= " + split); + RecordReader reader = + format.createRecordReader(split, context); + Class readerClass = reader.getClass(); + assertEquals("reader class is KeyValueLineRecordReader.", KeyValueLineRecordReader.class, readerClass); + + reader.initialize(split, context); + try { + int count = 0; + while (reader.nextKeyValue()) { + int v = Integer.parseInt(reader.getCurrentValue().toString()); + LOG.debug("read " + v); + if (bits.get(v)) { + LOG.warn("conflict with " + v + + " in split " + split + + " at "+reader.getProgress()); + } + assertFalse("Key in multiple partitions.", bits.get(v)); + bits.set(v); + count++; + } + LOG.debug("split="+split+" count=" + count); + } finally { + reader.close(); + } + } + assertEquals("Some keys in no partition.", length, bits.cardinality()); + + } + } + private LineReader makeStream(String str) throws IOException { + return new LineReader(new ByteArrayInputStream + (str.getBytes("UTF-8")), + defaultConf); + } + + public void testUTF8() throws Exception { + LineReader in = makeStream("abcd\u20acbdcd\u20ac"); + Text line = new Text(); + in.readLine(line); + assertEquals("readLine changed utf8 characters", + "abcd\u20acbdcd\u20ac", line.toString()); + in = makeStream("abc\u200axyz"); + in.readLine(line); + assertEquals("split on fake newline", "abc\u200axyz", line.toString()); + } + + public void testNewLines() throws Exception { + LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); + Text out = new Text(); + in.readLine(out); + assertEquals("line1 length", 1, out.getLength()); + in.readLine(out); + assertEquals("line2 length", 2, out.getLength()); + in.readLine(out); + assertEquals("line3 length", 0, out.getLength()); + in.readLine(out); + assertEquals("line4 length", 3, out.getLength()); + in.readLine(out); + assertEquals("line5 length", 4, out.getLength()); + in.readLine(out); + assertEquals("line5 length", 5, out.getLength()); + assertEquals("end of file", 0, in.readLine(out)); + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static List readSplit(KeyValueTextInputFormat format, + InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + List result = new ArrayList(); + RecordReader reader = format.createRecordReader(split, context); + reader.initialize(split, context); + while (reader.nextKeyValue()) { + result.add(new Text(reader.getCurrentValue())); + } + return result; + } + + /** + * Test using the gzip codec for reading + */ + public static void testGzip() throws Exception { + Job job = new Job(); + CompressionCodec gzip = new GzipCodec(); + ReflectionUtils.setConf(gzip, job.getConfiguration()); + localFs.delete(workDir, true); + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "line-1\tthe quick\nline-2\tbrown\nline-3\tfox jumped\nline-4\tover\nline-5\t the lazy\nline-6\t dog\n"); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "line-1\tthis is a test\nline-1\tof gzip\n"); + FileInputFormat.setInputPaths(job, workDir); + + KeyValueTextInputFormat format = new KeyValueTextInputFormat(); + JobContext jobContext = new JobContext(job.getConfiguration(), new JobID()); + TaskAttemptContext context = new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()); + List splits = format.getSplits(jobContext); + assertEquals("compressed splits == 2", 2, splits.size()); + FileSplit tmp = (FileSplit) splits.get(0); + if (tmp.getPath().getName().equals("part2.txt.gz")) { + splits.set(0, splits.get(1)); + splits.set(1, tmp); + } + List results = readSplit(format, splits.get(0), context); + assertEquals("splits[0] length", 6, results.size()); + assertEquals("splits[0][5]", " dog", results.get(5).toString()); + results = readSplit(format, splits.get(1), context); + assertEquals("splits[1] length", 2, results.size()); + assertEquals("splits[1][0]", "this is a test", + results.get(0).toString()); + assertEquals("splits[1][1]", "of gzip", + results.get(1).toString()); + } + + public static void main(String[] args) throws Exception { + new TestKeyValueTextInputFormat().testFormat(); + } +} Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/lib/input/TestKeyValueTextInputFormat.java ------------------------------------------------------------------------------ svn:eol-style = native