Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 43763 invoked from network); 1 Jun 2007 21:16:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 1 Jun 2007 21:16:11 -0000 Received: (qmail 51201 invoked by uid 500); 1 Jun 2007 21:16:15 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 51132 invoked by uid 500); 1 Jun 2007 21:16:15 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 51122 invoked by uid 99); 1 Jun 2007 21:16:15 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jun 2007 14:16:15 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jun 2007 14:16:10 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 98AEC1A981C; Fri, 1 Jun 2007 14:15:49 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r543619 - in /lucene/hadoop/branches/branch-0.13: CHANGES.txt src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java Date: Fri, 01 Jun 2007 21:15:49 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070601211549.98AEC1A981C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Fri Jun 1 14:15:48 2007 New Revision: 543619 URL: http://svn.apache.org/viewvc?view=rev&rev=543619 Log: Merge -r 543606:543607 from trunk to 0.13 branch, with manual changes to resolve conflicts. Fixes: HADOOP-1431. Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/CHANGES.txt?view=diff&rev=543619&r1=543618&r2=543619 ============================================================================== --- lucene/hadoop/branches/branch-0.13/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.13/CHANGES.txt Fri Jun 1 14:15:48 2007 @@ -436,6 +436,10 @@ 130. HADOOP-1332. Fix so that TaskTracker exits reliably during unit tests on Windows. (omalley via cutting) +131. HADOOP-1431. Fix so that sort progress reporting during map runs + only while sorting, so that stuck maps are correctly terminated. + (Devaraj Das and Arun C Murthy via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=543619&r1=543618&r2=543619 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jun 1 14:15:48 2007 @@ -163,6 +163,7 @@ throws IOException { setProgress(getProgress()); + reportProgress(umbilical); long beforePos = getPos(); boolean ret = rawIn.next(key, value); if (ret) { @@ -178,22 +179,16 @@ } }; - Thread sortProgress = createProgressThread(umbilical); MapRunnable runner = (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { - sortProgress.start(); runner.run(in, collector, reporter); collector.flush(); } finally { //close in.close(); // close input collector.close(); - sortProgress.interrupt(); - try { - sortProgress.join(); - } catch (InterruptedException ie){ } } done(umbilical); } @@ -220,6 +215,7 @@ }; sortProgress.setName("Sort progress reporter for task "+getTaskId()); sortProgress.setDaemon(true); + sortProgress.start(); return sortProgress; } @@ -381,10 +377,20 @@ for (int i = 0; i < partitions; i++) totalMem += sortImpl[i].getMemoryUtilized(); if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { - sortAndSpillToDisk(); - keyValBuffer.reset(); - for (int i = 0; i < partitions; i++) - sortImpl[i].close(); + + // Start the progress thread + Thread progress = createProgressThread(umbilical); + + try { + sortAndSpillToDisk(); + keyValBuffer.reset(); + for (int i = 0; i < partitions; i++) { + sortImpl[i].close(); + } + } finally { + // Stop the progress thread + progress.interrupt(); + } } } } @@ -602,13 +608,22 @@ } public void flush() throws IOException { - //check whether the length of the key/value buffer is 0. If not, then - //we need to spill that to disk. Note that we reset the key/val buffer - //upon each spill (so a length > 0 means that we have not spilled yet) - if (keyValBuffer.getLength() > 0) { - sortAndSpillToDisk(); + + // Start the progress thread + Thread progress = createProgressThread(umbilical); + + try { + //check whether the length of the key/value buffer is 0. If not, then + //we need to spill that to disk. Note that we reset the key/val buffer + //upon each spill (so a length > 0 means that we have not spilled yet) + if (keyValBuffer.getLength() > 0) { + sortAndSpillToDisk(); + } + mergeParts(); + } finally { + // Stop the progress thread + progress.interrupt(); } - mergeParts(); } } } Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=543619&r1=543618&r2=543619 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jun 1 14:15:48 2007 @@ -37,6 +37,7 @@ import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -78,7 +79,7 @@ private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName()); private int numMaps; - private boolean sortComplete; + AtomicBoolean sortComplete = new AtomicBoolean(false); private ReduceCopier reduceCopier; { @@ -283,7 +284,7 @@ // spawn a thread to give sort progress heartbeats Thread sortProgress = new Thread() { public void run() { - while (!sortComplete) { + while (!sortComplete.get()) { try { reportProgress(umbilical); Thread.sleep(PROGRESS_INTERVAL); @@ -298,6 +299,7 @@ } } }; + sortProgress.setDaemon(true); sortProgress.setName("Sort progress reporter for task "+getTaskId()); Path tempDir = new Path(getTaskId()); @@ -317,7 +319,7 @@ !conf.getKeepFailedTaskFiles()); // sort } finally { - sortComplete = true; + sortComplete.set(true); } sortPhase.complete(); // sort is complete