tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1247. Allow DAG.verify() to be called multiple times (Jeff Zhang via bikas)
Date Wed, 23 Jul 2014 02:22:38 GMT
Repository: tez
Updated Branches:
  refs/heads/master da9bec239 -> 06bd38302


TEZ-1247. Allow DAG.verify() to be called multiple times (Jeff Zhang via bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/06bd3830
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/06bd3830
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/06bd3830

Branch: refs/heads/master
Commit: 06bd3830295d8b6324989594ac0ae53a4599a4e9
Parents: da9bec2
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Jul 22 19:22:29 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Jul 22 19:22:29 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   | 45 +++++++-------------
 .../org/apache/tez/dag/api/VertexGroup.java     | 11 ++++-
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 14 ++++--
 3 files changed, 36 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/06bd3830/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 30793c2..79c474a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -58,6 +58,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -202,39 +203,25 @@ public class DAG {
     VertexGroup av = edge.getInputVertexGroup();
     av.addOutputVertex(edge.getOutputVertex(), edge);
     groupInputEdges.add(edge);
-    return this;
-  }
-  
-  public String getName() {
-    return this.name;
-  }
-  
-  private void processEdgesAndGroups() throws IllegalStateException {
-    // process all VertexGroups by transferring outgoing connections to the members
     
-    // add edges between VertexGroup members and destination vertices
+    // add new edge between members of VertexGroup and destVertex of the GroupInputEdge
     List<Edge> newEdges = Lists.newLinkedList();
-    for (GroupInputEdge e : groupInputEdges) {
-      Vertex  dstVertex = e.getOutputVertex();
-      VertexGroup uv = e.getInputVertexGroup();
-      for (Vertex member : uv.getMembers()) {
-        newEdges.add(new Edge(member, dstVertex, e.getEdgeProperty()));
-      }
-      dstVertex.addGroupInput(uv.getGroupName(), uv.getGroupInfo());
+    Vertex dstVertex = edge.getOutputVertex();
+    VertexGroup uv = edge.getInputVertexGroup();
+    for (Vertex member : uv.getMembers()) {
+      newEdges.add(new Edge(member, dstVertex, edge.getEdgeProperty()));
     }
+    dstVertex.addGroupInput(uv.getGroupName(), uv.getGroupInfo());
     
     for (Edge e : newEdges) {
       addEdge(e);
     }
     
-    // add outputs to VertexGroup members
-    for(VertexGroup av : vertexGroups) {
-      for (RootInputLeafOutput<OutputDescriptor> output : av.getOutputs()) {
-        for (Vertex member : av.getMembers()) {
-          member.addAdditionalOutput(output);
-        }
-      }
-    }
+    return this;
+  }
+  
+  public String getName() {
+    return this.name;
   }
   
   void checkAndInferOneToOneParallelism() {
@@ -324,16 +311,16 @@ public class DAG {
   //   In short term, the supported DAGs are limited. Call with restricted=true for these
verifications.
   //   Illegal:
   //     - any vertex with more than one input or output edge. (n-ary input, n-ary merge)
-  public void verify() throws IllegalStateException {
+  @VisibleForTesting
+  void verify() throws IllegalStateException {
     verify(true);
   }
 
-  public void verify(boolean restricted) throws IllegalStateException {
+  @VisibleForTesting
+  void verify(boolean restricted) throws IllegalStateException {
     if (vertices.isEmpty()) {
       throw new IllegalStateException("Invalid dag containing 0 vertices");
     }
-
-    processEdgesAndGroups();
     
     // check for valid vertices, duplicate vertex names,
     // and prepare for cycle detection

http://git-wip-us.apache.org/repos/asf/tez/blob/06bd3830/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
index 48b0873..3baa778 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
@@ -96,9 +96,16 @@ public class VertexGroup {
    */
   public VertexGroup addOutput(String outputName, OutputDescriptor outputDescriptor,
       Class<? extends OutputCommitter> outputCommitterClazz) {
-    outputs.add(new RootInputLeafOutput<OutputDescriptor>(outputName,
-        outputDescriptor, outputCommitterClazz));
+    RootInputLeafOutput<OutputDescriptor> leafOutput = new RootInputLeafOutput<OutputDescriptor>(outputName,
+        outputDescriptor, outputCommitterClazz);
+    outputs.add(leafOutput);
     this.groupInfo.outputs.add(outputName);
+    
+    // also add output to its members
+    for (Vertex member : getMembers()) {
+      member.addAdditionalOutput(leafOutput);
+    }
+    
     return this;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/06bd3830/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index e0d0551..6e65599 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -629,6 +629,9 @@ public class TestDAGVerify {
     dag.addEdge(e1);
     dag.addEdge(e2);
     dag.verify();
+    for (int i = 0; i< 10;++i){
+      dag.verify();  // should be OK when called multiple times
+    }
     
     Assert.assertEquals(2, v1.getOutputVertices().size());
     Assert.assertEquals(2, v2.getOutputVertices().size());
@@ -685,8 +688,10 @@ public class TestDAGVerify {
     dag.addVertex(v5);
     dag.addEdge(e1);
     dag.addEdge(e2);
-    dag.verify();
-
+    for (int i = 0; i< 10;++i){
+      dag.verify(); // should be OK when called multiple times
+    }
+    
     // for the first Group v1 and v2 should get connected to v4 and also have 1 output
     // for the second Group v2 and v3 should get connected to v5
     // the Group place holders should disappear
@@ -765,7 +770,10 @@ public class TestDAGVerify {
     dag.addVertex(v5);
     dag.addEdge(e1);
     dag.addEdge(e2);
-    dag.verify();
+    for (int i = 0; i< 10;++i){
+      dag.verify();  // should be OK when called multiple times
+    }
+    
     Assert.assertEquals(dummyTaskCount, v5.getParallelism());
   }
 


Mime
View raw message