Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F4031C41E for ; Fri, 26 Jul 2013 18:22:56 +0000 (UTC) Received: (qmail 69220 invoked by uid 500); 26 Jul 2013 18:22:56 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 69171 invoked by uid 500); 26 Jul 2013 18:22:56 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 69156 invoked by uid 99); 26 Jul 2013 18:22:54 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jul 2013 18:22:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jul 2013 18:22:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B2BBA2388900; Fri, 26 Jul 2013 18:22:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1507388 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/o... Date: Fri, 26 Jul 2013 18:22:26 -0000 To: mapreduce-commits@hadoop.apache.org From: jlowe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130726182226.B2BBA2388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jlowe Date: Fri Jul 26 18:22:26 2013 New Revision: 1507388 URL: http://svn.apache.org/r1507388 Log: MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus. Contributed by Hairong Kuang and Jason Lowe Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (with props) Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jul 26 18:22:26 2013 @@ -10,6 +10,9 @@ Release 0.23.10 - UNRELEASED OPTIMIZATIONS + MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus + (Hairong Kuang and Jason Lowe via jlowe) + BUG FIXES MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Fri Jul 26 18:22:26 2013 @@ -36,8 +36,10 @@ import org.apache.hadoop.classification. import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; @@ -164,13 +166,17 @@ public abstract class FileInputFormat result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException { - for(FileStatus stat: fs.listStatus(path, inputFilter)) { - if (stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator iter = fs.listLocatedStatus(path); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), inputFilter); + } else { + result.add(stat); + } } - } + } } /** List input directories. @@ -216,14 +222,19 @@ public abstract class FileInputFormat iter = + fs.listLocatedStatus(globStat.getPath()); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (recursive && stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), + inputFilter); + } else { + result.add(stat); + } } - } + } } else { result.add(globStat); } @@ -249,7 +260,6 @@ public abstract class FileInputFormat SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, - length-bytesRemaining, splitSize, clusterMap); - splits.add(makeSplit(path, length-bytesRemaining, splitSize, - splitHosts)); - bytesRemaining -= splitSize; + if (length != 0) { + FileSystem fs = path.getFileSystem(job); + BlockLocation[] blkLocations; + if (file instanceof LocatedFileStatus) { + blkLocations = ((LocatedFileStatus) file).getBlockLocations(); + } else { + blkLocations = fs.getFileBlockLocations(file, 0, length); + } + if (isSplitable(fs, path)) { + long blockSize = file.getBlockSize(); + long splitSize = computeSplitSize(goalSize, minSize, blockSize); + + long bytesRemaining = length; + while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { + String[] splitHosts = getSplitHosts(blkLocations, + length-bytesRemaining, splitSize, clusterMap); + splits.add(makeSplit(path, length-bytesRemaining, splitSize, + splitHosts)); + bytesRemaining -= splitSize; + } + + if (bytesRemaining != 0) { + String[] splitHosts = getSplitHosts(blkLocations, length + - bytesRemaining, bytesRemaining, clusterMap); + splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, + splitHosts)); + } + } else { + String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); + splits.add(makeSplit(path, 0, length, splitHosts)); } - - if (bytesRemaining != 0) { - String[] splitHosts = getSplitHosts(blkLocations, length - - bytesRemaining, bytesRemaining, clusterMap); - splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, - splitHosts)); - } - } else if (length != 0) { - String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); - splits.add(makeSplit(path, 0, length, splitHosts)); } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Fri Jul 26 18:22:26 2013 @@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.lib. import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.HashSet; import java.util.List; import java.util.HashMap; @@ -33,7 +32,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -203,47 +202,33 @@ public abstract class CombineFileInputFo } // all the files in input set - Path[] paths = FileUtil.stat2Paths( - listStatus(job).toArray(new FileStatus[0])); + List stats = listStatus(job); List splits = new ArrayList(); - if (paths.length == 0) { + if (stats.size() == 0) { return splits; } - // Convert them to Paths first. This is a costly operation and - // we should do it first, otherwise we will incur doing it multiple - // times, one time each for each pool in the next loop. - List newpaths = new LinkedList(); - for (int i = 0; i < paths.length; i++) { - FileSystem fs = paths[i].getFileSystem(conf); - Path p = fs.makeQualified(paths[i]); - newpaths.add(p); - } - paths = null; - // In one single iteration, process all the paths in a single pool. // Processing one pool at a time ensures that a split contains paths // from a single pool only. for (MultiPathFilter onepool : pools) { - ArrayList myPaths = new ArrayList(); + ArrayList myPaths = new ArrayList(); // pick one input path. If it matches all the filters in a pool, // add it to the output set - for (Iterator iter = newpaths.iterator(); iter.hasNext();) { - Path p = iter.next(); - if (onepool.accept(p)) { + for (Iterator iter = stats.iterator(); iter.hasNext();) { + FileStatus p = iter.next(); + if (onepool.accept(p.getPath())) { myPaths.add(p); // add it to my output set iter.remove(); } } // create splits for all files in this pool. - getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), - maxSize, minSizeNode, minSizeRack, splits); + getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits); } // create splits for all files that are not in any pool. - getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), - maxSize, minSizeNode, minSizeRack, splits); + getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits); // free up rackToNodes map rackToNodes.clear(); @@ -253,7 +238,7 @@ public abstract class CombineFileInputFo /** * Return all the splits in the specified set of paths */ - private void getMoreSplits(JobContext job, Path[] paths, + private void getMoreSplits(JobContext job, List stats, long maxSize, long minSizeNode, long minSizeRack, List splits) throws IOException { @@ -274,18 +259,20 @@ public abstract class CombineFileInputFo HashMap> nodeToBlocks = new HashMap>(); - files = new OneFileInfo[paths.length]; - if (paths.length == 0) { + files = new OneFileInfo[stats.size()]; + if (stats.size() == 0) { return; } // populate all the blocks for all files long totLength = 0; - for (int i = 0; i < paths.length; i++) { - files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]), + int fileIdx = 0; + for (FileStatus stat : stats) { + files[fileIdx] = new OneFileInfo(stat, conf, + isSplitable(job, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes, maxSize); - totLength += files[i].getLength(); + totLength += files[fileIdx].getLength(); } ArrayList validBlocks = new ArrayList(); @@ -479,7 +466,7 @@ public abstract class CombineFileInputFo private long fileSize; // size of the file private OneBlockInfo[] blocks; // all blocks in this file - OneFileInfo(Path path, Configuration conf, + OneFileInfo(FileStatus stat, Configuration conf, boolean isSplitable, HashMap> rackToBlocks, HashMap blockToNodes, @@ -490,10 +477,13 @@ public abstract class CombineFileInputFo this.fileSize = 0; // get block locations from file system - FileSystem fs = path.getFileSystem(conf); - FileStatus stat = fs.getFileStatus(path); - BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, - stat.getLen()); + BlockLocation[] locations; + if (stat instanceof LocatedFileStatus) { + locations = ((LocatedFileStatus) stat).getBlockLocations(); + } else { + FileSystem fs = stat.getPath().getFileSystem(conf); + locations = fs.getFileBlockLocations(stat, 0, stat.getLen()); + } // create a list of all block and their locations if (locations == null) { blocks = new OneBlockInfo[0]; @@ -508,8 +498,8 @@ public abstract class CombineFileInputFo // full file length blocks = new OneBlockInfo[1]; fileSize = stat.getLen(); - blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0] - .getHosts(), locations[0].getTopologyPaths()); + blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize, + locations[0].getHosts(), locations[0].getTopologyPaths()); } else { ArrayList blocksList = new ArrayList( locations.length); @@ -535,9 +525,9 @@ public abstract class CombineFileInputFo myLength = Math.min(maxSize, left); } } - OneBlockInfo oneblock = new OneBlockInfo(path, myOffset, - myLength, locations[i].getHosts(), locations[i] - .getTopologyPaths()); + OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(), + myOffset, myLength, locations[i].getHosts(), + locations[i].getTopologyPaths()); left -= myLength; myOffset += myLength; @@ -638,6 +628,9 @@ public abstract class CombineFileInputFo protected BlockLocation[] getFileBlockLocations( FileSystem fs, FileStatus stat) throws IOException { + if (stat instanceof LocatedFileStatus) { + return ((LocatedFileStatus) stat).getBlockLocations(); + } return fs.getFileBlockLocations(stat, 0, stat.getLen()); } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Jul 26 18:22:26 2013 @@ -29,9 +29,11 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -254,14 +256,19 @@ public abstract class FileInputFormat iter = + fs.listLocatedStatus(globStat.getPath()); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (recursive && stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), + inputFilter); + } else { + result.add(stat); + } } - } + } } else { result.add(globStat); } @@ -291,13 +298,17 @@ public abstract class FileInputFormat result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException { - for(FileStatus stat: fs.listStatus(path, inputFilter)) { - if (stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator iter = fs.listLocatedStatus(path); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), inputFilter); + } else { + result.add(stat); + } } - } + } } @@ -326,8 +337,13 @@ public abstract class FileInputFormat listLocatedStatus(Path f, + PathFilter filter) throws FileNotFoundException, IOException { + ++numListLocatedStatusCalls; + return super.listLocatedStatus(f, filter); + } + } +} Propchange: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Fri Jul 26 18:22:26 2013 @@ -24,11 +24,14 @@ import java.util.List; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.junit.Test; @@ -77,6 +80,23 @@ public class TestFileInputFormat { .toString()); } + @Test + public void testListLocatedStatus() throws Exception { + Configuration conf = getConfiguration(); + conf.setBoolean("fs.test.impl.disable.cache", false); + conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2"); + MockFileSystem mockFs = + (MockFileSystem) new Path("test:///").getFileSystem(conf); + Assert.assertEquals("listLocatedStatus already called", + 0, mockFs.numListLocatedStatusCalls); + Job job = Job.getInstance(conf); + FileInputFormat fileInputFormat = new TextInputFormat(); + List splits = fileInputFormat.getSplits(job); + Assert.assertEquals("Input splits are not correct", 2, splits.size()); + Assert.assertEquals("listLocatedStatuss calls", + 1, mockFs.numListLocatedStatusCalls); + } + private Configuration getConfiguration() { Configuration conf = new Configuration(); conf.set("fs.test.impl.disable.cache", "true"); @@ -86,13 +106,14 @@ public class TestFileInputFormat { } static class MockFileSystem extends RawLocalFileSystem { + int numListLocatedStatusCalls = 0; @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { if (f.toString().equals("test:/a1")) { return new FileStatus[] { - new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")), + new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")), new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) }; } else if (f.toString().equals("test:/a1/a2")) { return new FileStatus[] { @@ -116,5 +137,20 @@ public class TestFileInputFormat { throws FileNotFoundException, IOException { return this.listStatus(f); } + + @Override + public BlockLocation[] getFileBlockLocations(Path p, long start, long len) + throws IOException { + return new BlockLocation[] { + new BlockLocation(new String[] { "localhost:50010" }, + new String[] { "localhost" }, 0, len) }; + } + + @Override + protected RemoteIterator listLocatedStatus(Path f, + PathFilter filter) throws FileNotFoundException, IOException { + ++numListLocatedStatusCalls; + return super.listLocatedStatus(f, filter); + } } }