tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3108. Add support for external services to local mode. (sseth)
Date Wed, 30 Mar 2016 17:21:56 GMT
Repository: tez
Updated Branches:
  refs/heads/master c547dcc80 -> 9c1d8ceed


TEZ-3108. Add support for external services to local mode. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9c1d8cee
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9c1d8cee
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9c1d8cee

Branch: refs/heads/master
Commit: 9c1d8ceed9eb31a3631e30dc24e41843ab00e8fc
Parents: c547dcc
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Mar 30 10:21:31 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Mar 30 10:21:31 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 docs/src/site/markdown/localmode.md             |   9 +
 .../java/org/apache/tez/client/LocalClient.java |  43 +++-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  11 +-
 .../app/launcher/ContainerLauncherManager.java  |   8 +-
 .../app/launcher/LocalContainerLauncher.java    |  12 +-
 .../tez/dag/app/rm/TaskSchedulerManager.java    |  17 +-
 .../tez/tests/TestExtServicesWithLocalMode.java | 206 +++++++++++++++++++
 8 files changed, 282 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f9d0166..830d7fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3108. Add support for external services to local mode.
   TEZ-3189. Pre-warm dags should not be counted in submitted dags count by DAGAppMaster.
   TEZ-2967. Vertex start time should be that of first task start time in UI
   TEZ-3175. Add tez client submit host

http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/docs/src/site/markdown/localmode.md
----------------------------------------------------------------------
diff --git a/docs/src/site/markdown/localmode.md b/docs/src/site/markdown/localmode.md
index ae546a1..26968da 100644
--- a/docs/src/site/markdown/localmode.md
+++ b/docs/src/site/markdown/localmode.md
@@ -105,3 +105,12 @@ Potential pitfalls when moving from Local Mode to a real cluster
 -   The ObjectRegistry will work within a single task, when running in
     Local Mode. The behaviour would be different on a real cluster,
     where it would work across tasks which share the same container.
+
+Local Mode with External Services
+
+-   When running in local mode, regular container execution is converted
+    to run within the same process, instead of launching containers.
+-   Execution that is configured to run in external services is unaffected,
+    and an attempt is made to make use of these external services for execution.
+    If configured in this manner, make sure that the external services are running
+    when attempting to use local mode.

http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index b225523..474f4ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -18,11 +18,13 @@
 
 package org.apache.tez.client;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -44,9 +46,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClientHandler;
+import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.app.DAGAppMasterState;
@@ -333,16 +338,44 @@ public class LocalClient extends FrameworkClient {
 
     return thread;
   }
-  
+
   // this can be overridden by test code to create a mock app
   @VisibleForTesting
   protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
-      ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
-      Clock clock, long appSubmitTime, boolean isSession, String userDir,
-      String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName)
{
+                                            ContainerId cId, String currentHost, int nmPort,
+                                            int nmHttpPort,
+                                            Clock clock, long appSubmitTime, boolean isSession,
+                                            String userDir,
+                                            String[] localDirs, String[] logDirs,
+                                            Credentials credentials, String jobUserName)
throws
+      IOException {
+
+    // Read in additional information about external services
+    AMPluginDescriptorProto amPluginDescriptorProto =
+        getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString());
+
+
     return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
         new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
-        versionInfo.getVersion(), 1, credentials, jobUserName, null);
+        versionInfo.getVersion(), 1, credentials, jobUserName, amPluginDescriptorProto);
+  }
+
+  private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf,
+                                                          String applicationIdString) throws
+      IOException {
+    Path tezSysStagingPath = TezCommonUtils
+        .getTezSystemStagingPath(conf, applicationIdString);
+    // Remove the filesystem qualifier.
+    String unqualifiedPath = tezSysStagingPath.toUri().getPath();
+
+    DAGProtos.ConfigurationProto confProto =
+        TezUtilsInternal
+            .readUserSpecifiedTezConfiguration(unqualifiedPath);
+    AMPluginDescriptorProto amPluginDescriptorProto = null;
+    if (confProto.hasAmPluginDescriptor()) {
+      amPluginDescriptorProto = confProto.getAmPluginDescriptor();
+    }
+    return amPluginDescriptorProto;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 484ae77..7d28497 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -435,7 +435,6 @@ public class DAGAppMaster extends AbstractService {
         isLocal, defaultPayload);
 
 
-
     LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers"));
     LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers"));
     LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators"));
@@ -2638,8 +2637,14 @@ public class DAGAppMaster extends AbstractService {
                                           UserPayload defaultPayload,
                                           BiMap<String, Integer> schedulerPluginMap)
{
     if (isLocal) {
-      Preconditions.checkState(descriptors.size() == 1 &&
-          descriptors.get(0).getEntityName().equals(TezConstants.getTezUberServicePluginName()));
+      boolean foundUberServiceName = false;
+      for (NamedEntityDescriptor descriptor : descriptors) {
+        if (descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName()))
{
+          foundUberServiceName = true;
+          break;
+        }
+      }
+      Preconditions.checkState(foundUberServiceName);
     } else {
       boolean foundYarn = false;
       for (int i = 0; i < descriptors.size(); i++) {

http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 250afd8..1f5151b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -69,7 +69,7 @@ public class ContainerLauncherManager extends AbstractService
                                   TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                   String workingDirectory,
                                   List<NamedEntityDescriptor> containerLauncherDescriptors,
-                                  boolean isPureLocalMode) throws TezException {
+                                  boolean isLocalMode) throws TezException {
     super(ContainerLauncherManager.class.getName());
 
     this.isIncompleteCtor = false;
@@ -88,7 +88,7 @@ public class ContainerLauncherManager extends AbstractService
           new ContainerLauncherContextImpl(context, this, taskCommunicatorManagerInterface,
userPayload, i);
       containerLauncherContexts[i] = containerLauncherContext;
       containerLaunchers[i] = new ContainerLauncherWrapper(createContainerLauncher(containerLauncherDescriptors.get(i),
context,
-          containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i,
isPureLocalMode));
+          containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i,
isLocalMode));
       containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i].getContainerLauncher());
     }
   }
@@ -145,7 +145,7 @@ public class ContainerLauncherManager extends AbstractService
                                                 AppContext context,
                                                 TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                                 String workingDirectory,
-                                                boolean isPureLocalMode) {
+                                                boolean isLocalMode) {
     LOG.info("Creating LocalContainerLauncher");
     // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
     // extensive internals which are only available at runtime. Will likely require
@@ -154,7 +154,7 @@ public class ContainerLauncherManager extends AbstractService
       return
           new LocalContainerLauncher(containerLauncherContext, context,
               taskCommunicatorManagerInterface,
-              workingDirectory, isPureLocalMode);
+              workingDirectory, isLocalMode);
     } catch (UnknownHostException e) {
       throw new TezUncheckedException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index b737fda..1e9d1e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -88,7 +88,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
   private final Map<String, String> localEnv;
   private final ExecutionContext executionContext;
   private final int numExecutors;
-  private final boolean isPureLocalMode;
+  private final boolean isLocalMode;
 
   private final ConcurrentHashMap<ContainerId, RunningTaskCallback>
       runningContainers =
@@ -108,7 +108,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
                                 AppContext context,
                                 TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                 String workingDirectory,
-                                boolean isPureLocalMode) throws UnknownHostException {
+                                boolean isLocalMode) throws UnknownHostException {
     // TODO Post TEZ-2003. Most of this information is dynamic and only available after the
AM
     // starts up. It's not possible to set these up via a static payload.
     // Will need some kind of mechanism to dynamically crate payloads / bind to parameters
@@ -117,8 +117,8 @@ public class LocalContainerLauncher extends ContainerLauncher {
     this.context = context;
     this.tal = taskCommunicatorManagerInterface;
     this.workingDirectory = workingDirectory;
-    this.isPureLocalMode = isPureLocalMode;
-    if (isPureLocalMode) {
+    this.isLocalMode = isLocalMode;
+    if (isLocalMode) {
       localEnv = Maps.newHashMap();
       AuxiliaryServiceHelper.setServiceDataIntoEnv(
           ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
@@ -127,7 +127,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
     }
 
     // Check if the hostname is set in the environment before overriding it.
-    String host = isPureLocalMode ? InetAddress.getLocalHost().getHostName() :
+    String host = isLocalMode ? InetAddress.getLocalHost().getHostName() :
         System.getenv(Environment.NM_HOST.name());
     executionContext = new ExecutionContextImpl(host);
 
@@ -347,7 +347,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
     Map<String, String> containerEnv = new HashMap<String, String>();
     containerEnv.putAll(localEnv);
     // Use the user from env if it's available.
-    String user = isPureLocalMode ? System.getenv(Environment.USER.name()) : context.getUser();
+    String user = isLocalMode ? System.getenv(Environment.USER.name()) : context.getUser();
     containerEnv.put(Environment.USER.name(), user);
 
     long memAvailable;

http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 5317440..16f9a28 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -124,7 +124,7 @@ public class TaskSchedulerManager extends AbstractService implements
   @VisibleForTesting
   final ExecutorService appCallbackExecutor;
 
-  private final boolean isPureLocalMode;
+  private final boolean isLocalMode;
   // If running in non local-only mode, the YARN task scheduler will always run to take care
of
   // registration with YARN and heartbeats to YARN.
   // Splitting registration and heartbeats is not straight-forward due to the taskScheduler
being
@@ -159,7 +159,7 @@ public class TaskSchedulerManager extends AbstractService implements
     this.taskSchedulerDescriptors = null;
     this.webUI = null;
     this.historyUrl = null;
-    this.isPureLocalMode = false;
+    this.isLocalMode = false;
   }
 
   /**
@@ -171,7 +171,7 @@ public class TaskSchedulerManager extends AbstractService implements
    * @param webUI
    * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes
will not have the class names populated.
    *                         An empty list defaults to using the YarnTaskScheduler as the
only source.
-   * @param isPureLocalMode whether the AM is running in local mode
+   * @param isLocalMode whether the AM is running in local mode
    */
   @SuppressWarnings("rawtypes")
   public TaskSchedulerManager(AppContext appContext,
@@ -179,7 +179,7 @@ public class TaskSchedulerManager extends AbstractService implements
                               ContainerSignatureMatcher containerSignatureMatcher,
                               WebUIService webUI,
                               List<NamedEntityDescriptor> schedulerDescriptors,
-                              boolean isPureLocalMode) {
+                              boolean isLocalMode) {
     super(TaskSchedulerManager.class.getName());
     Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(),
         "TaskSchedulerDescriptors must be specified");
@@ -189,7 +189,7 @@ public class TaskSchedulerManager extends AbstractService implements
     this.containerSignatureMatcher = containerSignatureMatcher;
     this.webUI = webUI;
     this.historyUrl = getHistoryUrl();
-    this.isPureLocalMode = isPureLocalMode;
+    this.isLocalMode = isLocalMode;
     this.appCallbackExecutor = createAppCallbackExecutorService();
     if (this.webUI != null) {
       this.webUI.setHistoryUrl(this.historyUrl);
@@ -579,8 +579,11 @@ public class TaskSchedulerManager extends AbstractService implements
     int j = 0;
     for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
       long customAppIdIdentifier;
-      if (isPureLocalMode || taskSchedulerDescriptors[i].getEntityName().equals(
-          TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the
appId.
+      if ((isLocalMode && taskSchedulerDescriptors[i].getEntityName()
+          .equals(TezConstants.getTezUberServicePluginName()) ||
+          taskSchedulerDescriptors[i].getEntityName()
+              .equals(TezConstants.getTezYarnServicePluginName()))) {
+        // Use the provided appId instead of constructing one for containers.
         customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
       } else {
         customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);

http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExtServicesWithLocalMode.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExtServicesWithLocalMode.java
b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExtServicesWithLocalMode.java
new file mode 100644
index 0000000..3d8c087
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExtServicesWithLocalMode.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.tests;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
+import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
+import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
+import org.apache.tez.examples.HashJoinExample;
+import org.apache.tez.examples.JoinDataGen;
+import org.apache.tez.examples.JoinValidateConfigured;
+import org.apache.tez.service.MiniTezTestServiceCluster;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestExtServicesWithLocalMode {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestExtServicesWithLocalMode.class);
+
+
+  private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
+
+  private static String TEST_ROOT_DIR =
+      "target" + Path.SEPARATOR + TestExtServicesWithLocalMode.class.getName()
+          + "-tmpDir";
+
+  private static final Path SRC_DATA_DIR = new Path(TEST_ROOT_DIR + Path.SEPARATOR + "data");
+  private static final Path HASH_JOIN_EXPECTED_RESULT_PATH =
+      new Path(SRC_DATA_DIR, "expectedOutputPath");
+  private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
+
+  private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH =
+      Vertex.VertexExecutionContext.create(
+          EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+
+  private static volatile Configuration clusterConf = new Configuration();
+  private static volatile FileSystem localFs;
+  private static volatile MiniTezTestServiceCluster tezTestServiceCluster;
+
+  private static volatile Configuration confForJobs;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+
+    localFs = FileSystem.getLocal(clusterConf).getRaw();
+    long jvmMax = Runtime.getRuntime().maxMemory();
+    tezTestServiceCluster = MiniTezTestServiceCluster
+        .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)),
1);
+    tezTestServiceCluster.init(clusterConf);
+    tezTestServiceCluster.start();
+    LOG.info("MiniTezTestServer started");
+
+    confForJobs = new Configuration(clusterConf);
+    for (Map.Entry<String, String> entry : tezTestServiceCluster
+        .getClusterSpecificConfiguration()) {
+      confForJobs.set(entry.getKey(), entry.getValue());
+    }
+    confForJobs.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException, TezException {
+    if (tezTestServiceCluster != null) {
+      tezTestServiceCluster.stop();
+      tezTestServiceCluster = null;
+    }
+
+    Path testRootDirPath = new Path(TEST_ROOT_DIR);
+    testRootDirPath = localFs.makeQualified(testRootDirPath);
+    LOG.info("CLeaning up path: " + testRootDirPath);
+    localFs.delete(testRootDirPath, true);
+  }
+
+
+  @Test(timeout = 30000)
+  public void test1() throws Exception {
+
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(confForJobs);
+
+    TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
+        TaskSchedulerDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())
+            .setUserPayload(userPayload)};
+
+    ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{
+        ContainerLauncherDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())
+            .setUserPayload(userPayload)};
+
+    TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{
+        TaskCommunicatorDescriptor
+            .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())
+            .setUserPayload(userPayload)};
+
+    ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
false,
+        taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);
+
+
+    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+
+    TezClient tezClient = TezClient.newBuilder("test1", tezConf).setIsSession(true)
+        .setServicePluginDescriptor(servicePluginsDescriptor).build();
+    try {
+      tezClient.start();
+
+
+      Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1");
+      Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2");
+
+      Path expectedResultPath = new Path(SRC_DATA_DIR, "expectedOutputPath");
+
+
+      JoinDataGen dataGen = new JoinDataGen();
+      String[] dataGenArgs = new String[]{
+          dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+          expectedResultPath.toString(), "2"};
+
+      assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezClient));
+
+      Path outputPath = new Path(SRC_DATA_DIR, "outPath");
+      HashJoinExample joinExample = new HashJoinExample();
+      String[] args = new String[]{
+          dataPath1.toString(), dataPath2.toString(), "2", outputPath.toString()};
+      assertEquals(0, joinExample.run(tezConf, args, tezClient));
+      LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result");
+
+      assertEquals(0, tezTestServiceCluster.getNumSubmissions());
+
+      // ext can consume from ext.
+      runJoinValidate(tezClient, "allInExt", 7, EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+          EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
+      LOG.info("Completed allInExt");
+
+      // uber can consume from uber.
+      runJoinValidate(tezClient, "noneInExt", 0, null, null, null);
+      LOG.info("Completed noneInExt");
+
+      // uber can consume from ext
+      runJoinValidate(tezClient, "lhsInExt", 2, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, null,
null);
+      LOG.info("Completed lhsInExt");
+
+      // ext cannot consume from uber in this mode since there's no shuffle handler working,
+      // and the local data transfer semantics may not match.
+
+    } finally {
+      tezClient.stop();
+    }
+
+  }
+
+  private void runJoinValidate(TezClient tezClient, String name, int extExpectedCount,
+                               Vertex.VertexExecutionContext lhsContext,
+                               Vertex.VertexExecutionContext rhsContext,
+                               Vertex.VertexExecutionContext validateContext) throws
+      Exception {
+    int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions();
+
+    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+    JoinValidateConfigured joinValidate =
+        new JoinValidateConfigured(null, lhsContext, rhsContext,
+            validateContext, name);
+    String[] validateArgs = new String[]{"-disableSplitGrouping",
+        HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"};
+    assertEquals(0, joinValidate.run(tezConf, validateArgs, tezClient));
+
+    // Ensure this was actually submitted to the external cluster
+    assertEquals(extExpectedCount,
+        (tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount));
+  }
+}


Mime
View raw message