tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [27/50] [abbrv] tez git commit: TEZ-2261. Should add diagnostics in DAGAppMaster when recovery error happens (zjffdu)
Date Fri, 24 Apr 2015 00:26:21 GMT
TEZ-2261. Should add diagnostics in DAGAppMaster when recovery error happens (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fe11c5e6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fe11c5e6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fe11c5e6

Branch: refs/heads/TEZ-2003
Commit: fe11c5e67334f1a08e2b586b68ee4840d8b62763
Parents: d91eb28
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Thu Apr 23 09:46:54 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Thu Apr 23 09:46:54 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 35 +++++++++++++++-----
 .../apache/tez/dag/app/MockDAGAppMaster.java    | 20 ++++++++++-
 .../tez/dag/app/TestMockDAGAppMaster.java       | 28 ++++++++++++++++
 4 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fe11c5e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c32830e..af39092 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2261. Should add diagnostics in DAGAppMaster when recovery error happens
   TEZ-2340. TestRecoveryParser fails
   TEZ-2345. Tez UI: Enable cell level loading in all DAGs table
   TEZ-2330. Create reconfigureVertex() API for input based initialization

http://git-wip-us.apache.org/repos/asf/tez/blob/fe11c5e6/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 3dd9e4c..8a914f6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -236,6 +236,7 @@ public class DAGAppMaster extends AbstractService {
   private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>();
   private final Map<String, LocalResource> cumulativeAdditionalResources = new HashMap<String,
LocalResource>();
   private final int maxAppAttempts;
+  private final List<String> diagnostics = new ArrayList<String>();
 
   private boolean isLocal = false; //Local mode flag
 
@@ -383,6 +384,7 @@ public class DAGAppMaster extends AbstractService {
       versionMismatchDiagnostics = "Incompatible versions found"
           + ", clientVersion=" + clientVersion
           + ", AMVersion=" + dagVersionInfo.getVersion();
+      addDiagnostic(versionMismatchDiagnostics);
       if (disableVersionCheck) {
         LOG.warn("Ignoring client-AM version mismatch as check disabled. "
             + versionMismatchDiagnostics);
@@ -491,7 +493,7 @@ public class DAGAppMaster extends AbstractService {
     addIfService(containerLauncher, true);
     dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
 
-    historyEventHandler = new HistoryEventHandler(context);
+    historyEventHandler = createHistoryEventHandler(context);
     addIfService(historyEventHandler, true);
 
     this.sessionTimeoutInterval = 1000 * amConf.getInt(
@@ -563,6 +565,11 @@ public class DAGAppMaster extends AbstractService {
     return new AsyncDispatcher("Central");
   }
 
+  @VisibleForTesting
+  protected HistoryEventHandler createHistoryEventHandler(AppContext appContext) {
+    return new HistoryEventHandler(appContext);
+  }
+
   /**
    * Exit call. Just in a function call to enable testing.
    */
@@ -624,8 +631,10 @@ public class DAGAppMaster extends AbstractService {
         lastDAGCompletionTime = clock.getTime();
         _updateLoggers(currentDAG, "_post");
         if (this.historyEventHandler.hasRecoveryFailed()) {
-          LOG.warn("Recovery had a fatal error, shutting down session after" +
-              " DAG completion");
+          String recoveryErrorMsg = "Recovery had a fatal error, shutting down session after"
+
+              " DAG completion";
+          LOG.warn(recoveryErrorMsg);
+          addDiagnostic(recoveryErrorMsg);
           sessionStopped.set(true);
         }
         switch(finishEvt.getDAGState()) {
@@ -1071,22 +1080,32 @@ public class DAGAppMaster extends AbstractService {
     return state;
   }
 
+  private void addDiagnostic(String diag) {
+    synchronized (diagnostics) {
+      diagnostics.add(diag);
+    }
+  }
+
   public List<String> getDiagnostics() {
-    if (versionMismatch) {
-      return Collections.singletonList(versionMismatchDiagnostics);
+    // always create new diagnostics to return
+    // This is to avoid the case that this method is called multiple times and diagnostics
is accumulated.
+    List<String> diagResult = new ArrayList<String>();
+    synchronized (diagnostics) {
+      diagResult.addAll(this.diagnostics);
     }
+
     if (!isSession) {
       if(currentDAG != null) {
-        return currentDAG.getDiagnostics();
+        diagResult.addAll(currentDAG.getDiagnostics());
       }
     } else {
-      return Collections.singletonList("Session stats:"
+      diagResult.add("Session stats:"
           + "submittedDAGs=" + submittedDAGs.get()
           + ", successfulDAGs=" + successfulDAGs.get()
           + ", failedDAGs=" + failedDAGs.get()
           + ", killedDAGs=" + killedDAGs.get());
     }
-    return null;
+    return diagResult;
   }
 
   public float getProgress() {

http://git-wip-us.apache.org/repos/asf/tez/blob/fe11c5e6/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 92dfdb5..18286b5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -59,6 +59,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
@@ -92,6 +93,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
   boolean initFailFlag;
   boolean startFailFlag;
   boolean sendDMEvents;
+  boolean recoveryFatalError = false;
   CountersDelegate countersDelegate;
   StatisticsDelegate statsDelegate;
   long launcherSleepTime = 1;
@@ -453,6 +455,17 @@ public class MockDAGAppMaster extends DAGAppMaster {
     }
   }
   
+  public class MockHistoryEventHandler extends HistoryEventHandler {
+
+    public MockHistoryEventHandler(AppContext context) {
+      super(context);
+    }
+
+    @Override
+    public boolean hasRecoveryFailed() {
+      return recoveryFatalError;
+    }
+  }
 
   public class MockDAGAppMasterShutdownHandler extends DAGAppMasterShutdownHandler {
     public AtomicInteger shutdownInvoked = new AtomicInteger(0);
@@ -498,7 +511,12 @@ public class MockDAGAppMaster extends DAGAppMaster {
       throws UnknownHostException {
     return containerLauncher;
   }
-  
+
+  @Override
+  protected HistoryEventHandler createHistoryEventHandler(AppContext appContext) {
+    return new MockHistoryEventHandler(appContext);
+  }
+
   public MockContainerLauncher getContainerLauncher() {
     return containerLauncher;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fe11c5e6/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index db1d632..33dd18d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -887,4 +887,32 @@ public class TestMockDAGAppMaster {
       }
     }
   }
+
+  @Test(timeout = 5000)
+  public void testDAGFinishedRecoveryError() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null,
null, null);
+    tezClient.start();
+
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    mockApp.recoveryFatalError = true;
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(true);
+
+    DAG dag = DAG.create("test");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
+    dag.addVertex(vA);
+
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    dagClient.waitForCompletion();
+    while(!mockApp.getShutdownHandler().wasShutdownInvoked()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(DAGState.SUCCEEDED, mockApp.getContext().getCurrentDAG().getState());
+    Assert.assertEquals(DAGAppMasterState.FAILED, mockApp.getState());
+    Assert.assertTrue(StringUtils.join(mockApp.getDiagnostics(),",")
+        .contains("Recovery had a fatal error, shutting down session after" +
+              " DAG completion"));
+  }
 }


Mime
View raw message