tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [12/50] [abbrv] tez git commit: TEZ-2669. Propagation of errors from plugins to the AM for error reporting. Contributed by Hitesh Shah and Siddharth Seth.
Date Wed, 20 Jan 2016 17:05:38 GMT
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/tests/ExternalTezServiceTestHelper.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/ExternalTezServiceTestHelper.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/ExternalTezServiceTestHelper.java
new file mode 100644
index 0000000..14c19b5
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/ExternalTezServiceTestHelper.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.tests;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.examples.HashJoinExample;
+import org.apache.tez.examples.JoinDataGen;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.service.MiniTezTestServiceCluster;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.test.MiniTezCluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExternalTezServiceTestHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExternalTezServiceTestHelper.class);
+
+  private volatile MiniTezCluster tezCluster;
+  private volatile MiniDFSCluster dfsCluster;
+  private volatile MiniTezTestServiceCluster tezTestServiceCluster;
+
+  private volatile Configuration clusterConf = new Configuration();
+  private volatile Configuration confForJobs;
+
+  private volatile FileSystem remoteFs;
+
+  private volatile TezClient sharedTezClient;
+
+  /**
+   * Current usage: Create. setupSharedTezClient - during setup (beforeClass). Invoke tearDownAll
when done (afterClass)
+   * Alternately tearDown the sharedTezClient independently
+   */
+  public ExternalTezServiceTestHelper(String testRootDir) throws
+      IOException {
+    try {
+      clusterConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testRootDir);
+      dfsCluster =
+          new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+      LOG.info("MiniDFSCluster started");
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    tezCluster = new MiniTezCluster(TestExternalTezServices.class.getName(), 1, 1, 1);
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+    tezCluster.init(conf);
+    tezCluster.start();
+    LOG.info("MiniTezCluster started");
+
+    clusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+    for (Map.Entry<String, String> entry : tezCluster.getConfig()) {
+      clusterConf.set(entry.getKey(), entry.getValue());
+    }
+    long jvmMax = Runtime.getRuntime().maxMemory();
+
+    tezTestServiceCluster = MiniTezTestServiceCluster
+        .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)),
1);
+    tezTestServiceCluster.init(clusterConf);
+    tezTestServiceCluster.start();
+    LOG.info("MiniTezTestServer started");
+
+    confForJobs = new Configuration(clusterConf);
+    for (Map.Entry<String, String> entry : tezTestServiceCluster
+        .getClusterSpecificConfiguration()) {
+      confForJobs.set(entry.getKey(), entry.getValue());
+    }
+
+    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+    remoteFs.mkdirs(stagingDirPath);
+    // This is currently configured to push tasks into the Service, and then use the standard
RPC
+    confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
+  }
+
+  public void setupSharedTezClient(ServicePluginsDescriptor servicePluginsDescriptor) throws
+      IOException, TezException, InterruptedException {
+    // Create a session to use for all tests.
+    TezConfiguration tezClientConf = new TezConfiguration(confForJobs);
+
+    sharedTezClient = TezClient
+        .newBuilder(TestExternalTezServices.class.getSimpleName() + "_session", tezClientConf)
+        .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
+
+    sharedTezClient.start();
+    LOG.info("Shared TezSession started");
+    sharedTezClient.waitTillReady();
+    LOG.info("Shared TezSession ready for submission");
+  }
+
+  public void tearDownAll() throws IOException, TezException {
+    if (sharedTezClient != null) {
+      sharedTezClient.stop();
+      sharedTezClient = null;
+    }
+
+    if (tezTestServiceCluster != null) {
+      tezTestServiceCluster.stop();
+      tezTestServiceCluster = null;
+    }
+
+    if (tezCluster != null) {
+      tezCluster.stop();
+      tezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+
+  public void shutdownSharedTezClient() throws IOException, TezException {
+    if (sharedTezClient != null) {
+      sharedTezClient.stop();
+      sharedTezClient = null;
+    }
+  }
+
+
+  public void setupHashJoinData(Path srcDataDir, Path dataPath1, Path dataPath2,
+                                Path expectedResultPath, Path outputPath) throws
+      Exception {
+    remoteFs.mkdirs(srcDataDir);
+    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+    //   Generate join data - with 2 tasks.
+    JoinDataGen dataGen = new JoinDataGen();
+    String[] dataGenArgs = new String[]{
+        dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+        expectedResultPath.toString(), "2"};
+    assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
+    //    Run the actual join - with 2 reducers
+    HashJoinExample joinExample = new HashJoinExample();
+    String[] args = new String[]{
+        dataPath1.toString(), dataPath2.toString(), "2", outputPath.toString()};
+    assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
+    LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result");
+  }
+
+
+  public MiniTezCluster getTezCluster() {
+    return tezCluster;
+  }
+
+  public MiniDFSCluster getDfsCluster() {
+    return dfsCluster;
+  }
+
+  public MiniTezTestServiceCluster getTezTestServiceCluster() {
+    return tezTestServiceCluster;
+  }
+
+  public Configuration getClusterConf() {
+    return clusterConf;
+  }
+
+  public Configuration getConfForJobs() {
+    return confForJobs;
+  }
+
+  public FileSystem getRemoteFs() {
+    return remoteFs;
+  }
+
+  public TezClient getSharedTezClient() {
+    Preconditions.checkNotNull(sharedTezClient);
+    return sharedTezClient;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 3701455..920534a 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -17,13 +17,8 @@ package org.apache.tez.tests;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
-import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.tez.client.TezClient;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -37,18 +32,13 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
 import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
 import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
-import org.apache.tez.examples.HashJoinExample;
-import org.apache.tez.examples.JoinDataGen;
 import org.apache.tez.examples.JoinValidateConfigured;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
-import org.apache.tez.service.MiniTezTestServiceCluster;
 import org.apache.tez.service.impl.ContainerRunnerImpl;
 import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
 import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
 import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
-import org.apache.tez.test.MiniTezCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -61,17 +51,7 @@ public class TestExternalTezServices {
 
   private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
 
-  private static volatile MiniTezCluster tezCluster;
-  private static volatile MiniDFSCluster dfsCluster;
-  private static volatile MiniTezTestServiceCluster tezTestServiceCluster;
-
-  private static volatile Configuration clusterConf = new Configuration();
-  private static volatile Configuration confForJobs;
-
-  private static volatile FileSystem remoteFs;
-  private static volatile FileSystem localFs;
-
-  private static volatile TezClient sharedTezClient;
+  private static ExternalTezServiceTestHelper extServiceTestHelper;
 
   private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServices.class.getSimpleName());
   private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
@@ -93,50 +73,8 @@ public class TestExternalTezServices {
   @BeforeClass
   public static void setup() throws Exception {
 
-    localFs = FileSystem.getLocal(clusterConf);
-
-    try {
-      clusterConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
-      dfsCluster =
-          new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).format(true).racks(null).build();
-      remoteFs = dfsCluster.getFileSystem();
-      LOG.info("MiniDFSCluster started");
-    } catch (IOException io) {
-      throw new RuntimeException("problem starting mini dfs cluster", io);
-    }
-
-    tezCluster = new MiniTezCluster(TestExternalTezServices.class.getName(), 1, 1, 1);
-    Configuration conf = new Configuration();
-    conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
-    tezCluster.init(conf);
-    tezCluster.start();
-    LOG.info("MiniTezCluster started");
-
-    clusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
-    for (Map.Entry<String, String> entry : tezCluster.getConfig()) {
-      clusterConf.set(entry.getKey(), entry.getValue());
-    }
-    long jvmMax = Runtime.getRuntime().maxMemory();
-
-    tezTestServiceCluster = MiniTezTestServiceCluster
-        .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)),
1);
-    tezTestServiceCluster.init(clusterConf);
-    tezTestServiceCluster.start();
-    LOG.info("MiniTezTestServer started");
-
-    confForJobs = new Configuration(clusterConf);
-    for (Map.Entry<String, String> entry : tezTestServiceCluster
-        .getClusterSpecificConfiguration()) {
-      confForJobs.set(entry.getKey(), entry.getValue());
-    }
-
-    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
-    remoteFs.mkdirs(stagingDirPath);
-    // This is currently configured to push tasks into the Service, and then use the standard
RPC
-    confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
-    confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
-
-    UserPayload userPayload = TezUtils.createUserPayloadFromConf(confForJobs);
+    extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR);
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
 
     TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
         TaskSchedulerDescriptor
@@ -156,60 +94,21 @@ public class TestExternalTezServices {
     ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
true,
         taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);
 
-    // Create a session to use for all tests.
-    TezConfiguration tezClientConf = new TezConfiguration(confForJobs);
 
-    sharedTezClient = TezClient
-        .newBuilder(TestExternalTezServices.class.getSimpleName() + "_session", tezClientConf)
-        .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
+    extServiceTestHelper.setupSharedTezClient(servicePluginsDescriptor);
 
-    sharedTezClient.start();
-    LOG.info("Shared TezSession started");
-    sharedTezClient.waitTillReady();
-    LOG.info("Shared TezSession ready for submission");
 
     // Generate the join data set used for each run.
     // Can a timeout be enforced here ?
-    remoteFs.mkdirs(SRC_DATA_DIR);
     Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1");
     Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2");
-    TezConfiguration tezConf = new TezConfiguration(confForJobs);
-    //   Generate join data - with 2 tasks.
-    JoinDataGen dataGen = new JoinDataGen();
-    String[] dataGenArgs = new String[]{
-        dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
-        HASH_JOIN_EXPECTED_RESULT_PATH.toString(), "2"};
-    assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
-    //    Run the actual join - with 2 reducers
-    HashJoinExample joinExample = new HashJoinExample();
-    String[] args = new String[]{
-        dataPath1.toString(), dataPath2.toString(), "2", HASH_JOIN_OUTPUT_PATH.toString()};
-    assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
-
-    LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result");
+    extServiceTestHelper
+        .setupHashJoinData(SRC_DATA_DIR, dataPath1, dataPath2, HASH_JOIN_EXPECTED_RESULT_PATH,
HASH_JOIN_OUTPUT_PATH);
   }
 
   @AfterClass
   public static void tearDown() throws IOException, TezException {
-    if (sharedTezClient != null) {
-      sharedTezClient.stop();
-      sharedTezClient = null;
-    }
-
-    if (tezTestServiceCluster != null) {
-      tezTestServiceCluster.stop();
-      tezTestServiceCluster = null;
-    }
-
-    if (tezCluster != null) {
-      tezCluster.stop();
-      tezCluster = null;
-    }
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
-    // TODO Add cleanup code.
+    extServiceTestHelper.tearDownAll();
   }
 
 
@@ -297,7 +196,7 @@ public class TestExternalTezServices {
     v.setExecutionContext(EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
     dag.addVertex(v);
 
-    DAGClient dagClient = sharedTezClient.submitDAG(dag);
+    DAGClient dagClient = extServiceTestHelper.getSharedTezClient().submitDAG(dag);
     DAGStatus dagStatus = dagClient.waitForCompletion();
     assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
     assertEquals(1, dagStatus.getDAGProgress().getFailedTaskAttemptCount());
@@ -309,18 +208,18 @@ public class TestExternalTezServices {
                                VertexExecutionContext rhsContext,
                                VertexExecutionContext validateContext) throws
       Exception {
-    int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions();
+    int externalSubmissionCount = extServiceTestHelper.getTezTestServiceCluster().getNumSubmissions();
 
-    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+    TezConfiguration tezConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
     JoinValidateConfigured joinValidate =
         new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext, rhsContext,
             validateContext, name);
     String[] validateArgs = new String[]{"-disableSplitGrouping",
         HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"};
-    assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient));
+    assertEquals(0, joinValidate.run(tezConf, validateArgs, extServiceTestHelper.getSharedTezClient()));
 
     // Ensure this was actually submitted to the external cluster
     assertEquals(extExpectedCount,
-        (tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount));
+        (extServiceTestHelper.getTezTestServiceCluster().getNumSubmissions() - externalSubmissionCount));
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
new file mode 100644
index 0000000..bfd3ed2
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.tests;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.launcher.TezTestServiceContainerLauncherWithErrors;
+import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
+import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
+import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerServiceWithErrors;
+import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
+import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorWithErrors;
+import org.apache.tez.examples.JoinValidateConfigured;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestExternalTezServicesErrors {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServicesErrors.class);
+
+  private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
+  private static final String EXT_FAIL_ENTITY_NAME = "ExtServiceTestFail";
+
+  private static ExternalTezServiceTestHelper extServiceTestHelper;
+
+  private static ServicePluginsDescriptor servicePluginsDescriptor;
+
+  private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServicesErrors.class.getSimpleName());
+  private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
+  private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
+
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH =
+      Vertex.VertexExecutionContext.create(
+          EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_FAIL =
+      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_FAIL =
+      Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME);
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_FAIL =
+      Vertex.VertexExecutionContext.create(EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+
+
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServicesErrors.class.getName()
+      + "-tmpDir";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+
+    extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR);
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
+
+    TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
+        TaskSchedulerDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())
+            .setUserPayload(userPayload),
+        TaskSchedulerDescriptor.create(EXT_FAIL_ENTITY_NAME,
+            TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
+            userPayload)};
+
+    ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{
+        ContainerLauncherDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())
+            .setUserPayload(userPayload),
+        ContainerLauncherDescriptor.create(EXT_FAIL_ENTITY_NAME,
+            TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayload)};
+
+    TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{
+        TaskCommunicatorDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())
+            .setUserPayload(userPayload),
+        TaskCommunicatorDescriptor.create(EXT_FAIL_ENTITY_NAME,
+            TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayload)};
+
+    servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true,
+        taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);
+
+    extServiceTestHelper.setupSharedTezClient(servicePluginsDescriptor);
+
+    // Generate the join data set used for each run.
+    // Can a timeout be enforced here ?
+    Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1");
+    Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2");
+    extServiceTestHelper
+        .setupHashJoinData(SRC_DATA_DIR, dataPath1, dataPath2, HASH_JOIN_EXPECTED_RESULT_PATH,
HASH_JOIN_OUTPUT_PATH);
+
+    extServiceTestHelper.shutdownSharedTezClient();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException, TezException {
+    extServiceTestHelper.tearDownAll();
+  }
+
+  @Test (timeout = 90000)
+  public void testContainerLauncherError() throws Exception {
+    testServiceError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_FAIL,
+        DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR);
+  }
+
+  @Test (timeout = 90000)
+  public void testTaskCommunicatorError() throws Exception {
+    testServiceError("_testTaskCommunicatorError_", EXECUTION_CONTEXT_TASKCOMM_FAIL,
+        DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR);
+  }
+
+  @Test (timeout = 90000)
+  public void testTaskSchedulerError() throws Exception {
+    testServiceError("_testTaskSchedulerError_", EXECUTION_CONTEXT_SCHEDULER_FAIL,
+        DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR);
+  }
+
+  private void testServiceError(String methodName,
+                                Vertex.VertexExecutionContext lhsExecutionContext,
+                                DAGAppMasterEventType expectedEventType) throws
+      IOException, TezException, InterruptedException, YarnException {
+    TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
+    TezClient tezClient = TezClient
+        .newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session",
+            tezClientConf)
+        .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
+
+    ApplicationId appId;
+    try {
+      tezClient.start();
+      LOG.info("TezSessionStarted for " + methodName);
+      tezClient.waitTillReady();
+      LOG.info("TezSession ready for submission for " + methodName);
+
+      JoinValidateConfigured joinValidate =
+          new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsExecutionContext,
+              EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+              EXECUTION_CONTEXT_EXT_SERVICE_PUSH, "LauncherFailTest");
+
+      DAG dag = joinValidate
+          .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH,
+              HASH_JOIN_OUTPUT_PATH, 3);
+
+      DAGClient dagClient = tezClient.submitDAG(dag);
+
+      DAGStatus dagStatus =
+          dagClient.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+      assertEquals(DAGStatus.State.ERROR, dagStatus.getState());
+      boolean foundDiag = false;
+      for (String diag : dagStatus.getDiagnostics()) {
+        if (diag.contains("Service Error") && diag.contains(
+            expectedEventType.toString()) &&
+            diag.contains("Simulated Error")) {
+          foundDiag = true;
+        }
+      }
+      appId = tezClient.getAppMasterApplicationId();
+      assertTrue(foundDiag);
+    } finally {
+      tezClient.stop();
+    }
+    // Verify the state of the application.
+    if (appId != null) {
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      try {
+        yarnClient.init(tezClientConf);
+        yarnClient.start();
+
+        ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+        YarnApplicationState appState = appReport.getYarnApplicationState();
+        while (!EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED,
+            YarnApplicationState.KILLED).contains(appState)) {
+          Thread.sleep(200L);
+          appReport = yarnClient.getApplicationReport(appId);
+          appState = appReport.getYarnApplicationState();
+        }
+
+        // TODO Workaround for YARN-4554. AppReport does not provide diagnostics - need to
fetch them from ApplicationAttemptReport
+        ApplicationAttemptId appAttemptId = appReport.getCurrentApplicationAttemptId();
+        ApplicationAttemptReport appAttemptReport =
+            yarnClient.getApplicationAttemptReport(appAttemptId);
+        String diag = appAttemptReport.getDiagnostics();
+        assertEquals(FinalApplicationStatus.FAILED, appReport.getFinalApplicationStatus());
+        assertEquals(YarnApplicationState.FINISHED, appReport.getYarnApplicationState());
+        assertTrue(diag.contains("Service Error") && diag.contains(
+            expectedEventType.toString()) &&
+            diag.contains("Simulated Error"));
+
+      } finally {
+        yarnClient.stop();
+      }
+    }
+  }
+
+}


Mime
View raw message