tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject svn commit: r1475938 - /incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
Date Thu, 25 Apr 2013 20:03:03 GMT
Author: bikas
Date: Thu Apr 25 20:03:03 2013
New Revision: 1475938

URL: http://svn.apache.org/r1475938
Log:
TEZ-78. DAG App Master does not exit on completion (bikas)

Modified:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java?rev=1475938&r1=1475937&r2=1475938&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
Thu Apr 25 20:03:03 2013
@@ -112,6 +112,8 @@ public class TaskScheduler extends Abstr
   final int appHostPort;
   final String appTrackingUrl;
   
+  boolean isStopped = false; 
+  
   class CRCookie {
     Object task;
     Object appCookie;
@@ -190,12 +192,17 @@ public class TaskScheduler extends Abstr
     try {
       // TODO TEZ-36 dont unregister automatically after reboot sent by RM
       synchronized (this) {
+        isStopped = true;
         amRmClient.unregisterApplicationMaster(status.exitStatus, 
                                                status.exitMessage,
                                                status.postCompletionTrackingUrl);
-        amRmClient.stop();
-        super.stop();
       }
+      
+      // call client.stop() without lock client will attempt to stop the callback
+      // operation and at the same time the callback operation might be trying 
+      // to get our lock.
+      amRmClient.stop();
+      super.stop();
     } catch (YarnRemoteException e) {
       LOG.error("Exception while unregistering ", e);
       throw new YarnException(e);
@@ -205,6 +212,9 @@ public class TaskScheduler extends Abstr
   // AMRMClientAsync interface methods
   @Override
   public void onContainersCompleted(List<ContainerStatus> statuses) {
+    if(isStopped) {
+      return;
+    }
     Map<Object, ContainerStatus> appContainerStatus = 
                         new HashMap<Object, ContainerStatus>(statuses.size());
     synchronized (this) {
@@ -248,6 +258,9 @@ public class TaskScheduler extends Abstr
 
   @Override
   public void onContainersAllocated(List<Container> containers) {
+    if(isStopped) {
+      return;
+    }
     Map<ContainerRequest<CRCookie>, Container> appContainers = 
                   new HashMap<ContainerRequest<CRCookie>, Container>(containers.size());
     synchronized (this) {
@@ -292,12 +305,18 @@ public class TaskScheduler extends Abstr
 
   @Override
   public void onRebootRequest() {
+    if(isStopped) {
+      return;
+    }
     // upcall to app must be outside locks
     appClient.appRebootRequested();
   }
 
   @Override
   public void onNodesUpdated(List<NodeReport> updatedNodes) {
+    if(isStopped) {
+      return;
+    }
     // ignore bad nodes for now
     // upcall to app must be outside locks
     appClient.nodesUpdated(updatedNodes);
@@ -305,11 +324,17 @@ public class TaskScheduler extends Abstr
 
   @Override
   public float getProgress() {
+    if(isStopped) {
+      return 1;
+    }
     return appClient.getProgress();
   }
 
   @Override
   public void onError(Exception e) {
+    if(isStopped) {
+      return;
+    }
     appClient.onError(e);
   }
   



Mime
View raw message