Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BCCF4102BA for ; Wed, 14 Aug 2013 21:07:06 +0000 (UTC) Received: (qmail 62800 invoked by uid 500); 14 Aug 2013 21:07:06 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 62771 invoked by uid 500); 14 Aug 2013 21:07:06 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 62764 invoked by uid 99); 14 Aug 2013 21:07:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Aug 2013 21:07:06 +0000 X-ASF-Spam-Status: No, hits=-2002.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 14 Aug 2013 21:07:03 +0000 Received: (qmail 62619 invoked by uid 99); 14 Aug 2013 21:06:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Aug 2013 21:06:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 08185836115; Wed, 14 Aug 2013 21:06:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-354. Make user payload an optional field for {Input, Output, Processor}Descriptor. Contributed by Gunther Hagleitner Date: Wed, 14 Aug 2013 21:06:42 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master a9b6ab16b -> 15a411e71 TEZ-354. Make user payload an optional field for {Input,Output,Processor}Descriptor. Contributed by Gunther Hagleitner Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/15a411e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/15a411e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/15a411e7 Branch: refs/heads/master Commit: 15a411e716b49fa33101ea12e00f42451fa95726 Parents: a9b6ab1 Author: Siddharth Seth Authored: Wed Aug 14 14:05:49 2013 -0700 Committer: Siddharth Seth Committed: Wed Aug 14 14:05:49 2013 -0700 ---------------------------------------------------------------------- .../apache/tez/dag/api/DagTypeConverters.java | 6 +- .../org/apache/tez/dag/api/InputDescriptor.java | 13 ++- .../apache/tez/dag/api/OutputDescriptor.java | 11 +- .../apache/tez/dag/api/ProcessorDescriptor.java | 10 +- .../apache/tez/dag/api/TezEntityDescriptor.java | 14 ++- .../org/apache/tez/dag/api/TestDAGPlan.java | 16 +-- .../org/apache/tez/dag/api/TestDAGVerify.java | 106 +++++++++---------- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 2 +- .../tez/dag/app/dag/impl/TestTaskImpl.java | 2 +- .../dag/app/dag/impl/TestVertexScheduler.java | 12 +-- .../tez/dag/app/rm/TestContainerReuse.java | 2 +- .../tez/mapreduce/examples/MRRSleepJob.java | 12 +-- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 26 ++--- .../apache/hadoop/mapred/LocalJobRunnerTez.java | 4 +- .../tez/mapreduce/processor/MapUtils.java | 2 +- .../processor/reduce/TestReduceProcessor.java | 2 +- .../org/apache/tez/mapreduce/YARNRunner.java | 8 +- 17 files changed, 133 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 392558b..143186d 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -243,7 +243,7 @@ public class DagTypeConverters { if (proto.hasUserPayload()) { bb = proto.getUserPayload().toByteArray(); } - return new InputDescriptor(className, bb); + return new InputDescriptor(className).setUserPayload(bb); } public static OutputDescriptor convertOutputDescriptorFromDAGPlan( @@ -253,7 +253,7 @@ public class DagTypeConverters { if (proto.hasUserPayload()) { bb = proto.getUserPayload().toByteArray(); } - return new OutputDescriptor(className, bb); + return new OutputDescriptor(className).setUserPayload(bb); } public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan( @@ -263,6 +263,6 @@ public class DagTypeConverters { if (proto.hasUserPayload()) { bb = proto.getUserPayload().toByteArray(); } - return new ProcessorDescriptor(className, bb); + return new ProcessorDescriptor(className).setUserPayload(bb); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java index 829ff1e..dea9001 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java @@ -19,9 +19,14 @@ package org.apache.tez.dag.api; public class InputDescriptor extends TezEntityDescriptor { - - // TODO Fix dependencies so that this can be specified as a class. - public InputDescriptor(String inputClassName, byte[] userPayload) { - super(inputClassName, userPayload); + + public InputDescriptor(String inputClassName) { + super(inputClassName); + } + + @Override + public InputDescriptor setUserPayload(byte[] userPayload) { + this.userPayload = userPayload; + return this; } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java index afa7586..16fb9b1 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java @@ -20,8 +20,13 @@ package org.apache.tez.dag.api; public class OutputDescriptor extends TezEntityDescriptor { - // TODO Fix dependencies so that this can be specified as a class. - public OutputDescriptor(String outputClassName, byte[] userPayload) { - super(outputClassName, userPayload); + public OutputDescriptor(String outputClassName) { + super(outputClassName); + } + + @Override + public OutputDescriptor setUserPayload(byte[] userPayload) { + this.userPayload = userPayload; + return this; } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java index 0d04214..092147d 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java @@ -20,8 +20,12 @@ package org.apache.tez.dag.api; public class ProcessorDescriptor extends TezEntityDescriptor { - // TODO Fix dependencies so that this can be specified as a class. - public ProcessorDescriptor(String processorClassName, byte[] userPayload) { - super(processorClassName, userPayload); + public ProcessorDescriptor(String processorClassName) { + super(processorClassName); + } + + public ProcessorDescriptor setUserPayload(byte[] userPayload) { + this.userPayload = userPayload; + return this; } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java index e78a5a1..9d4b2c4 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java @@ -20,18 +20,22 @@ package org.apache.tez.dag.api; public abstract class TezEntityDescriptor { - private byte[] userPayload; + protected byte[] userPayload; private String className; - - public TezEntityDescriptor(String className, byte[] userPayload) { - this.userPayload = userPayload; + + public TezEntityDescriptor(String className) { this.className = className; } - + public byte[] getUserPayload() { return this.userPayload; } + public TezEntityDescriptor setUserPayload(byte[] userPayload) { + this.userPayload = userPayload; + return this; + } + public String getClassName() { return this.className; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java index ce26b35..8dd0826 100644 --- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java +++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java @@ -95,10 +95,10 @@ public class TestDAGPlan { @Test public void testUserPayloadSerde() { DAG dag = new DAG("testDag"); - ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1", - "processor1Bytes".getBytes()); - ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2", - "processor2Bytes".getBytes()); + ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1"). + setUserPayload("processor1Bytes".getBytes()); + ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2"). + setUserPayload("processor2Bytes".getBytes()); Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1)); Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1)); v1.setJavaOpts("").setTaskEnvironment(new HashMap()) @@ -106,10 +106,10 @@ public class TestDAGPlan { v2.setJavaOpts("").setTaskEnvironment(new HashMap()) .setTaskLocalResources(new HashMap()); - InputDescriptor inputDescriptor = new InputDescriptor("input", - "inputBytes".getBytes()); - OutputDescriptor outputDescriptor = new OutputDescriptor("output", - "outputBytes".getBytes()); + InputDescriptor inputDescriptor = new InputDescriptor("input"). + setUserPayload("inputBytes".getBytes()); + OutputDescriptor outputDescriptor = new OutputDescriptor("output"). + setUserPayload("outputBytes".getBytes()); Edge edge = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, outputDescriptor, inputDescriptor)); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java index 2a7b8ff..e66df54 100644 --- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java +++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java @@ -38,15 +38,15 @@ public class TestDAGVerify { @Test public void testVerify1() { Vertex v1 = new Vertex("v1", - new ProcessorDescriptor(dummyProcessorClassName, null), + new ProcessorDescriptor(dummyProcessorClassName), dummyTaskCount, dummyTaskResource); Vertex v2 = new Vertex("v2", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor(dummyOutputClassName, null), - new InputDescriptor(dummyInputClassName, null))); + new OutputDescriptor(dummyOutputClassName), + new InputDescriptor(dummyInputClassName))); DAG dag = new DAG("testDag"); dag.addVertex(v1); dag.addVertex(v2); @@ -63,33 +63,33 @@ public class TestDAGVerify { public void testCycle1() { IllegalStateException ex=null; Vertex v1 = new Vertex("v1", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v2 = new Vertex("v2", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v3 = new Vertex("v3", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v4 = new Vertex("v4", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); Edge e4 = new Edge(v4, v1, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); DAG dag = new DAG("testDag"); dag.addVertex(v1); dag.addVertex(v2); @@ -119,33 +119,33 @@ public class TestDAGVerify { public void testCycle2() { IllegalStateException ex=null; Vertex v1 = new Vertex("v1", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v2 = new Vertex("v2", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v3 = new Vertex("v3", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v4 = new Vertex("v4", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); Edge e4 = new Edge(v3, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); DAG dag = new DAG("testDag"); dag.addVertex(v1); dag.addVertex(v2); @@ -170,10 +170,10 @@ public class TestDAGVerify { public void repeatedVertexName() { IllegalStateException ex=null; Vertex v1 = new Vertex("v1", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v1repeat = new Vertex("v1", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); DAG dag = new DAG("testDag"); dag.addVertex(v1); @@ -197,22 +197,22 @@ public class TestDAGVerify { IllegalStateException ex=null; try { Vertex v1 = new Vertex("v1", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v2 = new Vertex("v2", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v3 = new Vertex("v3", - new ProcessorDescriptor("ReduceProcessor", null), + new ProcessorDescriptor("ReduceProcessor"), dummyTaskCount, dummyTaskResource); Edge e1 = new Edge(v1, v3, new EdgeProperty(ConnectionPattern.ONE_TO_ONE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); DAG dag = new DAG("testDag"); dag.addVertex(v1); dag.addVertex(v2); @@ -236,22 +236,22 @@ public class TestDAGVerify { @Test public void BinaryInputAllowed() { Vertex v1 = new Vertex("v1", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v2 = new Vertex("v2", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v3 = new Vertex("v3", - new ProcessorDescriptor("ReduceProcessor", null), + new ProcessorDescriptor("ReduceProcessor"), dummyTaskCount, dummyTaskResource); Edge e1 = new Edge(v1, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); DAG dag = new DAG("testDag"); dag.addVertex(v1); dag.addVertex(v2); @@ -269,22 +269,22 @@ public class TestDAGVerify { IllegalStateException ex=null; try { Vertex v1 = new Vertex("v1", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v2 = new Vertex("v2", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Vertex v3 = new Vertex("v3", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), dummyTaskCount, dummyTaskResource); Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); Edge e2 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor("dummy output class", null), - new InputDescriptor("dummy input class", null))); + new OutputDescriptor("dummy output class"), + new InputDescriptor("dummy input class"))); DAG dag = new DAG("testDag"); dag.addVertex(v1); dag.addVertex(v2); @@ -322,7 +322,7 @@ public class TestDAGVerify { public void testInvalidVertexConstruction() { try { Vertex v1 = new Vertex("v1", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), 0, dummyTaskResource); Assert.fail("Expected exception for 0 parallelism"); } catch (IllegalArgumentException e) { @@ -330,7 +330,7 @@ public class TestDAGVerify { } try { Vertex v1 = new Vertex("v1", - new ProcessorDescriptor("MapProcessor", null), + new ProcessorDescriptor("MapProcessor"), 1, null); Assert.fail("Expected exception for 0 parallelism"); } catch (IllegalArgumentException e) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index d9210d9..2cc00dc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -93,7 +93,7 @@ public class TestTaskAttempt { private static final ProcessorDescriptor MAP_PROCESSOR_DESC = new ProcessorDescriptor( - "org.apache.tez.mapreduce.processor.map.MapProcessor", null); + "org.apache.tez.mapreduce.processor.map.MapProcessor"); static public class StubbedFS extends RawLocalFileSystem { @Override http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 922c0a6..412fcb5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -114,7 +114,7 @@ public class TestTaskImpl { javaOpts = ""; leafVertex = false; mapProcDesc = new ProcessorDescriptor( - "org.apache.tez.mapreduce.processor.map.MapProcessor", null); + "org.apache.tez.mapreduce.processor.map.MapProcessor"); mockTask = new MockTaskImpl(vertexId, partition, dispatcher.getEventHandler(), conf, taskAttemptListener, jobToken, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java index 9102897..a2248ca 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java @@ -52,22 +52,22 @@ public class TestVertexScheduler { TezVertexID mockSrcVertexId1 = new TezVertexID(dagId, 1); EdgeProperty eProp1 = new EdgeProperty( EdgeProperty.ConnectionPattern.BIPARTITE, - EdgeProperty.SourceType.STABLE, new OutputDescriptor("out", null), - new InputDescriptor("in", null)); + EdgeProperty.SourceType.STABLE, new OutputDescriptor("out"), + new InputDescriptor("in")); when(mockSrcVertex1.getVertexId()).thenReturn(mockSrcVertexId1); Vertex mockSrcVertex2 = mock(Vertex.class); TezVertexID mockSrcVertexId2 = new TezVertexID(dagId, 2); EdgeProperty eProp2 = new EdgeProperty( EdgeProperty.ConnectionPattern.BIPARTITE, - EdgeProperty.SourceType.STABLE, new OutputDescriptor("out", null), - new InputDescriptor("in", null)); + EdgeProperty.SourceType.STABLE, new OutputDescriptor("out"), + new InputDescriptor("in")); when(mockSrcVertex2.getVertexId()).thenReturn(mockSrcVertexId2); Vertex mockSrcVertex3 = mock(Vertex.class); TezVertexID mockSrcVertexId3 = new TezVertexID(dagId, 3); EdgeProperty eProp3 = new EdgeProperty( EdgeProperty.ConnectionPattern.ONE_TO_ALL, - EdgeProperty.SourceType.STABLE, new OutputDescriptor("out", null), - new InputDescriptor("in", null)); + EdgeProperty.SourceType.STABLE, new OutputDescriptor("out"), + new InputDescriptor("in")); when(mockSrcVertex3.getVertexId()).thenReturn(mockSrcVertexId3); Vertex mockManagedVertex = mock(Vertex.class); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 056239b..0a75f20 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -193,7 +193,7 @@ public class TestContainerReuse { AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest( taID, capability, new HashMap(), new TezEngineTaskContext(taID, "user", "jobName", "vertexName", - new ProcessorDescriptor("processorClassName", null), + new ProcessorDescriptor("processorClassName"), Collections.singletonList(new InputSpec("vertexName", 1, "inputClassName")), Collections.singletonList(new OutputSpec( "vertexName", 1, "outputClassName"))), ta, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index 62cc96c..525dcc9 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -532,7 +532,7 @@ public class MRRSleepJob extends Configured implements Tool { List vertices = new ArrayList(); Vertex mapVertex = new Vertex("map", new ProcessorDescriptor( - MapProcessor.class.getName(), + MapProcessor.class.getName()).setUserPayload( MRHelpers.createUserPayloadFromConf(mapStageConf)), numMapper, MRHelpers.getMapResource(mapStageConf)); @@ -555,8 +555,8 @@ public class MRRSleepJob extends Configured implements Tool { Configuration iconf = intermediateReduceStageConfs[i]; Vertex ivertex = new Vertex("ireduce" + (i+1), - new ProcessorDescriptor(ReduceProcessor.class.getName(), - MRHelpers.createUserPayloadFromConf(iconf)), + new ProcessorDescriptor(ReduceProcessor.class.getName()). + setUserPayload(MRHelpers.createUserPayloadFromConf(iconf)), numIReducer, MRHelpers.getReduceResource(iconf)); ivertex.setJavaOpts(MRHelpers.getReduceJavaOpts(iconf)); @@ -571,7 +571,7 @@ public class MRRSleepJob extends Configured implements Tool { Vertex finalReduceVertex = null; if (numReducer > 0) { finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor( - ReduceProcessor.class.getName(), + ReduceProcessor.class.getName()).setUserPayload( MRHelpers.createUserPayloadFromConf(finalReduceConf)), numReducer, MRHelpers.getReduceResource(finalReduceConf)); @@ -591,9 +591,9 @@ public class MRRSleepJob extends Configured implements Tool { vertices.get(i), new EdgeProperty( ConnectionPattern.BIPARTITE, SourceType.STABLE, new OutputDescriptor( - OnFileSortedOutput.class.getName(), null), + OnFileSortedOutput.class.getName()), new InputDescriptor( - ShuffledMergedInput.class.getName(), null)))); + ShuffledMergedInput.class.getName())))); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index 67be44a..83dc927 100644 --- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -269,23 +269,23 @@ public class TestMRRJobsDAGApi { DAG dag = new DAG("testMRRSleepJobDagSubmit"); Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor( - MapProcessor.class.getName(), + MapProcessor.class.getName()).setUserPayload( MRHelpers.createUserPayloadFromConf(stage1Conf)), inputSplitInfo.getNumTasks(), Resource.newInstance(256, 1)); Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor( - ReduceProcessor.class.getName(), + ReduceProcessor.class.getName()).setUserPayload( MRHelpers.createUserPayloadFromConf(stage2Conf)), 1, Resource.newInstance(256, 1)); Vertex stage11Vertex = new Vertex("map1", new ProcessorDescriptor( - MapProcessor.class.getName(), + MapProcessor.class.getName()).setUserPayload( MRHelpers.createUserPayloadFromConf(stage1Conf)), inputSplitInfo1.getNumTasks(), Resource.newInstance(256, 1)); Vertex stage22Vertex = new Vertex("ireduce1", new ProcessorDescriptor( - ReduceProcessor.class.getName(), + ReduceProcessor.class.getName()).setUserPayload( MRHelpers.createUserPayloadFromConf(stage22Conf)), 2, Resource.newInstance(256, 1)); Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor( - ReduceProcessor.class.getName(), + ReduceProcessor.class.getName()).setUserPayload( MRHelpers.createUserPayloadFromConf(stage3Conf)), 1, Resource.newInstance(256, 1)); @@ -359,20 +359,20 @@ public class TestMRRJobsDAGApi { Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty( ConnectionPattern.BIPARTITE, SourceType.STABLE, new OutputDescriptor( - OnFileSortedOutput.class.getName(), null), new InputDescriptor( - ShuffledMergedInput.class.getName(), null))); + OnFileSortedOutput.class.getName()), new InputDescriptor( + ShuffledMergedInput.class.getName()))); Edge edge11 = new Edge(stage11Vertex, stage22Vertex, new EdgeProperty( ConnectionPattern.BIPARTITE, SourceType.STABLE, new OutputDescriptor( - OnFileSortedOutput.class.getName(), null), new InputDescriptor( - ShuffledMergedInput.class.getName(), null))); + OnFileSortedOutput.class.getName()), new InputDescriptor( + ShuffledMergedInput.class.getName()))); Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty( ConnectionPattern.BIPARTITE, SourceType.STABLE, new OutputDescriptor( - OnFileSortedOutput.class.getName(), null), new InputDescriptor( - ShuffledMergedInput.class.getName(), null))); + OnFileSortedOutput.class.getName()), new InputDescriptor( + ShuffledMergedInput.class.getName()))); Edge edge3 = new Edge(stage22Vertex, stage3Vertex, new EdgeProperty( ConnectionPattern.BIPARTITE, SourceType.STABLE, new OutputDescriptor( - OnFileSortedOutput.class.getName(), null), new InputDescriptor( - ShuffledMergedInput.class.getName(), null))); + OnFileSortedOutput.class.getName()), new InputDescriptor( + ShuffledMergedInput.class.getName()))); dag.addEdge(edge1); dag.addEdge(edge11); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java index 41e88a2..30ba8f8 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java @@ -246,7 +246,7 @@ public class LocalJobRunnerTez implements ClientProtocol { mapIds.add(mapId); // FIXME invalid task context ProcessorDescriptor mapProcessorDesc = new ProcessorDescriptor( - MapProcessor.class.getName(), null); + MapProcessor.class.getName()); TezEngineTaskContext taskContext = new TezEngineTaskContext( tezMapId, user, localConf.getJobName(), "TODO_vertexName", @@ -451,7 +451,7 @@ public class LocalJobRunnerTez implements ClientProtocol { setupChildMapredLocalDirs(reduceId, user, localConf); // FIXME invalid task context ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor( - ReduceProcessor.class.getName(), null); + ReduceProcessor.class.getName()); TezEngineTaskContext taskContext = new TezEngineTaskContext( IDConverter.fromMRTaskAttemptId(reduceId), user, localConf.getJobName(), "TODO_vertexName", http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index f2f0c18..2bc327c 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -183,7 +183,7 @@ public class MapUtils { InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput); ProcessorDescriptor mapProcessorDesc = new ProcessorDescriptor( - MapProcessor.class.getName(), null); + MapProcessor.class.getName()); writeSplitFiles(fs, jobConf, split); TezEngineTaskContext taskContext = new TezEngineTaskContext( TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0), "testuser", http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index 1ae3539..7562265 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -134,7 +134,7 @@ public class TestReduceProcessor { "localized-resources").toUri().toString()); FileOutputFormat.setOutputPath(reduceConf, new Path(workDir, "output")); ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor( - ReduceProcessor.class.getName(), null); + ReduceProcessor.class.getName()); // Now run a reduce TezEngineTaskContext taskContext = new TezEngineTaskContext( TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0), "testUser", http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/15a411e7/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java index 75a79c3..40a9563 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java @@ -400,8 +400,8 @@ public class YARNRunner implements ClientProtocol { Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf) : MRHelpers.getReduceResource(stageConf); - Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor( - processorName, MRHelpers.createUserPayloadFromConf(stageConf)), + Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName). + setUserPayload(MRHelpers.createUserPayloadFromConf(stageConf)), numTasks, taskResource); Map taskEnv = new HashMap(); @@ -461,8 +461,8 @@ public class YARNRunner implements ClientProtocol { if (i > 0) { EdgeProperty edgeProperty = new EdgeProperty( ConnectionPattern.BIPARTITE, SourceType.STABLE, - new OutputDescriptor(OnFileSortedOutput.class.getName(), null), - new InputDescriptor(ShuffledMergedInput.class.getName(), null)); + new OutputDescriptor(OnFileSortedOutput.class.getName()), + new InputDescriptor(ShuffledMergedInput.class.getName())); Edge edge = null; edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);