Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1F3D42289 for ; Tue, 3 May 2011 15:12:02 +0000 (UTC) Received: (qmail 67254 invoked by uid 500); 3 May 2011 15:12:01 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 67227 invoked by uid 500); 3 May 2011 15:12:01 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 67218 invoked by uid 99); 3 May 2011 15:12:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2011 15:12:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2011 15:11:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 631FE238896F; Tue, 3 May 2011 15:11:36 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1099088 - in /hadoop/common/branches/branch-0.20-security-203: CHANGES.txt src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Date: Tue, 03 May 2011 15:11:36 -0000 To: common-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110503151136.631FE238896F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Tue May 3 15:11:35 2011 New Revision: 1099088 URL: http://svn.apache.org/viewvc?rev=1099088&view=rev Log: HADOOP-5759. Fix for IllegalArgumentException when CombineFileInputFormat is used as job InputFormat. Contributed by Amareshwari Sriramadasu. Modified: hadoop/common/branches/branch-0.20-security-203/CHANGES.txt hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Modified: hadoop/common/branches/branch-0.20-security-203/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-203/CHANGES.txt?rev=1099088&r1=1099087&r2=1099088&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-203/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-security-203/CHANGES.txt Tue May 3 15:11:35 2011 @@ -1717,6 +1717,9 @@ Release 0.20.2 - Unreleased HADOOP-6269. Fix threading issue with defaultResource in Configuration. (Sreekanth Ramakrishnan via cdouglas) + HADOOP-5759. Fix for IllegalArgumentException when CombineFileInputFormat + is used as job InputFormat. (Amareshwari Sriramadasu via zshao) + Release 0.20.1 - 2009-09-01 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1099088&r1=1099087&r2=1099088&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-0.20-security-203/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Tue May 3 15:11:35 2011 @@ -20,12 +20,12 @@ package org.apache.hadoop.mapred.lib; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.HashMap; import java.util.Set; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -73,6 +73,9 @@ public abstract class CombineFileInputFo // across multiple pools. private ArrayList pools = new ArrayList(); + // mapping from a rack name to the set of Nodes in the rack + private static HashMap> rackToNodes = + new HashMap>(); /** * Specify the maximum size (in bytes) of each split. Each split is * approximately equal to the specified size. @@ -214,6 +217,8 @@ public abstract class CombineFileInputFo getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), maxSize, minSizeNode, minSizeRack, splits); + // free up rackToNodes map + rackToNodes.clear(); return splits.toArray(new CombineFileSplit[splits.size()]); } @@ -341,7 +346,7 @@ public abstract class CombineFileInputFo // create this split. if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array - addCreatedSplit(job, splits, racks, validBlocks); + addCreatedSplit(job, splits, getHosts(racks), validBlocks); createdSplit = true; break; } @@ -360,7 +365,7 @@ public abstract class CombineFileInputFo if (minSizeRack != 0 && curSplitSize >= minSizeRack) { // if there is a mimimum size specified, then create a single split // otherwise, store these blocks into overflow data structure - addCreatedSplit(job, splits, racks, validBlocks); + addCreatedSplit(job, splits, getHosts(racks), validBlocks); } else { // There were a few blocks in this rack that remained to be processed. // Keep them in 'overflow' block list. These will be combined later. @@ -393,7 +398,7 @@ public abstract class CombineFileInputFo // create this split. if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array - addCreatedSplit(job, splits, racks, validBlocks); + addCreatedSplit(job, splits, getHosts(racks), validBlocks); curSplitSize = 0; validBlocks.clear(); racks.clear(); @@ -402,7 +407,7 @@ public abstract class CombineFileInputFo // Process any remaining blocks, if any. if (!validBlocks.isEmpty()) { - addCreatedSplit(job, splits, racks, validBlocks); + addCreatedSplit(job, splits, getHosts(racks), validBlocks); } } @@ -412,13 +417,12 @@ public abstract class CombineFileInputFo */ private void addCreatedSplit(JobConf job, List splitList, - List racks, + List locations, ArrayList validBlocks) { // create an input split Path[] fl = new Path[validBlocks.size()]; long[] offset = new long[validBlocks.size()]; long[] length = new long[validBlocks.size()]; - String[] rackLocations = racks.toArray(new String[racks.size()]); for (int i = 0; i < validBlocks.size(); i++) { fl[i] = validBlocks.get(i).onepath; offset[i] = validBlocks.get(i).offset; @@ -427,7 +431,7 @@ public abstract class CombineFileInputFo // add this split to the list that is returned CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, - length, rackLocations); + length, locations.toArray(new String[0])); splitList.add(thissplit); } @@ -484,7 +488,9 @@ public abstract class CombineFileInputFo rackToBlocks.put(rack, blklist); } blklist.add(oneblock); - } + // Add this host to rackToNodes map + addHostToRack(oneblock.racks[j], oneblock.hosts[j]); + } // add this block to the node --> block map for (int j = 0; j < oneblock.hosts.length; j++) { @@ -547,6 +553,23 @@ public abstract class CombineFileInputFo } } + private static void addHostToRack(String rack, String host) { + Set hosts = rackToNodes.get(rack); + if (hosts == null) { + hosts = new HashSet(); + rackToNodes.put(rack, hosts); + } + hosts.add(host); + } + + private static List getHosts(List racks) { + List hosts = new ArrayList(); + for (String rack : racks) { + hosts.addAll(rackToNodes.get(rack)); + } + return hosts; + } + /** * Accept a path only if any one of filters given in the * constructor do. Modified: hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1099088&r1=1099087&r2=1099088&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-0.20-security-203/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Tue May 3 15:11:35 2011 @@ -18,11 +18,6 @@ package org.apache.hadoop.mapred.lib; import java.io.IOException; -import java.io.DataOutputStream; -import java.util.BitSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Random; import junit.framework.TestCase; @@ -30,17 +25,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -151,14 +141,14 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(1).getName(), file2.getName()); assertEquals(fileSplit.getOffset(1), BLOCKSIZE); assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r2"); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 fileSplit = (CombineFileSplit) splits[1]; assertEquals(fileSplit.getNumPaths(), 1); assertEquals(fileSplit.getLocations().length, 1); assertEquals(fileSplit.getPath(0).getName(), file1.getName()); assertEquals(fileSplit.getOffset(0), 0); assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 // create another file on 3 datanodes and 3 racks. dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null); @@ -186,7 +176,7 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(2).getName(), file3.getName()); assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r3"); + assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 fileSplit = (CombineFileSplit) splits[1]; assertEquals(fileSplit.getNumPaths(), 2); assertEquals(fileSplit.getLocations().length, 1); @@ -196,14 +186,14 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(1).getName(), file2.getName()); assertEquals(fileSplit.getOffset(1), BLOCKSIZE); assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r2"); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 fileSplit = (CombineFileSplit) splits[2]; assertEquals(fileSplit.getNumPaths(), 1); assertEquals(fileSplit.getLocations().length, 1); assertEquals(fileSplit.getPath(0).getName(), file1.getName()); assertEquals(fileSplit.getOffset(0), 0); assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 // create file4 on all three racks Path file4 = new Path(dir4 + "/file4"); @@ -229,7 +219,7 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(2).getName(), file3.getName()); assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r3"); + assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 fileSplit = (CombineFileSplit) splits[1]; assertEquals(fileSplit.getNumPaths(), 2); assertEquals(fileSplit.getLocations().length, 1); @@ -239,14 +229,14 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(1).getName(), file2.getName()); assertEquals(fileSplit.getOffset(1), BLOCKSIZE); assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r2"); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 fileSplit = (CombineFileSplit) splits[2]; assertEquals(fileSplit.getNumPaths(), 1); assertEquals(fileSplit.getLocations().length, 1); assertEquals(fileSplit.getPath(0).getName(), file1.getName()); assertEquals(fileSplit.getOffset(0), 0); assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 // maximum split size is 2 blocks inFormat = new DummyInputFormat(); @@ -385,7 +375,7 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(0).getName(), file1.getName()); assertEquals(fileSplit.getOffset(0), 0); assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 // maximum split size is 7 blocks and min is 3 blocks inFormat = new DummyInputFormat(); @@ -431,15 +421,15 @@ public class TestCombineFileInputFormat fileSplit = (CombineFileSplit) splits[0]; assertEquals(fileSplit.getNumPaths(), 2); assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], "/r2"); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 fileSplit = (CombineFileSplit) splits[1]; assertEquals(fileSplit.getNumPaths(), 1); assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 fileSplit = (CombineFileSplit) splits[2]; assertEquals(fileSplit.getNumPaths(), 6); assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], "/r3"); + assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 } finally { if (dfs != null) { dfs.shutdown();