tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-3294. DAG.createDag() does not clear local state on repeat calls. (Harish Jaiprakash via hitesh)
Date Tue, 14 Jun 2016 22:03:37 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 f478befcc -> 60c26c7b2


TEZ-3294. DAG.createDag() does not clear local state on repeat calls. (Harish Jaiprakash via
hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/60c26c7b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/60c26c7b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/60c26c7b

Branch: refs/heads/branch-0.7
Commit: 60c26c7b23e76dd80389c040cd74727c19217eca
Parents: f478bef
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Jun 14 15:02:38 2016 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Jun 14 15:02:38 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../main/java/org/apache/tez/dag/api/DAG.java   | 33 ++++++++++++--------
 .../org/apache/tez/client/TestTezClient.java    | 29 +++++++++++++++++
 .../java/org/apache/tez/dag/api/TestDAG.java    | 28 +++++++++++++++++
 4 files changed, 78 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/60c26c7b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8d70d97..880bfeb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3294. DAG.createDag() does not clear local state on repeat calls.
   TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
   TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
   TEZ-3278. Hide Swimlane from Tez UI

http://git-wip-us.apache.org/repos/asf/tez/blob/60c26c7b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 96788c5..cbcf32b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -21,9 +21,11 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -97,8 +99,6 @@ public class DAG {
   CallerContext callerContext;
   private Map<String,String> dagConf = new HashMap<String, String>();
 
-  private Stack<String> topologicalVertexStack = new Stack<String>();
-
   private DAG(String name) {
     this.name = name;
   }
@@ -529,7 +529,7 @@ public class DAG {
   }
 
   @VisibleForTesting
-  void verify(boolean restricted) throws IllegalStateException {
+  Deque<String> verify(boolean restricted) throws IllegalStateException {
     if (vertices.isEmpty()) {
       throw new IllegalStateException("Invalid dag containing 0 vertices");
     }
@@ -632,8 +632,8 @@ public class DAG {
     // When additional inputs are supported, this can be chceked easily (and early)
     // within the addInput / addOutput call itself.
 
-    detectCycles(edgeMap, vertexMap);
-    
+    Deque<String> topologicalVertexStack = detectCycles(edgeMap, vertexMap);
+
     checkAndInferOneToOneParallelism();
 
     if (restricted) {
@@ -650,29 +650,36 @@ public class DAG {
         }
       }
     }
+
+    return topologicalVertexStack;
   }
 
   // Adaptation of Tarjan's algorithm for connected components.
   // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
-  private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex>
vertexMap)
+  private Deque<String> detectCycles(Map<Vertex, List<Edge>> edgeMap,
+      Map<String, AnnotatedVertex> vertexMap)
     throws IllegalStateException {
+    Deque<String> topologicalVertexStack = new LinkedList<String>();
     Integer nextIndex = 0; // boxed integer so it is passed by reference.
     Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
     for (AnnotatedVertex av : vertexMap.values()) {
       if (av.index == -1) {
         assert stack.empty();
-        strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
+        strongConnect(av, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack);
       }
     }
+    return topologicalVertexStack;
   }
 
   // part of Tarjan's algorithm for connected components.
   // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
   private void strongConnect(
-    AnnotatedVertex av,
-    Map<String, AnnotatedVertex> vertexMap,
-    Map<Vertex, List<Edge>> edgeMap,
-    Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException {
+      AnnotatedVertex av,
+      Map<String, AnnotatedVertex> vertexMap,
+      Map<Vertex, List<Edge>> edgeMap,
+      Stack<AnnotatedVertex> stack,
+      Integer nextIndex,
+      Deque<String> topologicalVertexStack) throws IllegalStateException {
     av.index = nextIndex;
     av.lowlink = nextIndex;
     nextIndex++;
@@ -684,7 +691,7 @@ public class DAG {
       for (Edge e : edgeMap.get(av.v)) {
         AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getName());
         if (outVertex.index == -1) {
-          strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
+          strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack);
           av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
         } else if (outVertex.onstack) {
           // strongly connected component detected, but we will wait till later so that the
full cycle can be displayed.
@@ -737,7 +744,7 @@ public class DAG {
       Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
       boolean tezLrsAsArchive, Map<String, String> additionalConfigs,
       JavaOptsChecker javaOptsChecker) {
-    verify(true);
+    Deque<String> topologicalVertexStack = verify(true);
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
     dagBuilder.setName(this.name);

http://git-wip-us.apache.org/repos/asf/tez/blob/60c26c7b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index c84df29..08bb156 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -61,6 +61,8 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConfigurationConstants;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
@@ -509,4 +511,31 @@ public class TestTezClient {
       Assert.fail("Failed to retrieve local host information");
     }
   }
+
+  @Test(timeout = 5000)
+  public void testClientResubmit() throws Exception {
+    TezClientForTest client = configureAndCreateTezClient(null, true, null);
+    client.start();
+    Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1",
+        LocalResource.newInstance(
+            URL.newInstance("file", "localhost", 0, "/test1"),
+            LocalResourceType.FILE,
+            LocalResourceVisibility.PUBLIC, 1, 1));
+    Vertex vertex1 = Vertex.create("Vertex1", ProcessorDescriptor.create("P1"), 1,
+        Resource.newInstance(1, 1));
+    vertex1.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC");
+    Vertex vertex2 = Vertex.create("Vertex2", ProcessorDescriptor.create("P2"), 1,
+        Resource.newInstance(1, 1));
+    vertex2.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC");
+    DAG dag = DAG.create("DAG").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG);
+    for (int i = 0; i < 3; ++i) {
+      try {
+        client.submitDAG(dag);
+        Assert.fail("Expected TezUncheckedException here.");
+      } catch(TezUncheckedException ex) {
+        Assert.assertTrue(ex.getMessage().contains("Invalid/conflicting GC options found"));
+      }
+    }
+    client.stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/60c26c7b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index aeef846..ebee17d 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -18,11 +18,19 @@
 
 package org.apache.tez.dag.api;
 
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.tez.client.CallerContext;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -333,4 +341,24 @@ public class TestDAG {
 
   }
 
+  @Test
+  public void testRecreateDAG() {
+    Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1",
+        LocalResource.newInstance(
+            URL.newInstance("file", "localhost", 0, "/test1"),
+            LocalResourceType.FILE,
+            LocalResourceVisibility.PUBLIC, 1, 1));
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor1"), 1,
+        Resource.newInstance(1, 1));
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1,
+        Resource.newInstance(1, 1));
+    DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG);
+
+    TezConfiguration tezConf = new TezConfiguration();
+    DAGPlan firstPlan = dag.createDag(tezConf, null, null, null, false);
+    for (int i = 0; i < 3; ++i) {
+        DAGPlan dagPlan = dag.createDag(tezConf, null, null, null, false);
+        Assert.assertEquals(dagPlan, firstPlan);
+    }
+  }
 }


Mime
View raw message