hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From junping...@apache.org
Subject hadoop git commit: MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal to a reducer. Contributed by Jason Lowe.
Date Thu, 02 Apr 2015 18:57:10 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 96649c38f -> eccb7d46e


MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal to a reducer. Contributed
by Jason Lowe.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eccb7d46
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eccb7d46
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eccb7d46

Branch: refs/heads/trunk
Commit: eccb7d46efbf07abcc6a01bd5e7d682f6815b824
Parents: 96649c3
Author: Junping Du <junping_du@apache.org>
Authored: Thu Apr 2 12:13:03 2015 -0700
Committer: Junping Du <junping_du@apache.org>
Committed: Thu Apr 2 12:13:03 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../hadoop/mapreduce/task/reduce/Fetcher.java   | 73 ++++++++++----------
 .../mapreduce/task/reduce/TestFetcher.java      | 33 +++++++++
 3 files changed, 74 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/eccb7d46/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index f5d2d1a..160c49c 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -513,6 +513,9 @@ Release 2.7.0 - UNRELEASED
     MAPREDUCE-6285. ClientServiceDelegate should not retry upon
     AuthenticationException. (Jonathan Eagles via ozawa)
 
+    MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal 
+    to a reducer. (Jason Lowe via junping_du)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eccb7d46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 3f40853..d867e4b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -258,6 +258,39 @@ class Fetcher<K,V> extends Thread {
     closeConnection();
   }
 
+  private DataInputStream openShuffleUrl(MapHost host,
+      Set<TaskAttemptID> remaining, URL url) {
+    DataInputStream input = null;
+
+    try {
+      setupConnectionsWithRetry(host, remaining, url);
+      if (stopped) {
+        abortConnect(host, remaining);
+      } else {
+        input = new DataInputStream(connection.getInputStream());
+      }
+    } catch (IOException ie) {
+      boolean connectExcpt = ie instanceof ConnectException;
+      ioErrs.increment(1);
+      LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
+               " map outputs", ie);
+
+      // If connect did not succeed, just mark all the maps as failed,
+      // indirectly penalizing the host
+      scheduler.hostFailed(host.getHostName());
+      for(TaskAttemptID left: remaining) {
+        scheduler.copyFailed(left, host, false, connectExcpt);
+      }
+
+      // Add back all the remaining maps, WITHOUT marking them as failed
+      for(TaskAttemptID left: remaining) {
+        scheduler.putBackKnownMapOutput(host, left);
+      }
+    }
+
+    return input;
+  }
+
   /**
    * The crux of the matter...
    * 
@@ -286,38 +319,12 @@ class Fetcher<K,V> extends Thread {
     Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
     
     // Construct the url and connect
-    DataInputStream input = null;
     URL url = getMapOutputURL(host, maps);
-    try {
-      setupConnectionsWithRetry(host, remaining, url);
-      
-      if (stopped) {
-        abortConnect(host, remaining);
-        return;
-      }
-    } catch (IOException ie) {
-      boolean connectExcpt = ie instanceof ConnectException;
-      ioErrs.increment(1);
-      LOG.warn("Failed to connect to " + host + " with " + remaining.size() + 
-               " map outputs", ie);
-
-      // If connect did not succeed, just mark all the maps as failed,
-      // indirectly penalizing the host
-      scheduler.hostFailed(host.getHostName());
-      for(TaskAttemptID left: remaining) {
-        scheduler.copyFailed(left, host, false, connectExcpt);
-      }
-     
-      // Add back all the remaining maps, WITHOUT marking them as failed
-      for(TaskAttemptID left: remaining) {
-        scheduler.putBackKnownMapOutput(host, left);
-      }
-      
+    DataInputStream input = openShuffleUrl(host, remaining, url);
+    if (input == null) {
       return;
     }
     
-    input = new DataInputStream(connection.getInputStream());
-    
     try {
       // Loop through available map-outputs and fetch them
       // On any error, faildTasks is not null and we exit
@@ -333,14 +340,10 @@ class Fetcher<K,V> extends Thread {
           connection.disconnect();
           // Get map output from remaining tasks only.
           url = getMapOutputURL(host, remaining);
-          
-          // Connect with retry as expecting host's recovery take sometime.
-          setupConnectionsWithRetry(host, remaining, url);
-          if (stopped) {
-            abortConnect(host, remaining);
+          input = openShuffleUrl(host, remaining, url);
+          if (input == null) {
             return;
           }
-          input = new DataInputStream(connection.getInputStream());
         }
       }
       
@@ -591,7 +594,7 @@ class Fetcher<K,V> extends Thread {
     // Retry is not timeout, let's do retry with throwing an exception.
     if (currentTime - retryStartTime < this.fetchRetryTimeout) {
       LOG.warn("Shuffle output from " + host.getHostName() +
-          " failed, retry it.");
+          " failed, retry it.", ioe);
       throw ioe;
     } else {
       // timeout, prepare to be failed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/eccb7d46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 929c0ae..723df17 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -388,6 +388,39 @@ public class TestFetcher {
                                    anyBoolean(), anyBoolean());
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000)
+  public void testCopyFromHostWithRetryThenTimeout() throws Exception {
+    InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
+        id, ss, mm, r, metrics, except, key, connection);
+
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+
+    when(connection.getResponseCode()).thenReturn(200)
+      .thenThrow(new SocketTimeoutException("forced timeout"));
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+        .thenReturn(replyHash);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+    when(connection.getInputStream()).thenReturn(in);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+        .thenReturn(immo);
+    doThrow(new IOException("forced error")).when(immo).shuffle(
+        any(MapHost.class), any(InputStream.class), anyLong(),
+        anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+    underTest.copyFromHost(host);
+    verify(allErrs).increment(1);
+    verify(ss).copyFailed(map1ID, host, false, false);
+  }
+
   @Test
   public void testCopyFromHostExtraBytes() throws Exception {
     Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,


Mime
View raw message