tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-545. Fix TestVertexImpl test failure related to committer. (hitesh)
Date Wed, 09 Oct 2013 22:53:54 GMT
Updated Branches:
  refs/heads/master e2e9247e4 -> 751920750


TEZ-545. Fix TestVertexImpl test failure related to committer. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/75192075
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/75192075
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/75192075

Branch: refs/heads/master
Commit: 751920750e8c3bcab1d17917f51630a62feaa531
Parents: e2e9247
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Oct 9 15:53:34 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed Oct 9 15:53:34 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   6 -
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   9 +
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 226 ++++++++++---------
 3 files changed, 126 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/75192075/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index e677c0f..6bc151c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -947,12 +947,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           dag.credentials, dag.clock,
           dag.taskHeartbeatHandler, dag.appContext,
           vertexLocationHint);
-      if (vertexPlan.getInputsCount() > 0) {
-        v.setAdditionalInputs(vertexPlan.getInputsList());
-      }
-      if (vertexPlan.getOutputsCount() > 0) {
-        v.setAdditionalOutputs(vertexPlan.getOutputsList());
-      }
       return v;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/75192075/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 97034d4..5f54ad5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -477,6 +477,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     this.containerContext = new ContainerContext(this.localResources,
         this.credentials, this.environment, this.javaOpts);
+
+    if (vertexPlan.getInputsCount() > 0) {
+      setAdditionalInputs(vertexPlan.getInputsList());
+    }
+    if (vertexPlan.getOutputsCount() > 0) {
+      setAdditionalOutputs(vertexPlan.getOutputsList());
+    }
+
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
@@ -1740,6 +1748,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs) {
+    LOG.info("setting additional outputs for vertex " + this.vertexName);
     this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size());
     for (RootInputLeafOutputProto output : outputs) {
       

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/75192075/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 8d37c2e..1ab9c1d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.api.committer.NullVertexOutputCommitter;
 import org.apache.tez.dag.api.committer.VertexContext;
 import org.apache.tez.dag.api.committer.VertexOutputCommitter;
 import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
@@ -236,217 +237,224 @@ public class TestVertexImpl {
     DAGPlan dag = DAGPlan.newBuilder()
         .setName("testverteximpl")
         .addVertex(
-            VertexPlan.newBuilder()
+          VertexPlan.newBuilder()
             .setName("vertex1")
             .setType(PlanVertexType.NORMAL)
             .addTaskLocationHint(
-                PlanTaskLocationHint.newBuilder()
+              PlanTaskLocationHint.newBuilder()
                 .addHost("host1")
                 .addRack("rack1")
                 .build()
-                )
+            )
             .setTaskConfig(
-                PlanTaskConfiguration.newBuilder()
+              PlanTaskConfiguration.newBuilder()
                 .setNumTasks(1)
                 .setVirtualCores(4)
                 .setMemoryMb(1024)
                 .setJavaOpts("")
                 .setTaskModule("x1.y1")
                 .build()
-                )
+            )
             .addOutEdgeId("e1")
             .build()
-            )
+        )
         .addVertex(
-            VertexPlan.newBuilder()
+          VertexPlan.newBuilder()
             .setName("vertex2")
             .setType(PlanVertexType.NORMAL)
             .addTaskLocationHint(
-                PlanTaskLocationHint.newBuilder()
+              PlanTaskLocationHint.newBuilder()
                 .addHost("host2")
                 .addRack("rack2")
                 .build()
-                )
+            )
             .setTaskConfig(
-                PlanTaskConfiguration.newBuilder()
+              PlanTaskConfiguration.newBuilder()
                 .setNumTasks(2)
                 .setVirtualCores(4)
                 .setMemoryMb(1024)
                 .setJavaOpts("")
                 .setTaskModule("x2.y2")
                 .build()
-                )
+            )
             .addOutEdgeId("e2")
             .build()
-            )
+        )
         .addVertex(
-            VertexPlan.newBuilder()
+          VertexPlan.newBuilder()
             .setName("vertex3")
             .setType(PlanVertexType.NORMAL)
             .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
             .addTaskLocationHint(
-                PlanTaskLocationHint.newBuilder()
+              PlanTaskLocationHint.newBuilder()
                 .addHost("host3")
                 .addRack("rack3")
                 .build()
-                )
+            )
             .setTaskConfig(
-                PlanTaskConfiguration.newBuilder()
+              PlanTaskConfiguration.newBuilder()
                 .setNumTasks(2)
                 .setVirtualCores(4)
                 .setMemoryMb(1024)
                 .setJavaOpts("foo")
                 .setTaskModule("x3.y3")
                 .build()
-                )
+            )
             .addInEdgeId("e1")
             .addInEdgeId("e2")
             .addOutEdgeId("e3")
             .addOutEdgeId("e4")
             .build()
-            )
+        )
         .addVertex(
-            VertexPlan.newBuilder()
+          VertexPlan.newBuilder()
             .setName("vertex4")
             .setType(PlanVertexType.NORMAL)
             .addTaskLocationHint(
-                PlanTaskLocationHint.newBuilder()
+              PlanTaskLocationHint.newBuilder()
                 .addHost("host4")
                 .addRack("rack4")
                 .build()
-                )
+            )
             .setTaskConfig(
-                PlanTaskConfiguration.newBuilder()
+              PlanTaskConfiguration.newBuilder()
                 .setNumTasks(2)
                 .setVirtualCores(4)
                 .setMemoryMb(1024)
                 .setJavaOpts("")
                 .setTaskModule("x4.y4")
                 .build()
-                )
+            )
             .addInEdgeId("e3")
             .addOutEdgeId("e5")
             .build()
-            )
+        )
         .addVertex(
-            VertexPlan.newBuilder()
+          VertexPlan.newBuilder()
             .setName("vertex5")
             .setType(PlanVertexType.NORMAL)
             .addTaskLocationHint(
-                PlanTaskLocationHint.newBuilder()
+              PlanTaskLocationHint.newBuilder()
                 .addHost("host5")
                 .addRack("rack5")
                 .build()
-                )
+            )
             .setTaskConfig(
-                PlanTaskConfiguration.newBuilder()
+              PlanTaskConfiguration.newBuilder()
                 .setNumTasks(2)
                 .setVirtualCores(4)
                 .setMemoryMb(1024)
                 .setJavaOpts("")
                 .setTaskModule("x5.y5")
                 .build()
-                )
+            )
             .addInEdgeId("e4")
             .addOutEdgeId("e6")
             .build()
-            )
+        )
         .addVertex(
-            VertexPlan.newBuilder()
+          VertexPlan.newBuilder()
             .setName("vertex6")
             .setType(PlanVertexType.NORMAL)
             .addTaskLocationHint(
-                PlanTaskLocationHint.newBuilder()
+              PlanTaskLocationHint.newBuilder()
                 .addHost("host6")
                 .addRack("rack6")
                 .build()
-                )
+            )
+            .addOutputs(
+                DAGProtos.RootInputLeafOutputProto.newBuilder()
+                    .setEntityDescriptor(
+                      TezEntityDescriptorProto.newBuilder().setClassName("output").build()
+                    )
+                    .setName("outputx")
+            )
             .setTaskConfig(
                 PlanTaskConfiguration.newBuilder()
-                .setNumTasks(2)
-                .setVirtualCores(4)
-                .setMemoryMb(1024)
-                .setJavaOpts("")
-                .setTaskModule("x6.y6")
-                .build()
+                    .setNumTasks(2)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x6.y6")
+                    .build()
                 )
-            .addInEdgeId("e5")
-            .addInEdgeId("e6")
-            .build()
+                .addInEdgeId("e5")
+                .addInEdgeId("e6")
+                .build()
             )
-        .addEdge(
-            EdgePlan.newBuilder()
-            .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3_v1"))
-            .setInputVertexName("vertex1")
-            .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
-            .setOutputVertexName("vertex3")
-            .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
-            .setId("e1")
-            .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-            .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-            .build()
+            .addEdge(
+              EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3_v1"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
             )
-        .addEdge(
-            EdgePlan.newBuilder()
-            .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3_v2"))
-            .setInputVertexName("vertex2")
-            .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
-            .setOutputVertexName("vertex3")
-            .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
-            .setId("e2")
-            .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-            .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-            .build()
+            .addEdge(
+              EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3_v2"))
+                .setInputVertexName("vertex2")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("e2")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
             )
-        .addEdge(
-            EdgePlan.newBuilder()
-            .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i4_v3"))
-            .setInputVertexName("vertex3")
-            .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o3_v4"))
-            .setOutputVertexName("vertex4")
-            .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
-            .setId("e3")
-            .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-            .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-            .build()
+            .addEdge(
+              EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i4_v3"))
+                .setInputVertexName("vertex3")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o3_v4"))
+                .setOutputVertexName("vertex4")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("e3")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
             )
-        .addEdge(
-            EdgePlan.newBuilder()
-            .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i5_v3"))
-            .setInputVertexName("vertex3")
-            .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o3_v5"))
-            .setOutputVertexName("vertex5")
-            .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
-            .setId("e4")
-            .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-            .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-            .build()
+            .addEdge(
+              EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i5_v3"))
+                .setInputVertexName("vertex3")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o3_v5"))
+                .setOutputVertexName("vertex5")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("e4")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
             )
-        .addEdge(
-            EdgePlan.newBuilder()
-            .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v4"))
-            .setInputVertexName("vertex4")
-            .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o4"))
-            .setOutputVertexName("vertex6")
-            .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
-            .setId("e5")
-            .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-            .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-            .build()
+            .addEdge(
+              EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v4"))
+                .setInputVertexName("vertex4")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o4"))
+                .setOutputVertexName("vertex6")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("e5")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
             )
-        .addEdge(
-            EdgePlan.newBuilder()
-            .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v5"))
-            .setInputVertexName("vertex5")
-            .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o5"))
-            .setOutputVertexName("vertex6")
-            .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
-            .setId("e6")
-            .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-            .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-            .build()
+            .addEdge(
+              EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v5"))
+                .setInputVertexName("vertex5")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o5"))
+                .setOutputVertexName("vertex6")
+                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
+                .setId("e6")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
             )
-        .build();
+            .build();
 
     return dag;
   }


Mime
View raw message