hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject hadoop git commit: MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed by Gergo Repas
Date Wed, 03 Jan 2018 17:16:21 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.9 50f635ec7 -> a316a4cc4


MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed
by Gergo Repas

(cherry picked from commit fe35103591ece0209f8345aba5544313e45a073c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a316a4cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a316a4cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a316a4cc

Branch: refs/heads/branch-2.9
Commit: a316a4cc4765ecec99b5779ccfa6085adc63cce6
Parents: 50f635e
Author: Jason Lowe <jlowe@apache.org>
Authored: Wed Jan 3 11:01:38 2018 -0600
Committer: Jason Lowe <jlowe@apache.org>
Committed: Wed Jan 3 11:12:55 2018 -0600

----------------------------------------------------------------------
 .../hadoop/mapred/TaskAttemptListenerImpl.java  | 41 +++++++++++---------
 1 file changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a316a4cc/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 34bd15d..c54e11c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -523,33 +523,38 @@ public class TaskAttemptListenerImpl extends CompositeService
   private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID,
       TaskAttemptStatus taskAttemptStatus,
       AtomicReference<TaskAttemptStatus> lastStatusRef) {
-    boolean asyncUpdatedNeeded = false;
-    TaskAttemptStatus lastStatus = lastStatusRef.get();
-
-    if (lastStatus == null) {
-      lastStatusRef.set(taskAttemptStatus);
-      asyncUpdatedNeeded = true;
-    } else {
-      List<TaskAttemptId> oldFetchFailedMaps =
-          taskAttemptStatus.fetchFailedMaps;
-
-      // merge fetchFailedMaps from the previous update
-      if (lastStatus.fetchFailedMaps != null) {
+    List<TaskAttemptId> fetchFailedMaps = taskAttemptStatus.fetchFailedMaps;
+    TaskAttemptStatus lastStatus = null;
+    boolean done = false;
+    while (!done) {
+      lastStatus = lastStatusRef.get();
+      if (lastStatus != null && lastStatus.fetchFailedMaps != null) {
+        // merge fetchFailedMaps from the previous update
         if (taskAttemptStatus.fetchFailedMaps == null) {
           taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps;
         } else {
-          taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps);
+          taskAttemptStatus.fetchFailedMaps =
+              new ArrayList<>(lastStatus.fetchFailedMaps.size() +
+                  fetchFailedMaps.size());
+          taskAttemptStatus.fetchFailedMaps.addAll(
+              lastStatus.fetchFailedMaps);
+          taskAttemptStatus.fetchFailedMaps.addAll(
+              fetchFailedMaps);
         }
       }
 
-      if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) {
-        // update failed - async dispatcher has processed it in the meantime
-        taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps;
-        lastStatusRef.set(taskAttemptStatus);
-        asyncUpdatedNeeded = true;
+      // lastStatusRef may be changed by either the AsyncDispatcher when
+      // it processes the update, or by another IPC server handler
+      done = lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus);
+      if (!done) {
+        LOG.info("TaskAttempt " + yarnAttemptID +
+            ": lastStatusRef changed by another thread, retrying...");
+        // let's revert taskAttemptStatus.fetchFailedMaps
+        taskAttemptStatus.fetchFailedMaps = fetchFailedMaps;
       }
     }
 
+    boolean asyncUpdatedNeeded = (lastStatus == null);
     if (asyncUpdatedNeeded) {
       context.getEventHandler().handle(
           new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message