hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject svn commit: r1455742 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src...
Date Tue, 12 Mar 2013 22:56:57 GMT
Author: jlowe
Date: Tue Mar 12 22:56:57 2013
New Revision: 1455742

URL: http://svn.apache.org/r1455742
Log:
svn merge -c 1455740 FIXES: MAPREDUCE-5060. Fetch failures that time out only count against
the first map task. Contributed by Robert Joseph Evans

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1455742&r1=1455741&r2=1455742&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Mar 12 22:56:57
2013
@@ -633,6 +633,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5023. History Server Web Services missing Job Counters (Ravi
     Prakash via tgraves)
 
+    MAPREDUCE-5060. Fetch failures that time out only count against the first
+    map task (Robert Joseph Evans via jlowe)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1455742&r1=1455741&r2=1455742&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
Tue Mar 12 22:56:57 2013
@@ -221,7 +221,6 @@ class Fetcher<K,V> extends Thread {
     
     // Construct the url and connect
     DataInputStream input;
-    boolean connectSucceeded = false;
     
     try {
       URL url = getMapOutputURL(host, maps);
@@ -237,7 +236,6 @@ class Fetcher<K,V> extends Thread {
       // set the read timeout
       connection.setReadTimeout(readTimeout);
       connect(connection, connectionTimeout);
-      connectSucceeded = true;
       input = new DataInputStream(connection.getInputStream());
 
       // Validate response code
@@ -265,18 +263,10 @@ class Fetcher<K,V> extends Thread {
 
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
-      if (!connectSucceeded) {
-        for(TaskAttemptID left: remaining) {
-          scheduler.copyFailed(left, host, connectSucceeded, connectExcpt);
-        }
-      } else {
-        // If we got a read error at this stage, it implies there was a problem
-        // with the first map, typically lost map. So, penalize only that map
-        // and add the rest
-        TaskAttemptID firstMap = maps.get(0);
-        scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt);
+      for(TaskAttemptID left: remaining) {
+        scheduler.copyFailed(left, host, false, connectExcpt);
       }
-      
+     
       // Add back all the remaining maps, WITHOUT marking them as failed
       for(TaskAttemptID left: remaining) {
         scheduler.putBackKnownMapOutput(host, left);

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1455742&r1=1455741&r2=1455742&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
Tue Mar 12 22:56:57 2013
@@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.util.ArrayList;
 
@@ -71,6 +72,54 @@ public class TestFetcher {
   }
   
   @SuppressWarnings("unchecked")
+  @Test(timeout=30000)
+  public void testCopyFromHostConnectionTimeout() throws Exception {
+    LOG.info("testCopyFromHostConnectionTimeout");
+    JobConf job = new JobConf();
+    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
+    Reporter r = mock(Reporter.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    ExceptionReporter except = mock(ExceptionReporter.class);
+    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
+    HttpURLConnection connection = mock(HttpURLConnection.class);
+    when(connection.getInputStream()).thenThrow(
+        new SocketTimeoutException("This is a fake timeout :)"));
+    
+    Counters.Counter allErrs = mock(Counters.Counter.class);
+    when(r.getCounter(anyString(), anyString()))
+      .thenReturn(allErrs);
+    
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+        r, metrics, except, key, connection);
+
+    MapHost host = new MapHost("localhost", "http://localhost:8080/");
+    
+    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    maps.add(map1ID);
+    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+    maps.add(map2ID);
+    when(ss.getMapsForHost(host)).thenReturn(maps);
+    
+    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+    
+    underTest.copyFromHost(host);
+    
+    verify(connection)
+      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
+          encHash);
+    
+    verify(allErrs).increment(1);
+    verify(ss).copyFailed(map1ID, host, false, false);
+    verify(ss).copyFailed(map2ID, host, false, false);
+    
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+    verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+  }
+  
+  @SuppressWarnings("unchecked")
   @Test
   public void testCopyFromHostBogusHeader() throws Exception {
     LOG.info("testCopyFromHostBogusHeader");



Mime
View raw message