tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2534. Error handling summary event when shutting down AM (zjffdu)
Date Wed, 10 Jun 2015 01:06:39 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 3d900ad0d -> a542ad3e1


TEZ-2534. Error handling summary event when shutting down AM (zjffdu)

(cherry picked from commit 26c3c793456323539373e026170e9902c508a815)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a542ad3e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a542ad3e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a542ad3e

Branch: refs/heads/branch-0.7
Commit: a542ad3e16e7ddf06a12024472f8e48f4f5b7760
Parents: 3d900ad
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Wed Jun 10 09:02:06 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Wed Jun 10 09:06:26 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/history/recovery/RecoveryService.java   | 37 ++++++++++++--------
 2 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a542ad3e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ea961f..452b4a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -211,6 +211,7 @@ Release 0.6.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2534. Error handling summary event when shutting down AM.
   TEZ-2511. Add exitCode to diagnostics when container fails.
   TEZ-2489. Disable warn log for Timeline ACL error when tez.allow.disabled.timeline-domains
set to true.
   TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting
down.

http://git-wip-us.apache.org/repos/asf/tez/blob/a542ad3e/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 04a925f..8a96c76 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -208,22 +208,24 @@ public class RecoveryService extends AbstractService {
       }
     }
 
-    if (summaryStream != null) {
-      try {
-        LOG.info("Closing Summary Stream");
-        summaryStream.hflush();
-        summaryStream.close();
-      } catch (IOException ioe) {
-        LOG.warn("Error when closing summary stream", ioe);
+    synchronized (lock) {
+      if (summaryStream != null) {
+        try {
+          LOG.info("Closing Summary Stream");
+          summaryStream.hflush();
+          summaryStream.close();
+        } catch (IOException ioe) {
+          LOG.warn("Error when closing summary stream", ioe);
+        }
       }
-    }
-    for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) {
-      try {
-        LOG.info("Closing Output Stream for DAG " + entry.getKey());
-        entry.getValue().hflush();
-        entry.getValue().close();
-      } catch (IOException ioe) {
-        LOG.warn("Error when closing output stream", ioe);
+      for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet())
{
+        try {
+          LOG.info("Closing Output Stream for DAG " + entry.getKey());
+          entry.getValue().hflush();
+          entry.getValue().close();
+        } catch (IOException ioe) {
+          LOG.warn("Error when closing output stream", ioe);
+        }
       }
     }
   }
@@ -280,6 +282,11 @@ public class RecoveryService extends AbstractService {
 
     if (event.getHistoryEvent() instanceof SummaryEvent) {
       synchronized (lock) {
+        if (stopped.get()) {
+          LOG.warn("Igoring event as service stopped, eventType"
+              + event.getHistoryEvent().getEventType());
+          return;
+        }
         try {
           SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
           handleSummaryEvent(dagId, eventType, summaryEvent);


Mime
View raw message