hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject hive git commit: HIVE-11393. LLAP: Fix API usage to work with evolving Tez APIs - TEZ-{2651, 2652, 2653}. (Siddharth Seth)
Date Tue, 28 Jul 2015 21:58:20 GMT
Repository: hive
Updated Branches:
  refs/heads/llap 6bdb903e4 -> ef454511d


HIVE-11393. LLAP: Fix API usage to work with evolving Tez APIs - TEZ-{2651,2652,2653}. (Siddharth
Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ef454511
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ef454511
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ef454511

Branch: refs/heads/llap
Commit: ef454511dd8614de0a5d30466d9a7c6bb2c3b10b
Parents: 6bdb903
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Jul 28 14:57:52 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Jul 28 14:57:52 2015 -0700

----------------------------------------------------------------------
 .../llap/tezplugins/LlapContainerLauncher.java  |  2 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   | 37 +++++++-------
 .../dag/app/rm/LlapTaskSchedulerService.java    |  9 +++-
 .../app/rm/TestLlapTaskSchedulerService.java    | 13 ++++-
 .../hadoop/hive/ql/exec/tez/DagUtils.java       | 39 ++++++++-------
 .../hive/ql/exec/tez/TezSessionState.java       | 51 +++++++++-----------
 6 files changed, 82 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
index 3f1f58f..07703a2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java
@@ -25,7 +25,7 @@ public class LlapContainerLauncher extends ContainerLauncher {
   private static final Logger LOG = LoggerFactory.getLogger(LlapContainerLauncher.class);
 
   public LlapContainerLauncher(ContainerLauncherContext containerLauncherContext) {
-    super(LlapContainerLauncher.class.getName(), containerLauncherContext);
+    super(containerLauncherContext);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index dc06c97..44fd7e3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -106,12 +106,13 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
 
     credentialMap = new ConcurrentHashMap<>();
-    sourceStateTracker = new SourceStateTracker(getTaskCommunicatorContext(), this);
+    sourceStateTracker = new SourceStateTracker(getContext(), this);
   }
 
   @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
+  public void initialize() throws Exception {
+    super.initialize();
+    Configuration conf = getConf();
     int numThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS,
         LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT);
     this.communicator = new TaskCommunicator(numThreads, conf);
@@ -124,14 +125,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
   }
 
   @Override
-  public void serviceStart() {
-    super.serviceStart();
+  public void start() {
+    super.start();
     this.communicator.start();
   }
 
   @Override
-  public void serviceStop() {
-    super.serviceStop();
+  public void shutdown() {
+    super.shutdown();
     if (this.communicator != null) {
       this.communicator.stop();
     }
@@ -139,7 +140,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
   @Override
   protected void startRpcServer() {
-    Configuration conf = getConfig();
+    Configuration conf = getConf();
     try {
       JobTokenSecretManager jobTokenSecretManager =
           new JobTokenSecretManager();
@@ -232,7 +233,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
     // Have to register this up front right now. Otherwise, it's possible for the task to
start
     // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
-    getTaskCommunicatorContext()
+    getContext()
         .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
     communicator.sendSubmitWork(requestProto, host, port,
         new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
@@ -255,14 +256,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
                 LOG.info(
                     "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId:
" +
                         containerId + ", Service Busy");
-                getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+                getContext().taskKilled(taskSpec.getTaskAttemptID(),
                     TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
               } else {
                 // All others from the remote service cause the task to FAIL.
                 LOG.info(
                     "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId:
" +
                         containerId, t);
-                getTaskCommunicatorContext()
+                getContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
                         t.toString());
               }
@@ -272,14 +273,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
                 LOG.info(
                     "Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId:
" +
                         containerId + ", Communication Error");
-                getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+                getContext().taskKilled(taskSpec.getTaskAttemptID(),
                     TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
               } else {
                 // Anything else is a FAIL.
                 LOG.info(
                     "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId:
" +
                         containerId, t);
-                getTaskCommunicatorContext()
+                getContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
                         t.getMessage());
               }
@@ -406,11 +407,11 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     builder.setAmPort(getAddress().getPort());
     Credentials taskCredentials = new Credentials();
     // Credentials can change across DAGs. Ideally construct only once per DAG.
-    taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+    taskCredentials.addAll(getContext().getCredentials());
 
     ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
     if (credentialsBinary == null) {
-      credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials());
+      credentialsBinary = serializeCredentials(getContext().getCredentials());
       credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
     } else {
       credentialsBinary = credentialsBinary.duplicate();
@@ -459,7 +460,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
     @Override
     public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
       // TODO Unregister the task for state updates, which could in turn unregister the node.
-      getTaskCommunicatorContext().taskKilled(taskAttemptId,
+      getContext().taskKilled(taskAttemptId,
           TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");
       entityTracker.unregisterTaskAttempt(taskAttemptId);
     }
@@ -598,8 +599,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
       if (biMap != null) {
         synchronized(biMap) {
           for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
-            getTaskCommunicatorContext().taskAlive(entry.getValue());
-            getTaskCommunicatorContext().containerAlive(entry.getKey());
+            getContext().taskAlive(entry.getValue());
+            getContext().containerAlive(entry.getKey());
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index b6ee3d8..f31c6a5 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -68,6 +68,8 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -159,7 +161,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock)
{
     super(taskSchedulerContext);
     this.clock = clock;
-    this.conf = taskSchedulerContext.getInitialConfiguration();
+    try {
+      this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(
+          "Failed to parse user payload for " + LlapTaskSchedulerService.class.getSimpleName(),
e);
+    }
     this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
         taskSchedulerContext.getCustomClusterIdentifier());
     this.memoryPerInstance =

http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index 245c140..3737e55 100644
--- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.ControlledClock;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -301,7 +303,8 @@ public class TestLlapTaskSchedulerService {
 
       doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
       doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();
-      doReturn(conf).when(mockAppCallback).getInitialConfiguration();
+      UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+      doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
 
       ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
 
@@ -362,7 +365,13 @@ public class TestLlapTaskSchedulerService {
     public LlapTaskSchedulerServiceForTest(
         TaskSchedulerContext appClient, Clock clock) {
       super(appClient, clock);
-      this.inTest = appClient.getInitialConfiguration().getBoolean(LLAP_TASK_SCHEDULER_IN_TEST,
false);
+      Configuration conf;
+      try {
+        conf = TezUtils.createConfFromUserPayload(appClient.getInitialUserPayload());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      this.inTest = conf.getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false);
     }
 
     protected void schedulePendingTasks() {

http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 45b092c..bd69744 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -108,6 +108,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
@@ -625,15 +626,13 @@ public class DagUtils {
       procClassName = MergeFileTezProcessor.class.getName();
     }
 
-    String serviceName = findServiceName(mapWork);
+    VertexExecutionContext executionContext = createVertexExecutionContext(mapWork);
 
     map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName)
-        .setUserPayload(serializedConf), numTasks, getContainerResource(conf))
-      .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
-      .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName)
-      .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, serviceName);
+        .setUserPayload(serializedConf), numTasks, getContainerResource(conf));
 
     map.setTaskEnvironment(getContainerEnvironment(conf, true));
+    map.setExecutionContext(executionContext);
     map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     assert mapWork.getAliasToWork().keySet().size() == 1;
@@ -671,11 +670,17 @@ public class DagUtils {
     return conf;
   }
 
-  private String findServiceName(BaseWork work) {
-    String serviceName = TezSessionState.DEFAULT_SERVICE;
-    if (work.getLlapMode()) serviceName = TezSessionState.LLAP_SERVICE;
-    if (work.getUberMode()) serviceName = TezSessionState.LOCAL_SERVICE;
-    return serviceName;
+  private VertexExecutionContext createVertexExecutionContext(BaseWork work) {
+    VertexExecutionContext vertexExecutionContext = VertexExecutionContext.createExecuteInContainers(true);
+    if (work.getLlapMode()) {
+      vertexExecutionContext = VertexExecutionContext
+          .create(TezSessionState.LLAP_SERVICE, TezSessionState.LLAP_SERVICE,
+              TezSessionState.LLAP_SERVICE);
+    }
+    if (work.getUberMode()) {
+      vertexExecutionContext = VertexExecutionContext.createExecuteInAm(true);
+    }
+    return vertexExecutionContext;
   }
 
   /*
@@ -692,20 +697,18 @@ public class DagUtils {
     // create the directories FileSinkOperators need
     Utilities.createTmpDirs(conf, reduceWork);
 
-    String serviceName = findServiceName(reduceWork);
+    VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(reduceWork);
 
     // create the vertex
     Vertex reducer = Vertex.create(reduceWork.getName(),
         ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
-        setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
-            reduceWork.isAutoReduceParallelism()?
-	      reduceWork.getMaxReduceTasks():
-	      reduceWork.getNumReduceTasks(), getContainerResource(conf))
-      .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, serviceName)
-      .setConf(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, serviceName)
-      .setConf(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, serviceName);
+            setUserPayload(TezUtils.createUserPayloadFromConf(conf)),
+        reduceWork.isAutoReduceParallelism() ?
+            reduceWork.getMaxReduceTasks() :
+            reduceWork.getNumReduceTasks(), getContainerResource(conf));
 
     reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
+    reducer.setExecutionContext(vertexExecutionContext);
     reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
 
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

http://git-wip-us.apache.org/repos/asf/hive/blob/ef454511/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 34e8cc8..ac460b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -24,10 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,22 +44,23 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.optimizer.physical.LlapDecider.LlapMode;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.PreWarmVertex;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 
 /**
  * Holds session state related to Tez
@@ -71,14 +70,9 @@ public class TezSessionState {
   private static final Log LOG = LogFactory.getLog(TezSessionState.class.getName());
   private static final String TEZ_DIR = "_tez_session_dir";
   public static final String LLAP_SERVICE = "LLAP";
-  public static final String DEFAULT_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
-  public static final String LOCAL_SERVICE = TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT;
   private static final String LLAP_SCHEDULER = "org.apache.tez.dag.app.rm.LlapTaskSchedulerService";
   private static final String LLAP_LAUNCHER = "org.apache.hadoop.hive.llap.tezplugins.LlapContainerLauncher";
   private static final String LLAP_TASK_COMMUNICATOR = "org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator";
-  private static final String LLAP_SERVICE_SCHEDULER = LLAP_SERVICE + ":" + LLAP_SCHEDULER;
-  private static final String LLAP_SERVICE_LAUNCHER = LLAP_SERVICE + ":" + LLAP_LAUNCHER;
-  private static final String LLAP_SERVICE_TASK_COMMUNICATOR = LLAP_SERVICE + ":" + LLAP_TASK_COMMUNICATOR;
 
   private HiveConf conf;
   private Path tezScratchDir;
@@ -212,25 +206,23 @@ public class TezSessionState {
     // set up the staging directory to use
     tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
 
+    ServicePluginsDescriptor servicePluginsDescriptor;
+    UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
+
     if (llapMode) {
       // we need plugins to handle llap and uber mode
-      tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, DEFAULT_SERVICE, LOCAL_SERVICE,
-          LLAP_SERVICE_SCHEDULER);
-
-      tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, DEFAULT_SERVICE,
-          LOCAL_SERVICE, LLAP_SERVICE_LAUNCHER);
-
-      tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, DEFAULT_SERVICE,
-          LOCAL_SERVICE, LLAP_SERVICE_TASK_COMMUNICATOR);
+      servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
+          new TaskSchedulerDescriptor[]{
+              TaskSchedulerDescriptor.create(LLAP_SERVICE, LLAP_SCHEDULER)
+                  .setUserPayload(servicePluginPayload)},
+          new ContainerLauncherDescriptor[]{
+              ContainerLauncherDescriptor.create(LLAP_SERVICE, LLAP_LAUNCHER)},
+          new TaskCommunicatorDescriptor[]{
+              TaskCommunicatorDescriptor.create(LLAP_SERVICE, LLAP_TASK_COMMUNICATOR)
+                  .setUserPayload(servicePluginPayload)});
     } else {
       // we need plugins to handle llap and uber mode
-      tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, DEFAULT_SERVICE, LOCAL_SERVICE);
-
-      tezConfig.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, DEFAULT_SERVICE,
-          LOCAL_SERVICE);
-
-      tezConfig.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, DEFAULT_SERVICE,
-          LOCAL_SERVICE);
+      servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
     }
 
     // container prewarming. tell the am how many containers we need
@@ -242,8 +234,9 @@ public class TezSessionState {
       tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
     }
 
-    session = TezClient.create("HIVE-" + sessionId, tezConfig, true,
-        commonLocalResources, null);
+    session = TezClient.newBuilder("HIVE-" + sessionId, tezConfig).setIsSession(true)
+        .setLocalResources(commonLocalResources)
+        .setServicePluginDescriptor(servicePluginsDescriptor).build();
 
     LOG.info("Opening new Tez Session (id: " + sessionId
         + ", scratch dir: " + tezScratchDir + ")");


Mime
View raw message