tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases (jlowe)
Date Tue, 06 Sep 2016 22:46:33 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 cbd4eacb0 -> ae24f9905


TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases
(jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ae24f990
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ae24f990
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ae24f990

Branch: refs/heads/branch-0.7
Commit: ae24f9905d67539b9ce1a0b939a795a962ecb3d7
Parents: cbd4eac
Author: Jason Lowe <jlowe@apache.org>
Authored: Tue Sep 6 22:46:00 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Tue Sep 6 22:46:00 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 32 ++++++--
 .../apache/tez/dag/app/TestDAGAppMaster.java    | 82 ++++++++++++++++++++
 3 files changed, 110 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ae24f990/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 65da496..4dc1c93 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain
cases
   TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
   TEZ-3326. Display JVM system properties in AM and task logs.
   TEZ-3009. Errors that occur during container task acquisition are not logged.

http://git-wip-us.apache.org/repos/asf/tez/blob/ae24f990/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 673f2fe..98e9355 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -211,6 +211,10 @@ public class DAGAppMaster extends AbstractService {
 
   private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
 
+  @VisibleForTesting
+  static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode
failed. "
+      + "Application cannot recover and continue properly as DAG recovery has been disabled";
+
   private Clock clock;
   private final boolean isSession;
   private long appsStartTime;
@@ -328,7 +332,7 @@ public class DAGAppMaster extends AbstractService {
     this.workingDirectory = workingDirectory;
     this.localDirs = localDirs;
     this.logDirs = logDirs;
-    this.shutdownHandler = new DAGAppMasterShutdownHandler();
+    this.shutdownHandler = createShutdownHandler();
     this.dagVersionInfo = new TezDagVersionInfo();
     this.clientVersion = clientVersion;
     this.maxAppAttempts = maxAppAttempts;
@@ -510,8 +514,7 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
-    this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
-        clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService);
+    this.taskSchedulerEventHandler = createTaskSchedulerManager();
     addIfService(taskSchedulerEventHandler, true);
 
     if (enableWebUIService()) {
@@ -596,6 +599,17 @@ public class DAGAppMaster extends AbstractService {
   }
 
   @VisibleForTesting
+  protected DAGAppMasterShutdownHandler createShutdownHandler() {
+    return new DAGAppMasterShutdownHandler();
+  }
+
+  @VisibleForTesting
+  protected TaskSchedulerEventHandler createTaskSchedulerManager() {
+    return new TaskSchedulerEventHandler(context, clientRpcServer,
+        dispatcher.getEventHandler(), containerSignatureMatcher, webUIService);
+  }
+
+  @VisibleForTesting
   protected ContainerSignatureMatcher createContainerSignatureMatcher() {
     return new ContainerContextMatcher();
   }
@@ -1808,8 +1822,16 @@ public class DAGAppMaster extends AbstractService {
     startServices();
     super.serviceStart();
 
-    if (versionMismatch) {
-      // Short-circuit and return as no DAG should not be run
+    boolean invalidSession = false;
+    if (isSession && !recoveryEnabled && appAttemptID.getAttemptId() >
1) {
+      String err = INVALID_SESSION_ERR_MSG;
+      LOG.error(err);
+      addDiagnostic(err);
+      this.state = DAGAppMasterState.ERROR;
+      invalidSession = true;
+    }
+    if (versionMismatch || invalidSession) {
+      // Short-circuit and return as no DAG should be run
       this.taskSchedulerEventHandler.setShouldUnregisterFlag();
       shutdownHandler.shutdown();
       return;

http://git-wip-us.apache.org/repos/asf/tez/blob/ae24f990/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index 2390c79..8ee477e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -17,13 +17,20 @@ package org.apache.tez.dag.app;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -48,7 +55,9 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.records.TezDAGID;
 import org.junit.After;
 import org.junit.Before;
@@ -72,6 +81,32 @@ public class TestDAGAppMaster {
     FileUtil.fullyDelete(TEST_DIR);
   }
 
+  @Test(timeout = 20000)
+  public void testInvalidSession() throws Exception {
+    // AM should fail if not the first attempt and in session mode and
+    // DAG recovery is disabled, otherwise the app can succeed without
+    // finishing an in-progress DAG.
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
+    DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true, 3);
+    TezConfiguration conf = new TezConfiguration(false);
+    conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+    dam.init(conf);
+    dam.start();
+    verify(dam.mockScheduler).setShouldUnregisterFlag();
+    verify(dam.mockShutdown).shutdown();
+    List<String> diags = dam.getDiagnostics();
+    boolean found = false;
+    for (String diag : diags) {
+      if (diag.contains(DAGAppMaster.INVALID_SESSION_ERR_MSG)) {
+        found = true;
+        break;
+      }
+    }
+    assertTrue("Missing invalid session diagnostics", found);
+    dam.stop();
+  }
+
   @Test
   public void testDagCredentialsWithoutMerge() throws Exception {
     testDagCredentials(false);
@@ -233,4 +268,51 @@ public class TestDAGAppMaster {
       return new TestTokenIdentifier();
     }
   }
+
+  private static class DAGAppMasterForTest extends DAGAppMaster {
+    private DAGAppMasterShutdownHandler mockShutdown;
+    private TaskSchedulerEventHandler mockScheduler = mock(TaskSchedulerEventHandler.class);
+
+    public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession, int maxAttempts)
{
+      super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346,
+          new SystemClock(), 0, isSession, TEST_DIR.getAbsolutePath(),
+          new String[] { TEST_DIR.getAbsolutePath() }, new String[] { TEST_DIR.getAbsolutePath()
},
+          new TezDagVersionInfo().getVersion(), maxAttempts, createCredentials(), "jobname");
+    }
+
+    private static Credentials createCredentials() {
+      Credentials creds = new Credentials();
+      JobTokenSecretManager jtsm = new JobTokenSecretManager();
+      JobTokenIdentifier jtid = new JobTokenIdentifier(new Text());
+      Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(jtid, jtsm);
+      TokenCache.setSessionToken(token, creds);
+      return creds;
+    }
+
+    private static void stubSessionResources() throws IOException {
+      FileOutputStream out = new FileOutputStream(
+          new File(TEST_DIR, TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+      PlanLocalResourcesProto planProto = PlanLocalResourcesProto.getDefaultInstance();
+      planProto.writeDelimitedTo(out);
+      out.close();
+    }
+
+    @Override
+    public synchronized void serviceInit(Configuration conf) throws Exception {
+      stubSessionResources();
+      conf.setBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, false);
+      super.serviceInit(conf);
+    }
+
+    @Override
+    protected DAGAppMasterShutdownHandler createShutdownHandler() {
+      mockShutdown = mock(DAGAppMasterShutdownHandler.class);
+      return mockShutdown;
+    }
+
+    @Override
+    protected TaskSchedulerEventHandler createTaskSchedulerManager() {
+      return mockScheduler;
+    }
+  }
 }


Mime
View raw message