tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases (jlowe) (cherry picked from commit 91a397b0baf57d4a09f64233a4dd7df7f8019c2c)
Date Tue, 06 Sep 2016 22:44:58 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 b6b04de46 -> f86ed0d49


TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases
(jlowe)
(cherry picked from commit 91a397b0baf57d4a09f64233a4dd7df7f8019c2c)

Conflicts:

	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f86ed0d4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f86ed0d4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f86ed0d4

Branch: refs/heads/branch-0.8
Commit: f86ed0d495ae8b9f1d13c153a11ed06c61d8d4b6
Parents: b6b04de
Author: Jason Lowe <jlowe@apache.org>
Authored: Tue Sep 6 22:44:36 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Tue Sep 6 22:44:36 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 37 +++++++--
 .../apache/tez/dag/app/TestDAGAppMaster.java    | 81 +++++++++++++++++++-
 3 files changed, 110 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f86ed0d4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 20b2870..fbb305d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain
cases
   TEZ-3326. Display JVM system properties in AM and task logs.
   TEZ-3009. Errors that occur during container task acquisition are not logged.
   TEZ-2852. TestVertexImpl fails due to race in AsyncDispatcher.
@@ -503,6 +504,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain
cases
   TEZ-3009. Errors that occur during container task acquisition are not logged.
   TEZ-3413. ConcurrentModificationException in HistoryEventTimelineConversion for AppLaunchedEvent.
   TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container).

http://git-wip-us.apache.org/repos/asf/tez/blob/f86ed0d4/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index cc07fb7..de19fa3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -226,6 +226,10 @@ public class DAGAppMaster extends AbstractService {
 
   private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
 
+  @VisibleForTesting
+  static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode
failed. "
+      + "Application cannot recover and continue properly as DAG recovery has been disabled";
+
   private Clock clock;
   private final boolean isSession;
   private long appsStartTime;
@@ -350,7 +354,7 @@ public class DAGAppMaster extends AbstractService {
     this.workingDirectory = workingDirectory;
     this.localDirs = localDirs;
     this.logDirs = logDirs;
-    this.shutdownHandler = new DAGAppMasterShutdownHandler();
+    this.shutdownHandler = createShutdownHandler();
     this.dagVersionInfo = new TezDagVersionInfo();
     this.clientVersion = clientVersion;
     this.maxAppAttempts = maxAppAttempts;
@@ -566,11 +570,7 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
-
-
-    this.taskSchedulerManager = new TaskSchedulerManager(context,
-        clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
-        taskSchedulerDescriptors, isLocal);
+    this.taskSchedulerManager = createTaskSchedulerManager(taskSchedulerDescriptors);
     addIfService(taskSchedulerManager, true);
 
     if (enableWebUIService()) {
@@ -644,6 +644,19 @@ public class DAGAppMaster extends AbstractService {
   }
 
   @VisibleForTesting
+  protected DAGAppMasterShutdownHandler createShutdownHandler() {
+    return new DAGAppMasterShutdownHandler();
+  }
+
+  @VisibleForTesting
+  protected TaskSchedulerManager createTaskSchedulerManager(
+      List<NamedEntityDescriptor> taskSchedulerDescriptors) {
+    return new TaskSchedulerManager(context,
+        clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
+        taskSchedulerDescriptors, isLocal);
+  }
+
+  @VisibleForTesting
   protected ContainerSignatureMatcher createContainerSignatureMatcher() {
     return new ContainerContextMatcher();
   }
@@ -1974,8 +1987,16 @@ public class DAGAppMaster extends AbstractService {
     startServices();
     super.serviceStart();
 
-    if (versionMismatch) {
-      // Short-circuit and return as no DAG should not be run
+    boolean invalidSession = false;
+    if (isSession && !recoveryEnabled && appAttemptID.getAttemptId() >
1) {
+      String err = INVALID_SESSION_ERR_MSG;
+      LOG.error(err);
+      addDiagnostic(err);
+      this.state = DAGAppMasterState.ERROR;
+      invalidSession = true;
+    }
+    if (versionMismatch || invalidSession) {
+      // Short-circuit and return as no DAG should be run
       this.taskSchedulerManager.setShouldUnregisterFlag();
       shutdownHandler.shutdown();
       return;

http://git-wip-us.apache.org/repos/asf/tez/blob/f86ed0d4/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index 3ea5ba4..56d1f96 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -18,15 +18,17 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -50,7 +52,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.TezApiVersionInfo;
 import org.apache.tez.common.TezCommonUtils;
@@ -66,9 +67,11 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezUserPayloadProto;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
 import org.apache.tez.dag.records.TezDAGID;
 import org.junit.After;
 import org.junit.Assert;
@@ -99,6 +102,32 @@ public class TestDAGAppMaster {
     FileUtil.fullyDelete(TEST_DIR);
   }
 
+  @Test(timeout = 20000)
+  public void testInvalidSession() throws Exception {
+    // AM should fail if not the first attempt and in session mode and
+    // DAG recovery is disabled, otherwise the app can succeed without
+    // finishing an in-progress DAG.
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 2);
+    DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true, 3);
+    TezConfiguration conf = new TezConfiguration(false);
+    conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false);
+    dam.init(conf);
+    dam.start();
+    verify(dam.mockScheduler).setShouldUnregisterFlag();
+    verify(dam.mockShutdown).shutdown();
+    List<String> diags = dam.getDiagnostics();
+    boolean found = false;
+    for (String diag : diags) {
+      if (diag.contains(DAGAppMaster.INVALID_SESSION_ERR_MSG)) {
+        found = true;
+        break;
+      }
+    }
+    Assert.assertTrue("Missing invalid session diagnostics", found);
+    dam.stop();
+  }
+
   @Test(timeout = 5000)
   public void testPluginParsing() throws IOException {
     BiMap<String, Integer> pluginMap = HashBiMap.create();
@@ -510,4 +539,52 @@ public class TestDAGAppMaster {
       return new TestTokenIdentifier();
     }
   }
+
+  private static class DAGAppMasterForTest extends DAGAppMaster {
+    private DAGAppMasterShutdownHandler mockShutdown;
+    private TaskSchedulerManager mockScheduler = mock(TaskSchedulerManager.class);
+
+    public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession, int maxAttempts)
{
+      super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346,
+          new SystemClock(), 0, isSession, TEST_DIR.getAbsolutePath(),
+          new String[] { TEST_DIR.getAbsolutePath() }, new String[] { TEST_DIR.getAbsolutePath()
},
+          new TezDagVersionInfo().getVersion(), maxAttempts, createCredentials(), "jobname",
null);
+    }
+
+    private static Credentials createCredentials() {
+      Credentials creds = new Credentials();
+      JobTokenSecretManager jtsm = new JobTokenSecretManager();
+      JobTokenIdentifier jtid = new JobTokenIdentifier(new Text());
+      Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(jtid, jtsm);
+      TokenCache.setSessionToken(token, creds);
+      return creds;
+    }
+
+    private static void stubSessionResources() throws IOException {
+      FileOutputStream out = new FileOutputStream(
+          new File(TEST_DIR, TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+      PlanLocalResourcesProto planProto = PlanLocalResourcesProto.getDefaultInstance();
+      planProto.writeDelimitedTo(out);
+      out.close();
+    }
+
+    @Override
+    public synchronized void serviceInit(Configuration conf) throws Exception {
+      stubSessionResources();
+      conf.setBoolean(TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE, false);
+      super.serviceInit(conf);
+    }
+
+    @Override
+    protected DAGAppMasterShutdownHandler createShutdownHandler() {
+      mockShutdown = mock(DAGAppMasterShutdownHandler.class);
+      return mockShutdown;
+    }
+
+    @Override
+    protected TaskSchedulerManager createTaskSchedulerManager(
+        List<NamedEntityDescriptor> taskSchedulerDescriptors) {
+      return mockScheduler;
+    }
+  }
 }


Mime
View raw message