Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 87315 invoked from network); 29 Sep 2008 10:19:53 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 29 Sep 2008 10:19:53 -0000 Received: (qmail 41841 invoked by uid 500); 29 Sep 2008 10:19:52 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 41754 invoked by uid 500); 29 Sep 2008 10:19:51 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 41745 invoked by uid 99); 29 Sep 2008 10:19:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2008 03:19:51 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2008 10:18:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 74EC423889C4; Mon, 29 Sep 2008 03:18:54 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r700043 - in /hadoop/core/trunk: CHANGES.txt src/core/org/apache/hadoop/net/ConnTimeoutException.java src/core/org/apache/hadoop/net/ReadTimeoutException.java src/mapred/org/apache/hadoop/mapred/ReduceTask.java Date: Mon, 29 Sep 2008 10:18:54 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080929101854.74EC423889C4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Mon Sep 29 03:18:53 2008 New Revision: 700043 URL: http://svn.apache.org/viewvc?rev=700043&view=rev Log: HADOOP-3327. Reverted the patch since it was affecting the performance of applications on regular clusters (without bad trackers). Removed: hadoop/core/trunk/src/core/org/apache/hadoop/net/ConnTimeoutException.java hadoop/core/trunk/src/core/org/apache/hadoop/net/ReadTimeoutException.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=700043&r1=700042&r2=700043&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Mon Sep 29 03:18:53 2008 @@ -289,10 +289,6 @@ HADOOP-3756. Minor. Remove unused dfs.client.buffer.dir from hadoop-default.xml. (rangadi) - HADOOP-3327. Treats connection and read timeouts differently in the - shuffle and the backoff logic is dependent on the type of timeout. - (Jothi Padmanabhan via ddas) - HADOOP-3747. Adds counter suport for MultipleOutputs. (Alejandro Abdelnur via ddas) Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=700043&r1=700042&r2=700043&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Mon Sep 29 03:18:53 2008 @@ -74,8 +74,6 @@ import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.net.ConnTimeoutException; -import org.apache.hadoop.net.ReadTimeoutException; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; @@ -96,12 +94,6 @@ private int numMaps; private ReduceCopier reduceCopier; - private static enum CopyOutputErrorType { - NO_ERROR, - CONNECTION_ERROR, - READ_ERROR - } - private CompressionCodec codec; @@ -110,7 +102,6 @@ setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with } - private Progress copyPhase; private Progress sortPhase; private Progress reducePhase; @@ -637,15 +628,9 @@ Set fetchFailedMaps = new TreeSet(); /** - * A map of taskId -> no. of failed fetches in connect - */ - Map mapTaskToConnectFailedFetchesMap = - new HashMap(); - - /** - * A map of taskId -> no. of failed fetches in read + * A map of taskId -> no. of failed fetches */ - Map mapTaskToReadFailedFetchesMap = + Map mapTaskToFailedFetchesMap = new HashMap(); /** @@ -665,7 +650,6 @@ Collections.synchronizedList(new LinkedList()); - /** * This class contains the methods that should be used for metrics-reporting * the specific metrics for shuffle. This class actually reports the @@ -728,8 +712,7 @@ /** Represents the result of an attempt to copy a map output */ private class CopyResult { - - // the map output location against which a copy attempt was made + // the map output location against which a copy attempt was made private final MapOutputLocation loc; // the size of the file copied, -1 if the transfer failed @@ -737,14 +720,10 @@ //a flag signifying whether a copy result is obsolete private static final int OBSOLETE = -2; - - CopyOutputErrorType errorType; - - CopyResult(MapOutputLocation loc, long size, - CopyOutputErrorType errorType) { + + CopyResult(MapOutputLocation loc, long size) { this.loc = loc; this.size = size; - this.errorType = errorType; } public boolean getSuccess() { return size >= 0; } @@ -753,9 +732,6 @@ } public long getSize() { return size; } public String getHost() { return loc.getHost(); } - public CopyOutputErrorType getErrorType() { - return ((size < 0) ? errorType: CopyOutputErrorType.NO_ERROR); - } public MapOutputLocation getLocation() { return loc; } } @@ -1026,6 +1002,19 @@ } /** + * Fail the current file that we are fetching + * @return were we currently fetching? + */ + public synchronized boolean fail() { + if (currentLocation != null) { + finish(-1); + return true; + } else { + return false; + } + } + + /** * Get the current map output location. */ public synchronized MapOutputLocation getLocation() { @@ -1036,12 +1025,11 @@ currentLocation = loc; } - private synchronized void finish(long size, - CopyOutputErrorType errorType) { + private synchronized void finish(long size) { if (currentLocation != null) { LOG.debug(getName() + " finishing " + currentLocation + " =" + size); synchronized (copyResults) { - copyResults.add(new CopyResult(currentLocation, size, errorType)); + copyResults.add(new CopyResult(currentLocation, size)); copyResults.notify(); } currentLocation = null; @@ -1057,7 +1045,6 @@ try { MapOutputLocation loc = null; long size = -1; - CopyOutputErrorType errorType = CopyOutputErrorType.NO_ERROR; synchronized (scheduledCopies) { while (scheduledCopies.isEmpty()) { @@ -1079,18 +1066,9 @@ // Reset size = -1; - - // Identify the error type - if (e.getClass() == ConnTimeoutException.class) { - errorType = CopyOutputErrorType.CONNECTION_ERROR; - } - else if (e.getClass() == ReadTimeoutException.class) { - errorType = CopyOutputErrorType.READ_ERROR; - } - } finally { shuffleClientMetrics.threadFree(); - finish(size, errorType); + finish(size); } } catch (InterruptedException e) { return; // ALL DONE @@ -1276,17 +1254,26 @@ connection.setReadTimeout(readTimeout); // set the connect timeout to the unit-connect-timeout connection.setConnectTimeout(unit); + while (true) { + try { + return connection.getInputStream(); + } catch (IOException ioe) { + // update the total remaining connect-timeout + connectionTimeout -= unit; - try { - connection.connect(); - } catch (IOException ioe) { - throw new ConnTimeoutException("Connection Timed out"); - } + // throw an exception if we have waited for timeout amount of time + // note that the updated value if timeout is used here + if (connectionTimeout == 0) { + throw ioe; + } - try { - return connection.getInputStream(); - } catch (IOException ioe) { - throw new ReadTimeoutException("Read Timed out"); + // reset the connect timeout for the last try + if (connectionTimeout < unit) { + unit = connectionTimeout; + // reset the connect time out for the final connect + connection.setConnectTimeout(unit); + } + } } } @@ -1857,61 +1844,33 @@ cr.getHost()); } else { retryFetches.add(cr.getLocation()); - - CopyOutputErrorType errorType = cr.getErrorType(); // note the failed-fetch TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId(); TaskID mapId = cr.getLocation().getTaskId(); totalFailures++; - - Integer noFailedFetches = 0; - - Integer noReadFailedFetches = - mapTaskToReadFailedFetchesMap.get(mapTaskId); - - if (noReadFailedFetches == null) noReadFailedFetches = 0; - - Integer noConnectFailedFetches = - mapTaskToConnectFailedFetchesMap.get(mapTaskId); - - if (noConnectFailedFetches == null) noConnectFailedFetches = 0; - - if (errorType == CopyOutputErrorType.READ_ERROR) { - noReadFailedFetches ++; - mapTaskToReadFailedFetchesMap.put (mapTaskId, - noReadFailedFetches); + Integer noFailedFetches = + mapTaskToFailedFetchesMap.get(mapTaskId); + noFailedFetches = + (noFailedFetches == null) ? 1 : (noFailedFetches + 1); + mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches); + LOG.info("Task " + getTaskID() + ": Failed fetch #" + + noFailedFetches + " from " + mapTaskId); + + // did the fetch fail too many times? + // using a hybrid technique for notifying the jobtracker. + // a. the first notification is sent after max-retries + // b. subsequent notifications are sent after 2 retries. + if ((noFailedFetches >= maxFetchRetriesPerMap) + && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) { synchronized (ReduceTask.this) { taskStatus.addFetchFailedMap(mapTaskId); LOG.info("Failed to fetch map-output from " + mapTaskId + - " Got a Read Time out," + - " reporting to the JobTracker"); - } - } else if (errorType == CopyOutputErrorType.CONNECTION_ERROR) { - noConnectFailedFetches ++; - mapTaskToConnectFailedFetchesMap.put ( - mapTaskId, noConnectFailedFetches); - - LOG.info("Task " + getTaskID() + ": Failed fetch #" - + noConnectFailedFetches + " from " + mapTaskId); - - if ((noConnectFailedFetches >= maxFetchRetriesPerMap) && - ((noConnectFailedFetches - maxFetchRetriesPerMap) % 2) - == 0) { - synchronized (ReduceTask.this) { - taskStatus.addFetchFailedMap(mapTaskId); - LOG.info("Failed to fetch map-output from " + mapTaskId - + " even after MAX_FETCH_RETRIES_PER_MAP" - + " (connect) retries... " - + " reporting to the JobTracker"); - } + " even after MAX_FETCH_RETRIES_PER_MAP retries... " + + " reporting to the JobTracker"); } } - - noFailedFetches = noConnectFailedFetches + - noReadFailedFetches; - // note unique failed-fetch maps if (noFailedFetches == maxFetchRetriesPerMap) { fetchFailedMaps.add(mapId); @@ -1960,32 +1919,22 @@ } } - if (errorType == CopyOutputErrorType.CONNECTION_ERROR) { - // back off exponentially until num_retries <= max_retries - // back off by max_backoff/2 on subsequent failed attempts - currentTime = System.currentTimeMillis(); - int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap + // back off exponentially until num_retries <= max_retries + // back off by max_backoff/2 on subsequent failed attempts + currentTime = System.currentTimeMillis(); + int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap ? BACKOFF_INIT * (1 << (noFailedFetches - 1)) : (this.maxBackoff * 1000 / 2); - penaltyBox.put(cr.getHost(), currentTime + currentBackOff); - LOG.warn(reduceTask.getTaskID() + " adding host " + + penaltyBox.put(cr.getHost(), currentTime + currentBackOff); + LOG.warn(reduceTask.getTaskID() + " adding host " + cr.getHost() + " to penalty box, next contact in " + (currentBackOff/1000) + " seconds"); - } else if (errorType == CopyOutputErrorType.READ_ERROR) { - int backOff = Math.max(maxMapRuntime/2, - (this.maxBackoff * 1000)); - penaltyBox.put(cr.getHost(), currentTime + backOff); - LOG.warn(reduceTask.getTaskID() + " adding host " + - cr.getHost() + " to penalty box, next contact in " + - (backOff/1000) + " seconds"); - } - - } // Fetch Failure + } uniqueHosts.remove(cr.getHost()); numInFlight--; - } // while (numInFlight > 0) - } // while (copiedMaps < numMaps) + } + } // all done, inform the copiers to exit synchronized (copiers) {