tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down. (hitesh)
Date Fri, 12 Jun 2015 06:05:52 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 a82da8ad4 -> 36579fa21


TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down. (hitesh)

(cherry picked from commit bb51e4ed65f704f1a03b8420c163d2ee0dffdb81)

Conflicts:
	CHANGES.txt

(cherry picked from commit cd5101b68793e7f7feb3e6af0485d53011657b7b)

Conflicts:
	CHANGES.txt
	tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/36579fa2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/36579fa2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/36579fa2

Branch: refs/heads/branch-0.6
Commit: 36579fa218cc13e603c16faec11b0b4b39867c2e
Parents: a82da8a
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Jun 11 22:55:29 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Jun 11 23:03:09 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 155 ++++++++++---------
 2 files changed, 84 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/36579fa2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4408aeb..01aa780 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.
   TEZ-2534. Error handling summary event when shutting down AM.
   TEZ-2511. Add exitCode to diagnostics when container fails.
   TEZ-2541. DAGClientImpl enable TimelineClient check is wrong.

http://git-wip-us.apache.org/repos/asf/tez/blob/36579fa2/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index cbb875f..a221662 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -60,6 +60,7 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -1067,50 +1068,58 @@ public class DAGAppMaster extends AbstractService {
         + oldState + " new state: " + state);
   }
 
-  public synchronized void shutdownTezAM() {
+  public void shutdownTezAM() {
     sessionStopped.set(true);
-    this.taskSchedulerEventHandler.setShouldUnregisterFlag();
-    if (currentDAG != null
-        && !currentDAG.isComplete()) {
-      //send a DAG_KILL message
-      LOG.info("Sending a kill event to the current DAG"
-          + ", dagId=" + currentDAG.getID());
-      sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
-    } else {
-      LOG.info("No current running DAG, shutting down the AM");
-      if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
-        state = DAGAppMasterState.SUCCEEDED;
+    synchronized (this) {
+      this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+      if (currentDAG != null
+          && !currentDAG.isComplete()) {
+        //send a DAG_KILL message
+        LOG.info("Sending a kill event to the current DAG"
+            + ", dagId=" + currentDAG.getID());
+        sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
+      } else {
+        LOG.info("No current running DAG, shutting down the AM");
+        if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
+          state = DAGAppMasterState.SUCCEEDED;
+        }
+        shutdownHandler.shutdown();
       }
-      shutdownHandler.shutdown();
     }
   }
 
-  public synchronized String submitDAGToAppMaster(DAGPlan dagPlan,
+  public String submitDAGToAppMaster(DAGPlan dagPlan,
       Map<String, LocalResource> additionalResources) throws TezException {
-    if (this.versionMismatch) {
-      throw new TezException("Unable to accept DAG submissions as the ApplicationMaster is"
-          + " incompatible with the client. " + versionMismatchDiagnostics);
-    }
-    if(currentDAG != null
-        && !state.equals(DAGAppMasterState.IDLE)) {
-      throw new TezException("App master already running a DAG");
-    }
-    if (state.equals(DAGAppMasterState.ERROR)
-        || sessionStopped.get()) {
-      throw new TezException("AM unable to accept new DAG submissions."
+    if (sessionStopped.get()) {
+      throw new SessionNotRunning("AM unable to accept new DAG submissions."
           + " In the process of shutting down");
     }
+    synchronized (this) {
+      if (this.versionMismatch) {
+        throw new TezException("Unable to accept DAG submissions as the ApplicationMaster
is"
+            + " incompatible with the client. " + versionMismatchDiagnostics);
+      }
+      if (currentDAG != null
+          && !state.equals(DAGAppMasterState.IDLE)) {
+        throw new TezException("App master already running a DAG");
+      }
+      if (state.equals(DAGAppMasterState.ERROR)
+          || sessionStopped.get()) {
+        throw new SessionNotRunning("AM unable to accept new DAG submissions."
+            + " In the process of shutting down");
+      }
 
-    // RPC server runs in the context of the job user as it was started in
-    // the job user's UGI context
-    LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Invoked with additional local resources: " + additionalResources);
+      // RPC server runs in the context of the job user as it was started in
+      // the job user's UGI context
+      LOG.info("Starting DAG submitted via RPC: " + dagPlan.getName());
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Invoked with additional local resources: " + additionalResources);
+      }
+      submittedDAGs.incrementAndGet();
+      startDAG(dagPlan, additionalResources);
+      return currentDAG.getID().toString();
     }
-    submittedDAGs.incrementAndGet();
-    startDAG(dagPlan, additionalResources);
-    return currentDAG.getID().toString();
   }
 
   @SuppressWarnings("unchecked")
@@ -1700,52 +1709,54 @@ public class DAGAppMaster extends AbstractService {
 
 
   @Override
-  public synchronized void serviceStop() throws Exception {
+  public void serviceStop() throws Exception {
     if (isSession) {
       sessionStopped.set(true);
     }
-    if (this.dagSubmissionTimer != null) {
-      this.dagSubmissionTimer.cancel();
-    }
-    stopServices();
+    synchronized (this) {
+      if (this.dagSubmissionTimer != null) {
+        this.dagSubmissionTimer.cancel();
+      }
+      stopServices();
 
-    // Given pre-emption, we should delete tez scratch dir only if unregister is
-    // successful
-    boolean deleteTezScratchData = this.amConf.getBoolean(
-        TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE,
-        TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
-          + deleteTezScratchData);
-    }
-    if (deleteTezScratchData && this.taskSchedulerEventHandler != null
-        && this.taskSchedulerEventHandler.hasUnregistered()) {
-      // Delete tez scratch data dir
-      if (this.tezSystemStagingDir != null) {
-        try {
-          this.appMasterUgi.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-              FileSystem fs = tezSystemStagingDir.getFileSystem(amConf);
-              boolean deletedStagingDir = fs.delete(tezSystemStagingDir, true);
-              if (!deletedStagingDir) {
-                LOG.warn("Failed to delete tez scratch data dir, path="
-                + tezSystemStagingDir);
-              } else {
-                LOG.info("Completed deletion of tez scratch data dir, path="
-                  + tezSystemStagingDir);
+      // Given pre-emption, we should delete tez scratch dir only if unregister is
+      // successful
+      boolean deleteTezScratchData = this.amConf.getBoolean(
+          TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE,
+          TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE_DEFAULT);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
+            + deleteTezScratchData);
+      }
+      if (deleteTezScratchData && this.taskSchedulerEventHandler != null
+          && this.taskSchedulerEventHandler.hasUnregistered()) {
+        // Delete tez scratch data dir
+        if (this.tezSystemStagingDir != null) {
+          try {
+            this.appMasterUgi.doAs(new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                FileSystem fs = tezSystemStagingDir.getFileSystem(amConf);
+                boolean deletedStagingDir = fs.delete(tezSystemStagingDir, true);
+                if (!deletedStagingDir) {
+                  LOG.warn("Failed to delete tez scratch data dir, path="
+                      + tezSystemStagingDir);
+                } else {
+                  LOG.info("Completed deletion of tez scratch data dir, path="
+                      + tezSystemStagingDir);
+                }
+                return null;
               }
-              return null;
-            }
-          });
-        } catch (IOException e) {
-          // Best effort to delete tez scratch data dir
-          LOG.warn("Failed to delete tez scratch data dir", e);
+            });
+          } catch (IOException e) {
+            // Best effort to delete tez scratch data dir
+            LOG.warn("Failed to delete tez scratch data dir", e);
+          }
         }
       }
-    }
 
-    super.serviceStop();
+      super.serviceStop();
+    }
   }
 
   private class DagEventDispatcher implements EventHandler<DAGEvent> {


Mime
View raw message