tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-371. Time out AM after a certain period if no DAG submitted. (hitesh)
Date Tue, 01 Oct 2013 23:53:07 GMT
Updated Branches:
  refs/heads/master 072dc8467 -> 095ffeff4


TEZ-371. Time out AM after a certain period if no DAG submitted. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/095ffeff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/095ffeff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/095ffeff

Branch: refs/heads/master
Commit: 095ffeff4662dd094170b34ccb271c01d7320013
Parents: 072dc84
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Oct 1 16:52:45 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Oct 1 16:52:45 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 75 ++++++++++++++++----
 .../apache/tez/dag/app/rm/TaskScheduler.java    |  4 +-
 2 files changed, 63 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/095ffeff/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 88d1a30..85ebbfa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -30,6 +30,8 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -183,6 +185,9 @@ public class DAGAppMaster extends AbstractService {
   private UserGroupInformation currentUser; // Will be setup during init
 
   private AtomicBoolean sessionStopped = new AtomicBoolean(false);
+  private long sessionTimeoutInterval;
+  private long lastDAGCompletionTime;
+  private Timer dagSubmissionTimer;
 
   // DAG Counter
   private final AtomicInteger dagCounter = new AtomicInteger();
@@ -294,6 +299,10 @@ public class DAGAppMaster extends AbstractService {
     addIfService(historyEventHandler, true);
     dispatcher.register(HistoryEventType.class, historyEventHandler);
 
+    this.sessionTimeoutInterval = 1000 * amConf.getInt(
+            TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
+            TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
+
     initServices(conf);
     super.serviceInit(conf);
   }
@@ -334,6 +343,7 @@ public class DAGAppMaster extends AbstractService {
         LOG.info("DAG completed, dagId="
             + finishEvt.getDAGId().toString()
             + ", dagState=" + finishEvt.getDAGState());
+        lastDAGCompletionTime = clock.getTime();
         switch(finishEvt.getDAGState()) {
         case SUCCEEDED:
           successfulDAGs.incrementAndGet();
@@ -634,6 +644,23 @@ public class DAGAppMaster extends AbstractService {
         + oldState + " new state: " + state);
   }
 
+  synchronized void shutdownTezAM() {
+    sessionStopped.set(true);
+    if (currentDAG != null
+        && !currentDAG.isComplete()) {
+      //send a DAG_KILL message
+      LOG.info("Sending a kill event to the current DAG"
+          + ", dagId=" + currentDAG.getID());
+      sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
+    } else {
+      LOG.info("No current running DAG, shutting down the AM");
+      if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
+        state = DAGAppMasterState.SUCCEEDED;
+      }
+      shutdownHandler.shutdown();
+    }
+  }
+
   synchronized String submitDAGToAppMaster(DAGPlan dagPlan)
       throws TezException  {
     if(currentDAG != null
@@ -645,7 +672,7 @@ public class DAGAppMaster extends AbstractService {
       throw new TezException("AM unable to accept new DAG submissions."
           + " In the process of shutting down");
     }
-    
+
     // RPC server runs in the context of the job user as it was started in
     // the job user's UGI context
     LOG.info("Starting DAG submitted via RPC");
@@ -721,20 +748,7 @@ public class DAGAppMaster extends AbstractService {
 
     public synchronized void shutdownAM() {
       LOG.info("Received message to shutdown AM");
-      if (currentDAG != null
-          && !currentDAG.isComplete()) {
-        //send a DAG_KILL message
-        LOG.info("Sending a kill event to the current DAG"
-            + ", dagId=" + currentDAG.getID());
-        sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
-        sessionStopped.set(true);
-      } else {
-        LOG.info("No current running DAG, shutting down the AM");
-        if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
-          state = DAGAppMasterState.SUCCEEDED;
-        }
-        shutdownHandler.shutdown();
-      }
+      shutdownTezAM();
     }
   }
 
@@ -1047,15 +1061,30 @@ public class DAGAppMaster extends AbstractService {
     dispatcher.getEventHandler().handle(
         new DAGHistoryEvent(startEvent));
 
+    this.lastDAGCompletionTime = clock.getTime();
+
     if (!isSession) {
       startDAG();
     } else {
       LOG.info("In Session mode. Waiting for DAG over RPC");
+      this.dagSubmissionTimer = new Timer(true);
+      this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
+        @Override
+        public void run() {
+          checkAndHandleSessionTimeout();
+        }
+      }, sessionTimeoutInterval, sessionTimeoutInterval/10);
     }
   }
 
   @Override
   public void serviceStop() throws Exception {
+    if (isSession) {
+      sessionStopped.set(true);
+    }
+    if (this.dagSubmissionTimer != null) {
+      this.dagSubmissionTimer.cancel();
+    }
     stopServices();
     super.serviceStop();
   }
@@ -1114,6 +1143,22 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
+  private synchronized void checkAndHandleSessionTimeout() {
+    if (this.state.equals(DAGAppMasterState.RUNNING)
+        || sessionStopped.get()) {
+      // DAG running or session already completed, cannot timeout session
+      return;
+    }
+    long currentTime = clock.getTime();
+    if (currentTime < (lastDAGCompletionTime + sessionTimeoutInterval)) {
+      return;
+    }
+    LOG.info("Session timed out"
+        + ", lastDAGCompletionTime=" + lastDAGCompletionTime + " ms"
+        + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms");
+    shutdownTezAM();
+  }
+
   public static void main(String[] args) {
     try {
       Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/095ffeff/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 3e93d3e..97541b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -602,8 +602,10 @@ public class TaskScheduler extends AbstractService
   synchronized void preemptIfNeeded() {
     Resource freeResources = Resources.subtract(totalResources,
         allocatedResources);
-    LOG.info("Allocated resource memory: " + allocatedResources.getMemory() +
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
              " cpu:" + allocatedResources.getVirtualCores());
+    }
     assert freeResources.getMemory() >= 0;
 
     CookieContainerRequest highestPriRequest = null;


Mime
View raw message