Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75C7FC704 for ; Thu, 19 Jul 2012 18:23:17 +0000 (UTC) Received: (qmail 44917 invoked by uid 500); 19 Jul 2012 18:23:17 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 44852 invoked by uid 500); 19 Jul 2012 18:23:17 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 44842 invoked by uid 99); 19 Jul 2012 18:23:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jul 2012 18:23:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jul 2012 18:23:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E413623889CB; Thu, 19 Jul 2012 18:22:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1363456 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/... Date: Thu, 19 Jul 2012 18:22:53 -0000 To: mapreduce-commits@hadoop.apache.org From: tgraves@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120719182253.E413623889CB@eris.apache.org> Author: tgraves Date: Thu Jul 19 18:22:53 2012 New Revision: 1363456 URL: http://svn.apache.org/viewvc?rev=1363456&view=rev Log: merge -r 1363454:1363455 from branch-2. FIXES: MAPREDUCE-4423 Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java - copied unchanged from r1363455, hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1363456&r1=1363455&r2=1363456&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Jul 19 18:22:53 2012 @@ -343,6 +343,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4448. Fix NM crash during app cleanup if aggregation didn't init. (Jason Lowe via daryn) + MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans + via tgraves) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1363456&r1=1363455&r2=1363456&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Jul 19 18:22:53 2012 @@ -49,7 +49,8 @@ import org.apache.hadoop.mapreduce.task. import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -@SuppressWarnings({"deprecation"}) +import com.google.common.annotations.VisibleForTesting; + class Fetcher extends Thread { private static final Log LOG = LogFactory.getLog(Fetcher.class); @@ -175,13 +176,18 @@ class Fetcher extends Thread { } } + @VisibleForTesting + protected HttpURLConnection openConnection(URL url) throws IOException { + return (HttpURLConnection)url.openConnection(); + } + /** * The crux of the matter... * * @param host {@link MapHost} from which we need to * shuffle available map-outputs. */ - private void copyFromHost(MapHost host) throws IOException { + protected void copyFromHost(MapHost host) throws IOException { // Get completed maps on 'host' List maps = scheduler.getMapsForHost(host); @@ -191,9 +197,11 @@ class Fetcher extends Thread { return; } - LOG.debug("Fetcher " + id + " going to fetch from " + host); - for (TaskAttemptID tmp: maps) { - LOG.debug(tmp); + if(LOG.isDebugEnabled()) { + LOG.debug("Fetcher " + id + " going to fetch from " + host); + for (TaskAttemptID tmp: maps) { + LOG.debug(tmp); + } } // List of maps to be fetched yet @@ -205,7 +213,7 @@ class Fetcher extends Thread { try { URL url = getMapOutputURL(host, maps); - HttpURLConnection connection = (HttpURLConnection)url.openConnection(); + HttpURLConnection connection = openConnection(url); // generate hash of the url String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); @@ -266,17 +274,24 @@ class Fetcher extends Thread { try { // Loop through available map-outputs and fetch them - // On any error, good becomes false and we exit after putting back - // the remaining maps to the yet_to_be_fetched list - boolean good = true; - while (!remaining.isEmpty() && good) { - good = copyMapOutput(host, input, remaining); + // On any error, faildTasks is not null and we exit + // after putting back the remaining maps to the + // yet_to_be_fetched list and marking the failed tasks. + TaskAttemptID[] failedTasks = null; + while (!remaining.isEmpty() && failedTasks == null) { + failedTasks = copyMapOutput(host, input, remaining); + } + + if(failedTasks != null) { + for(TaskAttemptID left: failedTasks) { + scheduler.copyFailed(left, host, true); + } } IOUtils.cleanup(LOG, input); // Sanity check - if (good && !remaining.isEmpty()) { + if (failedTasks == null && !remaining.isEmpty()) { throw new IOException("server didn't return all expected map outputs: " + remaining.size() + " left."); } @@ -285,10 +300,9 @@ class Fetcher extends Thread { scheduler.putBackKnownMapOutput(host, left); } } - - } + } - private boolean copyMapOutput(MapHost host, + private TaskAttemptID[] copyMapOutput(MapHost host, DataInputStream input, Set remaining) { MapOutput mapOutput = null; @@ -310,14 +324,15 @@ class Fetcher extends Thread { } catch (IllegalArgumentException e) { badIdErrs.increment(1); LOG.warn("Invalid map id ", e); - return false; + //Don't know which one was bad, so consider all of them as bad + return remaining.toArray(new TaskAttemptID[remaining.size()]); } // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { - return false; + return new TaskAttemptID[] {mapId}; } LOG.debug("header: " + mapId + ", len: " + compressedLength + @@ -329,7 +344,7 @@ class Fetcher extends Thread { // Check if we can shuffle *now* ... if (mapOutput.getType() == Type.WAIT) { LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - return false; + return new TaskAttemptID[] {mapId}; } // Go! @@ -351,14 +366,18 @@ class Fetcher extends Thread { // Note successful shuffle remaining.remove(mapId); metrics.successFetch(); - return true; + return null; } catch (IOException ioe) { ioErrs.increment(1); if (mapId == null || mapOutput == null) { LOG.info("fetcher#" + id + " failed to read map header" + mapId + " decomp: " + decompressedLength + ", " + compressedLength, ioe); - return false; + if(mapId == null) { + return remaining.toArray(new TaskAttemptID[remaining.size()]); + } else { + return new TaskAttemptID[] {mapId}; + } } LOG.info("Failed to shuffle output of " + mapId + @@ -366,9 +385,8 @@ class Fetcher extends Thread { // Inform the shuffle-scheduler mapOutput.abort(); - scheduler.copyFailed(mapId, host, true); metrics.failedFetch(); - return false; + return new TaskAttemptID[] {mapId}; } }