tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [35/50] [abbrv] tez git commit: TEZ-2652. Cleanup the way services are specified for an AM and vertices. (sseth)
Date Sat, 22 Aug 2015 01:19:25 GMT
TEZ-2652. Cleanup the way services are specified for an AM and vertices. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ec5acd8a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ec5acd8a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ec5acd8a

Branch: refs/heads/TEZ-2003
Commit: ec5acd8a6243db277b4145228f34917811984cc6
Parents: 25a6a13
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Jul 28 14:56:20 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Aug 21 18:14:40 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../java/org/apache/tez/client/TezClient.java   |  81 ++++++++-
 .../org/apache/tez/client/TezClientUtils.java   |  56 +++++-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  48 +++++-
 .../apache/tez/dag/api/DagTypeConverters.java   |  93 +++++++++-
 .../tez/dag/api/NamedEntityDescriptor.java      |  33 ++++
 .../apache/tez/dag/api/TezConfiguration.java    |  31 ----
 .../org/apache/tez/dag/api/TezConstants.java    |  11 +-
 .../java/org/apache/tez/dag/api/Vertex.java     | 110 +++++++++++-
 .../api/ContainerLauncherDescriptor.java        |  32 ++++
 .../api/ServicePluginsDescriptor.java           |  96 +++++++++++
 .../api/TaskCommunicatorDescriptor.java         |  33 ++++
 .../api/TaskSchedulerDescriptor.java            |  32 ++++
 tez-api/src/main/proto/DAGApiRecords.proto      |  25 +++
 .../apache/tez/client/TestTezClientUtils.java   |  12 +-
 .../org/apache/tez/common/TezUtilsInternal.java |  31 +++-
 .../java/org/apache/tez/client/LocalClient.java |   2 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 172 +++++++++++--------
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  37 ++--
 .../java/org/apache/tez/dag/app/dag/DAG.java    |   2 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   9 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  39 ++++-
 .../dag/app/launcher/ContainerLauncherImpl.java |   6 +-
 .../app/launcher/ContainerLauncherRouter.java   |  40 +++--
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  47 ++---
 .../apache/tez/dag/app/MockDAGAppMaster.java    |   5 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  |   5 +-
 .../app/rm/TestTaskSchedulerEventHandler.java   |   4 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |   3 +-
 .../org/apache/tez/examples/JoinValidate.java   |  31 ++--
 .../TezTestServiceContainerLauncher.java        |   3 +-
 .../tez/examples/JoinValidateConfigured.java    |  40 +++--
 .../tez/tests/TestExternalTezServices.java      | 131 +++++++-------
 .../org/apache/tez/runtime/task/TezChild.java   |   5 +-
 34 files changed, 995 insertions(+), 311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index e57f76f..a201942 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -37,5 +37,6 @@ ALL CHANGES:
   TEZ-2004. Define basic interface for pluggable ContainerLaunchers.
   TEZ-2005. Define basic interface for pluggable TaskScheduler.
   TEZ-2651. Pluggable services should not extend AbstractService.
+  TEZ-2652. Cleanup the way services are specified for an AM and vertices.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index f9d8973..ee6280a 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 
 import org.apache.tez.common.RPCUtil;
 import org.apache.tez.common.counters.Limits;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -112,8 +113,9 @@ public class TezClient {
   private static final long SLEEP_FOR_READY = 500;
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
-  private Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
-  private TezApiVersionInfo apiVersionInfo;
+  private final Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
+  private final TezApiVersionInfo apiVersionInfo;
+  private final ServicePluginsDescriptor servicePluginsDescriptor;
   private HistoryACLPolicyManager historyACLPolicyManager;
 
   private int preWarmDAGCounter = 0;
@@ -143,19 +145,45 @@ public class TezClient {
 
   @Private
   protected TezClient(String name, TezConfiguration tezConf, boolean isSession,
+                      @Nullable Map<String, LocalResource> localResources,
+                      @Nullable Credentials credentials) {
+    this(name, tezConf, isSession, localResources, credentials, null);
+  }
+
+  @Private
+  protected TezClient(String name, TezConfiguration tezConf, boolean isSession,
             @Nullable Map<String, LocalResource> localResources,
-            @Nullable Credentials credentials) {
+            @Nullable Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor) {
     this.clientName = name;
     this.isSession = isSession;
     // Set in conf for local mode AM to figure out whether in session mode or not
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
     this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
     this.apiVersionInfo = new TezApiVersionInfo();
+    this.servicePluginsDescriptor = servicePluginsDescriptor;
     Limits.setConfiguration(tezConf);
 
     LOG.info("Tez Client Version: " + apiVersionInfo.toString());
   }
 
+
+  /**
+   * Create a new TezClientBuilder. This can be used to setup additional parameters
+   * like session mode, local resources, credentials, servicePlugins, etc.
+   * <p/>
+   * If session mode is not specified in the builder, this will be inferred from
+   * the provided TezConfiguration.
+   *
+   * @param name    Name of the client. Used for logging etc. This will also be used
+   *                as app master name is session mode
+   * @param tezConf Configuration for the framework
+   * @return An instance of {@link org.apache.tez.client.TezClient.TezClientBuilder}
+   * which can be used to construct the final TezClient.
+   */
+  public static TezClientBuilder newBuilder(String name, TezConfiguration tezConf) {
+    return new TezClientBuilder(name, tezConf);
+  }
+
   /**
    * Create a new TezClient. Session or non-session execution mode will be
    * inferred from configuration.
@@ -357,7 +385,7 @@ public class TezClient {
                 sessionAppId,
                 null, clientName, amConfig,
                 tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo,
-                historyACLPolicyManager);
+                historyACLPolicyManager, servicePluginsDescriptor);
   
         // Set Tez Sessions to not retry on AM crashes if recovery is disabled
         if (!amConfig.getTezConfiguration().getBoolean(
@@ -773,7 +801,8 @@ public class TezClient {
       ApplicationSubmissionContext appContext = TezClientUtils
           .createApplicationSubmissionContext( 
               appId, dag, dag.getName(), amConfig, tezJarResources, credentials,
-              usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager);
+              usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager,
+              servicePluginsDescriptor);
       LOG.info("Submitting DAG to YARN"
           + ", applicationId=" + appId
           + ", dagName=" + dag.getName());
@@ -848,4 +877,46 @@ public class TezClient {
          append(SEPARATOR).
          append(tezDagIdFormat.get().format(1)).toString();
   }
+
+  @Public
+  public static class TezClientBuilder {
+    final String name;
+    final TezConfiguration tezConf;
+    boolean isSession;
+    private Map<String, LocalResource> localResourceMap;
+    private Credentials credentials;
+    ServicePluginsDescriptor servicePluginsDescriptor;
+
+    private TezClientBuilder(String name, TezConfiguration tezConf) {
+      this.name = name;
+      this.tezConf = tezConf;
+      isSession = tezConf.getBoolean(
+          TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT);
+    }
+
+    public TezClientBuilder setIsSession(boolean isSession) {
+      this.isSession = isSession;
+      return this;
+    }
+
+    public TezClientBuilder setLocalResources(Map<String, LocalResource> localResources) {
+      this.localResourceMap = localResources;
+      return this;
+    }
+
+    public TezClientBuilder setCredentials(Credentials credentials) {
+      this.credentials = credentials;
+      return this;
+    }
+
+    public TezClientBuilder setServicePluginDescriptor(ServicePluginsDescriptor servicePluginsDescriptor) {
+      this.servicePluginsDescriptor = servicePluginsDescriptor;
+      return this;
+    }
+
+    public TezClient build() {
+      return new TezClient(name, tezConf, isSession, localResourceMap, credentials,
+          servicePluginsDescriptor);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 8bfaa1f..9cf1f3f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -39,6 +39,10 @@ import java.util.Map.Entry;
 
 import com.google.common.base.Strings;
 import org.apache.commons.lang.StringUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -405,6 +409,7 @@ public class TezClientUtils {
    * @param tezJarResources Resources to be used by the AM
    * @param sessionCreds the credential object which will be populated with session specific
    * @param historyACLPolicyManager
+   * @param servicePluginsDescriptor descriptor for services which may be running in the AM
    * @return an ApplicationSubmissionContext to launch a Tez AM
    * @throws IOException
    * @throws YarnException
@@ -415,7 +420,8 @@ public class TezClientUtils {
       ApplicationId appId, DAG dag, String amName,
       AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
       Credentials sessionCreds, boolean tezLrsAsArchive,
-      TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager)
+      TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager,
+      ServicePluginsDescriptor servicePluginsDescriptor)
       throws IOException, YarnException {
 
     Preconditions.checkNotNull(sessionCreds);
@@ -551,7 +557,7 @@ public class TezClientUtils {
 
     // emit conf as PB file
     ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration(),
-        aclConfigs);
+        aclConfigs, servicePluginsDescriptor);
     
     FSDataOutputStream amConfPBOutBinaryStream = null;
     try {
@@ -752,12 +758,8 @@ public class TezClientUtils {
         + "," + TezConstants.TEZ_CONTAINER_LOGGER_NAME);
   }
 
-  static ConfigurationProto createFinalConfProtoForApp(Configuration amConf) {
-    return createFinalConfProtoForApp(amConf, null);
-  }
-
   static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
-    Map<String, String> additionalConfigs) {
+    Map<String, String> additionalConfigs, ServicePluginsDescriptor servicePluginsDescriptor) {
     assert amConf != null;
     ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
     for (Entry<String, String> entry : amConf) {
@@ -774,9 +776,49 @@ public class TezClientUtils {
         builder.addConfKeyValues(kvp);
       }
     }
+
+    AMPluginDescriptorProto pluginDescriptorProto =
+        createAMServicePluginDescriptorProto(servicePluginsDescriptor);
+    builder.setAmPluginDescriptor(pluginDescriptorProto);
+
     return builder.build();
   }
 
+  static AMPluginDescriptorProto createAMServicePluginDescriptorProto(
+      ServicePluginsDescriptor servicePluginsDescriptor) {
+    AMPluginDescriptorProto.Builder pluginDescriptorBuilder =
+        AMPluginDescriptorProto.newBuilder();
+    if (servicePluginsDescriptor != null) {
+
+      pluginDescriptorBuilder.setContainersEnabled(servicePluginsDescriptor.areContainersEnabled());
+      pluginDescriptorBuilder.setUberEnabled(servicePluginsDescriptor.isUberEnabled());
+
+      if (servicePluginsDescriptor.getTaskSchedulerDescriptors() != null &&
+          servicePluginsDescriptor.getTaskSchedulerDescriptors().length > 0) {
+        List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+            servicePluginsDescriptor.getTaskSchedulerDescriptors());
+        pluginDescriptorBuilder.addAllTaskScedulers(namedEntityProtos);
+      }
+
+      if (servicePluginsDescriptor.getContainerLauncherDescriptors() != null &&
+          servicePluginsDescriptor.getContainerLauncherDescriptors().length > 0) {
+        List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+            servicePluginsDescriptor.getContainerLauncherDescriptors());
+        pluginDescriptorBuilder.addAllContainerLaunchers(namedEntityProtos);
+      }
+
+      if (servicePluginsDescriptor.getTaskCommunicatorDescriptors() != null &&
+          servicePluginsDescriptor.getTaskCommunicatorDescriptors().length > 0) {
+        List<TezNamedEntityDescriptorProto> namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto(
+            servicePluginsDescriptor.getTaskCommunicatorDescriptors());
+        pluginDescriptorBuilder.addAllTaskCommunicators(namedEntityProtos);
+      }
+
+    } else {
+      pluginDescriptorBuilder.setContainersEnabled(true).setUberEnabled(false);
+    }
+    return pluginDescriptorBuilder.build();
+  }
 
   /**
    * Helper function to create a YARN LocalResource

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 8ee1682..fce9522 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -32,6 +32,9 @@ import java.util.Stack;
 
 import org.apache.commons.collections4.BidiMap;
 import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -60,9 +63,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -92,6 +93,7 @@ public class DAG {
   Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
   String dagInfo;
   private Map<String,String> dagConf = new HashMap<String, String>();
+  private VertexExecutionContext defaultExecutionContext;
 
   private Stack<String> topologicalVertexStack = new Stack<String>();
 
@@ -335,6 +337,26 @@ public class DAG {
     return this;
   }
 
+  /**
+   * Sets the default execution context for the DAG. This can be overridden at a per Vertex level.
+   * See {@link org.apache.tez.dag.api.Vertex#setExecutionContext(VertexExecutionContext)}
+   *
+   * @param vertexExecutionContext the default execution context for the DAG
+   *
+   * @return
+   */
+  @Public
+  @InterfaceStability.Unstable
+  public synchronized DAG setExecutionContext(VertexExecutionContext vertexExecutionContext) {
+    this.defaultExecutionContext = vertexExecutionContext;
+    return this;
+  }
+
+  @Private
+  VertexExecutionContext getDefaultExecutionContext() {
+    return this.defaultExecutionContext;
+  }
+
   @Private
   @VisibleForTesting
   public Map<String,String> getDagConf() {
@@ -707,7 +729,15 @@ public class DAG {
     if (this.dagInfo != null && !this.dagInfo.isEmpty()) {
       dagBuilder.setDagInfo(this.dagInfo);
     }
-    
+
+    // Setup default execution context.
+    VertexExecutionContext defaultContext = getDefaultExecutionContext();
+    if (defaultContext != null) {
+      DAGProtos.VertexExecutionContextProto contextProto = DagTypeConverters.convertToProto(
+          defaultContext);
+      dagBuilder.setDefaultExecutionContext(contextProto);
+    }
+
     if (!vertexGroups.isEmpty()) {
       for (VertexGroup av : vertexGroups) {
         GroupInfo groupInfo = av.getGroupInfo();
@@ -800,7 +830,17 @@ public class DAG {
       vertexBuilder.setName(vertex.getName());
       vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
       vertexBuilder.setProcessorDescriptor(DagTypeConverters
-        .convertToDAGPlan(vertex.getProcessorDescriptor()));
+          .convertToDAGPlan(vertex.getProcessorDescriptor()));
+
+      // Vertex ExecutionContext setup
+      VertexExecutionContext execContext = vertex.getVertexExecutionContext();
+      if (execContext != null) {
+        DAGProtos.VertexExecutionContextProto contextProto =
+            DagTypeConverters.convertToProto(execContext);
+        vertexBuilder.setExecutionContext(contextProto);
+      }
+      // End of VertexExecutionContext setup.
+
       if (vertex.getInputs().size() > 0) {
         for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : vertex.getInputs()) {
           vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 8b1d553..2e0d417 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -73,11 +73,15 @@ import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexLocationHintProto;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString.Output;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 
 @Private
 public class DagTypeConverters {
@@ -399,6 +403,8 @@ public class DagTypeConverters {
     return userPayload;
   }
 
+
+
   private static void setUserPayload(EntityDescriptor<?> entity, UserPayload payload) {
     if (payload != null) {
       entity.setUserPayload(payload);
@@ -423,6 +429,15 @@ public class DagTypeConverters {
     return od;
   }
 
+  public static NamedEntityDescriptor convertNamedDescriptorFromProto(TezNamedEntityDescriptorProto proto) {
+    String name = proto.getName();
+    String className = proto.getEntityDescriptor().getClassName();
+    UserPayload payload = convertTezUserPayloadFromDAGPlan(proto.getEntityDescriptor());
+    NamedEntityDescriptor descriptor = new NamedEntityDescriptor(name, className);
+    setUserPayload(descriptor, payload);
+    return descriptor;
+  }
+
   public static InputInitializerDescriptor convertInputInitializerDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
@@ -550,11 +565,11 @@ public class DagTypeConverters {
   public static LocalResource convertPlanLocalResourceToLocalResource(
       PlanLocalResource plr) {
     return LocalResource.newInstance(
-      ConverterUtils.getYarnUrlFromPath(new Path(plr.getUri())),
-      DagTypeConverters.convertFromDAGPlan(plr.getType()),
-      DagTypeConverters.convertFromDAGPlan(plr.getVisibility()),
-      plr.getSize(), plr.getTimeStamp(),
-      plr.hasPattern() ? plr.getPattern() : null);
+        ConverterUtils.getYarnUrlFromPath(new Path(plr.getUri())),
+        DagTypeConverters.convertFromDAGPlan(plr.getType()),
+        DagTypeConverters.convertFromDAGPlan(plr.getVisibility()),
+        plr.getSize(), plr.getTimeStamp(),
+        plr.hasPattern() ? plr.getPattern() : null);
   }
 
   public static TezCounters convertTezCountersFromProto(TezCountersProto proto) {
@@ -717,4 +732,72 @@ public class DagTypeConverters {
     return payload.getPayload();
   }
 
+  public static DAGProtos.VertexExecutionContextProto convertToProto(
+      Vertex.VertexExecutionContext context) {
+    if (context == null) {
+      return null;
+    } else {
+      DAGProtos.VertexExecutionContextProto.Builder builder =
+          DAGProtos.VertexExecutionContextProto.newBuilder();
+      builder.setExecuteInAm(context.shouldExecuteInAm());
+      builder.setExecuteInContainers(context.shouldExecuteInContainers());
+      if (context.getTaskSchedulerName() != null) {
+        builder.setTaskSchedulerName(context.getTaskSchedulerName());
+      }
+      if (context.getContainerLauncherName() != null) {
+        builder.setContainerLauncherName(context.getContainerLauncherName());
+      }
+      if (context.getTaskCommName() != null) {
+        builder.setTaskCommName(context.getTaskCommName());
+      }
+      return builder.build();
+    }
+  }
+
+  public static Vertex.VertexExecutionContext convertFromProto(
+      DAGProtos.VertexExecutionContextProto proto) {
+    if (proto == null) {
+      return null;
+    } else {
+      if (proto.getExecuteInAm()) {
+        Vertex.VertexExecutionContext context =
+            Vertex.VertexExecutionContext.createExecuteInAm(proto.getExecuteInAm());
+        return context;
+      } else if (proto.getExecuteInContainers()) {
+        Vertex.VertexExecutionContext context =
+            Vertex.VertexExecutionContext.createExecuteInContainers(proto.getExecuteInContainers());
+        return context;
+      } else {
+        String taskScheduler = proto.hasTaskSchedulerName() ? proto.getTaskSchedulerName() : null;
+        String containerLauncher =
+            proto.hasContainerLauncherName() ? proto.getContainerLauncherName() : null;
+        String taskComm = proto.hasTaskCommName() ? proto.getTaskCommName() : null;
+        Vertex.VertexExecutionContext context =
+            Vertex.VertexExecutionContext.create(taskScheduler, containerLauncher, taskComm);
+        return context;
+      }
+    }
+  }
+
+  public static List<TezNamedEntityDescriptorProto> convertNamedEntityCollectionToProto(
+      NamedEntityDescriptor[] namedEntityDescriptors) {
+    List<TezNamedEntityDescriptorProto> list =
+        Lists.newArrayListWithCapacity(namedEntityDescriptors.length);
+    for (NamedEntityDescriptor namedEntity : namedEntityDescriptors) {
+      TezNamedEntityDescriptorProto namedEntityProto = convertNamedEntityToProto(namedEntity);
+      list.add(namedEntityProto);
+    }
+    return list;
+  }
+
+  public static TezNamedEntityDescriptorProto convertNamedEntityToProto(
+      NamedEntityDescriptor namedEntityDescriptor) {
+    TezNamedEntityDescriptorProto.Builder builder = TezNamedEntityDescriptorProto.newBuilder();
+    builder.setName(namedEntityDescriptor.getEntityName());
+    DAGProtos.TezEntityDescriptorProto entityProto =
+        DagTypeConverters.convertToDAGPlan(namedEntityDescriptor);
+    builder.setEntityDescriptor(entityProto);
+    return builder.build();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
new file mode 100644
index 0000000..bad0d10
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+public class NamedEntityDescriptor<T extends EntityDescriptor<T>> extends EntityDescriptor<T> {
+  private final String entityName;
+
+  @InterfaceAudience.Private
+  public NamedEntityDescriptor(String entityName, String className) {
+    super(className);
+    Preconditions.checkArgument(entityName != null, "EntityName must be specified");
+    this.entityName = entityName;
+  }
+
+  public String getEntityName() {
+    return entityName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 39a4c77..3b7378a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1215,37 +1215,6 @@ public class TezConfiguration extends Configuration {
       + "tez-ui.webservice.enable";
   public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true;
 
-  /** defaults container-launcher for the specific vertex */
-  @ConfigurationScope(Scope.VERTEX)
-  public static final String TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME = TEZ_AM_PREFIX + "vertex.container-launcher.name";
-  /** defaults task-scheduler for the specific vertex */
-  @ConfigurationScope(Scope.VERTEX)
-  public static final String TEZ_AM_VERTEX_TASK_SCHEDULER_NAME = TEZ_AM_PREFIX + "vertex.task-scheduler.name";
-  /** defaults task-communicator for the specific vertex */
-  @ConfigurationScope(Scope.VERTEX)
-  public static final String TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME = TEZ_AM_PREFIX + "vertex.task-communicator.name";
-
-  /** Comma separated list of named container-launcher classes running in the AM.
-   * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
-   * e.g. Tez, ExtService:org.apache.ExtLauncherClasss
-   * */
-  @ConfigurationScope(Scope.AM)
-  public static final String TEZ_AM_CONTAINER_LAUNCHERS = TEZ_AM_PREFIX + "container-launchers";
-
-  /** Comma separated list of task-schedulers classes running in the AM.
-   * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
-   * e.g. Tez, ExtService:org.apache.ExtSchedulerClasss
-   */
-  @ConfigurationScope(Scope.AM)
-  public static final String TEZ_AM_TASK_SCHEDULERS = TEZ_AM_PREFIX + "task-schedulers";
-
-  /** Comma separated list of task-communicators classes running in the AM.
-   * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez
-   * e.g. Tez, ExtService:org.apache.ExtTaskCommClass
-   * */
-   @ConfigurationScope(Scope.AM)
-  public static final String TEZ_AM_TASK_COMMUNICATORS = TEZ_AM_PREFIX + "task-communicators";
-
   // TODO only validate property here, value can also be validated if necessary
   public static void validateProperty(String property, Scope usedScope) {
     Scope validScope = PropertyScope.get(property);

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 3b07c59..6e1cb2d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -102,7 +102,14 @@ public class TezConstants {
   /// Version-related Environment variables
   public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION";
 
+  private static final String TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS = "TezYarn";
+  private static final String TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM = "TezUber";
 
-  public static final String TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT = "Tez";
-  public static final String TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT = "TezLocal";
+  public static String getTezYarnServicePluginName() {
+    return TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS;
+  }
+
+  public static String getTezUberServicePluginName() {
+    return TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 0ed4bd8..34124b2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -28,11 +28,11 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.VertexGroup.GroupInfo;
-import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 
 import com.google.common.base.Preconditions;
@@ -57,6 +57,7 @@ public class Vertex {
   private final Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
   private Map<String, String> taskEnvironment = new HashMap<String, String>();
   private Map<String, String> vertexConf = new HashMap<String, String>();
+  private VertexExecutionContext vertexExecutionContext;
   private final Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs
                       = new HashMap<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>();
   private final Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs
@@ -410,6 +411,108 @@ public class Vertex {
     return this;
   }
 
+  /**
+   * Sets the execution context for this Vertex - i.e. the Task Scheduler, ContainerLauncher and
+   * TaskCommunicator to be used. Also whether the vertex will be executed within the AM.
+   * If partially specified, the default components in Tez will be used - which may or may not work
+   * with the custom context.
+   *
+   * @param vertexExecutionContext the execution context for the vertex.
+   *
+   * @return
+   */
+  public Vertex setExecutionContext(VertexExecutionContext vertexExecutionContext) {
+    this.vertexExecutionContext = vertexExecutionContext;
+    return this;
+  }
+
+  @Public
+  @InterfaceStability.Unstable
+  public static class VertexExecutionContext {
+    final boolean executeInAm;
+    final boolean executeInContainers;
+    final String taskSchedulerName;
+    final String containerLauncherName;
+    final String taskCommName;
+
+    public static VertexExecutionContext createExecuteInAm(boolean executeInAm) {
+      return new VertexExecutionContext(executeInAm, false);
+    }
+
+    public static VertexExecutionContext createExecuteInContainers(boolean executeInContainers) {
+      return new VertexExecutionContext(false, executeInContainers);
+    }
+
+    public static VertexExecutionContext create(String taskSchedulerName, String containerLauncherName,
+                                                String taskCommName) {
+      return new VertexExecutionContext(taskSchedulerName, containerLauncherName, taskCommName);
+    }
+
+    private VertexExecutionContext(boolean executeInAm, boolean executeInContainers) {
+      this(executeInAm, executeInContainers, null, null, null);
+    }
+
+    private VertexExecutionContext(String taskSchedulerName, String containerLauncherName,
+                                  String taskCommName) {
+      this(false, false, taskSchedulerName, containerLauncherName, taskCommName);
+    }
+
+    private VertexExecutionContext(boolean executeInAm, boolean executeInContainers, String taskSchedulerName, String containerLauncherName,
+                      String taskCommName) {
+      if (executeInAm || executeInContainers) {
+        Preconditions.checkState(!(executeInAm && executeInContainers),
+            "executeInContainers and executeInAM are mutually exclusive");
+        Preconditions.checkState(
+            taskSchedulerName == null && containerLauncherName == null && taskCommName == null,
+            "Uber (in-AM) or container execution cannot be enabled with a custom plugins. TaskScheduler=" +
+                taskSchedulerName + ", ContainerLauncher=" + containerLauncherName +
+                ", TaskCommunicator=" + taskCommName);
+      }
+      if (taskSchedulerName != null || containerLauncherName != null || taskCommName != null) {
+        Preconditions.checkState(executeInAm == false && executeInContainers == false,
+            "Uber (in-AM) and container execution cannot be enabled with a custom plugins. TaskScheduler=" +
+                taskSchedulerName + ", ContainerLauncher=" + containerLauncherName +
+                ", TaskCommunicator=" + taskCommName);
+      }
+      this.executeInAm = executeInAm;
+      this.executeInContainers = executeInContainers;
+      this.taskSchedulerName = taskSchedulerName;
+      this.containerLauncherName = containerLauncherName;
+      this.taskCommName = taskCommName;
+    }
+
+    public boolean shouldExecuteInAm() {
+      return executeInAm;
+    }
+
+    public boolean shouldExecuteInContainers() {
+      return executeInContainers;
+    }
+
+    public String getTaskSchedulerName() {
+      return taskSchedulerName;
+    }
+
+    public String getContainerLauncherName() {
+      return containerLauncherName;
+    }
+
+    public String getTaskCommName() {
+      return taskCommName;
+    }
+
+    @Override
+    public String toString() {
+      return "VertexExecutionContext{" +
+          "executeInAm=" + executeInAm +
+          ", executeInContainers=" + executeInContainers +
+          ", taskSchedulerName='" + taskSchedulerName + '\'' +
+          ", containerLauncherName='" + containerLauncherName + '\'' +
+          ", taskCommName='" + taskCommName + '\'' +
+          '}';
+    }
+  }
+
   @Override
   public String toString() {
     return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]";
@@ -475,6 +578,11 @@ public class Vertex {
     return dataSinks;
   }
 
+  @Private
+  public VertexExecutionContext getVertexExecutionContext() {
+    return this.vertexExecutionContext;
+  }
+
   List<Edge> getInputEdges() {
     return inputEdges;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java
new file mode 100644
index 0000000..ff3c90e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ContainerLauncherDescriptor extends NamedEntityDescriptor<ContainerLauncherDescriptor> {
+
+  private ContainerLauncherDescriptor(String containerLauncherName, String containerLauncherClassname) {
+    super(containerLauncherName, containerLauncherClassname);
+  }
+
+  public static ContainerLauncherDescriptor create(String containerLauncherName, String containerLauncherClassname) {
+    return new ContainerLauncherDescriptor(containerLauncherName, containerLauncherClassname);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
new file mode 100644
index 0000000..8df102a
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ServicePluginsDescriptor {
+
+  private final boolean enableContainers;
+  private final boolean enableUber;
+
+  private TaskSchedulerDescriptor[] taskSchedulerDescriptors;
+  private ContainerLauncherDescriptor[] containerLauncherDescriptors;
+  private TaskCommunicatorDescriptor[] taskCommunicatorDescriptors;
+
+  private ServicePluginsDescriptor(boolean enableContainers, boolean enableUber,
+                                   TaskSchedulerDescriptor[] taskSchedulerDescriptor,
+                                   ContainerLauncherDescriptor[] containerLauncherDescriptors,
+                                   TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
+    this.enableContainers = enableContainers;
+    this.enableUber = enableUber;
+    Preconditions.checkArgument(taskSchedulerDescriptors == null || taskSchedulerDescriptors.length > 0,
+        "TaskSchedulerDescriptors should either not be specified or at least 1 should be provided");
+    this.taskSchedulerDescriptors = taskSchedulerDescriptor;
+    Preconditions.checkArgument(containerLauncherDescriptors == null || containerLauncherDescriptors.length > 0,
+        "ContainerLauncherDescriptor should either not be specified or at least 1 should be provided");
+    this.containerLauncherDescriptors = containerLauncherDescriptors;
+    Preconditions.checkArgument(taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.length > 0,
+        "TaskCommunicatorDescriptors should either not be specified or at least 1 should be provided");
+    this.taskCommunicatorDescriptors = taskCommunicatorDescriptors;
+  }
+
+  public static ServicePluginsDescriptor create(TaskSchedulerDescriptor[] taskSchedulerDescriptor,
+                                                ContainerLauncherDescriptor[] containerLauncherDescriptors,
+                                                TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
+    return new ServicePluginsDescriptor(true, false, taskSchedulerDescriptor,
+        containerLauncherDescriptors, taskCommunicatorDescriptors);
+  }
+
+  public static ServicePluginsDescriptor create(boolean enableUber,
+                                                TaskSchedulerDescriptor[] taskSchedulerDescriptor,
+                                                ContainerLauncherDescriptor[] containerLauncherDescriptors,
+                                                TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
+    return new ServicePluginsDescriptor(true, enableUber, taskSchedulerDescriptor,
+        containerLauncherDescriptors, taskCommunicatorDescriptors);
+  }
+
+  public static ServicePluginsDescriptor create(boolean enableContainers, boolean enableUber,
+                                                TaskSchedulerDescriptor[] taskSchedulerDescriptor,
+                                                ContainerLauncherDescriptor[] containerLauncherDescriptors,
+                                                TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) {
+    return new ServicePluginsDescriptor(enableContainers, enableUber, taskSchedulerDescriptor,
+        containerLauncherDescriptors, taskCommunicatorDescriptors);
+  }
+
+  public static ServicePluginsDescriptor create(boolean enableUber) {
+    return new ServicePluginsDescriptor(true, enableUber, null, null, null);
+  }
+
+
+  public boolean areContainersEnabled() {
+    return enableContainers;
+  }
+
+  public boolean isUberEnabled() {
+    return enableUber;
+  }
+
+  public TaskSchedulerDescriptor[] getTaskSchedulerDescriptors() {
+    return taskSchedulerDescriptors;
+  }
+
+  public ContainerLauncherDescriptor[] getContainerLauncherDescriptors() {
+    return containerLauncherDescriptors;
+  }
+
+  public TaskCommunicatorDescriptor[] getTaskCommunicatorDescriptors() {
+    return taskCommunicatorDescriptors;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java
new file mode 100644
index 0000000..57ac385
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TaskCommunicatorDescriptor extends NamedEntityDescriptor<TaskCommunicatorDescriptor> {
+
+
+  private TaskCommunicatorDescriptor(String taskCommName, String taskCommClassname) {
+    super(taskCommName, taskCommClassname);
+  }
+
+  public static TaskCommunicatorDescriptor create(String taskCommName, String taskCommClassname) {
+    return new TaskCommunicatorDescriptor(taskCommName, taskCommClassname);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java
new file mode 100644
index 0000000..12e0919
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TaskSchedulerDescriptor extends NamedEntityDescriptor<TaskSchedulerDescriptor> {
+
+  private TaskSchedulerDescriptor(String taskSchedulerName, String schedulerClassname) {
+    super(taskSchedulerName, schedulerClassname);
+  }
+
+  public static TaskSchedulerDescriptor create(String taskSchedulerName, String schedulerClassName) {
+    return new TaskSchedulerDescriptor(taskSchedulerName, schedulerClassName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 959d4e6..ebe3259 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -127,6 +127,14 @@ message RootInputLeafOutputProto {
   optional TezEntityDescriptorProto controller_descriptor = 3;
 }
 
+message VertexExecutionContextProto {
+  optional bool execute_in_am = 1;
+  optional bool execute_in_containers = 2;
+  optional string task_scheduler_name = 3;
+  optional string container_launcher_name = 4;
+  optional string task_comm_name = 5;
+}
+
 message VertexPlan {
   required string name = 1;
   required PlanVertexType type = 2;
@@ -139,6 +147,7 @@ message VertexPlan {
   repeated RootInputLeafOutputProto outputs = 9;
   optional TezEntityDescriptorProto vertex_manager_plugin = 10;
   optional ConfigurationProto vertexConf = 11;
+  optional VertexExecutionContextProto execution_context = 12;
 }
 
 message PlanEdgeProperty {
@@ -162,8 +171,23 @@ message EdgePlan {
   optional TezEntityDescriptorProto edge_manager = 9;
 }
 
+message TezNamedEntityDescriptorProto {
+  optional string name = 1;
+  optional TezEntityDescriptorProto entity_descriptor = 2;
+}
+
+
+message AMPluginDescriptorProto {
+  optional bool containers_enabled = 1 [default = true];
+  optional bool uber_enabled = 2 [default = false];
+  repeated TezNamedEntityDescriptorProto task_scedulers = 3;
+  repeated TezNamedEntityDescriptorProto container_launchers = 4;
+  repeated TezNamedEntityDescriptorProto task_communicators = 5;
+}
+
 message ConfigurationProto {
   repeated PlanKeyValuePair confKeyValues = 1;
+  optional AMPluginDescriptorProto am_plugin_descriptor = 2;
 }
 
 message DAGPlan {
@@ -175,6 +199,7 @@ message DAGPlan {
   repeated PlanVertexGroupInfo vertex_groups = 6;
   repeated PlanLocalResource local_resource = 7;
   optional string dag_info = 8;
+  optional VertexExecutionContextProto default_execution_context = 9;
 }
 
 // DAG monitoring messages

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
index 2d4e005..8946ef0 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java
@@ -261,7 +261,7 @@ public class TestTezClientUtils {
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
             new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
-            mock(HistoryACLPolicyManager.class));
+            mock(HistoryACLPolicyManager.class), null);
 
     ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec();
     Map<String, ByteBuffer> amServiceData = amClc.getServiceData();
@@ -294,7 +294,7 @@ public class TestTezClientUtils {
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
             new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
-            mock(HistoryACLPolicyManager.class));
+            mock(HistoryACLPolicyManager.class), null);
 
     List<String> expectedCommands = new LinkedList<String>();
     expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -334,7 +334,7 @@ public class TestTezClientUtils {
     ApplicationSubmissionContext appSubmissionContext =
         TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf,
             new HashMap<String, LocalResource>(), credentials, false, new TezApiVersionInfo(),
-            mock(HistoryACLPolicyManager.class));
+            mock(HistoryACLPolicyManager.class), null);
 
     List<String> expectedCommands = new LinkedList<String>();
     expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator");
@@ -516,7 +516,7 @@ public class TestTezClientUtils {
     expected.put("property1", val1);
     expected.put("property2", expVal2);
 
-    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null);
+    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null, null);
 
     for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
       String v = expected.remove(kvPair.getKey());
@@ -620,7 +620,7 @@ public class TestTezClientUtils {
       srcConf.set(entry.getKey(), entry.getValue());
     }
 
-    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf);
+    ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null, null);
 
     for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) {
       String val = confMap.remove(kvPair.getKey());
@@ -677,4 +677,6 @@ public class TestTezClientUtils {
     Assert.assertTrue(resourceNames.contains("dir2-f.txt"));
   }
 
+  // TODO TEZ-2003 Add test to validate ServicePluginDescriptor propagation
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 532e83c..1fb7ff9 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -57,7 +57,7 @@ public class TezUtilsInternal {
 
   private static final Logger LOG = LoggerFactory.getLogger(TezUtilsInternal.class);
 
-  public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
+  public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
       IOException {
     FileInputStream confPBBinaryStream = null;
     ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
@@ -72,14 +72,41 @@ public class TezUtilsInternal {
     }
 
     ConfigurationProto confProto = confProtoBuilder.build();
+    return confProto;
+  }
 
-    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+  public static void addUserSpecifiedTezConfiguration(Configuration conf,
+                                                      List<PlanKeyValuePair> kvPairList) {
     if (kvPairList != null && !kvPairList.isEmpty()) {
       for (PlanKeyValuePair kvPair : kvPairList) {
         conf.set(kvPair.getKey(), kvPair.getValue());
       }
     }
   }
+//
+//  public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
+//      IOException {
+//    FileInputStream confPBBinaryStream = null;
+//    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+//    try {
+//      confPBBinaryStream =
+//          new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME));
+//      confProtoBuilder.mergeFrom(confPBBinaryStream);
+//    } finally {
+//      if (confPBBinaryStream != null) {
+//        confPBBinaryStream.close();
+//      }
+//    }
+//
+//    ConfigurationProto confProto = confProtoBuilder.build();
+//
+//    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+//    if (kvPairList != null && !kvPairList.isEmpty()) {
+//      for (PlanKeyValuePair kvPair : kvPairList) {
+//        conf.set(kvPair.getKey(), kvPair.getValue());
+//      }
+//    }
+//  }
 
 
   public static byte[] compressBytes(byte[] inBytes) throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 1bb2002..508f817 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -341,7 +341,7 @@ public class LocalClient extends FrameworkClient {
       String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName) {
     return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
         new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs,
-        versionInfo.getVersion(), 1, credentials, jobUserName);
+        versionInfo.getVersion(), 1, credentials, jobUserName, null);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e4d3f8b..4f75891 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -40,6 +40,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -61,7 +62,11 @@ import com.google.common.collect.HashBiMap;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
 import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -221,6 +226,7 @@ public class DAGAppMaster extends AbstractService {
   private final String workingDirectory;
   private final String[] localDirs;
   private final String[] logDirs;
+  private final AMPluginDescriptorProto amPluginDescriptorProto;
   private ContainerSignatureMatcher containerSignatureMatcher;
   private AMContainerMap containers;
   private AMNodeTracker nodes;
@@ -312,7 +318,7 @@ public class DAGAppMaster extends AbstractService {
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
       Clock clock, long appSubmitTime, boolean isSession, String workingDirectory,
       String [] localDirs, String[] logDirs, String clientVersion, int maxAppAttempts,
-      Credentials credentials, String jobUserName) {
+      Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
     super(DAGAppMaster.class.getName());
     this.clock = clock;
     this.startTime = clock.getTime();
@@ -332,6 +338,7 @@ public class DAGAppMaster extends AbstractService {
     this.clientVersion = clientVersion;
     this.maxAppAttempts = maxAppAttempts;
     this.amCredentials = credentials;
+    this.amPluginDescriptorProto = pluginDescriptorProto;
     this.appMasterUgi = UserGroupInformation
         .createRemoteUser(jobUserName);
     this.appMasterUgi.addCredentials(amCredentials);
@@ -380,28 +387,47 @@ public class DAGAppMaster extends AbstractService {
     this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
         TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
 
-    String tezDefaultClassIdentifier =
-        isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT :
-            TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
+    List<NamedEntityDescriptor> taskSchedulerDescriptors;
+    List<NamedEntityDescriptor> containerLauncherDescriptors;
+    List<NamedEntityDescriptor> taskCommunicatorDescriptors;
+    boolean tezYarnEnabled = true;
+    boolean uberEnabled = false;
 
-    String[] taskSchedulerClassIdentifiers = parsePlugins(taskSchedulers,
-        conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
-            tezDefaultClassIdentifier),
-        TezConfiguration.TEZ_AM_TASK_SCHEDULERS);
+    if (!isLocal) {
+      if (amPluginDescriptorProto == null) {
+        tezYarnEnabled = true;
+        uberEnabled = false;
+      } else {
+        tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled();
+        uberEnabled = amPluginDescriptorProto.getUberEnabled();
+      }
+    } else {
+      tezYarnEnabled = false;
+      uberEnabled = true;
+    }
+
+    taskSchedulerDescriptors = parsePlugin(taskSchedulers,
+        (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskScedulersCount() == 0 ?
+            null :
+            amPluginDescriptorProto.getTaskScedulersList()),
+        tezYarnEnabled, uberEnabled);
 
-    String[] containerLauncherClassIdentifiers = parsePlugins(containerLaunchers,
-        conf.getTrimmedStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
-            tezDefaultClassIdentifier),
-        TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS);
+    containerLauncherDescriptors = parsePlugin(containerLaunchers,
+        (amPluginDescriptorProto == null ||
+            amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null :
+            amPluginDescriptorProto.getContainerLaunchersList()),
+        tezYarnEnabled, uberEnabled);
 
-    String[] taskCommunicatorClassIdentifiers = parsePlugins(taskCommunicators,
-        conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
-            tezDefaultClassIdentifier),
-        TezConfiguration.TEZ_AM_TASK_COMMUNICATORS);
+    taskCommunicatorDescriptors = parsePlugin(taskCommunicators,
+        (amPluginDescriptorProto == null ||
+            amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null :
+            amPluginDescriptorProto.getTaskCommunicatorsList()),
+        tezYarnEnabled, uberEnabled);
 
-    LOG.info(buildPluginComponentLog(taskSchedulerClassIdentifiers, taskSchedulers, "TaskSchedulers"));
-    LOG.info(buildPluginComponentLog(containerLauncherClassIdentifiers, containerLaunchers, "ContainerLaunchers"));
-    LOG.info(buildPluginComponentLog(taskCommunicatorClassIdentifiers, taskCommunicators, "TaskCommunicators"));
+
+    LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers"));
+    LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers"));
+    LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators"));
 
     boolean disableVersionCheck = conf.getBoolean(
         TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK,
@@ -468,7 +494,7 @@ public class DAGAppMaster extends AbstractService {
 
     //service to handle requests to TaskUmbilicalProtocol
     taskAttemptListener = createTaskAttemptListener(context,
-        taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers, isLocal);
+        taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors, isLocal);
     addIfService(taskAttemptListener, true);
 
     containerSignatureMatcher = createContainerSignatureMatcher();
@@ -516,7 +542,7 @@ public class DAGAppMaster extends AbstractService {
 
     this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
         clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
-        taskSchedulerClassIdentifiers, isLocal);
+        taskSchedulerDescriptors, isLocal);
     addIfService(taskSchedulerEventHandler, true);
 
     if (enableWebUIService()) {
@@ -534,7 +560,7 @@ public class DAGAppMaster extends AbstractService {
         taskSchedulerEventHandler);
     addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
 
-    this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers, isLocal);
+    this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherDescriptors, isLocal);
     addIfService(containerLauncherRouter, true);
     dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
 
@@ -1044,11 +1070,11 @@ public class DAGAppMaster extends AbstractService {
   protected TaskAttemptListener createTaskAttemptListener(AppContext context,
                                                           TaskHeartbeatHandler thh,
                                                           ContainerHeartbeatHandler chh,
-                                                          String[] taskCommunicatorClasses,
+                                                          List<NamedEntityDescriptor> entityDescriptors,
                                                           boolean isLocal) {
     TaskAttemptListener lis =
         new TaskAttemptListenerImpTezDag(context, thh, chh,
-            taskCommunicatorClasses, amConf, isLocal);
+            entityDescriptors, amConf, isLocal);
     return lis;
   }
 
@@ -1070,11 +1096,11 @@ public class DAGAppMaster extends AbstractService {
   }
 
   protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf,
-                                                                  String[] containerLauncherClasses,
+                                                                  List<NamedEntityDescriptor> containerLauncherDescriptors,
                                                                   boolean isLocal) throws
       UnknownHostException {
     return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory,
-        containerLauncherClasses, isLocal);
+        containerLauncherDescriptors, isLocal);
   }
 
   public ApplicationId getAppID() {
@@ -2140,7 +2166,16 @@ public class DAGAppMaster extends AbstractService {
 
       // TODO Does this really need to be a YarnConfiguration ?
       Configuration conf = new Configuration(new YarnConfiguration());
-      TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()), conf);
+
+      ConfigurationProto confProto =
+          TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()));
+      TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());
+
+      AMPluginDescriptorProto amPluginDescriptorProto = null;
+      if (confProto.hasAmPluginDescriptor()) {
+        amPluginDescriptorProto = confProto.getAmPluginDescriptor();
+      }
+
       UserGroupInformation.setConfiguration(conf);
       Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
 
@@ -2152,7 +2187,7 @@ public class DAGAppMaster extends AbstractService {
               System.getenv(Environment.PWD.name()),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())),
               TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())),
-              clientVersion, maxAppAttempts, credentials, jobUserName);
+              clientVersion, maxAppAttempts, credentials, jobUserName, amPluginDescriptorProto);
       ShutdownHookManager.get().addShutdownHook(
         new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
 
@@ -2258,7 +2293,7 @@ public class DAGAppMaster extends AbstractService {
 
     LOG.info("Running DAG: " + dagPlan.getName());
     String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
-    System.err.println(timeStamp + " Running Dag: "+ newDAG.getID());
+    System.err.println(timeStamp + " Running Dag: " + newDAG.getID());
     System.out.println(timeStamp + " Running Dag: "+ newDAG.getID());
     // Job name is the same as the app name until we support multiple dags
     // for an app later
@@ -2364,60 +2399,51 @@ public class DAGAppMaster extends AbstractService {
         TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT);
   }
 
-  // Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT
-  private String[] parsePlugins(BiMap<String, Integer> pluginMap, String[] pluginStrings,
-                                   String context) {
-    // TODO TEZ-2003 Duplicate error checking - ideally in the client itself. Depends on the final API.
-    Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0,
-        "Plugin strings should not be null or empty: " + context);
-
-    String[] classNames = new String[pluginStrings.length];
+  private static List<NamedEntityDescriptor> parsePlugin(
+      BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> namedEntityDescriptorProtos,
+      boolean tezYarnEnabled, boolean uberEnabled) {
 
     int index = 0;
-    for (String pluginString : pluginStrings) {
-
-      String className;
-      String identifierString;
-
-      Preconditions.checkState(pluginString != null && !pluginString.isEmpty(),
-          "Plugin string: " + pluginString + " should not be null or empty");
-      if (pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) ||
-          pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
-        // Kind of ugly, but Tez internal routing is encoded via a String instead of classnames.
-        // Individual components - TaskComm, Scheduler, Launcher deal with actual classname translation,
-        // and avoid reflection.
-        identifierString = pluginString;
-        className = pluginString;
-      } else {
-        String[] parts = pluginString.split(":");
-        Preconditions.checkState(
-            parts.length == 2 && parts[0] != null && !parts[0].isEmpty() && parts[1] != null &&
-                !parts[1].isEmpty(),
-            "Invalid configuration string for " + context + ": " + pluginString);
-        Preconditions.checkState(
-            !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) &&
-                !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT),
-            "Identifier cannot be " + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT + " or " +
-                TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT + " for " +
-                pluginString);
-        identifierString = parts[0];
-        className = parts[1];
-      }
-      pluginMap.put(identifierString, index);
-      classNames[index] = className;
+
+    List<NamedEntityDescriptor> resultList = new LinkedList<>();
+
+    if (tezYarnEnabled) {
+      // Default classnames will be populated by individual components
+      NamedEntityDescriptor r = new NamedEntityDescriptor(
+          TezConstants.getTezYarnServicePluginName(), null);
+      resultList.add(r);
+      pluginMap.put(TezConstants.getTezYarnServicePluginName(), index);
+      index++;
+    }
+
+    if (uberEnabled) {
+      // Default classnames will be populated by individual components
+      NamedEntityDescriptor r = new NamedEntityDescriptor(
+          TezConstants.getTezUberServicePluginName(), null);
+      resultList.add(r);
+      pluginMap.put(TezConstants.getTezUberServicePluginName(), index);
       index++;
     }
-    return classNames;
+
+    if (namedEntityDescriptorProtos != null) {
+      for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : namedEntityDescriptorProtos) {
+        resultList.add(DagTypeConverters
+            .convertNamedDescriptorFromProto(namedEntityDescriptorProto));
+        pluginMap.put(resultList.get(index).getEntityName(), index);
+        index++;
+      }
+    }
+    return resultList;
   }
 
-  String buildPluginComponentLog(String[] classIdentifiers, BiMap<String, Integer> map,
+  String buildPluginComponentLog(List<NamedEntityDescriptor> namedEntityDescriptors, BiMap<String, Integer> map,
                                  String component) {
     StringBuilder sb = new StringBuilder();
     sb.append("AM Level configured ").append(component).append(": ");
-    for (int i = 0; i < classIdentifiers.length; i++) {
+    for (int i = 0; i < namedEntityDescriptors.size(); i++) {
       sb.append("[").append(i).append(":").append(map.inverse().get(i))
-          .append(":").append(classIdentifiers[i]).append("]");
-      if (i != classIdentifiers.length - 1) {
+          .append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]");
+      if (i != namedEntityDescriptors.size() - 1) {
         sb.append(",");
       }
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 599c208..1e34184 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -27,8 +27,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -46,7 +49,6 @@ import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -100,28 +102,28 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   public TaskAttemptListenerImpTezDag(AppContext context,
                                       TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
-                                      String [] taskCommunicatorClassIdentifiers,
+                                      List<NamedEntityDescriptor> taskCommunicatorDescriptors,
                                       Configuration conf,
                                       boolean isPureLocalMode) {
     super(TaskAttemptListenerImpTezDag.class.getName());
     this.context = context;
     this.taskHeartbeatHandler = thh;
     this.containerHeartbeatHandler = chh;
-    if (taskCommunicatorClassIdentifiers == null || taskCommunicatorClassIdentifiers.length == 0) {
+    if (taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.isEmpty()) {
       if (isPureLocalMode) {
-        taskCommunicatorClassIdentifiers =
-            new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+        taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
+            TezConstants.getTezUberServicePluginName(), null));
       } else {
-        taskCommunicatorClassIdentifiers =
-            new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+        taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor(
+            TezConstants.getTezYarnServicePluginName(), null));
       }
     }
-    this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
-    this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length];
-    this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorClassIdentifiers.length];
-    for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
+    this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
+    this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
+    this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
+    for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
       taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, conf, i);
-      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i);
+      taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
       taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
     }
     // TODO TEZ-2118 Start using taskCommunicator indices properly
@@ -143,17 +145,18 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
   }
 
-  private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) {
-    if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+  private TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor, int taskCommIndex) {
+    if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
       LOG.info("Using Default Task Communicator");
       return createTezTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
-    } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+    } else if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) {
       LOG.info("Using Default Local Task Communicator");
       return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
     } else {
-      LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
+      // TODO TEZ-2003. Use the payload
+      LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), taskCommDescriptor.getClassName());
       Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
-          .getClazz(taskCommClassIdentifier);
+          .getClazz(taskCommDescriptor.getClassName());
       try {
         Constructor<? extends TaskCommunicator> ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
         ctor.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 458362f..335239e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -98,4 +98,6 @@ public interface DAG {
 
   StateChangeNotifier getStateChangeNotifier();
 
+  org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext();
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index ec2ef66..25518b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -717,6 +717,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
+  public org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext() {
+    if (jobPlan.hasDefaultExecutionContext()) {
+      return DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext());
+    } else {
+      return null;
+    }
+  }
+
+  @Override
   public TezCounters getAllCounters() {
 
     readLock.lock();

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index bdab984..2e8f218 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -995,14 +995,37 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
 
     String tezDefaultComponentName =
-        isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT :
-            TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT;
-    String taskSchedulerName =
-        vertexConf.get(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, tezDefaultComponentName);
-    String taskCommName = vertexConf
-        .get(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, tezDefaultComponentName);
-    String containerLauncherName = vertexConf
-        .get(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, tezDefaultComponentName);
+        isLocal ? TezConstants.getTezUberServicePluginName() :
+            TezConstants.getTezYarnServicePluginName();
+
+    org.apache.tez.dag.api.Vertex.VertexExecutionContext execContext = dag.getDefaultExecutionContext();
+    if (vertexPlan.hasExecutionContext()) {
+      execContext = DagTypeConverters.convertFromProto(vertexPlan.getExecutionContext());
+      LOG.info("Using ExecutionContext from Vertex for Vertex {}", vertexName);
+    } else if (execContext != null) {
+      LOG.info("Using ExecutionContext from DAG for Vertex {}", vertexName);
+    }
+    if (execContext != null) {
+      if (execContext.shouldExecuteInAm()) {
+        tezDefaultComponentName = TezConstants.getTezUberServicePluginName();
+      }
+    }
+
+    String taskSchedulerName = tezDefaultComponentName;
+    String containerLauncherName = tezDefaultComponentName;
+    String taskCommName = tezDefaultComponentName;
+
+    if (execContext != null) {
+      if (execContext.getTaskSchedulerName() != null) {
+        taskSchedulerName = execContext.getTaskSchedulerName();
+      }
+      if (execContext.getContainerLauncherName() != null) {
+        containerLauncherName = execContext.getContainerLauncherName();
+      }
+      if (execContext.getTaskCommName() != null) {
+        taskCommName = execContext.getTaskCommName();
+      }
+    }
 
     LOG.info("Vertex: " + logIdentifier + " configured with TaskScheduler=" + taskSchedulerName +
         ", ContainerLauncher=" + containerLauncherName + ", TaskComm=" + taskCommName);

http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 34c7bc0..cba5c80 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,11 +30,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
 import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.apache.tez.dag.api.TezConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -269,8 +269,8 @@ public class ContainerLauncherImpl extends ContainerLauncher {
 
             // nodes where containers will run at *this* point of time. This is
             // *not* the cluster size and doesn't need to be.
-            int numNodes = getContext().getNumNodes(
-                TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+            int numNodes =
+                getContext().getNumNodes(TezConstants.getTezYarnServicePluginName());
             int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
 
             if (poolSize < idealPoolSize) {


Mime
View raw message