tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kshu...@apache.org
Subject tez git commit: TEZ-3817. DAGs can hang after more than one uncaught Exception during doTransition. (kshukla)
Date Mon, 23 Apr 2018 22:06:01 GMT
Repository: tez
Updated Branches:
  refs/heads/master 24b872a7f -> 2e66f3cb2


TEZ-3817. DAGs can hang after more than one uncaught Exception during doTransition. (kshukla)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2e66f3cb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2e66f3cb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2e66f3cb

Branch: refs/heads/master
Commit: 2e66f3cb2ef082889551f6a0830c7014317d9680
Parents: 24b872a
Author: Kuhu Shukla <kshukla@yahoo-inc.com>
Authored: Mon Apr 23 16:52:55 2018 -0500
Committer: Kuhu Shukla <kshukla@yahoo-inc.com>
Committed: Mon Apr 23 16:52:55 2018 -0500

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 63 ++++++++++----------
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 48 +++++++++++++++
 2 files changed, 81 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2e66f3cb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0a775a6..ecd8d17 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -1388,41 +1388,44 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
   
   private DAGState finished(DAGState finalState) {
-    if (finishTime == 0) {
-      setFinishTime();
-    }
-    entityUpdateTracker.stop();
-
-    boolean recoveryError = false;
-
-    // update cpu time counters before finishing the dag
-    updateCpuCounters();
-    TezCounters counters = null;
+    boolean dagError = false;
     try {
-      counters = getAllCounters();
-    } catch (LimitExceededException e) {
-      addDiagnostic("Counters limit exceeded: " + e.getMessage());
-      finalState = DAGState.FAILED;
-    }
+      if (finishTime == 0) {
+        setFinishTime();
+      }
+      entityUpdateTracker.stop();
 
-    try {
-      if (finalState == DAGState.SUCCEEDED) {
-        logJobHistoryFinishedEvent(counters);
-      } else {
-        logJobHistoryUnsuccesfulEvent(finalState, counters);
+      // update cpu time counters before finishing the dag
+      updateCpuCounters();
+      TezCounters counters = null;
+      try {
+        counters = getAllCounters();
+      } catch (LimitExceededException e) {
+        addDiagnostic("Counters limit exceeded: " + e.getMessage());
+        finalState = DAGState.FAILED;
       }
-    } catch (IOException e) {
-      LOG.warn("Failed to persist recovery event for DAG completion"
-          + ", dagId=" + dagId
-          + ", finalState=" + finalState);
-      recoveryError = true;
-    }
 
-    if (finalState != DAGState.SUCCEEDED) {
-      abortOutputs();
-    }
+      try {
+        if (finalState == DAGState.SUCCEEDED) {
+          logJobHistoryFinishedEvent(counters);
+        } else {
+          logJobHistoryUnsuccesfulEvent(finalState, counters);
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to persist recovery event for DAG completion"
+            + ", dagId=" + dagId
+            + ", finalState=" + finalState, e);
+        dagError = true;
+      }
 
-    if (recoveryError) {
+      if (finalState != DAGState.SUCCEEDED) {
+        abortOutputs();
+      }
+    } catch (Exception e) {
+      dagError = true;
+      LOG.warn("Encountered exception while DAG finish", e);
+    }
+    if (dagError) {
       eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), DAGState.ERROR));
     } else {
       eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));

http://git-wip-us.apache.org/repos/asf/tez/blob/2e66f3cb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 966b464..c0506de 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -111,6 +112,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
+import org.apache.tez.dag.app.dag.event.DAGEventCommitCompleted;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
@@ -140,6 +142,7 @@ import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.state.StateMachineTez;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -1929,6 +1932,51 @@ public class TestDAGImpl {
     Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
   }
 
+  @Test (timeout = 5000L)
+  @SuppressWarnings("unchecked")
+  public void testDAGHang() throws Exception {
+    conf.setBoolean(
+        TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    dag = Mockito.spy(new DAGImpl(dagId, conf, dagPlan,
+        dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
+        fsTokens, clock, "user", thh, appContext));
+    StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl> spyStateMachine =
+        Mockito.spy(new StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl>(
+            dag.stateMachineFactory.make(dag), dag));
+    when(dag.getStateMachine()).thenReturn(spyStateMachine);
+    dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
+    doReturn(dag).when(appContext).getCurrentDAG();
+    DAGImpl.OutputKey outputKey = Mockito.mock(DAGImpl.OutputKey.class);
+    ListenableFuture future = Mockito.mock(ListenableFuture.class);
+    dag.commitFutures.put(outputKey, future);
+    initDAG(dag);
+    startDAG(dag);
+    dispatcher.await();
+
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+        TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+        TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+        TezVertexID.getInstance(dagId, 2), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+        TezVertexID.getInstance(dagId, 3), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+        TezVertexID.getInstance(dagId, 4), VertexState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+        TezVertexID.getInstance(dagId, 5), VertexState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(DAGState.COMMITTING, dag.getState());
+    DAGEventCommitCompleted dagEvent = new DAGEventCommitCompleted(
+        dagId, outputKey, false , new RuntimeException("test"));
+    doThrow(new RuntimeException("test")).when(
+        dag).logJobHistoryUnsuccesfulEvent(any(DAGState.class), any(TezCounters.class));
+    dag.handle(dagEvent);
+    dispatcher.await();
+    Assert.assertTrue("DAG did not terminate!", dag.getInternalState() == DAGState.FAILED);
+  }
+
   @Test(timeout = 5000)
   public void testDAGKillVertexSuccessAfterTerminated() {
     _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.DAG_KILL);


Mime
View raw message