Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 73779 invoked from network); 7 Feb 2007 18:37:03 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 7 Feb 2007 18:37:03 -0000 Received: (qmail 89747 invoked by uid 500); 7 Feb 2007 18:37:10 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 89660 invoked by uid 500); 7 Feb 2007 18:37:09 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 89651 invoked by uid 99); 7 Feb 2007 18:37:09 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Feb 2007 10:37:09 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Feb 2007 10:37:02 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id E3F101A981A; Wed, 7 Feb 2007 10:36:41 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r504640 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskTracker.java Date: Wed, 07 Feb 2007 18:36:41 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070207183641.E3F101A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Wed Feb 7 10:36:40 2007 New Revision: 504640 URL: http://svn.apache.org/viewvc?view=rev&rev=504640 Log: HADOOP-984. Fix a bug in shuffle error handling introduced by HADOOP-331. Contributed by Arun. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=504640&r1=504639&r2=504640 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Feb 7 10:36:40 2007 @@ -8,6 +8,10 @@ 2. HADOOP-917. Fix a NullPointerException in SequenceFile's merger with large map outputs. (omalley via cutting) + 3. HADOOP-984. Fix a bug in shuffle error handling introduced by + HADOOP-331. If a map output is unavailable, the job tracker is + once more informed. (Arun C Murthy via cutting) + Release 0.11.0 - 2007-02-02 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=504640&r1=504639&r2=504640 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Feb 7 10:36:40 2007 @@ -1526,30 +1526,47 @@ JobConf conf = (JobConf) context.getAttribute("conf"); FileSystem fileSys = (FileSystem) context.getAttribute("local.file.system"); - //open index file + + // Index file Path indexFileName = conf.getLocalPath(mapId+"/file.out.index"); - FSDataInputStream in = fileSys.open(indexFileName); - //seek to the correct offset for the given reduce - in.seek(reduce * 16); - - //read the offset and length of the partition data - long startOffset = in.readLong(); - long partLength = in.readLong(); - - in.close(); + FSDataInputStream indexIn = null; + // Map-output file Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out"); - - response.setContentLength((int) partLength); - FSDataInputStream inStream = null; + FSDataInputStream mapOutputIn = null; + // true iff IOException was caused by attempt to access input boolean isInputException = true; + try { - inStream = fileSys.open(mapOutputFileName); - inStream.seek(startOffset); + /** + * Read the index file to get the information about where + * the map-output for the given reducer is available. + */ + //open index file + indexIn = fileSys.open(indexFileName); + + //seek to the correct offset for the given reduce + indexIn.seek(reduce * 16); + + //read the offset and length of the partition data + long startOffset = indexIn.readLong(); + long partLength = indexIn.readLong(); + + //set the content-length header + response.setContentLength((int) partLength); + + /** + * Read the data from the sigle map-output file and + * send it to the reducer. + */ + //open the map-output file + mapOutputIn = fileSys.open(mapOutputFileName); + //seek to the correct offset for the reduce + mapOutputIn.seek(startOffset); try { int totalRead = 0; - int len = inStream.read(buffer, 0, + int len = mapOutputIn.read(buffer, 0, partLength < MAX_BYTES_TO_READ ? (int)partLength : MAX_BYTES_TO_READ); while (len > 0) { @@ -1561,12 +1578,17 @@ } totalRead += len; if (totalRead == partLength) break; - len = inStream.read(buffer, 0, + len = mapOutputIn.read(buffer, 0, (partLength - totalRead) < MAX_BYTES_TO_READ ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ); } } finally { - inStream.close(); + if (indexIn != null) { + indexIn.close(); + } + if (mapOutputIn != null) { + mapOutputIn.close(); + } } } catch (IOException ie) { TaskTracker tracker =