Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 760FA18E21 for ; Sat, 22 Aug 2015 01:18:52 +0000 (UTC) Received: (qmail 10528 invoked by uid 500); 22 Aug 2015 01:18:52 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 10440 invoked by uid 500); 22 Aug 2015 01:18:52 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 8913 invoked by uid 99); 22 Aug 2015 01:18:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 01:18:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9A1DDE1144; Sat, 22 Aug 2015 01:18:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Sat, 22 Aug 2015 01:19:18 -0000 Message-Id: In-Reply-To: <874e92d7e5584fc6929ed08fa7b87588@git.apache.org> References: <874e92d7e5584fc6929ed08fa7b87588@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/50] [abbrv] tez git commit: TEZ-2657. Add tests for client side changes - specifying plugins, etc. (sseth) TEZ-2657. Add tests for client side changes - specifying plugins, etc. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/52f0e5d2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/52f0e5d2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/52f0e5d2 Branch: refs/heads/TEZ-2003 Commit: 52f0e5d2a0ec36c781f4f0cca63d2150426add9d Parents: 9ee4d5d Author: Siddharth Seth Authored: Wed Jul 29 18:26:01 2015 -0700 Committer: Siddharth Seth Committed: Fri Aug 21 18:14:40 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 15 +- .../org/apache/tez/client/TezClientUtils.java | 38 +--- .../apache/tez/dag/api/DagTypeConverters.java | 67 +++++-- .../java/org/apache/tez/dag/api/Vertex.java | 41 ++++ .../api/ServicePluginsDescriptor.java | 36 ++++ tez-api/src/main/proto/DAGApiRecords.proto | 2 +- .../org/apache/tez/client/TestTezClient.java | 111 +++++++++-- .../apache/tez/client/TestTezClientUtils.java | 16 +- .../org/apache/tez/dag/api/TestDAGPlan.java | 63 +++++- .../tez/dag/api/TestDagTypeConverters.java | 196 ++++++++++++++++++- .../org/apache/tez/dag/app/DAGAppMaster.java | 4 +- 12 files changed, 507 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 9d72d92..9b3967a 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -40,5 +40,6 @@ ALL CHANGES: TEZ-2652. Cleanup the way services are specified for an AM and vertices. TEZ-2653. Change service contexts to expose a user specified payload instead of the AM configuration. TEZ-2441. Add tests for TezTaskRunner2. + TEZ-2657. Add tests for client side changes - specifying plugins, etc. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index ee6280a..27f0a81 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -96,13 +96,16 @@ public class TezClient { @VisibleForTesting static final String NO_CLUSTER_DIAGNOSTICS_MSG = "No cluster diagnostics found."; - private final String clientName; + @VisibleForTesting + final String clientName; private ApplicationId sessionAppId; private ApplicationId lastSubmittedAppId; - private AMConfiguration amConfig; + @VisibleForTesting + final AMConfiguration amConfig; private FrameworkClient frameworkClient; private String diagnostics; - private boolean isSession; + @VisibleForTesting + final boolean isSession; private boolean sessionStarted = false; private boolean sessionStopped = false; /** Tokens which will be required for all DAGs submitted to this session. */ @@ -114,8 +117,10 @@ public class TezClient { private JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); private final Map additionalLocalResources = Maps.newHashMap(); - private final TezApiVersionInfo apiVersionInfo; - private final ServicePluginsDescriptor servicePluginsDescriptor; + @VisibleForTesting + final TezApiVersionInfo apiVersionInfo; + @VisibleForTesting + final ServicePluginsDescriptor servicePluginsDescriptor; private HistoryACLPolicyManager historyACLPolicyManager; private int preWarmDAGCounter = 0; http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 9cf1f3f..6086fa1 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -39,9 +39,7 @@ import java.util.Map.Entry; import com.google.common.base.Strings; import org.apache.commons.lang.StringUtils; -import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; -import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -778,47 +776,13 @@ public class TezClientUtils { } AMPluginDescriptorProto pluginDescriptorProto = - createAMServicePluginDescriptorProto(servicePluginsDescriptor); + DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); builder.setAmPluginDescriptor(pluginDescriptorProto); return builder.build(); } - static AMPluginDescriptorProto createAMServicePluginDescriptorProto( - ServicePluginsDescriptor servicePluginsDescriptor) { - AMPluginDescriptorProto.Builder pluginDescriptorBuilder = - AMPluginDescriptorProto.newBuilder(); - if (servicePluginsDescriptor != null) { - pluginDescriptorBuilder.setContainersEnabled(servicePluginsDescriptor.areContainersEnabled()); - pluginDescriptorBuilder.setUberEnabled(servicePluginsDescriptor.isUberEnabled()); - - if (servicePluginsDescriptor.getTaskSchedulerDescriptors() != null && - servicePluginsDescriptor.getTaskSchedulerDescriptors().length > 0) { - List namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto( - servicePluginsDescriptor.getTaskSchedulerDescriptors()); - pluginDescriptorBuilder.addAllTaskScedulers(namedEntityProtos); - } - - if (servicePluginsDescriptor.getContainerLauncherDescriptors() != null && - servicePluginsDescriptor.getContainerLauncherDescriptors().length > 0) { - List namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto( - servicePluginsDescriptor.getContainerLauncherDescriptors()); - pluginDescriptorBuilder.addAllContainerLaunchers(namedEntityProtos); - } - - if (servicePluginsDescriptor.getTaskCommunicatorDescriptors() != null && - servicePluginsDescriptor.getTaskCommunicatorDescriptors().length > 0) { - List namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto( - servicePluginsDescriptor.getTaskCommunicatorDescriptors()); - pluginDescriptorBuilder.addAllTaskCommunicators(namedEntityProtos); - } - - } else { - pluginDescriptorBuilder.setContainersEnabled(true).setUberEnabled(false); - } - return pluginDescriptorBuilder.build(); - } /** * Helper function to create a YARN LocalResource http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 2e0d417..61e4d33 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -52,9 +52,11 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType; @@ -74,14 +76,13 @@ import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto; import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto; import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto; import org.apache.tez.dag.api.records.DAGProtos.VertexLocationHintProto; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.ByteString.Output; -import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; -import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; -import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; @Private public class DagTypeConverters { @@ -732,13 +733,13 @@ public class DagTypeConverters { return payload.getPayload(); } - public static DAGProtos.VertexExecutionContextProto convertToProto( - Vertex.VertexExecutionContext context) { + public static VertexExecutionContextProto convertToProto( + VertexExecutionContext context) { if (context == null) { return null; } else { - DAGProtos.VertexExecutionContextProto.Builder builder = - DAGProtos.VertexExecutionContextProto.newBuilder(); + VertexExecutionContextProto.Builder builder = + VertexExecutionContextProto.newBuilder(); builder.setExecuteInAm(context.shouldExecuteInAm()); builder.setExecuteInContainers(context.shouldExecuteInContainers()); if (context.getTaskSchedulerName() != null) { @@ -754,26 +755,26 @@ public class DagTypeConverters { } } - public static Vertex.VertexExecutionContext convertFromProto( - DAGProtos.VertexExecutionContextProto proto) { + public static VertexExecutionContext convertFromProto( + VertexExecutionContextProto proto) { if (proto == null) { return null; } else { if (proto.getExecuteInAm()) { - Vertex.VertexExecutionContext context = - Vertex.VertexExecutionContext.createExecuteInAm(proto.getExecuteInAm()); + VertexExecutionContext context = + VertexExecutionContext.createExecuteInAm(proto.getExecuteInAm()); return context; } else if (proto.getExecuteInContainers()) { - Vertex.VertexExecutionContext context = - Vertex.VertexExecutionContext.createExecuteInContainers(proto.getExecuteInContainers()); + VertexExecutionContext context = + VertexExecutionContext.createExecuteInContainers(proto.getExecuteInContainers()); return context; } else { String taskScheduler = proto.hasTaskSchedulerName() ? proto.getTaskSchedulerName() : null; String containerLauncher = proto.hasContainerLauncherName() ? proto.getContainerLauncherName() : null; String taskComm = proto.hasTaskCommName() ? proto.getTaskCommName() : null; - Vertex.VertexExecutionContext context = - Vertex.VertexExecutionContext.create(taskScheduler, containerLauncher, taskComm); + VertexExecutionContext context = + VertexExecutionContext.create(taskScheduler, containerLauncher, taskComm); return context; } } @@ -800,4 +801,40 @@ public class DagTypeConverters { return builder.build(); } + public static AMPluginDescriptorProto convertServicePluginDescriptoToProto( + ServicePluginsDescriptor servicePluginsDescriptor) { + AMPluginDescriptorProto.Builder pluginDescriptorBuilder = + AMPluginDescriptorProto.newBuilder(); + if (servicePluginsDescriptor != null) { + + pluginDescriptorBuilder.setContainersEnabled(servicePluginsDescriptor.areContainersEnabled()); + pluginDescriptorBuilder.setUberEnabled(servicePluginsDescriptor.isUberEnabled()); + + if (servicePluginsDescriptor.getTaskSchedulerDescriptors() != null && + servicePluginsDescriptor.getTaskSchedulerDescriptors().length > 0) { + List namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto( + servicePluginsDescriptor.getTaskSchedulerDescriptors()); + pluginDescriptorBuilder.addAllTaskSchedulers(namedEntityProtos); + } + + if (servicePluginsDescriptor.getContainerLauncherDescriptors() != null && + servicePluginsDescriptor.getContainerLauncherDescriptors().length > 0) { + List namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto( + servicePluginsDescriptor.getContainerLauncherDescriptors()); + pluginDescriptorBuilder.addAllContainerLaunchers(namedEntityProtos); + } + + if (servicePluginsDescriptor.getTaskCommunicatorDescriptors() != null && + servicePluginsDescriptor.getTaskCommunicatorDescriptors().length > 0) { + List namedEntityProtos = DagTypeConverters.convertNamedEntityCollectionToProto( + servicePluginsDescriptor.getTaskCommunicatorDescriptors()); + pluginDescriptorBuilder.addAllTaskCommunicators(namedEntityProtos); + } + + } else { + pluginDescriptorBuilder.setContainersEnabled(true).setUberEnabled(false); + } + return pluginDescriptorBuilder.build(); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java index 34124b2..8953ae1 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java @@ -511,6 +511,47 @@ public class Vertex { ", taskCommName='" + taskCommName + '\'' + '}'; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + VertexExecutionContext that = (VertexExecutionContext) o; + + if (executeInAm != that.executeInAm) { + return false; + } + if (executeInContainers != that.executeInContainers) { + return false; + } + if (taskSchedulerName != null ? !taskSchedulerName.equals(that.taskSchedulerName) : + that.taskSchedulerName != null) { + return false; + } + if (containerLauncherName != null ? + !containerLauncherName.equals(that.containerLauncherName) : + that.containerLauncherName != null) { + return false; + } + return !(taskCommName != null ? !taskCommName.equals(that.taskCommName) : + that.taskCommName != null); + + } + + @Override + public int hashCode() { + int result = (executeInAm ? 1 : 0); + result = 31 * result + (executeInContainers ? 1 : 0); + result = 31 * result + (taskSchedulerName != null ? taskSchedulerName.hashCode() : 0); + result = 31 * result + (containerLauncherName != null ? containerLauncherName.hashCode() : 0); + result = 31 * result + (taskCommName != null ? taskCommName.hashCode() : 0); + return result; + } } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java index 8df102a..2e4fc46 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java @@ -46,6 +46,15 @@ public class ServicePluginsDescriptor { this.taskCommunicatorDescriptors = taskCommunicatorDescriptors; } + /** + * Create a service plugin descriptor with the provided plugins. Regular containers will also be enabled + * when using this method. + * + * @param taskSchedulerDescriptor the task scheduler plugin descriptors + * @param containerLauncherDescriptors the container launcher plugin descriptors + * @param taskCommunicatorDescriptors the task communicator plugin descriptors + * @return + */ public static ServicePluginsDescriptor create(TaskSchedulerDescriptor[] taskSchedulerDescriptor, ContainerLauncherDescriptor[] containerLauncherDescriptors, TaskCommunicatorDescriptor[] taskCommunicatorDescriptors) { @@ -53,6 +62,15 @@ public class ServicePluginsDescriptor { containerLauncherDescriptors, taskCommunicatorDescriptors); } + /** + * Create a service plugin descriptor with the provided plugins. Also allows specification of whether + * in-AM execution is enabled. Container execution is enabled by default. + * @param enableUber whether to enable execution in the AM or not + * @param taskSchedulerDescriptor the task scheduler plugin descriptors + * @param containerLauncherDescriptors the container launcher plugin descriptors + * @param taskCommunicatorDescriptors the task communicator plugin descriptors + * @return + */ public static ServicePluginsDescriptor create(boolean enableUber, TaskSchedulerDescriptor[] taskSchedulerDescriptor, ContainerLauncherDescriptor[] containerLauncherDescriptors, @@ -61,6 +79,17 @@ public class ServicePluginsDescriptor { containerLauncherDescriptors, taskCommunicatorDescriptors); } + /** + * Create a service plugin descriptor with the provided plugins. Also allows specification of whether + * container execution and in-AM execution will be enabled. + * + * @param enableContainers whether to enable execution in containers + * @param enableUber whether to enable execution in the AM or not + * @param taskSchedulerDescriptor the task scheduler plugin descriptors + * @param containerLauncherDescriptors the container launcher plugin descriptors + * @param taskCommunicatorDescriptors the task communicator plugin descriptors + * @return + */ public static ServicePluginsDescriptor create(boolean enableContainers, boolean enableUber, TaskSchedulerDescriptor[] taskSchedulerDescriptor, ContainerLauncherDescriptor[] containerLauncherDescriptors, @@ -69,6 +98,13 @@ public class ServicePluginsDescriptor { containerLauncherDescriptors, taskCommunicatorDescriptors); } + /** + * Create a service plugin descriptor which may have in-AM execution of tasks enabled. Container + * execution is enabled by default + * + * @param enableUber whether to enable execution in the AM or not + * @return + */ public static ServicePluginsDescriptor create(boolean enableUber) { return new ServicePluginsDescriptor(true, enableUber, null, null, null); } http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/main/proto/DAGApiRecords.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto index ebe3259..193f7b8 100644 --- a/tez-api/src/main/proto/DAGApiRecords.proto +++ b/tez-api/src/main/proto/DAGApiRecords.proto @@ -180,7 +180,7 @@ message TezNamedEntityDescriptorProto { message AMPluginDescriptorProto { optional bool containers_enabled = 1 [default = true]; optional bool uber_enabled = 2 [default = false]; - repeated TezNamedEntityDescriptorProto task_scedulers = 3; + repeated TezNamedEntityDescriptorProto task_schedulers = 3; repeated TezNamedEntityDescriptorProto container_launchers = 4; repeated TezNamedEntityDescriptorProto task_communicators = 5; } http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 92d3cd2..66b273a 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -27,6 +27,11 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; @@ -65,6 +70,7 @@ import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRespo import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezAppMasterStatusProto; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Test; @@ -165,11 +171,11 @@ public class TestTezClient { verify(client.mockYarnClient, times(1)).submitApplication(captor.capture()); ApplicationSubmissionContext context = captor.getValue(); Assert.assertEquals(3, context.getAMContainerSpec().getLocalResources().size()); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_PB_BINARY_CONF_NAME)); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( lrName1)); } else { verify(client.mockYarnClient, times(0)).submitApplication(captor.capture()); @@ -184,7 +190,7 @@ public class TestTezClient { DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG); DAGClient dagClient = client.submitDAG(dag); - Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString())); + assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString())); if (isSession) { verify(client.mockYarnClient, times(1)).submitApplication(captor.capture()); @@ -193,13 +199,13 @@ public class TestTezClient { verify(client.mockYarnClient, times(1)).submitApplication(captor.capture()); ApplicationSubmissionContext context = captor.getValue(); Assert.assertEquals(4, context.getAMContainerSpec().getLocalResources().size()); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_PB_BINARY_CONF_NAME)); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_PB_PLAN_BINARY_NAME)); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( lrName1)); } @@ -223,7 +229,7 @@ public class TestTezClient { if (isSession) { // same app master verify(client.mockYarnClient, times(1)).submitApplication(captor.capture()); - Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString())); + assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString())); // additional resource is sent ArgumentCaptor captor1 = ArgumentCaptor.forClass(SubmitDAGRequestProto.class); verify(client.sessionAmProxy, times(2)).submitDAG((RpcController)any(), captor1.capture()); @@ -232,20 +238,20 @@ public class TestTezClient { Assert.assertEquals(lrName2, proto.getAdditionalAmResources().getLocalResources(0).getName()); } else { // new app master - Assert.assertTrue(dagClient.getExecutionContext().contains(appId2.toString())); + assertTrue(dagClient.getExecutionContext().contains(appId2.toString())); verify(client.mockYarnClient, times(2)).submitApplication(captor.capture()); // additional resource is added ApplicationSubmissionContext context = captor.getValue(); Assert.assertEquals(5, context.getAMContainerSpec().getLocalResources().size()); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME)); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_PB_BINARY_CONF_NAME)); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( TezConstants.TEZ_PB_PLAN_BINARY_NAME)); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( lrName1)); - Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( + assertTrue(context.getAMContainerSpec().getLocalResources().containsKey( lrName2)); } @@ -275,7 +281,7 @@ public class TestTezClient { ArgumentCaptor captor1 = ArgumentCaptor.forClass(SubmitDAGRequestProto.class); verify(client.sessionAmProxy, times(1)).submitDAG((RpcController)any(), captor1.capture()); SubmitDAGRequestProto proto = captor1.getValue(); - Assert.assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)); + assertTrue(proto.getDAGPlan().getName().startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)); client.stop(); } @@ -360,7 +366,7 @@ public class TestTezClient { client.waitTillReady(); fail(); } catch (SessionNotRunning e) { - Assert.assertTrue(e.getMessage().contains(msg)); + assertTrue(e.getMessage().contains(msg)); } client.stop(); } @@ -375,7 +381,7 @@ public class TestTezClient { client.waitTillReady(); fail(); } catch (SessionNotRunning e) { - Assert.assertTrue(e.getMessage().contains(TezClient.NO_CLUSTER_DIAGNOSTICS_MSG)); + assertTrue(e.getMessage().contains(TezClient.NO_CLUSTER_DIAGNOSTICS_MSG)); } client.stop(); } @@ -400,7 +406,7 @@ public class TestTezClient { client.submitDAG(dag); fail(); } catch (SessionNotRunning e) { - Assert.assertTrue(e.getMessage().contains(msg)); + assertTrue(e.getMessage().contains(msg)); } client.stop(); } @@ -429,4 +435,71 @@ public class TestTezClient { } } + @Test(timeout = 5000) + public void testClientBuilder() { + TezConfiguration tezConfWitSession = new TezConfiguration(); + tezConfWitSession.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true); + + TezConfiguration tezConfNoSession = new TezConfiguration(); + tezConfNoSession.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false); + + AMConfiguration amConf; + TezClient tezClient; + Credentials credentials = new Credentials(); + Map localResourceMap = new HashMap<>(); + localResourceMap.put("testResource", mock(LocalResource.class)); + ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true); + + // Session mode via conf + tezClient = TezClient.newBuilder("client", tezConfWitSession).build(); + assertTrue(tezClient.isSession); + assertNull(tezClient.servicePluginsDescriptor); + assertNotNull(tezClient.apiVersionInfo); + amConf = tezClient.amConfig; + assertNotNull(amConf); + assertEquals(0, amConf.getAMLocalResources().size()); + assertNull(amConf.getCredentials()); + assertTrue( + amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false)); + + // Non-Session mode via conf + tezClient = TezClient.newBuilder("client", tezConfNoSession).build(); + assertFalse(tezClient.isSession); + assertNull(tezClient.servicePluginsDescriptor); + assertNotNull(tezClient.apiVersionInfo); + amConf = tezClient.amConfig; + assertNotNull(amConf); + assertEquals(0, amConf.getAMLocalResources().size()); + assertNull(amConf.getCredentials()); + assertFalse( + amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true)); + + // no-session via config. API explicit session. + tezClient = TezClient.newBuilder("client", tezConfNoSession).setIsSession(true).build(); + assertTrue(tezClient.isSession); + assertNull(tezClient.servicePluginsDescriptor); + assertNotNull(tezClient.apiVersionInfo); + amConf = tezClient.amConfig; + assertNotNull(amConf); + assertEquals(0, amConf.getAMLocalResources().size()); + assertNull(amConf.getCredentials()); + assertTrue( + amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false)); + + // Plugins, credentials, local resources + tezClient = TezClient.newBuilder("client", tezConfWitSession).setCredentials(credentials) + .setLocalResources(localResourceMap).setServicePluginDescriptor(servicePluginsDescriptor) + .build(); + assertTrue(tezClient.isSession); + assertEquals(servicePluginsDescriptor, tezClient.servicePluginsDescriptor); + assertNotNull(tezClient.apiVersionInfo); + amConf = tezClient.amConfig; + assertNotNull(amConf); + assertEquals(1, amConf.getAMLocalResources().size()); + assertEquals(localResourceMap, amConf.getAMLocalResources()); + assertEquals(credentials, amConf.getCredentials()); + assertTrue( + amConf.getTezConfiguration().getBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false)); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java index 8946ef0..8f40bbd 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClientUtils.java @@ -70,6 +70,7 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.junit.Assert; import org.junit.Test; /** @@ -500,7 +501,8 @@ public class TestTezClientUtils { Assert.assertNotNull(javaOpts); Assert.assertTrue(javaOpts.contains("-D" + TezConstants.TEZ_ROOT_LOGGER_NAME + "=FOOBAR") && javaOpts.contains(TezConstants.TEZ_CONTAINER_LOG4J_PROPERTIES_FILE) - && javaOpts.contains("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator")); + && + javaOpts.contains("-Dlog4j.configuratorClass=org.apache.tez.common.TezLog4jConfigurator")); } @Test (timeout = 5000) @@ -677,6 +679,16 @@ public class TestTezClientUtils { Assert.assertTrue(resourceNames.contains("dir2-f.txt")); } - // TODO TEZ-2003 Add test to validate ServicePluginDescriptor propagation + @Test(timeout = 5000) + public void testServiceDescriptorSerializationForAM() { + Configuration conf = new Configuration(false); + ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true); + + ConfigurationProto confProto = TezClientUtils.createFinalConfProtoForApp(conf, null, + servicePluginsDescriptor); + + assertTrue(confProto.hasAmPluginDescriptor()); + assertTrue(confProto.getAmPluginDescriptor().getUberEnabled()); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java index fccbb08..cd42109 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java @@ -37,11 +37,14 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.Vertex.VertexExecutionContext; +import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration; import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint; import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType; +import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.junit.Assert; import org.junit.Rule; @@ -131,7 +134,8 @@ public class TestDAGPlan { EdgeManagerPluginDescriptor emDesc = edgeProperty.getEdgeManagerDescriptor(); Assert.assertNotNull(emDesc); Assert.assertEquals("emClass", emDesc.getClassName()); - Assert.assertTrue(Arrays.equals("emPayload".getBytes(), emDesc.getUserPayload().deepCopyAsArray())); + Assert.assertTrue( + Arrays.equals("emPayload".getBytes(), emDesc.getUserPayload().deepCopyAsArray())); } @Test(timeout = 5000) @@ -311,4 +315,61 @@ public class TestDAGPlan { assertNotNull(fetchedCredentials.getToken(new Text("Token1"))); assertNotNull(fetchedCredentials.getToken(new Text("Token2"))); } + + @Test(timeout = 5000) + public void testServiceDescriptorPropagation() { + DAG dag = DAG.create("testDag"); + ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1"). + setUserPayload(UserPayload.create(ByteBuffer.wrap("processor1Bytes".getBytes()))); + ProcessorDescriptor pd2 = ProcessorDescriptor.create("processor2"). + setUserPayload(UserPayload.create(ByteBuffer.wrap("processor2Bytes".getBytes()))); + + VertexExecutionContext defaultExecutionContext = + VertexExecutionContext.create("plugin", "plugin", "plugin"); + VertexExecutionContext v1Context = VertexExecutionContext.createExecuteInAm(true); + + + Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)).setExecutionContext(v1Context); + Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1)); + v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap()) + .addTaskLocalFiles(new HashMap()); + v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap()) + .addTaskLocalFiles(new HashMap()); + + InputDescriptor inputDescriptor = InputDescriptor.create("input"). + setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes()))); + OutputDescriptor outputDescriptor = OutputDescriptor.create("output"). + setUserPayload(UserPayload.create(ByteBuffer.wrap("outputBytes".getBytes()))); + Edge edge = Edge.create(v1, v2, EdgeProperty.create( + DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, outputDescriptor, inputDescriptor)); + + dag.addVertex(v1).addVertex(v2).addEdge(edge); + dag.setExecutionContext(defaultExecutionContext); + + DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true); + + assertEquals(2, dagProto.getVertexCount()); + assertEquals(1, dagProto.getEdgeCount()); + + assertTrue(dagProto.hasDefaultExecutionContext()); + VertexExecutionContextProto defaultContextProto = dagProto.getDefaultExecutionContext(); + assertFalse(defaultContextProto.getExecuteInContainers()); + assertFalse(defaultContextProto.getExecuteInAm()); + assertEquals("plugin", defaultContextProto.getTaskSchedulerName()); + assertEquals("plugin", defaultContextProto.getContainerLauncherName()); + assertEquals("plugin", defaultContextProto.getTaskCommName()); + + VertexPlan v1Proto = dagProto.getVertex(0); + assertTrue(v1Proto.hasExecutionContext()); + VertexExecutionContextProto v1ContextProto = v1Proto.getExecutionContext(); + assertFalse(v1ContextProto.getExecuteInContainers()); + assertTrue(v1ContextProto.getExecuteInAm()); + assertFalse(v1ContextProto.hasTaskSchedulerName()); + assertFalse(v1ContextProto.hasContainerLauncherName()); + assertFalse(v1ContextProto.hasTaskCommName()); + + VertexPlan v2Proto = dagProto.getVertex(1); + assertFalse(v2Proto.hasExecutionContext()); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java index 51b179a..e37f849 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java @@ -18,15 +18,32 @@ package org.apache.tez.dag.api; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.dag.api.Vertex.VertexExecutionContext; +import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; +import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; import org.junit.Assert; import org.junit.Test; @@ -43,7 +60,7 @@ public class TestDagTypeConverters { DagTypeConverters.convertToDAGPlan(entityDescriptor); Assert.assertEquals(payload.getVersion(), proto.getTezUserPayload().getVersion()); Assert.assertArrayEquals(payload.deepCopyAsArray(), proto.getTezUserPayload().getUserPayload().toByteArray()); - Assert.assertTrue(proto.hasHistoryText()); + assertTrue(proto.hasHistoryText()); Assert.assertNotEquals(historytext, proto.getHistoryText()); Assert.assertEquals(historytext, new String( TezCommonUtils.decompressByteStringToByteArray(proto.getHistoryText()))); @@ -89,4 +106,181 @@ public class TestDagTypeConverters { Assert.assertEquals(2311, lr2UrlDeserialized.getPort()); } + + @Test(timeout = 5000) + public void testVertexExecutionContextTranslation() { + VertexExecutionContext originalContext; + VertexExecutionContextProto contextProto; + VertexExecutionContext retrievedContext; + + + // Uber + originalContext = VertexExecutionContext.createExecuteInAm(true); + contextProto = DagTypeConverters.convertToProto(originalContext); + retrievedContext = DagTypeConverters.convertFromProto(contextProto); + assertEquals(originalContext, retrievedContext); + + // Regular containers + originalContext = VertexExecutionContext.createExecuteInContainers(true); + contextProto = DagTypeConverters.convertToProto(originalContext); + retrievedContext = DagTypeConverters.convertFromProto(contextProto); + assertEquals(originalContext, retrievedContext); + + // Custom + originalContext = VertexExecutionContext.create("plugin", "plugin", "plugin"); + contextProto = DagTypeConverters.convertToProto(originalContext); + retrievedContext = DagTypeConverters.convertFromProto(contextProto); + assertEquals(originalContext, retrievedContext); + } + + + static final String testScheduler = "testScheduler"; + static final String testLauncher = "testLauncher"; + static final String testComm = "testComm"; + static final String classSuffix = "_class"; + + @Test(timeout = 5000) + public void testServiceDescriptorTranslation() { + + + TaskSchedulerDescriptor[] taskSchedulers; + ContainerLauncherDescriptor[] containerLaunchers; + TaskCommunicatorDescriptor[] taskComms; + + ServicePluginsDescriptor servicePluginsDescriptor; + AMPluginDescriptorProto proto; + + // Uber-execution + servicePluginsDescriptor = ServicePluginsDescriptor.create(true); + proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); + assertTrue(proto.hasUberEnabled()); + assertTrue(proto.hasContainersEnabled()); + assertTrue(proto.getUberEnabled()); + assertTrue(proto.getContainersEnabled()); + assertEquals(0, proto.getTaskSchedulersCount()); + assertEquals(0, proto.getContainerLaunchersCount()); + assertEquals(0, proto.getTaskCommunicatorsCount()); + + // Single plugin set specified. One with a payload. + taskSchedulers = createTaskScheduelrs(1, false); + containerLaunchers = createContainerLaunchers(1, false); + taskComms = createTaskCommunicators(1, true); + + servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers, + taskComms); + proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); + assertTrue(proto.hasUberEnabled()); + assertTrue(proto.hasContainersEnabled()); + assertFalse(proto.getUberEnabled()); + assertTrue(proto.getContainersEnabled()); + verifyPlugins(proto.getTaskSchedulersList(), 1, testScheduler, false); + verifyPlugins(proto.getContainerLaunchersList(), 1, testLauncher, false); + verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true); + + + // Multiple plugin set specified. All with a payload + taskSchedulers = createTaskScheduelrs(3, true); + containerLaunchers = createContainerLaunchers(3, true); + taskComms = createTaskCommunicators(3, true); + + servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers, + taskComms); + proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); + assertTrue(proto.hasUberEnabled()); + assertTrue(proto.hasContainersEnabled()); + assertFalse(proto.getUberEnabled()); + assertTrue(proto.getContainersEnabled()); + verifyPlugins(proto.getTaskSchedulersList(), 3, testScheduler, true); + verifyPlugins(proto.getContainerLaunchersList(), 3, testLauncher, true); + verifyPlugins(proto.getTaskCommunicatorsList(), 3, testComm, true); + + // Single plugin set specified. One with a payload. No container execution. Uber enabled. + taskSchedulers = createTaskScheduelrs(1, false); + containerLaunchers = createContainerLaunchers(1, false); + taskComms = createTaskCommunicators(1, true); + + servicePluginsDescriptor = ServicePluginsDescriptor.create(false, true, taskSchedulers, containerLaunchers, + taskComms); + proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor); + assertTrue(proto.hasUberEnabled()); + assertTrue(proto.hasContainersEnabled()); + assertTrue(proto.getUberEnabled()); + assertFalse(proto.getContainersEnabled()); + verifyPlugins(proto.getTaskSchedulersList(), 1, testScheduler, false); + verifyPlugins(proto.getContainerLaunchersList(), 1, testLauncher, false); + verifyPlugins(proto.getTaskCommunicatorsList(), 1, testComm, true); + } + + private void verifyPlugins(List entities, int expectedCount, + String baseString, boolean hasPayload) { + assertEquals(expectedCount, entities.size()); + for (int i = 0; i < expectedCount; i++) { + assertEquals(indexedEntity(baseString, i), entities.get(i).getName()); + TezEntityDescriptorProto subEntityProto = entities.get(i).getEntityDescriptor(); + assertEquals(append(indexedEntity(baseString, i), classSuffix), + subEntityProto.getClassName()); + assertEquals(hasPayload, subEntityProto.hasTezUserPayload()); + if (hasPayload) { + UserPayload userPayload = + UserPayload + .create(subEntityProto.getTezUserPayload().getUserPayload().asReadOnlyByteBuffer(), + subEntityProto.getTezUserPayload().getVersion()); + ByteBuffer bb = userPayload.getPayload(); + assertNotNull(bb); + assertEquals(i, bb.getInt()); + } + } + } + + private TaskSchedulerDescriptor[] createTaskScheduelrs(int count, boolean withUserPayload) { + TaskSchedulerDescriptor[] descriptors = new TaskSchedulerDescriptor[count]; + for (int i = 0; i < count; i++) { + descriptors[i] = TaskSchedulerDescriptor.create(indexedEntity(testScheduler, i), + append(indexedEntity(testScheduler, i), classSuffix)); + if (withUserPayload) { + descriptors[i].setUserPayload(createPayload(i)); + } + } + return descriptors; + } + + private ContainerLauncherDescriptor[] createContainerLaunchers(int count, + boolean withUserPayload) { + ContainerLauncherDescriptor[] descriptors = new ContainerLauncherDescriptor[count]; + for (int i = 0; i < count; i++) { + descriptors[i] = ContainerLauncherDescriptor.create(indexedEntity(testLauncher, i), + append(indexedEntity(testLauncher, i), classSuffix)); + if (withUserPayload) { + descriptors[i].setUserPayload(createPayload(i)); + } + } + return descriptors; + } + + private TaskCommunicatorDescriptor[] createTaskCommunicators(int count, boolean withUserPayload) { + TaskCommunicatorDescriptor[] descriptors = new TaskCommunicatorDescriptor[count]; + for (int i = 0; i < count; i++) { + descriptors[i] = TaskCommunicatorDescriptor.create(indexedEntity(testComm, i), + append(indexedEntity(testComm, i), classSuffix)); + if (withUserPayload) { + descriptors[i].setUserPayload(createPayload(i)); + } + } + return descriptors; + } + + private static UserPayload createPayload(int i) { + ByteBuffer bb = ByteBuffer.allocate(4); + bb.putInt(0, i); + UserPayload payload = UserPayload.create(bb); + return payload; + } + + private String indexedEntity(String name, int index) { + return name + index; + } + + private String append(String s1, String s2) { + return s1 + s2; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/52f0e5d2/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 52621bd..53e15e8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -409,9 +409,9 @@ public class DAGAppMaster extends AbstractService { } taskSchedulerDescriptors = parsePlugin(taskSchedulers, - (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskScedulersCount() == 0 ? + (amPluginDescriptorProto == null || amPluginDescriptorProto.getTaskSchedulersCount() == 0 ? null : - amPluginDescriptorProto.getTaskScedulersList()), + amPluginDescriptorProto.getTaskSchedulersList()), tezYarnEnabled, uberEnabled); containerLauncherDescriptors = parsePlugin(containerLaunchers,