Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 466D71848F for ; Sat, 22 Aug 2015 07:26:02 +0000 (UTC) Received: (qmail 979 invoked by uid 500); 22 Aug 2015 07:26:02 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 857 invoked by uid 500); 22 Aug 2015 07:26:02 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 124 invoked by uid 99); 22 Aug 2015 07:26:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 07:26:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 84832E0570; Sat, 22 Aug 2015 07:26:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Sat, 22 Aug 2015 07:26:31 -0000 Message-Id: In-Reply-To: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org> References: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [32/50] [abbrv] tez git commit: TEZ-2652. Cleanup the way services are specified for an AM and vertices. (sseth) TEZ-2652. Cleanup the way services are specified for an AM and vertices. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ec5acd8a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ec5acd8a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ec5acd8a Branch: refs/heads/master Commit: ec5acd8a6243db277b4145228f34917811984cc6 Parents: 25a6a13 Author: Siddharth Seth Authored: Tue Jul 28 14:56:20 2015 -0700 Committer: Siddharth Seth Committed: Fri Aug 21 18:14:40 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 81 ++++++++- .../org/apache/tez/client/TezClientUtils.java | 56 +++++- .../main/java/org/apache/tez/dag/api/DAG.java | 48 +++++- .../apache/tez/dag/api/DagTypeConverters.java | 93 +++++++++- .../tez/dag/api/NamedEntityDescriptor.java | 33 ++++ .../apache/tez/dag/api/TezConfiguration.java | 31 ---- .../org/apache/tez/dag/api/TezConstants.java | 11 +- .../java/org/apache/tez/dag/api/Vertex.java | 110 +++++++++++- .../api/ContainerLauncherDescriptor.java | 32 ++++ .../api/ServicePluginsDescriptor.java | 96 +++++++++++ .../api/TaskCommunicatorDescriptor.java | 33 ++++ .../api/TaskSchedulerDescriptor.java | 32 ++++ tez-api/src/main/proto/DAGApiRecords.proto | 25 +++ .../apache/tez/client/TestTezClientUtils.java | 12 +- .../org/apache/tez/common/TezUtilsInternal.java | 31 +++- .../java/org/apache/tez/client/LocalClient.java | 2 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 172 +++++++++++-------- .../dag/app/TaskAttemptListenerImpTezDag.java | 37 ++-- .../java/org/apache/tez/dag/app/dag/DAG.java | 2 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 9 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 39 ++++- .../dag/app/launcher/ContainerLauncherImpl.java | 6 +- .../app/launcher/ContainerLauncherRouter.java | 40 +++-- .../dag/app/rm/TaskSchedulerEventHandler.java | 47 ++--- .../apache/tez/dag/app/MockDAGAppMaster.java | 5 +- .../app/TestTaskAttemptListenerImplTezDag.java | 5 +- .../app/rm/TestTaskSchedulerEventHandler.java | 4 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 3 +- .../org/apache/tez/examples/JoinValidate.java | 31 ++-- .../TezTestServiceContainerLauncher.java | 3 +- .../tez/examples/JoinValidateConfigured.java | 40 +++-- .../tez/tests/TestExternalTezServices.java | 131 +++++++------- .../org/apache/tez/runtime/task/TezChild.java | 5 +- 34 files changed, 995 insertions(+), 311 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index e57f76f..a201942 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -37,5 +37,6 @@ ALL CHANGES: TEZ-2004. Define basic interface for pluggable ContainerLaunchers. TEZ-2005. Define basic interface for pluggable TaskScheduler. TEZ-2651. Pluggable services should not extend AbstractService. + TEZ-2652. Cleanup the way services are specified for an AM and vertices. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index f9d8973..ee6280a 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -27,6 +27,7 @@ import javax.annotation.Nullable; import org.apache.tez.common.RPCUtil; import org.apache.tez.common.counters.Limits; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -112,8 +113,9 @@ public class TezClient { private static final long SLEEP_FOR_READY = 500; private JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); - private Map additionalLocalResources = Maps.newHashMap(); - private TezApiVersionInfo apiVersionInfo; + private final Map additionalLocalResources = Maps.newHashMap(); + private final TezApiVersionInfo apiVersionInfo; + private final ServicePluginsDescriptor servicePluginsDescriptor; private HistoryACLPolicyManager historyACLPolicyManager; private int preWarmDAGCounter = 0; @@ -143,19 +145,45 @@ public class TezClient { @Private protected TezClient(String name, TezConfiguration tezConf, boolean isSession, + @Nullable Map localResources, + @Nullable Credentials credentials) { + this(name, tezConf, isSession, localResources, credentials, null); + } + + @Private + protected TezClient(String name, TezConfiguration tezConf, boolean isSession, @Nullable Map localResources, - @Nullable Credentials credentials) { + @Nullable Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor) { this.clientName = name; this.isSession = isSession; // Set in conf for local mode AM to figure out whether in session mode or not tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession); this.amConfig = new AMConfiguration(tezConf, localResources, credentials); this.apiVersionInfo = new TezApiVersionInfo(); + this.servicePluginsDescriptor = servicePluginsDescriptor; Limits.setConfiguration(tezConf); LOG.info("Tez Client Version: " + apiVersionInfo.toString()); } + + /** + * Create a new TezClientBuilder. This can be used to setup additional parameters + * like session mode, local resources, credentials, servicePlugins, etc. + *

+ * If session mode is not specified in the builder, this will be inferred from + * the provided TezConfiguration. + * + * @param name Name of the client. Used for logging etc. This will also be used + * as app master name is session mode + * @param tezConf Configuration for the framework + * @return An instance of {@link org.apache.tez.client.TezClient.TezClientBuilder} + * which can be used to construct the final TezClient. + */ + public static TezClientBuilder newBuilder(String name, TezConfiguration tezConf) { + return new TezClientBuilder(name, tezConf); + } + /** * Create a new TezClient. Session or non-session execution mode will be * inferred from configuration. @@ -357,7 +385,7 @@ public class TezClient { sessionAppId, null, clientName, amConfig, tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo, - historyACLPolicyManager); + historyACLPolicyManager, servicePluginsDescriptor); // Set Tez Sessions to not retry on AM crashes if recovery is disabled if (!amConfig.getTezConfiguration().getBoolean( @@ -773,7 +801,8 @@ public class TezClient { ApplicationSubmissionContext appContext = TezClientUtils .createApplicationSubmissionContext( appId, dag, dag.getName(), amConfig, tezJarResources, credentials, - usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager); + usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager, + servicePluginsDescriptor); LOG.info("Submitting DAG to YARN" + ", applicationId=" + appId + ", dagName=" + dag.getName()); @@ -848,4 +877,46 @@ public class TezClient { append(SEPARATOR). append(tezDagIdFormat.get().format(1)).toString(); } + + @Public + public static class TezClientBuilder { + final String name; + final TezConfiguration tezConf; + boolean isSession; + private Map localResourceMap; + private Credentials credentials; + ServicePluginsDescriptor servicePluginsDescriptor; + + private TezClientBuilder(String name, TezConfiguration tezConf) { + this.name = name; + this.tezConf = tezConf; + isSession = tezConf.getBoolean( + TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT); + } + + public TezClientBuilder setIsSession(boolean isSession) { + this.isSession = isSession; + return this; + } + + public TezClientBuilder setLocalResources(Map localResources) { + this.localResourceMap = localResources; + return this; + } + + public TezClientBuilder setCredentials(Credentials credentials) { + this.credentials = credentials; + return this; + } + + public TezClientBuilder setServicePluginDescriptor(ServicePluginsDescriptor servicePluginsDescriptor) { + this.servicePluginsDescriptor = servicePluginsDescriptor; + return this; + } + + public TezClient build() { + return new TezClient(name, tezConf, isSession, localResourceMap, credentials, + servicePluginsDescriptor); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 8bfaa1f..9cf1f3f 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -39,6 +39,10 @@ import java.util.Map.Entry; import com.google.common.base.Strings; import org.apache.commons.lang.StringUtils; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -405,6 +409,7 @@ public class TezClientUtils { * @param tezJarResources Resources to be used by the AM * @param sessionCreds the credential object which will be populated with session specific * @param historyACLPolicyManager + * @param servicePluginsDescriptor descriptor for services which may be running in the AM * @return an ApplicationSubmissionContext to launch a Tez AM * @throws IOException * @throws YarnException @@ -415,7 +420,8 @@ public class TezClientUtils { ApplicationId appId, DAG dag, String amName, AMConfiguration amConfig, Map tezJarResources, Credentials sessionCreds, boolean tezLrsAsArchive, - TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager) + TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager, + ServicePluginsDescriptor servicePluginsDescriptor) throws IOException, YarnException { Preconditions.checkNotNull(sessionCreds); @@ -551,7 +557,7 @@ public class TezClientUtils { // emit conf as PB file ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration(), - aclConfigs); + aclConfigs, servicePluginsDescriptor); FSDataOutputStream amConfPBOutBinaryStream = null; try { @@ -752,12 +758,8 @@ public class TezClientUtils { + "," + TezConstants.TEZ_CONTAINER_LOGGER_NAME); } - static ConfigurationProto createFinalConfProtoForApp(Configuration amConf) { - return createFinalConfProtoForApp(amConf, null); - } - static ConfigurationProto createFinalConfProtoForApp(Configuration amConf, - Map additionalConfigs) { + Map additionalConfigs, ServicePluginsDescriptor servicePluginsDescriptor) { assert amConf != null; ConfigurationProto.Builder builder = ConfigurationProto.newBuilder(); for (Entry entry : amConf) { @@ -774,9 +776,49 @@ public class TezClientUtils { builder.addConfKeyValues(kvp); } } + + AMPluginDescriptorProto pluginDescriptorProto = + createAMServicePluginDescriptorProto(servicePluginsDescriptor); + builder.setAmPluginDescriptor(pluginDescriptorProto); + return builder.build(); } + static AMPluginDescriptorProto createAMServicePluginDescriptorProto( + ServicePluginsDescriptor servicePluginsDescriptor) { + AMPluginDescriptorProto.Builder pluginDescriptorBuilder = + AMPluginDescriptorProto.newBuilder(); + if (servicePluginsDescriptor != null) { + + pluginDescriptorBuilder.setContainersEnabled(servicePluginsDescriptor.areContainersEnabled()); + pluginDescriptorBuilder.setUberEnabled(servicePluginsDescriptor.isUberEnabled()); + + if (servicePluginsDescriptor.getTaskSchedulerDescriptors() != null && + servicePluginsDescriptor.getTaskSchedulerDescriptors().length > 0) { + List namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto( + servicePluginsDescriptor.getTaskSchedulerDescriptors()); + pluginDescriptorBuilder.addAllTaskScedulers(namedEntityProtos); + } + + if (servicePluginsDescriptor.getContainerLauncherDescriptors() != null && + servicePluginsDescriptor.getContainerLauncherDescriptors().length > 0) { + List namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto( + servicePluginsDescriptor.getContainerLauncherDescriptors()); + pluginDescriptorBuilder.addAllContainerLaunchers(namedEntityProtos); + } + + if (servicePluginsDescriptor.getTaskCommunicatorDescriptors() != null && + servicePluginsDescriptor.getTaskCommunicatorDescriptors().length > 0) { + List namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto( + servicePluginsDescriptor.getTaskCommunicatorDescriptors()); + pluginDescriptorBuilder.addAllTaskCommunicators(namedEntityProtos); + } + + } else { + pluginDescriptorBuilder.setContainersEnabled(true).setUberEnabled(false); + } + return pluginDescriptorBuilder.build(); + } /** * Helper function to create a YARN LocalResource http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 8ee1682..fce9522 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -32,6 +32,9 @@ import java.util.Stack; import org.apache.commons.collections4.BidiMap; import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tez.dag.api.Vertex.VertexExecutionContext; +import org.apache.tez.dag.api.records.DAGProtos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -60,9 +63,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -92,6 +93,7 @@ public class DAG { Map commonTaskLocalFiles = Maps.newHashMap(); String dagInfo; private Map dagConf = new HashMap(); + private VertexExecutionContext defaultExecutionContext; private Stack topologicalVertexStack = new Stack(); @@ -335,6 +337,26 @@ public class DAG { return this; } + /** + * Sets the default execution context for the DAG. This can be overridden at a per Vertex level. + * See {@link org.apache.tez.dag.api.Vertex#setExecutionContext(VertexExecutionContext)} + * + * @param vertexExecutionContext the default execution context for the DAG + * + * @return + */ + @Public + @InterfaceStability.Unstable + public synchronized DAG setExecutionContext(VertexExecutionContext vertexExecutionContext) { + this.defaultExecutionContext = vertexExecutionContext; + return this; + } + + @Private + VertexExecutionContext getDefaultExecutionContext() { + return this.defaultExecutionContext; + } + @Private @VisibleForTesting public Map getDagConf() { @@ -707,7 +729,15 @@ public class DAG { if (this.dagInfo != null && !this.dagInfo.isEmpty()) { dagBuilder.setDagInfo(this.dagInfo); } - + + // Setup default execution context. + VertexExecutionContext defaultContext = getDefaultExecutionContext(); + if (defaultContext != null) { + DAGProtos.VertexExecutionContextProto contextProto = DagTypeConverters.convertToProto( + defaultContext); + dagBuilder.setDefaultExecutionContext(contextProto); + } + if (!vertexGroups.isEmpty()) { for (VertexGroup av : vertexGroups) { GroupInfo groupInfo = av.getGroupInfo(); @@ -800,7 +830,17 @@ public class DAG { vertexBuilder.setName(vertex.getName()); vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46. vertexBuilder.setProcessorDescriptor(DagTypeConverters - .convertToDAGPlan(vertex.getProcessorDescriptor())); + .convertToDAGPlan(vertex.getProcessorDescriptor())); + + // Vertex ExecutionContext setup + VertexExecutionContext execContext = vertex.getVertexExecutionContext(); + if (execContext != null) { + DAGProtos.VertexExecutionContextProto contextProto = + DagTypeConverters.convertToProto(execContext); + vertexBuilder.setExecutionContext(contextProto); + } + // End of VertexExecutionContext setup. + if (vertex.getInputs().size() > 0) { for (RootInputLeafOutput input : vertex.getInputs()) { vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input)); http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 8b1d553..2e0d417 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -73,11 +73,15 @@ import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto; import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto; import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto; import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexLocationHintProto; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.ByteString.Output; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; @Private public class DagTypeConverters { @@ -399,6 +403,8 @@ public class DagTypeConverters { return userPayload; } + + private static void setUserPayload(EntityDescriptor entity, UserPayload payload) { if (payload != null) { entity.setUserPayload(payload); @@ -423,6 +429,15 @@ public class DagTypeConverters { return od; } + public static NamedEntityDescriptor convertNamedDescriptorFromProto(TezNamedEntityDescriptorProto proto) { + String name = proto.getName(); + String className = proto.getEntityDescriptor().getClassName(); + UserPayload payload = convertTezUserPayloadFromDAGPlan(proto.getEntityDescriptor()); + NamedEntityDescriptor descriptor = new NamedEntityDescriptor(name, className); + setUserPayload(descriptor, payload); + return descriptor; + } + public static InputInitializerDescriptor convertInputInitializerDescriptorFromDAGPlan( TezEntityDescriptorProto proto) { String className = proto.getClassName(); @@ -550,11 +565,11 @@ public class DagTypeConverters { public static LocalResource convertPlanLocalResourceToLocalResource( PlanLocalResource plr) { return LocalResource.newInstance( - ConverterUtils.getYarnUrlFromPath(new Path(plr.getUri())), - DagTypeConverters.convertFromDAGPlan(plr.getType()), - DagTypeConverters.convertFromDAGPlan(plr.getVisibility()), - plr.getSize(), plr.getTimeStamp(), - plr.hasPattern() ? plr.getPattern() : null); + ConverterUtils.getYarnUrlFromPath(new Path(plr.getUri())), + DagTypeConverters.convertFromDAGPlan(plr.getType()), + DagTypeConverters.convertFromDAGPlan(plr.getVisibility()), + plr.getSize(), plr.getTimeStamp(), + plr.hasPattern() ? plr.getPattern() : null); } public static TezCounters convertTezCountersFromProto(TezCountersProto proto) { @@ -717,4 +732,72 @@ public class DagTypeConverters { return payload.getPayload(); } + public static DAGProtos.VertexExecutionContextProto convertToProto( + Vertex.VertexExecutionContext context) { + if (context == null) { + return null; + } else { + DAGProtos.VertexExecutionContextProto.Builder builder = + DAGProtos.VertexExecutionContextProto.newBuilder(); + builder.setExecuteInAm(context.shouldExecuteInAm()); + builder.setExecuteInContainers(context.shouldExecuteInContainers()); + if (context.getTaskSchedulerName() != null) { + builder.setTaskSchedulerName(context.getTaskSchedulerName()); + } + if (context.getContainerLauncherName() != null) { + builder.setContainerLauncherName(context.getContainerLauncherName()); + } + if (context.getTaskCommName() != null) { + builder.setTaskCommName(context.getTaskCommName()); + } + return builder.build(); + } + } + + public static Vertex.VertexExecutionContext convertFromProto( + DAGProtos.VertexExecutionContextProto proto) { + if (proto == null) { + return null; + } else { + if (proto.getExecuteInAm()) { + Vertex.VertexExecutionContext context = + Vertex.VertexExecutionContext.createExecuteInAm(proto.getExecuteInAm()); + return context; + } else if (proto.getExecuteInContainers()) { + Vertex.VertexExecutionContext context = + Vertex.VertexExecutionContext.createExecuteInContainers(proto.getExecuteInContainers()); + return context; + } else { + String taskScheduler = proto.hasTaskSchedulerName() ? proto.getTaskSchedulerName() : null; + String containerLauncher = + proto.hasContainerLauncherName() ? proto.getContainerLauncherName() : null; + String taskComm = proto.hasTaskCommName() ? proto.getTaskCommName() : null; + Vertex.VertexExecutionContext context = + Vertex.VertexExecutionContext.create(taskScheduler, containerLauncher, taskComm); + return context; + } + } + } + + public static List convertNamedEntityCollectionToProto( + NamedEntityDescriptor[] namedEntityDescriptors) { + List list = + Lists.newArrayListWithCapacity(namedEntityDescriptors.length); + for (NamedEntityDescriptor namedEntity : namedEntityDescriptors) { + TezNamedEntityDescriptorProto namedEntityProto = convertNamedEntityToProto(namedEntity); + list.add(namedEntityProto); + } + return list; + } + + public static TezNamedEntityDescriptorProto convertNamedEntityToProto( + NamedEntityDescriptor namedEntityDescriptor) { + TezNamedEntityDescriptorProto.Builder builder = TezNamedEntityDescriptorProto.newBuilder(); + builder.setName(namedEntityDescriptor.getEntityName()); + DAGProtos.TezEntityDescriptorProto entityProto = + DagTypeConverters.convertToDAGPlan(namedEntityDescriptor); + builder.setEntityDescriptor(entityProto); + return builder.build(); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java new file mode 100644 index 0000000..bad0d10 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; + +public class NamedEntityDescriptor> extends EntityDescriptor { + private final String entityName; + + @InterfaceAudience.Private + public NamedEntityDescriptor(String entityName, String className) { + super(className); + Preconditions.checkArgument(entityName != null, "EntityName must be specified"); + this.entityName = entityName; + } + + public String getEntityName() { + return entityName; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 39a4c77..3b7378a 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1215,37 +1215,6 @@ public class TezConfiguration extends Configuration { + "tez-ui.webservice.enable"; public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true; - /** defaults container-launcher for the specific vertex */ - @ConfigurationScope(Scope.VERTEX) - public static final String TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME = TEZ_AM_PREFIX + "vertex.container-launcher.name"; - /** defaults task-scheduler for the specific vertex */ - @ConfigurationScope(Scope.VERTEX) - public static final String TEZ_AM_VERTEX_TASK_SCHEDULER_NAME = TEZ_AM_PREFIX + "vertex.task-scheduler.name"; - /** defaults task-communicator for the specific vertex */ - @ConfigurationScope(Scope.VERTEX) - public static final String TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME = TEZ_AM_PREFIX + "vertex.task-communicator.name"; - - /** Comma separated list of named container-launcher classes running in the AM. - * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez - * e.g. Tez, ExtService:org.apache.ExtLauncherClasss - * */ - @ConfigurationScope(Scope.AM) - public static final String TEZ_AM_CONTAINER_LAUNCHERS = TEZ_AM_PREFIX + "container-launchers"; - - /** Comma separated list of task-schedulers classes running in the AM. - * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez - * e.g. Tez, ExtService:org.apache.ExtSchedulerClasss - */ - @ConfigurationScope(Scope.AM) - public static final String TEZ_AM_TASK_SCHEDULERS = TEZ_AM_PREFIX + "task-schedulers"; - - /** Comma separated list of task-communicators classes running in the AM. - * The format for each entry is NAME:CLASSNAME, except for tez default which is specified as Tez - * e.g. Tez, ExtService:org.apache.ExtTaskCommClass - * */ - @ConfigurationScope(Scope.AM) - public static final String TEZ_AM_TASK_COMMUNICATORS = TEZ_AM_PREFIX + "task-communicators"; - // TODO only validate property here, value can also be validated if necessary public static void validateProperty(String property, Scope usedScope) { Scope validScope = PropertyScope.get(property); http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java index 3b07c59..6e1cb2d 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java @@ -102,7 +102,14 @@ public class TezConstants { /// Version-related Environment variables public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION"; + private static final String TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS = "TezYarn"; + private static final String TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM = "TezUber"; - public static final String TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT = "Tez"; - public static final String TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT = "TezLocal"; + public static String getTezYarnServicePluginName() { + return TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS; + } + + public static String getTezUberServicePluginName() { + return TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java index 0ed4bd8..34124b2 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java @@ -28,11 +28,11 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.VertexGroup.GroupInfo; -import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.runtime.api.LogicalIOProcessor; import com.google.common.base.Preconditions; @@ -57,6 +57,7 @@ public class Vertex { private final Map taskLocalResources = new HashMap(); private Map taskEnvironment = new HashMap(); private Map vertexConf = new HashMap(); + private VertexExecutionContext vertexExecutionContext; private final Map> additionalInputs = new HashMap>(); private final Map> additionalOutputs @@ -410,6 +411,108 @@ public class Vertex { return this; } + /** + * Sets the execution context for this Vertex - i.e. the Task Scheduler, ContainerLauncher and + * TaskCommunicator to be used. Also whether the vertex will be executed within the AM. + * If partially specified, the default components in Tez will be used - which may or may not work + * with the custom context. + * + * @param vertexExecutionContext the execution context for the vertex. + * + * @return + */ + public Vertex setExecutionContext(VertexExecutionContext vertexExecutionContext) { + this.vertexExecutionContext = vertexExecutionContext; + return this; + } + + @Public + @InterfaceStability.Unstable + public static class VertexExecutionContext { + final boolean executeInAm; + final boolean executeInContainers; + final String taskSchedulerName; + final String containerLauncherName; + final String taskCommName; + + public static VertexExecutionContext createExecuteInAm(boolean executeInAm) { + return new VertexExecutionContext(executeInAm, false); + } + + public static VertexExecutionContext createExecuteInContainers(boolean executeInContainers) { + return new VertexExecutionContext(false, executeInContainers); + } + + public static VertexExecutionContext create(String taskSchedulerName, String containerLauncherName, + String taskCommName) { + return new VertexExecutionContext(taskSchedulerName, containerLauncherName, taskCommName); + } + + private VertexExecutionContext(boolean executeInAm, boolean executeInContainers) { + this(executeInAm, executeInContainers, null, null, null); + } + + private VertexExecutionContext(String taskSchedulerName, String containerLauncherName, + String taskCommName) { + this(false, false, taskSchedulerName, containerLauncherName, taskCommName); + } + + private VertexExecutionContext(boolean executeInAm, boolean executeInContainers, String taskSchedulerName, String containerLauncherName, + String taskCommName) { + if (executeInAm || executeInContainers) { + Preconditions.checkState(!(executeInAm && executeInContainers), + "executeInContainers and executeInAM are mutually exclusive"); + Preconditions.checkState( + taskSchedulerName == null && containerLauncherName == null && taskCommName == null, + "Uber (in-AM) or container execution cannot be enabled with a custom plugins. TaskScheduler=" + + taskSchedulerName + ", ContainerLauncher=" + containerLauncherName + + ", TaskCommunicator=" + taskCommName); + } + if (taskSchedulerName != null || containerLauncherName != null || taskCommName != null) { + Preconditions.checkState(executeInAm == false && executeInContainers == false, + "Uber (in-AM) and container execution cannot be enabled with a custom plugins. TaskScheduler=" + + taskSchedulerName + ", ContainerLauncher=" + containerLauncherName + + ", TaskCommunicator=" + taskCommName); + } + this.executeInAm = executeInAm; + this.executeInContainers = executeInContainers; + this.taskSchedulerName = taskSchedulerName; + this.containerLauncherName = containerLauncherName; + this.taskCommName = taskCommName; + } + + public boolean shouldExecuteInAm() { + return executeInAm; + } + + public boolean shouldExecuteInContainers() { + return executeInContainers; + } + + public String getTaskSchedulerName() { + return taskSchedulerName; + } + + public String getContainerLauncherName() { + return containerLauncherName; + } + + public String getTaskCommName() { + return taskCommName; + } + + @Override + public String toString() { + return "VertexExecutionContext{" + + "executeInAm=" + executeInAm + + ", executeInContainers=" + executeInContainers + + ", taskSchedulerName='" + taskSchedulerName + '\'' + + ", containerLauncherName='" + containerLauncherName + '\'' + + ", taskCommName='" + taskCommName + '\'' + + '}'; + } + } + @Override public String toString() { return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]"; @@ -475,6 +578,11 @@ public class Vertex { return dataSinks; } + @Private + public VertexExecutionContext getVertexExecutionContext() { + return this.vertexExecutionContext; + } + List getInputEdges() { return inputEdges; } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java new file mode 100644 index 0000000..ff3c90e --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherDescriptor.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tez.dag.api.NamedEntityDescriptor; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ContainerLauncherDescriptor extends NamedEntityDescriptor { + + private ContainerLauncherDescriptor(String containerLauncherName, String containerLauncherClassname) { + super(containerLauncherName, containerLauncherClassname); + } + + public static ContainerLauncherDescriptor create(String containerLauncherName, String containerLauncherClassname) { + return new ContainerLauncherDescriptor(containerLauncherName, containerLauncherClassname); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java new file mode 100644 index 0000000..8df102a --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java @@ -0,0 +1,96 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ServicePluginsDescriptor { + + private final boolean enableContainers; + private final boolean enableUber; + + private TaskSchedulerDescriptor[] taskSchedulerDescriptors; + private ContainerLauncherDescriptor[] containerLauncherDescriptors; + private TaskCommunicatorDescriptor[] taskCommunicatorDescriptors; + + private ServicePluginsDescriptor(boolean enableContainers, boolean enableUber, + TaskSchedulerDescriptor[] taskSchedulerDescriptor, + ContainerLauncherDescriptor[] containerLauncherDescriptors, + TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) { + this.enableContainers = enableContainers; + this.enableUber = enableUber; + Preconditions.checkArgument(taskSchedulerDescriptors == null || taskSchedulerDescriptors.length > 0, + "TaskSchedulerDescriptors should either not be specified or at least 1 should be provided"); + this.taskSchedulerDescriptors = taskSchedulerDescriptor; + Preconditions.checkArgument(containerLauncherDescriptors == null || containerLauncherDescriptors.length > 0, + "ContainerLauncherDescriptor should either not be specified or at least 1 should be provided"); + this.containerLauncherDescriptors = containerLauncherDescriptors; + Preconditions.checkArgument(taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.length > 0, + "TaskCommunicatorDescriptors should either not be specified or at least 1 should be provided"); + this.taskCommunicatorDescriptors = taskCommunicatorDescriptors; + } + + public static ServicePluginsDescriptor create(TaskSchedulerDescriptor[] taskSchedulerDescriptor, + ContainerLauncherDescriptor[] containerLauncherDescriptors, + TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) { + return new ServicePluginsDescriptor(true, false, taskSchedulerDescriptor, + containerLauncherDescriptors, taskCommunicatorDescriptors); + } + + public static ServicePluginsDescriptor create(boolean enableUber, + TaskSchedulerDescriptor[] taskSchedulerDescriptor, + ContainerLauncherDescriptor[] containerLauncherDescriptors, + TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) { + return new ServicePluginsDescriptor(true, enableUber, taskSchedulerDescriptor, + containerLauncherDescriptors, taskCommunicatorDescriptors); + } + + public static ServicePluginsDescriptor create(boolean enableContainers, boolean enableUber, + TaskSchedulerDescriptor[] taskSchedulerDescriptor, + ContainerLauncherDescriptor[] containerLauncherDescriptors, + TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) { + return new ServicePluginsDescriptor(enableContainers, enableUber, taskSchedulerDescriptor, + containerLauncherDescriptors, taskCommunicatorDescriptors); + } + + public static ServicePluginsDescriptor create(boolean enableUber) { + return new ServicePluginsDescriptor(true, enableUber, null, null, null); + } + + + public boolean areContainersEnabled() { + return enableContainers; + } + + public boolean isUberEnabled() { + return enableUber; + } + + public TaskSchedulerDescriptor[] getTaskSchedulerDescriptors() { + return taskSchedulerDescriptors; + } + + public ContainerLauncherDescriptor[] getContainerLauncherDescriptors() { + return containerLauncherDescriptors; + } + + public TaskCommunicatorDescriptor[] getTaskCommunicatorDescriptors() { + return taskCommunicatorDescriptors; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java new file mode 100644 index 0000000..57ac385 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorDescriptor.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tez.dag.api.NamedEntityDescriptor; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TaskCommunicatorDescriptor extends NamedEntityDescriptor { + + + private TaskCommunicatorDescriptor(String taskCommName, String taskCommClassname) { + super(taskCommName, taskCommClassname); + } + + public static TaskCommunicatorDescriptor create(String taskCommName, String taskCommClassname) { + return new TaskCommunicatorDescriptor(taskCommName, taskCommClassname); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java new file mode 100644 index 0000000..12e0919 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerDescriptor.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tez.dag.api.NamedEntityDescriptor; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TaskSchedulerDescriptor extends NamedEntityDescriptor { + + private TaskSchedulerDescriptor(String taskSchedulerName, String schedulerClassname) { + super(taskSchedulerName, schedulerClassname); + } + + public static TaskSchedulerDescriptor create(String taskSchedulerName, String schedulerClassName) { + return new TaskSchedulerDescriptor(taskSchedulerName, schedulerClassName); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/main/proto/DAGApiRecords.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto index 959d4e6..ebe3259 100644 --- a/tez-api/src/main/proto/DAGApiRecords.proto +++ b/tez-api/src/main/proto/DAGApiRecords.proto @@ -127,6 +127,14 @@ message RootInputLeafOutputProto { optional TezEntityDescriptorProto controller_descriptor = 3; } +message VertexExecutionContextProto { + optional bool execute_in_am = 1; + optional bool execute_in_containers = 2; + optional string task_scheduler_name = 3; + optional string container_launcher_name = 4; + optional string task_comm_name = 5; +} + message VertexPlan { required string name = 1; required PlanVertexType type = 2; @@ -139,6 +147,7 @@ message VertexPlan { repeated RootInputLeafOutputProto outputs = 9; optional TezEntityDescriptorProto vertex_manager_plugin = 10; optional ConfigurationProto vertexConf = 11; + optional VertexExecutionContextProto execution_context = 12; } message PlanEdgeProperty { @@ -162,8 +171,23 @@ message EdgePlan { optional TezEntityDescriptorProto edge_manager = 9; } +message TezNamedEntityDescriptorProto { + optional string name = 1; + optional TezEntityDescriptorProto entity_descriptor = 2; +} + + +message AMPluginDescriptorProto { + optional bool containers_enabled = 1 [default = true]; + optional bool uber_enabled = 2 [default = false]; + repeated TezNamedEntityDescriptorProto task_scedulers = 3; + repeated TezNamedEntityDescriptorProto container_launchers = 4; + repeated TezNamedEntityDescriptorProto task_communicators = 5; +} + message ConfigurationProto { repeated PlanKeyValuePair confKeyValues = 1; + optional AMPluginDescriptorProto am_plugin_descriptor = 2; } message DAGPlan { @@ -175,6 +199,7 @@ message DAGPlan { repeated PlanVertexGroupInfo vertex_groups = 6; repeated PlanLocalResource local_resource = 7; optional string dag_info = 8; + optional VertexExecutionContextProto default_execution_context = 9; } // DAG monitoring messages http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java index 2d4e005..8946ef0 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java @@ -261,7 +261,7 @@ public class TestTezClientUtils { ApplicationSubmissionContext appSubmissionContext = TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf, new HashMap(), credentials, false, new TezApiVersionInfo(), - mock(HistoryACLPolicyManager.class)); + mock(HistoryACLPolicyManager.class), null); ContainerLaunchContext amClc = appSubmissionContext.getAMContainerSpec(); Map amServiceData = amClc.getServiceData(); @@ -294,7 +294,7 @@ public class TestTezClientUtils { ApplicationSubmissionContext appSubmissionContext = TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf, new HashMap(), credentials, false, new TezApiVersionInfo(), - mock(HistoryACLPolicyManager.class)); + mock(HistoryACLPolicyManager.class), null); List expectedCommands = new LinkedList(); expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"); @@ -334,7 +334,7 @@ public class TestTezClientUtils { ApplicationSubmissionContext appSubmissionContext = TezClientUtils.createApplicationSubmissionContext(appId, dag, "amName", amConf, new HashMap(), credentials, false, new TezApiVersionInfo(), - mock(HistoryACLPolicyManager.class)); + mock(HistoryACLPolicyManager.class), null); List expectedCommands = new LinkedList(); expectedCommands.add("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator"); @@ -516,7 +516,7 @@ public class TestTezClientUtils { expected.put("property1", val1); expected.put("property2", expVal2); - ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null); + ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null, null); for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) { String v = expected.remove(kvPair.getKey()); @@ -620,7 +620,7 @@ public class TestTezClientUtils { srcConf.set(entry.getKey(), entry.getValue()); } - ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf); + ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(srcConf, null, null); for (PlanKeyValuePair kvPair : confProto.getConfKeyValuesList()) { String val = confMap.remove(kvPair.getKey()); @@ -677,4 +677,6 @@ public class TestTezClientUtils { Assert.assertTrue(resourceNames.contains("dir2-f.txt")); } + // TODO TEZ-2003 Add test to validate ServicePluginDescriptor propagation + } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index 532e83c..1fb7ff9 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -57,7 +57,7 @@ public class TezUtilsInternal { private static final Logger LOG = LoggerFactory.getLogger(TezUtilsInternal.class); - public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws + public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws IOException { FileInputStream confPBBinaryStream = null; ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); @@ -72,14 +72,41 @@ public class TezUtilsInternal { } ConfigurationProto confProto = confProtoBuilder.build(); + return confProto; + } - List kvPairList = confProto.getConfKeyValuesList(); + public static void addUserSpecifiedTezConfiguration(Configuration conf, + List kvPairList) { if (kvPairList != null && !kvPairList.isEmpty()) { for (PlanKeyValuePair kvPair : kvPairList) { conf.set(kvPair.getKey(), kvPair.getValue()); } } } +// +// public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws +// IOException { +// FileInputStream confPBBinaryStream = null; +// ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); +// try { +// confPBBinaryStream = +// new FileInputStream(new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME)); +// confProtoBuilder.mergeFrom(confPBBinaryStream); +// } finally { +// if (confPBBinaryStream != null) { +// confPBBinaryStream.close(); +// } +// } +// +// ConfigurationProto confProto = confProtoBuilder.build(); +// +// List kvPairList = confProto.getConfKeyValuesList(); +// if (kvPairList != null && !kvPairList.isEmpty()) { +// for (PlanKeyValuePair kvPair : kvPairList) { +// conf.set(kvPair.getKey(), kvPair.getValue()); +// } +// } +// } public static byte[] compressBytes(byte[] inBytes) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index 1bb2002..508f817 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -341,7 +341,7 @@ public class LocalClient extends FrameworkClient { String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName) { return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs, - versionInfo.getVersion(), 1, credentials, jobUserName); + versionInfo.getVersion(), 1, credentials, jobUserName, null); } } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index e4d3f8b..4f75891 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -61,7 +62,11 @@ import com.google.common.collect.HashBiMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; +import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; +import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup; import org.apache.tez.dag.history.events.DAGRecoveredEvent; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -221,6 +226,7 @@ public class DAGAppMaster extends AbstractService { private final String workingDirectory; private final String[] localDirs; private final String[] logDirs; + private final AMPluginDescriptorProto amPluginDescriptorProto; private ContainerSignatureMatcher containerSignatureMatcher; private AMContainerMap containers; private AMNodeTracker nodes; @@ -312,7 +318,7 @@ public class DAGAppMaster extends AbstractService { ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession, String workingDirectory, String [] localDirs, String[] logDirs, String clientVersion, int maxAppAttempts, - Credentials credentials, String jobUserName) { + Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { super(DAGAppMaster.class.getName()); this.clock = clock; this.startTime = clock.getTime(); @@ -332,6 +338,7 @@ public class DAGAppMaster extends AbstractService { this.clientVersion = clientVersion; this.maxAppAttempts = maxAppAttempts; this.amCredentials = credentials; + this.amPluginDescriptorProto = pluginDescriptorProto; this.appMasterUgi = UserGroupInformation .createRemoteUser(jobUserName); this.appMasterUgi.addCredentials(amCredentials); @@ -380,28 +387,47 @@ public class DAGAppMaster extends AbstractService { this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); - String tezDefaultClassIdentifier = - isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT : - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT; + List taskSchedulerDescriptors; + List containerLauncherDescriptors; + List taskCommunicatorDescriptors; + boolean tezYarnEnabled = true; + boolean uberEnabled = false; - String[] taskSchedulerClassIdentifiers = parsePlugins(taskSchedulers, - conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, - tezDefaultClassIdentifier), - TezConfiguration.TEZ_AM_TASK_SCHEDULERS); + if (!isLocal) { + if (amPluginDescriptorProto == null) { + tezYarnEnabled = true; + uberEnabled = false; + } else { + tezYarnEnabled = amPluginDescriptorProto.getContainersEnabled(); + uberEnabled = amPluginDescriptorProto.getUberEnabled(); + } + } else { + tezYarnEnabled = false; + uberEnabled = true; + } + + taskSchedulerDescriptors = parsePlugin(taskSchedulers, + (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskScedulersCount() == 0 ? + null : + amPluginDescriptorProto.getTaskScedulersList()), + tezYarnEnabled, uberEnabled); - String[] containerLauncherClassIdentifiers = parsePlugins(containerLaunchers, - conf.getTrimmedStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, - tezDefaultClassIdentifier), - TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS); + containerLauncherDescriptors = parsePlugin(containerLaunchers, + (amPluginDescriptorProto == null || + amPluginDescriptorProto.getContainerLaunchersCount() == 0 ? null : + amPluginDescriptorProto.getContainerLaunchersList()), + tezYarnEnabled, uberEnabled); - String[] taskCommunicatorClassIdentifiers = parsePlugins(taskCommunicators, - conf.getTrimmedStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, - tezDefaultClassIdentifier), - TezConfiguration.TEZ_AM_TASK_COMMUNICATORS); + taskCommunicatorDescriptors = parsePlugin(taskCommunicators, + (amPluginDescriptorProto == null || + amPluginDescriptorProto.getTaskCommunicatorsCount() == 0 ? null : + amPluginDescriptorProto.getTaskCommunicatorsList()), + tezYarnEnabled, uberEnabled); - LOG.info(buildPluginComponentLog(taskSchedulerClassIdentifiers, taskSchedulers, "TaskSchedulers")); - LOG.info(buildPluginComponentLog(containerLauncherClassIdentifiers, containerLaunchers, "ContainerLaunchers")); - LOG.info(buildPluginComponentLog(taskCommunicatorClassIdentifiers, taskCommunicators, "TaskCommunicators")); + + LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers")); + LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers")); + LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators")); boolean disableVersionCheck = conf.getBoolean( TezConfiguration.TEZ_AM_DISABLE_CLIENT_VERSION_CHECK, @@ -468,7 +494,7 @@ public class DAGAppMaster extends AbstractService { //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context, - taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers, isLocal); + taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors, isLocal); addIfService(taskAttemptListener, true); containerSignatureMatcher = createContainerSignatureMatcher(); @@ -516,7 +542,7 @@ public class DAGAppMaster extends AbstractService { this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context, clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService, - taskSchedulerClassIdentifiers, isLocal); + taskSchedulerDescriptors, isLocal); addIfService(taskSchedulerEventHandler, true); if (enableWebUIService()) { @@ -534,7 +560,7 @@ public class DAGAppMaster extends AbstractService { taskSchedulerEventHandler); addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer); - this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers, isLocal); + this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherDescriptors, isLocal); addIfService(containerLauncherRouter, true); dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter); @@ -1044,11 +1070,11 @@ public class DAGAppMaster extends AbstractService { protected TaskAttemptListener createTaskAttemptListener(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - String[] taskCommunicatorClasses, + List entityDescriptors, boolean isLocal) { TaskAttemptListener lis = new TaskAttemptListenerImpTezDag(context, thh, chh, - taskCommunicatorClasses, amConf, isLocal); + entityDescriptors, amConf, isLocal); return lis; } @@ -1070,11 +1096,11 @@ public class DAGAppMaster extends AbstractService { } protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf, - String[] containerLauncherClasses, + List containerLauncherDescriptors, boolean isLocal) throws UnknownHostException { return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory, - containerLauncherClasses, isLocal); + containerLauncherDescriptors, isLocal); } public ApplicationId getAppID() { @@ -2140,7 +2166,16 @@ public class DAGAppMaster extends AbstractService { // TODO Does this really need to be a YarnConfiguration ? Configuration conf = new Configuration(new YarnConfiguration()); - TezUtilsInternal.addUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name()), conf); + + ConfigurationProto confProto = + TezUtilsInternal.readUserSpecifiedTezConfiguration(System.getenv(Environment.PWD.name())); + TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList()); + + AMPluginDescriptorProto amPluginDescriptorProto = null; + if (confProto.hasAmPluginDescriptor()) { + amPluginDescriptorProto = confProto.getAmPluginDescriptor(); + } + UserGroupInformation.setConfiguration(conf); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); @@ -2152,7 +2187,7 @@ public class DAGAppMaster extends AbstractService { System.getenv(Environment.PWD.name()), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name())), TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOG_DIRS.name())), - clientVersion, maxAppAttempts, credentials, jobUserName); + clientVersion, maxAppAttempts, credentials, jobUserName, amPluginDescriptorProto); ShutdownHookManager.get().addShutdownHook( new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); @@ -2258,7 +2293,7 @@ public class DAGAppMaster extends AbstractService { LOG.info("Running DAG: " + dagPlan.getName()); String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); - System.err.println(timeStamp + " Running Dag: "+ newDAG.getID()); + System.err.println(timeStamp + " Running Dag: " + newDAG.getID()); System.out.println(timeStamp + " Running Dag: "+ newDAG.getID()); // Job name is the same as the app name until we support multiple dags // for an app later @@ -2364,60 +2399,51 @@ public class DAGAppMaster extends AbstractService { TezConfiguration.TEZ_AM_WEBSERVICE_ENABLE_DEFAULT); } - // Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT - private String[] parsePlugins(BiMap pluginMap, String[] pluginStrings, - String context) { - // TODO TEZ-2003 Duplicate error checking - ideally in the client itself. Depends on the final API. - Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0, - "Plugin strings should not be null or empty: " + context); - - String[] classNames = new String[pluginStrings.length]; + private static List parsePlugin( + BiMap pluginMap, List namedEntityDescriptorProtos, + boolean tezYarnEnabled, boolean uberEnabled) { int index = 0; - for (String pluginString : pluginStrings) { - - String className; - String identifierString; - - Preconditions.checkState(pluginString != null && !pluginString.isEmpty(), - "Plugin string: " + pluginString + " should not be null or empty"); - if (pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) || - pluginString.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { - // Kind of ugly, but Tez internal routing is encoded via a String instead of classnames. - // Individual components - TaskComm, Scheduler, Launcher deal with actual classname translation, - // and avoid reflection. - identifierString = pluginString; - className = pluginString; - } else { - String[] parts = pluginString.split(":"); - Preconditions.checkState( - parts.length == 2 && parts[0] != null && !parts[0].isEmpty() && parts[1] != null && - !parts[1].isEmpty(), - "Invalid configuration string for " + context + ": " + pluginString); - Preconditions.checkState( - !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) && - !parts[0].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT), - "Identifier cannot be " + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT + " or " + - TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT + " for " + - pluginString); - identifierString = parts[0]; - className = parts[1]; - } - pluginMap.put(identifierString, index); - classNames[index] = className; + + List resultList = new LinkedList<>(); + + if (tezYarnEnabled) { + // Default classnames will be populated by individual components + NamedEntityDescriptor r = new NamedEntityDescriptor( + TezConstants.getTezYarnServicePluginName(), null); + resultList.add(r); + pluginMap.put(TezConstants.getTezYarnServicePluginName(), index); + index++; + } + + if (uberEnabled) { + // Default classnames will be populated by individual components + NamedEntityDescriptor r = new NamedEntityDescriptor( + TezConstants.getTezUberServicePluginName(), null); + resultList.add(r); + pluginMap.put(TezConstants.getTezUberServicePluginName(), index); index++; } - return classNames; + + if (namedEntityDescriptorProtos != null) { + for (TezNamedEntityDescriptorProto namedEntityDescriptorProto : namedEntityDescriptorProtos) { + resultList.add(DagTypeConverters + .convertNamedDescriptorFromProto(namedEntityDescriptorProto)); + pluginMap.put(resultList.get(index).getEntityName(), index); + index++; + } + } + return resultList; } - String buildPluginComponentLog(String[] classIdentifiers, BiMap map, + String buildPluginComponentLog(List namedEntityDescriptors, BiMap map, String component) { StringBuilder sb = new StringBuilder(); sb.append("AM Level configured ").append(component).append(": "); - for (int i = 0; i < classIdentifiers.length; i++) { + for (int i = 0; i < namedEntityDescriptors.size(); i++) { sb.append("[").append(i).append(":").append(map.inverse().get(i)) - .append(":").append(classIdentifiers[i]).append("]"); - if (i != classIdentifiers.length - 1) { + .append(":").append(namedEntityDescriptors.get(i).getClassName()).append("]"); + if (i != namedEntityDescriptors.size() - 1) { sb.append(","); } } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 599c208..1e34184 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -27,8 +27,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; @@ -46,7 +49,6 @@ import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskHeartbeatResponse; -import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -100,28 +102,28 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements public TaskAttemptListenerImpTezDag(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - String [] taskCommunicatorClassIdentifiers, + List taskCommunicatorDescriptors, Configuration conf, boolean isPureLocalMode) { super(TaskAttemptListenerImpTezDag.class.getName()); this.context = context; this.taskHeartbeatHandler = thh; this.containerHeartbeatHandler = chh; - if (taskCommunicatorClassIdentifiers == null || taskCommunicatorClassIdentifiers.length == 0) { + if (taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.isEmpty()) { if (isPureLocalMode) { - taskCommunicatorClassIdentifiers = - new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT}; + taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor( + TezConstants.getTezUberServicePluginName(), null)); } else { - taskCommunicatorClassIdentifiers = - new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + taskCommunicatorDescriptors = Lists.newArrayList(new NamedEntityDescriptor( + TezConstants.getTezYarnServicePluginName(), null)); } } - this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length]; - this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length]; - this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorClassIdentifiers.length]; - for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) { + this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()]; + this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()]; + this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()]; + for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) { taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, conf, i); - taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i); + taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i); taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]); } // TODO TEZ-2118 Start using taskCommunicator indices properly @@ -143,17 +145,18 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } } - private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex) { - if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { + private TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor, int taskCommIndex) { + if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) { LOG.info("Using Default Task Communicator"); return createTezTaskCommunicator(taskCommunicatorContexts[taskCommIndex]); - } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { + } else if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) { LOG.info("Using Default Local Task Communicator"); return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]); } else { - LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier); + // TODO TEZ-2003. Use the payload + LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), taskCommDescriptor.getClassName()); Class taskCommClazz = (Class) ReflectionUtils - .getClazz(taskCommClassIdentifier); + .getClazz(taskCommDescriptor.getClassName()); try { Constructor ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class); ctor.setAccessible(true); http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index 458362f..335239e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -98,4 +98,6 @@ public interface DAG { StateChangeNotifier getStateChangeNotifier(); + org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext(); + } http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index ec2ef66..25518b0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -717,6 +717,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } @Override + public org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext() { + if (jobPlan.hasDefaultExecutionContext()) { + return DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext()); + } else { + return null; + } + } + + @Override public TezCounters getAllCounters() { readLock.lock(); http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index bdab984..2e8f218 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -995,14 +995,37 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); String tezDefaultComponentName = - isLocal ? TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT : - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT; - String taskSchedulerName = - vertexConf.get(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, tezDefaultComponentName); - String taskCommName = vertexConf - .get(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, tezDefaultComponentName); - String containerLauncherName = vertexConf - .get(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, tezDefaultComponentName); + isLocal ? TezConstants.getTezUberServicePluginName() : + TezConstants.getTezYarnServicePluginName(); + + org.apache.tez.dag.api.Vertex.VertexExecutionContext execContext = dag.getDefaultExecutionContext(); + if (vertexPlan.hasExecutionContext()) { + execContext = DagTypeConverters.convertFromProto(vertexPlan.getExecutionContext()); + LOG.info("Using ExecutionContext from Vertex for Vertex {}", vertexName); + } else if (execContext != null) { + LOG.info("Using ExecutionContext from DAG for Vertex {}", vertexName); + } + if (execContext != null) { + if (execContext.shouldExecuteInAm()) { + tezDefaultComponentName = TezConstants.getTezUberServicePluginName(); + } + } + + String taskSchedulerName = tezDefaultComponentName; + String containerLauncherName = tezDefaultComponentName; + String taskCommName = tezDefaultComponentName; + + if (execContext != null) { + if (execContext.getTaskSchedulerName() != null) { + taskSchedulerName = execContext.getTaskSchedulerName(); + } + if (execContext.getContainerLauncherName() != null) { + containerLauncherName = execContext.getContainerLauncherName(); + } + if (execContext.getTaskCommName() != null) { + taskCommName = execContext.getTaskCommName(); + } + } LOG.info("Vertex: " + logIdentifier + " configured with TaskScheduler=" + taskSchedulerName + ", ContainerLauncher=" + containerLauncherName + ", TaskComm=" + taskCommName); http://git-wip-us.apache.org/repos/asf/tez/blob/ec5acd8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java index 34c7bc0..cba5c80 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java @@ -30,11 +30,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; import org.apache.tez.serviceplugins.api.ContainerStopRequest; -import org.apache.tez.dag.api.TezConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -269,8 +269,8 @@ public class ContainerLauncherImpl extends ContainerLauncher { // nodes where containers will run at *this* point of time. This is // *not* the cluster size and doesn't need to be. - int numNodes = getContext().getNumNodes( - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + int numNodes = + getContext().getNumNodes(TezConstants.getTezYarnServicePluginName()); int idealPoolSize = Math.min(limitOnPoolSize, numNodes); if (poolSize < idealPoolSize) {