hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oz...@apache.org
Subject hadoop git commit: MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between copySucceeded() in one thread and copyFailed() in another thread on the same host. Contributed by Junping Du.
Date Tue, 12 May 2015 15:29:57 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 c31c6fbda -> fb5b0ebb4


MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between copySucceeded() in
one thread and copyFailed() in another thread on the same host. Contributed by Junping Du.

(cherry picked from commit f4e2b3cc0b1f4e49c306bc09a9dddd0495225bb2)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fb5b0ebb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fb5b0ebb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fb5b0ebb

Branch: refs/heads/branch-2
Commit: fb5b0ebb459cc8812084090a7ce7ac29e2ad147c
Parents: c31c6fb
Author: Tsuyoshi Ozawa <ozawa@apache.org>
Authored: Wed May 13 00:28:17 2015 +0900
Committer: Tsuyoshi Ozawa <ozawa@apache.org>
Committed: Wed May 13 00:28:36 2015 +0900

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  4 ++
 .../task/reduce/ShuffleSchedulerImpl.java       | 14 +++-
 .../task/reduce/TestShuffleScheduler.java       | 70 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb5b0ebb/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 50a5e5a..df8df39 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -159,6 +159,10 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6360. TestMapreduceConfigFields is placed in wrong dir, 
     introducing compile error (Arshad Mohammad via vinayakumarb)
 
+    MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between
+    copySucceeded() in one thread and copyFailed() in another thread on the
+    same host. (Junping Du via ozawa)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb5b0ebb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
index 8317672..ff0bb4f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
@@ -239,7 +239,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V>
{
   }
   
   private void updateStatus() {
-    updateStatus(null);	
+    updateStatus(null);
   }
 
   public synchronized void hostFailed(String hostname) {
@@ -263,9 +263,17 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V>
{
       failureCounts.put(mapId, new IntWritable(1));
     }
     String hostname = host.getHostName();
+    IntWritable hostFailedNum = hostFailures.get(hostname);
+    // MAPREDUCE-6361: hostname could get cleanup from hostFailures in another
+    // thread with copySucceeded.
+    // In this case, add back hostname to hostFailures to get rid of NPE issue.
+    if (hostFailedNum == null) {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
     //report failure if already retried maxHostFailures times
-    boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true
: false;
-    
+    boolean hostFail = hostFailures.get(hostname).get() >
+        getMaxHostFailures() ? true : false;
+
     if (failures >= abortFailureLimit) {
       try {
         throw new IOException(failures + " failures downloading " + mapId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb5b0ebb/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
index 6ac2320..654b748 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
@@ -213,6 +213,76 @@ public class TestShuffleScheduler {
     Assert.assertEquals(copyMessage(10, 1, 2), progress.toString());
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
+    JobConf job = new JobConf();
+    job.setNumMapTasks(2);
+    //mock creation
+    TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+    Reporter mockReporter = mock(Reporter.class);
+    FileSystem mockFileSystem = mock(FileSystem.class);
+    Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = job.getCombinerClass();
+    @SuppressWarnings("unchecked")  // needed for mock with generic
+    CombineOutputCollector<K, V>  mockCombineOutputCollector =
+        (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+    org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+        mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+    LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+    CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+    Counter mockCounter = mock(Counter.class);
+    TaskStatus mockTaskStatus = mock(TaskStatus.class);
+    Progress mockProgress = mock(Progress.class);
+    MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+    Task mockTask = mock(Task.class);
+    @SuppressWarnings("unchecked")
+    MapOutput<K, V> output = mock(MapOutput.class);
+
+    ShuffleConsumerPlugin.Context<K, V> context =
+        new ShuffleConsumerPlugin.Context<K, V>(
+            mockTaskAttemptID, job, mockFileSystem,
+            mockUmbilical, mockLocalDirAllocator,
+            mockReporter, mockCompressionCodec,
+            combinerClass, mockCombineOutputCollector,
+            mockCounter, mockCounter, mockCounter,
+            mockCounter, mockCounter, mockCounter,
+            mockTaskStatus, mockProgress, mockProgress,
+            mockTask, mockMapOutputFile, null);
+    TaskStatus status = new TaskStatus() {
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+    };
+    Progress progress = new Progress();
+    ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
+        status, null, null, progress, context.getShuffledMapsCounter(),
+        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+
+    MapHost host1 = new MapHost("host1", null);
+    TaskAttemptID failedAttemptID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 0), 0);
+
+    TaskAttemptID succeedAttemptID = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 1), 1);
+
+    // handle output fetch failure for failedAttemptID, part I
+    scheduler.hostFailed(host1.getHostName());
+
+    // handle output fetch succeed for succeedAttemptID
+    long bytes = (long)500 * 1024 * 1024;
+    scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
+
+    // handle output fetch failure for failedAttemptID, part II
+    // for MAPREDUCE-6361: verify no NPE exception get thrown out
+    scheduler.copyFailed(failedAttemptID, host1, true, false);
+  }
+
   private static String copyMessage(int attemptNo, double rate1, double rate2) {
     int attemptZero = attemptNo - 1;
     return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"


Mime
View raw message