tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-3036. Tez AM can hang on startup with no indication of error (jlowe)
Date Thu, 21 Jan 2016 20:28:45 GMT
Repository: tez
Updated Branches:
  refs/heads/master ca447ba5c -> 92def52ff


TEZ-3036. Tez AM can hang on startup with no indication of error (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/92def52f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/92def52f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/92def52f

Branch: refs/heads/master
Commit: 92def52ff8b02eab7aae38170ca6c9b0caf83ef7
Parents: ca447ba
Author: Jason Lowe <jlowe@apache.org>
Authored: Thu Jan 21 20:28:17 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Thu Jan 21 20:28:17 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 48 ++++++++++++++------
 2 files changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/92def52f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bec7dd4..4ffcf13 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3036. Tez AM can hang on startup with no indication of error
   TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED
   TEZ-2594. Fix LICENSE for missing entries for full and minimal tarballs.
   TEZ-3053. Containers timeout if they do not receive a task within the container timeout
interval.
@@ -319,6 +320,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-3036. Tez AM can hang on startup with no indication of error
   TEZ-3052. Task internal error due to Invalid event: T_ATTEMPT_FAILED at FAILED
   TEZ-2937. Can Processor.close() be called after closing inputs and outputs?
   TEZ-3037. History URL should be set regardless of which history logging service is enabled.

http://git-wip-us.apache.org/repos/asf/tez/blob/92def52f/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 609a018..c16bdb9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1732,7 +1732,18 @@ public class DAGAppMaster extends AbstractService {
         LOG.debug("Service dependency: " + dependency.getName() + " notify" +
                   " for service: " + service.getName());
       }
-      if (dependency.isInState(Service.STATE.STARTED)) {
+      Throwable dependencyError = dependency.getFailureCause();
+      if (dependencyError != null) {
+        synchronized(this) {
+          dependenciesFailed = true;
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Service: " + service.getName() + " will fail to start"
+                + " as dependent service " + dependency.getName()
+                + " failed to start: " + dependencyError);
+          }
+          this.notifyAll();
+        }
+      } else if (dependency.isInState(Service.STATE.STARTED)) {
         if(dependenciesStarted.incrementAndGet() == dependencies.size()) {
           synchronized(this) {
             if(LOG.isDebugEnabled()) {
@@ -1742,17 +1753,6 @@ public class DAGAppMaster extends AbstractService {
             this.notifyAll();
           }
         }
-      } else if (!service.isInState(Service.STATE.STARTED)
-          && dependency.getFailureState() != null) {
-        synchronized(this) {
-          dependenciesFailed = true;
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Service: " + service.getName() + " will fail to start"
-                + " as dependent service " + dependency.getName()
-                + " failed to start");
-          }
-          this.notifyAll();
-        }
       }
     }
 
@@ -1786,9 +1786,12 @@ public class DAGAppMaster extends AbstractService {
 
   private static class ServiceThread extends Thread {
     final ServiceWithDependency serviceWithDependency;
-    Throwable error = null;
-    public ServiceThread(ServiceWithDependency serviceWithDependency) {
+    final Map<Service, ServiceWithDependency> services;
+    volatile Throwable error = null;
+    public ServiceThread(ServiceWithDependency serviceWithDependency,
+        Map<Service, ServiceWithDependency> services) {
       this.serviceWithDependency = serviceWithDependency;
+      this.services = services;
       this.setName("ServiceThread:" + serviceWithDependency.service.getName());
     }
 
@@ -1800,7 +1803,14 @@ public class DAGAppMaster extends AbstractService {
       try {
         serviceWithDependency.start();
       } catch (Throwable t) {
+        // AbstractService does not notify listeners if something throws, so
+        // notify dependent services explicitly to prevent hanging.
+        // AbstractService only records fault causes for exceptions, not
+        // errors, so dependent services will proceed thinking startup
+        // succeeded if an error is thrown. The error will be noted when the
+        // main thread joins the ServiceThread.
         error = t;
+        notifyDependentServices();
       } finally {
         if(LOG.isDebugEnabled()) {
           LOG.debug("Service: " + serviceWithDependency.service.getName() +
@@ -1812,6 +1822,14 @@ public class DAGAppMaster extends AbstractService {
             + serviceWithDependency.service.getName());
       }
     }
+
+    private void notifyDependentServices() {
+      for (ServiceWithDependency otherSvc : services.values()) {
+        if (otherSvc.dependencies.contains(serviceWithDependency.service)) {
+          otherSvc.stateChanged(serviceWithDependency.service);
+        }
+      }
+    }
   }
 
   void startServices(){
@@ -1824,7 +1842,7 @@ public class DAGAppMaster extends AbstractService {
       for(ServiceWithDependency sd : services.values()) {
         // start the service. If this fails that service
         // will be stopped and an exception raised
-        ServiceThread st = new ServiceThread(sd);
+        ServiceThread st = new ServiceThread(sd, services);
         threads.add(st);
       }
 


Mime
View raw message