Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 8281 invoked from network); 26 Nov 2008 01:41:14 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 26 Nov 2008 01:41:14 -0000 Received: (qmail 97725 invoked by uid 500); 26 Nov 2008 01:41:24 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 97695 invoked by uid 500); 26 Nov 2008 01:41:24 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 97686 invoked by uid 99); 26 Nov 2008 01:41:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Nov 2008 17:41:24 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Nov 2008 01:40:06 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1031E2388879; Tue, 25 Nov 2008 17:40:23 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r720698 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java src/mapred/org/apache/hadoop/mapred/Merger.java Date: Wed, 26 Nov 2008 01:40:22 -0000 To: core-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081126014023.1031E2388879@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Tue Nov 25 17:40:22 2008 New Revision: 720698 URL: http://svn.apache.org/viewvc?rev=720698&view=rev Log: HADOOP-4614. Lazily open segments when merging map spills to avoid using too many file descriptors. Contributed by Yuri Pradkin. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=720698&r1=720697&r2=720698&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Tue Nov 25 17:40:22 2008 @@ -1250,6 +1250,9 @@ HADOOP-4659. Root cause of connection failure is being ost to code that uses it for delaying startup. (Steve Loughran and Hairong via hairong) + HADOOP-4614. Lazily open segments when merging map spills to avoid using + too many file descriptors. (Yuri Pradkin via cdouglas) + Release 0.18.2 - 2008-11-03 BUG FIXES Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=720698&r1=720697&r2=720698&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 25 17:40:22 2008 @@ -1236,27 +1236,23 @@ new ArrayList>(numSpills); TaskAttemptID mapId = getTaskID(); for(int i = 0; i < numSpills; i++) { - IndexRecord indexRecord = + final IndexRecord indexRecord = getIndexInformation(mapId, i, parts); long segmentOffset = indexRecord.startOffset; - long rawSegmentLength = indexRecord.rawLength; long segmentLength = indexRecord.partLength; - FSDataInputStream in = rfs.open(filename[i]); - in.seek(segmentOffset); - - Segment s = - new Segment(new Reader(job, in, segmentLength, - codec, null), true); + Segment s = + new Segment(job, rfs, filename[i], segmentOffset, + segmentLength, codec, true); segmentList.add(i, s); if (LOG.isDebugEnabled()) { + long rawSegmentLength = indexRecord.rawLength; LOG.debug("MapId=" + mapId + " Reducer=" + parts + "Spill =" + i + "(" + segmentOffset + ","+ rawSegmentLength + ", " + segmentLength + ")"); } - indexRecord = null; } //merge Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=720698&r1=720697&r2=720698&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Tue Nov 25 17:40:22 2008 @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.ChecksumFileSystem; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; @@ -135,17 +136,25 @@ Path file = null; boolean preserve = false; CompressionCodec codec = null; + long segmentOffset = 0; long segmentLength = -1; public Segment(Configuration conf, FileSystem fs, Path file, CompressionCodec codec, boolean preserve) throws IOException { + this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve); + } + + public Segment(Configuration conf, FileSystem fs, Path file, + long segmentOffset, long segmentLength, CompressionCodec codec, + boolean preserve) throws IOException { this.conf = conf; this.fs = fs; this.file = file; this.codec = codec; this.preserve = preserve; - - this.segmentLength = fs.getFileStatus(file).getLen(); + + this.segmentOffset = segmentOffset; + this.segmentLength = segmentLength; } public Segment(Reader reader, boolean preserve) { @@ -157,7 +166,9 @@ private void init(Counters.Counter readsCounter) throws IOException { if (reader == null) { - reader = new Reader(conf, fs, file, codec, readsCounter); + FSDataInputStream in = fs.open(file); + in.seek(segmentOffset); + reader = new Reader(conf, in, segmentLength, codec, readsCounter); } }