Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E871F17E47 for ; Wed, 29 Oct 2014 21:23:09 +0000 (UTC) Received: (qmail 86354 invoked by uid 500); 29 Oct 2014 21:23:06 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 86313 invoked by uid 500); 29 Oct 2014 21:23:06 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 86304 invoked by uid 99); 29 Oct 2014 21:23:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Oct 2014 21:23:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 88DD791A1FE; Wed, 29 Oct 2014 21:23:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: <9ad5f583d2f04013ac02138b80775f59@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1666. UserPayload should be null if the payload is not specified. (sseth) (cherry picked from commit 4e69bed5c7dbc68d17f3d4648838e5acfc52c3eb) Date: Wed, 29 Oct 2014 21:23:06 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.5 7d469cb5d -> a2ec913a0 TEZ-1666. UserPayload should be null if the payload is not specified. (sseth) (cherry picked from commit 4e69bed5c7dbc68d17f3d4648838e5acfc52c3eb) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a2ec913a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a2ec913a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a2ec913a Branch: refs/heads/branch-0.5 Commit: a2ec913a08919905227829b708e63f2614815118 Parents: 7d469cb Author: Siddharth Seth Authored: Wed Oct 29 14:22:03 2014 -0700 Committer: Siddharth Seth Committed: Wed Oct 29 14:22:57 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../apache/tez/dag/api/DagTypeConverters.java | 56 ++-- tez-api/src/main/proto/DAGApiRecords.proto | 8 +- .../org/apache/tez/dag/api/TestDAGPlan.java | 18 +- .../tez/dag/api/TestDagTypeConverters.java | 4 +- .../org/apache/tez/dag/app/dag/impl/Edge.java | 11 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 10 +- .../tez/dag/app/dag/impl/TestDAGImpl.java | 117 ++++---- .../tez/dag/app/dag/impl/TestVertexImpl.java | 268 ++++++++++--------- .../TestHistoryEventsProtoConversion.java | 3 +- .../runtime/api/impl/TezInputContextImpl.java | 2 +- .../api/impl/TezMergedInputContextImpl.java | 2 +- .../runtime/api/impl/TezOutputContextImpl.java | 2 +- .../api/impl/TezProcessorContextImpl.java | 2 +- .../examples/BroadcastAndOneToOneExample.java | 3 +- 15 files changed, 275 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6de608f..3c6d948 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,9 @@ Apache Tez Change Log Release 0.5.2: Unreleased INCOMPATIBLE CHANGES + TEZ-1666. UserPayload should be null if the payload is not specified. + 0.5.1 client cannot talk to 0.5.2 AMs (TEZ-1666 and TEZ-1664). + context.getUserPayload can now return null, apps may need to add defensive code. ALL CHANGES: TEZ-1620. Wait for application finish before stopping MiniTezCluster http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 179f3cc..17807d3 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -306,9 +306,13 @@ public class DagTypeConverters { builder.setClassName(descriptor.getClassName()); UserPayload userPayload = descriptor.getUserPayload(); - if (userPayload != null && userPayload.hasPayload()) { - builder.setUserPayload(ByteString.copyFrom(descriptor.getUserPayload().getPayload())); - builder.setVersion(userPayload.getVersion()); + if (userPayload != null) { + DAGProtos.TezUserPayloadProto.Builder payloadBuilder = DAGProtos.TezUserPayloadProto.newBuilder(); + if (userPayload.hasPayload()) { + payloadBuilder.setUserPayload(ByteString.copyFrom(userPayload.getPayload())); + payloadBuilder.setVersion(userPayload.getVersion()); + } + builder.setTezUserPayload(payloadBuilder.build()); } if (descriptor.getHistoryText() != null) { try { @@ -348,62 +352,84 @@ public class DagTypeConverters { private static UserPayload convertTezUserPayloadFromDAGPlan( TezEntityDescriptorProto proto) { UserPayload userPayload = null; - if (proto.hasUserPayload()) { - userPayload = - UserPayload.create(proto.getUserPayload().asReadOnlyByteBuffer(), proto.getVersion()); - } else { - userPayload = UserPayload.create(null, -1); + if (proto.hasTezUserPayload()) { + if (proto.getTezUserPayload().hasUserPayload()) { + userPayload = + UserPayload.create(proto.getTezUserPayload().getUserPayload().asReadOnlyByteBuffer(), proto.getTezUserPayload().getVersion()); + } else { + userPayload = UserPayload.create(null); + } } return userPayload; } + private static void setUserPayload(EntityDescriptor entity, UserPayload payload) { + if (payload != null) { + entity.setUserPayload(payload); + } + } + public static InputDescriptor convertInputDescriptorFromDAGPlan( TezEntityDescriptorProto proto) { String className = proto.getClassName(); UserPayload payload = convertTezUserPayloadFromDAGPlan(proto); - return InputDescriptor.create(className).setUserPayload(payload); + InputDescriptor id = InputDescriptor.create(className); + setUserPayload(id, payload); + return id; } public static OutputDescriptor convertOutputDescriptorFromDAGPlan( TezEntityDescriptorProto proto) { String className = proto.getClassName(); UserPayload payload = convertTezUserPayloadFromDAGPlan(proto); - return OutputDescriptor.create(className).setUserPayload(payload); + OutputDescriptor od = OutputDescriptor.create(className); + setUserPayload(od, payload); + return od; } public static InputInitializerDescriptor convertInputInitializerDescriptorFromDAGPlan( TezEntityDescriptorProto proto) { String className = proto.getClassName(); UserPayload payload = convertTezUserPayloadFromDAGPlan(proto); - return InputInitializerDescriptor.create(className).setUserPayload(payload); + InputInitializerDescriptor iid = InputInitializerDescriptor.create(className); + setUserPayload(iid, payload); + return iid; } public static OutputCommitterDescriptor convertOutputCommitterDescriptorFromDAGPlan( TezEntityDescriptorProto proto) { String className = proto.getClassName(); UserPayload payload = convertTezUserPayloadFromDAGPlan(proto); - return OutputCommitterDescriptor.create(className).setUserPayload(payload); + OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create(className); + setUserPayload(ocd, payload); + return ocd; } public static VertexManagerPluginDescriptor convertVertexManagerPluginDescriptorFromDAGPlan( TezEntityDescriptorProto proto) { String className = proto.getClassName(); UserPayload payload = convertTezUserPayloadFromDAGPlan(proto); - return VertexManagerPluginDescriptor.create(className).setUserPayload(payload); + VertexManagerPluginDescriptor vmpd = VertexManagerPluginDescriptor.create(className); + setUserPayload(vmpd, payload); + return vmpd; } public static EdgeManagerPluginDescriptor convertEdgeManagerPluginDescriptorFromDAGPlan( TezEntityDescriptorProto proto) { String className = proto.getClassName(); UserPayload payload = convertTezUserPayloadFromDAGPlan(proto); - return EdgeManagerPluginDescriptor.create(className).setUserPayload(payload); + EdgeManagerPluginDescriptor empd = EdgeManagerPluginDescriptor.create(className); + setUserPayload(empd, payload); + return empd; } public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan( TezEntityDescriptorProto proto) { String className = proto.getClassName(); UserPayload payload = convertTezUserPayloadFromDAGPlan(proto); - return ProcessorDescriptor.create(className).setUserPayload(payload); + ProcessorDescriptor pd = ProcessorDescriptor.create(className); + setUserPayload(pd, payload); + return pd; } public static TezAppMasterStatus convertTezSessionStatusFromProto( http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-api/src/main/proto/DAGApiRecords.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto index 04aa791..177faba 100644 --- a/tez-api/src/main/proto/DAGApiRecords.proto +++ b/tez-api/src/main/proto/DAGApiRecords.proto @@ -112,9 +112,13 @@ message PlanTaskConfiguration { message TezEntityDescriptorProto { optional string class_name = 1; - optional bytes user_payload = 2; + optional TezUserPayloadProto tez_user_payload = 2; optional bytes history_text = 3; - optional int32 version = 4; +} + +message TezUserPayloadProto { + optional bytes user_payload = 1; + optional int32 version = 2; } message RootInputLeafOutputProto { http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java index 8cbd611..fccbb08 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java @@ -167,19 +167,19 @@ public class TestDAGPlan { VertexPlan v2Proto = dagProto.getVertex(1); EdgePlan edgeProto = dagProto.getEdge(0); - assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor() + assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor().getTezUserPayload() .getUserPayload().toByteArray())); assertEquals("processor1", v1Proto.getProcessorDescriptor().getClassName()); - assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor() + assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor().getTezUserPayload() .getUserPayload().toByteArray())); assertEquals("processor2", v2Proto.getProcessorDescriptor().getClassName()); - assertEquals("inputBytes", new String(edgeProto.getEdgeDestination() + assertEquals("inputBytes", new String(edgeProto.getEdgeDestination().getTezUserPayload() .getUserPayload().toByteArray())); assertEquals("input", edgeProto.getEdgeDestination().getClassName()); - assertEquals("outputBytes", new String(edgeProto.getEdgeSource() + assertEquals("outputBytes", new String(edgeProto.getEdgeSource().getTezUserPayload() .getUserPayload().toByteArray())); assertEquals("output", edgeProto.getEdgeSource().getClassName()); @@ -235,8 +235,8 @@ public class TestDAGPlan { EdgePlan edgeProto = dagProto.getEdge(0); // either v1 or v2 will be on top based on topological order - String v1ProtoPayload = new String(v1Proto.getProcessorDescriptor().getUserPayload().toByteArray()); - String v2ProtoPayload = new String(v2Proto.getProcessorDescriptor().getUserPayload().toByteArray()); + String v1ProtoPayload = new String(v1Proto.getProcessorDescriptor().getTezUserPayload().getUserPayload().toByteArray()); + String v2ProtoPayload = new String(v2Proto.getProcessorDescriptor().getTezUserPayload().getUserPayload().toByteArray()); assertTrue(v1ProtoPayload.equals("processor1Bytes") || v1ProtoPayload.equals("processor3Bytes")); assertTrue(v2ProtoPayload.equals("processor1Bytes") || v2ProtoPayload.equals("processor3Bytes")); assertTrue(v1Proto.getProcessorDescriptor().getClassName().equals("processor1") || @@ -244,15 +244,15 @@ public class TestDAGPlan { assertTrue(v2Proto.getProcessorDescriptor().getClassName().equals("processor1") || v2Proto.getProcessorDescriptor().getClassName().equals("processor3")); - assertEquals("processor2Bytes", new String(v3Proto.getProcessorDescriptor() + assertEquals("processor2Bytes", new String(v3Proto.getProcessorDescriptor().getTezUserPayload() .getUserPayload().toByteArray())); assertEquals("processor2", v3Proto.getProcessorDescriptor().getClassName()); - assertEquals("inputBytes", new String(edgeProto.getEdgeDestination() + assertEquals("inputBytes", new String(edgeProto.getEdgeDestination().getTezUserPayload() .getUserPayload().toByteArray())); assertEquals("input", edgeProto.getEdgeDestination().getClassName()); - assertEquals("outputBytes", new String(edgeProto.getEdgeSource() + assertEquals("outputBytes", new String(edgeProto.getEdgeSource().getTezUserPayload() .getUserPayload().toByteArray())); assertEquals("output", edgeProto.getEdgeSource().getClassName()); http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java index 9a1cb07..13347bb 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java @@ -37,8 +37,8 @@ public class TestDagTypeConverters { .setHistoryText(historytext); TezEntityDescriptorProto proto = DagTypeConverters.convertToDAGPlan(entityDescriptor); - Assert.assertEquals(payload.getVersion(), proto.getVersion()); - Assert.assertArrayEquals(payload.deepCopyAsArray(), proto.getUserPayload().toByteArray()); + Assert.assertEquals(payload.getVersion(), proto.getTezUserPayload().getVersion()); + Assert.assertArrayEquals(payload.deepCopyAsArray(), proto.getTezUserPayload().getUserPayload().toByteArray()); Assert.assertTrue(proto.hasHistoryText()); Assert.assertNotEquals(historytext, proto.getHistoryText()); Assert.assertEquals(historytext, new String( http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index 57d742f..360a839 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -115,25 +115,22 @@ public class Edge { private void createEdgeManager() { switch (edgeProperty.getDataMovementType()) { case ONE_TO_ONE: - edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null)); + edgeManagerContext = new EdgeManagerPluginContextImpl(null); edgeManager = new OneToOneEdgeManager(edgeManagerContext); break; case BROADCAST: - edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null)); + edgeManagerContext = new EdgeManagerPluginContextImpl(null); edgeManager = new BroadcastEdgeManager(edgeManagerContext); break; case SCATTER_GATHER: - edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null)); + edgeManagerContext = new EdgeManagerPluginContextImpl(null); edgeManager = new ScatterGatherEdgeManager(edgeManagerContext); break; case CUSTOM: if (edgeProperty.getEdgeManagerDescriptor() != null) { UserPayload payload = null; - if (edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null && - edgeProperty.getEdgeManagerDescriptor().getUserPayload().hasPayload()) { + if (edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null) { payload = edgeProperty.getEdgeManagerDescriptor().getUserPayload(); - } else { - payload = UserPayload.create(null); } edgeManagerContext = new EdgeManagerPluginContextImpl(payload); String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName(); http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 50e8b0c..b8e99d4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -69,7 +69,6 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; @@ -2032,15 +2031,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.info("Setting vertexManager to RootInputVertexManager for " + logIdentifier); vertexManager = new VertexManager( - VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()) - .setUserPayload(UserPayload.create(null)), + VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()), this, appContext); } else if (hasOneToOne && !hasCustom) { LOG.info("Setting vertexManager to InputReadyVertexManager for " + logIdentifier); vertexManager = new VertexManager( - VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()) - .setUserPayload(UserPayload.create(null)), + VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()), this, appContext); } else if (hasBipartite && !hasCustom) { LOG.info("Setting vertexManager to ShuffleVertexManager for " @@ -2053,8 +2050,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.info("Setting vertexManager to ImmediateStartVertexManager for " + logIdentifier); vertexManager = new VertexManager( - VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()) - .setUserPayload(UserPayload.create(null)), + VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()), this, appContext); } } http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index dc90a6f..af48c49 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -650,65 +650,67 @@ public class TestDAGImpl { .setName("testverteximpl") .addVertex( VertexPlan.newBuilder() - .setName("vertex1") - .setType(PlanVertexType.NORMAL) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder() - .addHost("host1") - .addRack("rack1") - .build() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host1") + .addRack("rack1") + .build() ) - .setTaskConfig( - PlanTaskConfiguration.newBuilder() - .setNumTasks(1) - .setVirtualCores(4) - .setMemoryMb(1024) - .setJavaOpts("") - .setTaskModule("x1.y1") - .build() - ) - .addOutEdgeId("e1") - .build() - ) - .addVertex( - VertexPlan.newBuilder() - .setName("vertex2") - .setType(PlanVertexType.NORMAL) - .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x2.y2")) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder() - .addHost("host2") - .addRack("rack2") - .build() + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() ) - .setTaskConfig( - PlanTaskConfiguration.newBuilder() - .setNumTasks(2) - .setVirtualCores(4) - .setMemoryMb(1024) - .setJavaOpts("foo") - .setTaskModule("x2.y2") + .addOutEdgeId("e1") .build() - ) - .addInEdgeId("e1") - .build() - ) + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex2") + .setType(PlanVertexType.NORMAL) + .setProcessorDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName("x2.y2")) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host2") + .addRack("rack2") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("foo") + .setTaskModule("x2.y2") + .build() + ) + .addInEdgeId("e1") + .build() + ) .addEdge( - EdgePlan.newBuilder() - .setEdgeManager(TezEntityDescriptorProto.newBuilder() - .setClassName(CustomizedEdgeManager.class.getName()) - .setUserPayload(ByteString.copyFromUtf8(exLocation.name())) - ) - .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")) - .setInputVertexName("vertex1") - .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1")) - .setOutputVertexName("vertex2") - .setDataMovementType(PlanEdgeDataMovementType.CUSTOM) - .setId("e1") - .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) - .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) - .build() - ) + EdgePlan.newBuilder() + .setEdgeManager(TezEntityDescriptorProto.newBuilder() + .setClassName(CustomizedEdgeManager.class.getName()) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload(ByteString.copyFromUtf8(exLocation.name()))) + ) + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")) + .setInputVertexName("vertex1") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1")) + .setOutputVertexName("vertex2") + .setDataMovementType(PlanEdgeDataMovementType.CUSTOM) + .setId("e1") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) .build(); return dag; } @@ -1135,14 +1137,15 @@ public class TestDAGImpl { TezEntityDescriptorProto .newBuilder() .setClassName(CountingOutputCommitter.class.getName()) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() .setUserPayload( ByteString .copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig( - true, false, false).toUserPayload())).build()) + true, false, false).toUserPayload())).build())) .setName("output3") .setIODescriptor( TezEntityDescriptorProto.newBuilder().setClassName("output.class") - ) + ) .build()); badVertex.setAdditionalOutputs(outputs); http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 72dcd92..55ee05f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -219,7 +219,7 @@ public class TestVertexImpl { @Override public void initialize() throws IOException { - if (getContext().getUserPayload().hasPayload()) { + if (getContext().getUserPayload() != null && getContext().getUserPayload().hasPayload()) { CountingOutputCommitterConfig conf = new CountingOutputCommitterConfig(getContext().getUserPayload()); this.throwError = conf.throwError; @@ -419,8 +419,9 @@ public class TestVertexImpl { ) .addOutEdgeId("e1") .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder() - .setClassName(VertexManagerWithException.class.getName()) - .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes()))) + .setClassName(VertexManagerWithException.class.getName()) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes())))) .build() ) .addVertex( @@ -438,8 +439,9 @@ public class TestVertexImpl { ) .addInEdgeId("e1") .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder() - .setClassName(VertexManagerWithException.class.getName()) - .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes()))) + .setClassName(VertexManagerWithException.class.getName()) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes())))) .build() ) .addEdge( @@ -706,24 +708,24 @@ public class TestVertexImpl { .setControllerDescriptor( TezEntityDescriptorProto.newBuilder().setClassName( initializerClassName)) - .setName("input1") - .setIODescriptor( - TezEntityDescriptorProto.newBuilder() - .setClassName("InputClazz") - .build() - ).build() + .setName("input1") + .setIODescriptor( + TezEntityDescriptorProto.newBuilder() + .setClassName("InputClazz") + .build() + ).build() ) .setTaskConfig( PlanTaskConfiguration.newBuilder() - .setNumTasks(-1) - .setVirtualCores(4) - .setMemoryMb(1024) - .setJavaOpts("") - .setTaskModule("x1.y1") - .build() + .setNumTasks(-1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() ) .addOutEdgeId("e1") - .build() + .build() ) .addVertex( VertexPlan.newBuilder() @@ -737,11 +739,11 @@ public class TestVertexImpl { .setName("input2") .setIODescriptor( TezEntityDescriptorProto.newBuilder() - .setClassName("InputClazz") - .build() + .setClassName("InputClazz") + .build() ) .build() - ) + ) .setTaskConfig( PlanTaskConfiguration.newBuilder() .setNumTasks(-1) @@ -752,7 +754,7 @@ public class TestVertexImpl { .build() ) .addInEdgeId("e1") - .build() + .build() ) .addVertex( VertexPlan.newBuilder() @@ -766,8 +768,8 @@ public class TestVertexImpl { .setName("input3") .setIODescriptor( TezEntityDescriptorProto.newBuilder() - .setClassName("InputClazz") - .build() + .setClassName("InputClazz") + .build() ) .build() ) @@ -782,40 +784,42 @@ public class TestVertexImpl { ) .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder() .setClassName(RootInputSpecUpdaterVertexManager.class.getName()) - .setUserPayload(ByteString.copyFrom(new byte[] {0}))) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload(ByteString.copyFrom(new byte[]{0})))) .build() ) .addVertex( - VertexPlan.newBuilder() - .setName("vertex4") - .setType(PlanVertexType.NORMAL) - .addInputs( - RootInputLeafOutputProto.newBuilder() - .setControllerDescriptor( - TezEntityDescriptorProto.newBuilder().setClassName( - initializerClassName)) - .setName("input4") - .setIODescriptor( - TezEntityDescriptorProto.newBuilder() - .setClassName("InputClazz") - .build() + VertexPlan.newBuilder() + .setName("vertex4") + .setType(PlanVertexType.NORMAL) + .addInputs( + RootInputLeafOutputProto.newBuilder() + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + initializerClassName)) + .setName("input4") + .setIODescriptor( + TezEntityDescriptorProto.newBuilder() + .setClassName("InputClazz") + .build() + ) + .build() ) - .build() - ) - .setTaskConfig( - PlanTaskConfiguration.newBuilder() - .setNumTasks(-1) - .setVirtualCores(4) - .setMemoryMb(1024) - .setJavaOpts("") - .setTaskModule("x3.y3") + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(-1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x3.y3") + .build() + ) + .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder() + .setClassName(RootInputSpecUpdaterVertexManager.class.getName()) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload(ByteString.copyFrom(new byte[]{1})))) .build() ) - .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder() - .setClassName(RootInputSpecUpdaterVertexManager.class.getName()) - .setUserPayload(ByteString.copyFrom(new byte[] {1}))) - .build() - ) .addEdge( EdgePlan.newBuilder() .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")) @@ -827,7 +831,7 @@ public class TestVertexImpl { .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) .build() - ) + ) .build(); return dag; } @@ -1404,71 +1408,71 @@ public class TestVertexImpl { .build() ) .addVertex( - VertexPlan.newBuilder() - .setName("vertex2") - .setType(PlanVertexType.NORMAL) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder() - .addHost("host2") - .addRack("rack2") - .build() - ) - .setTaskConfig( - PlanTaskConfiguration.newBuilder() - .setNumTasks(2) - .setVirtualCores(4) - .setMemoryMb(1024) - .setJavaOpts("") - .setTaskModule("x2.y2") + VertexPlan.newBuilder() + .setName("vertex2") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host2") + .addRack("rack2") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x2.y2") + .build() + ) + .addOutEdgeId("e2") .build() - ) - .addOutEdgeId("e2") - .build() ) .addVertex( - VertexPlan.newBuilder() - .setName("vertex3") - .setType(PlanVertexType.NORMAL) - .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")) - .addTaskLocationHint( - PlanTaskLocationHint.newBuilder() - .addHost("host3") - .addRack("rack3") - .build() - ) - .setTaskConfig( - PlanTaskConfiguration.newBuilder() - .setNumTasks(2) - .setVirtualCores(4) - .setMemoryMb(1024) - .setJavaOpts("foo") - .setTaskModule("x3.y3") + VertexPlan.newBuilder() + .setName("vertex3") + .setType(PlanVertexType.NORMAL) + .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host3") + .addRack("rack3") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("foo") + .setTaskModule("x3.y3") + .build() + ) + .addInEdgeId("e1") + .addInEdgeId("e2") + .addOutEdgeId("e3") + .addOutEdgeId("e4") .build() - ) - .addInEdgeId("e1") - .addInEdgeId("e2") - .addOutEdgeId("e3") - .addOutEdgeId("e4") - .build() ) .addVertex( VertexPlan.newBuilder() .setName("vertex4") .setType(PlanVertexType.NORMAL) .addTaskLocationHint( - PlanTaskLocationHint.newBuilder() - .addHost("host4") - .addRack("rack4") - .build() + PlanTaskLocationHint.newBuilder() + .addHost("host4") + .addRack("rack4") + .build() ) .setTaskConfig( - PlanTaskConfiguration.newBuilder() - .setNumTasks(2) - .setVirtualCores(4) - .setMemoryMb(1024) - .setJavaOpts("") - .setTaskModule("x4.y4") - .build() + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x4.y4") + .build() ) .addInEdgeId("e3") .addOutEdgeId("e5") @@ -1479,19 +1483,19 @@ public class TestVertexImpl { .setName("vertex5") .setType(PlanVertexType.NORMAL) .addTaskLocationHint( - PlanTaskLocationHint.newBuilder() - .addHost("host5") - .addRack("rack5") - .build() + PlanTaskLocationHint.newBuilder() + .addHost("host5") + .addRack("rack5") + .build() ) .setTaskConfig( - PlanTaskConfiguration.newBuilder() - .setNumTasks(2) - .setVirtualCores(4) - .setMemoryMb(1024) - .setJavaOpts("") - .setTaskModule("x5.y5") - .build() + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x5.y5") + .build() ) .addInEdgeId("e4") .addOutEdgeId("e6") @@ -1575,9 +1579,10 @@ public class TestVertexImpl { .setDataMovementType(PlanEdgeDataMovementType.CUSTOM) .setEdgeManager( TezEntityDescriptorProto.newBuilder() - .setClassName(EdgeManagerForTest.class.getName()) - .setUserPayload(ByteString.copyFrom(edgePayload)) - .build()) + .setClassName(EdgeManagerForTest.class.getName()) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload((ByteString.copyFrom(edgePayload)))) + .build()) .setId("e4") .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) @@ -2833,9 +2838,11 @@ public class TestVertexImpl { outputs.add(RootInputLeafOutputProto.newBuilder() .setControllerDescriptor( TezEntityDescriptorProto.newBuilder().setClassName( - CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom( - new CountingOutputCommitter.CountingOutputCommitterConfig() - .toUserPayload())).build()) + CountingOutputCommitter.class.getName()) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload(ByteString.copyFrom( + new CountingOutputCommitter.CountingOutputCommitterConfig() + .toUserPayload())).build())) .setName("output_v2") .setIODescriptor( TezEntityDescriptorProto.newBuilder().setClassName("output.class")) @@ -2946,9 +2953,11 @@ public class TestVertexImpl { outputs.add(RootInputLeafOutputProto.newBuilder() .setControllerDescriptor( TezEntityDescriptorProto.newBuilder().setClassName( - CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom( + CountingOutputCommitter.class.getName()) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload(ByteString.copyFrom( new CountingOutputCommitter.CountingOutputCommitterConfig( - true, true, false).toUserPayload())).build()) + true, true, false).toUserPayload())).build())) .setName("output_v2") .setIODescriptor( TezEntityDescriptorProto.newBuilder().setClassName("output.class")) @@ -2990,9 +2999,11 @@ public class TestVertexImpl { outputs.add(RootInputLeafOutputProto.newBuilder() .setControllerDescriptor( TezEntityDescriptorProto.newBuilder().setClassName( - CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom( + CountingOutputCommitter.class.getName()) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload(ByteString.copyFrom( new CountingOutputCommitter.CountingOutputCommitterConfig( - true, true, true).toUserPayload())).build()) + true, true, true).toUserPayload())).build())) .setName("output_v2") .setIODescriptor( TezEntityDescriptorProto.newBuilder().setClassName("output.class")) @@ -4162,7 +4173,8 @@ public class TestVertexImpl { .setEdgeManager( TezEntityDescriptorProto.newBuilder() .setClassName(EdgeManagerForTest.class.getName()) - .setUserPayload(ByteString.copyFrom(edgePayload)) + .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder() + .setUserPayload(ByteString.copyFrom(edgePayload))) .build()) .setOutputVertexName("M5") .setDataMovementType(PlanEdgeDataMovementType.CUSTOM) http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 903b4fe..ad508b6 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -325,8 +325,7 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getSourceEdgeManagers().size()); Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(), deserializedEvent.getSourceEdgeManagers().get("foo").getClassName()); - Assert.assertNotNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload()); - Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload().getPayload()); + Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload()); Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(), deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName()); Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion(), http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index 420c477..62311e9 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -76,7 +76,7 @@ public class TezInputContextImpl extends TezTaskContextImpl checkNotNull(sourceVertexName, "sourceVertexName is null"); checkNotNull(inputs, "input map is null"); checkNotNull(inputReadyTracker, "inputReadyTracker is null"); - this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload; + this.userPayload = userPayload; this.inputIndex = inputIndex; this.sourceVertexName = sourceVertexName; this.sourceInfo = new EventMetaData( http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java index 693bde8..e71a0bc 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java @@ -46,7 +46,7 @@ public class TezMergedInputContextImpl implements MergedInputContext { checkNotNull(inputReadyTracker, "inputReadyTracker is null"); this.groupInputName = groupInputName; this.groupInputsMap = groupInputsMap; - this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload; + this.userPayload = userPayload; this.inputReadyTracker = inputReadyTracker; this.workDirs = workDirs; } http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index 94df6aa..7b53075 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -69,7 +69,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl auxServiceEnv, memDist, outputDescriptor, objectRegistry); checkNotNull(outputIndex, "outputIndex is null"); checkNotNull(destinationVertexName, "destinationVertexName is null"); - this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload; + this.userPayload = userPayload; this.outputIndex = outputIndex; this.destinationVertexName = destinationVertexName; this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT, http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index 330c4c9..9b4dc6a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -62,7 +62,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce counters, runtimeTask, tezUmbilical, serviceConsumerMetadata, auxServiceEnv, memDist, processorDescriptor, objectRegistry); checkNotNull(inputReadyTracker, "inputReadyTracker is null"); - this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload; + this.userPayload = userPayload; this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR, taskVertexName, "", taskAttemptID); this.inputReadyTracker = inputReadyTracker; http://git-wip-us.apache.org/repos/asf/tez/blob/a2ec913a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java index e8a137c..42b31f1 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java @@ -72,7 +72,8 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool { .next(); KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter(); kvWriter.write(word, new IntWritable(getContext().getTaskIndex())); - ByteBuffer userPayload = getContext().getUserPayload().getPayload(); + ByteBuffer userPayload = + getContext().getUserPayload() == null ? null : getContext().getUserPayload().getPayload(); if (userPayload != null) { boolean doLocalityCheck = getContext().getUserPayload().getPayload().get(0) > 0 ? true : false; if (doLocalityCheck) {