tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From haris...@apache.org
Subject tez git commit: TEZ-3007. Use AppFinalState.ENDED when unregistering with the RM in session mode (harishjp)
Date Wed, 24 May 2017 06:13:05 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 2e483ef2c -> 851b6541b


TEZ-3007. Use AppFinalState.ENDED when unregistering with the RM in session mode (harishjp)

(cherry picked from commit 8c4407798ead6b771df6c8eb47bc0775ceaf67b5)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/851b6541
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/851b6541
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/851b6541

Branch: refs/heads/branch-0.8
Commit: 851b6541b5c44ec3a9dc1951b1c4e8497d591a56
Parents: 2e483ef
Author: Harish JP <harishjp@gmail.com>
Authored: Wed May 24 11:39:20 2017 +0530
Committer: Harish JP <harishjp@gmail.com>
Committed: Wed May 24 11:42:25 2017 +0530

----------------------------------------------------------------------
 .../apache/tez/hadoop/shim/HadoopShim28.java    | 16 ++++++
 .../tez/hadoop/shim/TestHadoopShim28.java       | 60 ++++++++++++++++++++
 .../org/apache/tez/hadoop/shim/HadoopShim.java  |  7 ++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  2 +-
 .../tez/dag/app/rm/TaskSchedulerManager.java    | 10 +++-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  3 +-
 .../dag/app/rm/TestTaskSchedulerManager.java    | 13 +++--
 7 files changed, 101 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java
----------------------------------------------------------------------
diff --git a/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java
b/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java
index 0c599e4..5504c02 100644
--- a/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java
+++ b/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 
 public class HadoopShim28 extends HadoopShim {
@@ -49,4 +50,19 @@ public class HadoopShim28 extends HadoopShim {
     }
     return supportedTypes;
   }
+
+  @Override
+  public FinalApplicationStatus applyFinalApplicationStatusCorrection(FinalApplicationStatus
orig,
+      boolean isSessionMode, boolean isError) {
+    switch (orig) {
+      case FAILED:
+        // App is failed if dag failed in non-session mode or there was an error.
+        return (!isSessionMode || isError) ?
+            FinalApplicationStatus.FAILED : FinalApplicationStatus.ENDED;
+      case SUCCEEDED:
+        return isSessionMode ? FinalApplicationStatus.ENDED : FinalApplicationStatus.SUCCEEDED;
+      default:
+        return orig;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/hadoop-shim-impls/hadoop-shim-2.8/src/test/java/org/apache/tez/hadoop/shim/TestHadoopShim28.java
----------------------------------------------------------------------
diff --git a/hadoop-shim-impls/hadoop-shim-2.8/src/test/java/org/apache/tez/hadoop/shim/TestHadoopShim28.java
b/hadoop-shim-impls/hadoop-shim-2.8/src/test/java/org/apache/tez/hadoop/shim/TestHadoopShim28.java
new file mode 100644
index 0000000..c7d8c63
--- /dev/null
+++ b/hadoop-shim-impls/hadoop-shim-2.8/src/test/java/org/apache/tez/hadoop/shim/TestHadoopShim28.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.hadoop.shim;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHadoopShim28 {
+
+  @Test
+  public void testApplyFinalApplicationStatusCorrection() {
+    HadoopShim shim = new HadoopShim28();
+    // Session mode success/failure, change to ended
+    Assert.assertEquals(FinalApplicationStatus.ENDED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.SUCCEEDED, true,
false));
+    Assert.assertEquals(FinalApplicationStatus.ENDED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.FAILED, true, false));
+
+    // Non-session mode success/failure, retain success/failure
+    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.SUCCEEDED, false,
false));
+    Assert.assertEquals(FinalApplicationStatus.FAILED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.FAILED, false,
false));
+
+    // Session and non-session mode error, retain failed.
+    Assert.assertEquals(FinalApplicationStatus.FAILED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.FAILED, true, true));
+    Assert.assertEquals(FinalApplicationStatus.FAILED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.FAILED, false,
true));
+
+    // Session and non-session mode killed is killed.
+    Assert.assertEquals(FinalApplicationStatus.KILLED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.KILLED, true, false));
+    Assert.assertEquals(FinalApplicationStatus.KILLED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.KILLED, false,
false));
+
+    // Session and non-session mode undefined is undefined.
+    Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.UNDEFINED, true,
false));
+    Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
+        shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.UNDEFINED, false,
false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java
----------------------------------------------------------------------
diff --git a/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java b/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java
index 7314c2d..47da74a 100644
--- a/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java
+++ b/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java
@@ -23,8 +23,7 @@ import java.util.Set;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 
 @Public
 @Unstable
@@ -57,4 +56,8 @@ public abstract class HadoopShim {
     return null;
   }
 
+  public FinalApplicationStatus applyFinalApplicationStatusCorrection(FinalApplicationStatus
orig,
+      boolean isSessionMode, boolean isError) {
+    return orig;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 7ad6405..cbf7353 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -651,7 +651,7 @@ public class DAGAppMaster extends AbstractService {
       List<NamedEntityDescriptor> taskSchedulerDescriptors) {
     return new TaskSchedulerManager(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
-        taskSchedulerDescriptors, isLocal);
+        taskSchedulerDescriptors, isLocal, hadoopShim);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index e68c9b8..d0d69d0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -89,6 +89,8 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
 import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
 import org.apache.tez.dag.app.web.WebUIService;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.hadoop.shim.HadoopShim;
+import org.apache.tez.hadoop.shim.HadoopShimsLoader;
 
 import com.google.common.base.Preconditions;
 
@@ -132,6 +134,7 @@ public class TaskSchedulerManager extends AbstractService implements
   // Custom AppIds to avoid container conflicts if there's multiple sources
   private final long SCHEDULER_APP_ID_BASE = 111101111;
   private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
+  private final HadoopShim hadoopShim;
 
   BlockingQueue<AMSchedulerEvent> eventQueue
                               = new LinkedBlockingQueue<AMSchedulerEvent>();
@@ -160,6 +163,7 @@ public class TaskSchedulerManager extends AbstractService implements
     this.webUI = null;
     this.historyUrl = null;
     this.isLocalMode = false;
+    this.hadoopShim = new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim();
   }
 
   /**
@@ -179,7 +183,8 @@ public class TaskSchedulerManager extends AbstractService implements
                               ContainerSignatureMatcher containerSignatureMatcher,
                               WebUIService webUI,
                               List<NamedEntityDescriptor> schedulerDescriptors,
-                              boolean isLocalMode) {
+                              boolean isLocalMode,
+                              HadoopShim hadoopShim) {
     super(TaskSchedulerManager.class.getName());
     Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(),
         "TaskSchedulerDescriptors must be specified");
@@ -190,6 +195,7 @@ public class TaskSchedulerManager extends AbstractService implements
     this.webUI = webUI;
     this.historyUrl = getHistoryUrl();
     this.isLocalMode = isLocalMode;
+    this.hadoopShim = hadoopShim;
     this.appCallbackExecutor = createAppCallbackExecutorService();
     if (this.webUI != null) {
       this.webUI.setHistoryUrl(this.historyUrl);
@@ -798,6 +804,8 @@ public class TaskSchedulerManager extends AbstractService implements
       } else {
         finishState = FinalApplicationStatus.UNDEFINED;
       }
+      finishState = hadoopShim.applyFinalApplicationStatusCorrection(finishState,
+          dagAppMaster.isSession(), appMasterState == DAGAppMasterState.ERROR);
       List<String> diagnostics = dagAppMaster.getDiagnostics();
       if(diagnostics != null) {
         for (String s : diagnostics) {

http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index d8170e3..2cabb27 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -70,6 +70,7 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
+import org.apache.tez.hadoop.shim.HadoopShimsLoader;
 import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
@@ -144,7 +145,7 @@ class TestTaskSchedulerHelpers {
                                        UserPayload defaultPayload) {
       super(appContext, null, eventHandler, containerSignatureMatcher, null,
           Lists.newArrayList(new NamedEntityDescriptor("FakeScheduler", null)),
-          false);
+          false, new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim());
       this.amrmClientAsync = amrmClientAsync;
       this.containerSignatureMatcher = containerSignatureMatcher;
       this.defaultPayload = defaultPayload;

http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 791bb7f..8539d48 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -94,6 +94,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.hadoop.shim.HadoopShimsLoader;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.ServicePluginException;
@@ -130,7 +131,8 @@ public class TestTaskSchedulerManager {
                                     ContainerSignatureMatcher containerSignatureMatcher,
                                     WebUIService webUI) {
       super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI,
-          Lists.newArrayList(new NamedEntityDescriptor("FakeDescriptor", null)), false);
+          Lists.newArrayList(new NamedEntityDescriptor("FakeDescriptor", null)), false,
+          new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim());
     }
 
     @Override
@@ -574,7 +576,8 @@ public class TestTaskSchedulerManager {
 
     TaskSchedulerManager taskSchedulerManager =
         new TaskSchedulerManager(appContext, dagClientServer, eventHandler,
-            mock(ContainerSignatureMatcher.class), mock(WebUIService.class), list, false)
{
+            mock(ContainerSignatureMatcher.class), mock(WebUIService.class), list, false,
+            new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim()) {
           @Override
           TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext)
{
             // Avoid wrapping in threads
@@ -638,8 +641,7 @@ public class TestTaskSchedulerManager {
     doReturn(address).when(mockClientService).getBindAddress();
     TaskSchedulerManager taskSchedulerManager =
         new TaskSchedulerManager(taskScheduler, appContext, mock(ContainerSignatureMatcher.class),
-            mockClientService,
-            Executors.newFixedThreadPool(1)) {
+            mockClientService, Executors.newFixedThreadPool(1)) {
           @Override
           protected void instantiateSchedulers(String host, int port, String trackingUrl,
                                                AppContext appContext) throws TezException
{
@@ -726,7 +728,8 @@ public class TestTaskSchedulerManager {
                                          List<NamedEntityDescriptor> schedulerDescriptors,
                                          boolean isPureLocalMode) {
       super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI,
-          schedulerDescriptors, isPureLocalMode);
+          schedulerDescriptors, isPureLocalMode,
+          new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim());
       yarnTaskScheduler = mock(TaskScheduler.class);
       uberTaskScheduler = mock(TaskScheduler.class);
     }


Mime
View raw message