tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3792. RootInputVertexManager doesn't drain queued source task completed events (Eric Badger via jeagles)
Date Fri, 14 Jul 2017 15:42:27 GMT
Repository: tez
Updated Branches:
  refs/heads/master 07d9146f8 -> 6777707fb


TEZ-3792. RootInputVertexManager doesn't drain queued source task completed events (Eric Badger
via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6777707f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6777707f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6777707f

Branch: refs/heads/master
Commit: 6777707fb2a93ea8a8ffa05f290d9555e77d4dc6
Parents: 07d9146
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Fri Jul 14 10:42:23 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Fri Jul 14 10:42:23 2017 -0500

----------------------------------------------------------------------
 .../app/dag/impl/RootInputVertexManager.java    |  5 +++
 .../dag/impl/TestRootInputVertexManager.java    | 33 ++++++++++++++++++++
 .../TestShuffleVertexManagerBase.java           | 29 ++++++++++++++++-
 3 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6777707f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index 3205983..38eba0e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -171,6 +171,11 @@ public class RootInputVertexManager extends VertexManagerPlugin {
             + srcVertex + " as it has " + numTasks + " tasks");
       }
     }
+    if (completions != null) {
+      for (TaskAttemptIdentifier attempt : completions) {
+        onSourceTaskCompleted(attempt);
+      }
+    }
     onVertexStartedDone.set(true);
     // track the tasks in this vertex
     updatePendingTasks();

http://git-wip-us.apache.org/repos/asf/tez/blob/6777707f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
index 50bac69..16a97d4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -491,6 +492,38 @@ public class TestRootInputVertexManager {
     Assert.assertEquals(manager.numSourceTasksCompleted, 7);
   }
 
+  @Test
+  public void testTezDrainCompletionsOnVertexStart() throws IOException {
+    Configuration conf = new Configuration();
+    RootInputVertexManager manager = null;
+    HashMap<String, EdgeProperty> mockInputVertices =
+        new HashMap<String, EdgeProperty>();
+    String mockSrcVertexId1 = "Vertex1";
+    EdgeProperty eProp1 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.BROADCAST,
+        EdgeProperty.DataSourceType.PERSISTED,
+        EdgeProperty.SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+
+    VertexManagerPluginContext mockContext =
+        mock(VertexManagerPluginContext.class);
+    when(mockContext.getVertexStatistics(any(String.class)))
+        .thenReturn(mock(VertexStatistics.class));
+    when(mockContext.getInputVertexEdgeProperties())
+        .thenReturn(mockInputVertices);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+
+    mockInputVertices.put(mockSrcVertexId1, eProp1);
+
+    // check initialization
+    manager = createRootInputVertexManager(conf, mockContext, 0.1f, 0.1f);
+    Assert.assertEquals(0, manager.numSourceTasksCompleted);
+    manager.onVertexStarted(Collections.singletonList(
+      createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+    Assert.assertEquals(1, manager.numSourceTasksCompleted);
+  }
+
 
   static RootInputVertexManager createRootInputVertexManager(
       Configuration conf, VertexManagerPluginContext context, Float min,

http://git-wip-us.apache.org/repos/asf/tez/blob/6777707f/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
index 2e97381..96f46d6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.java
@@ -44,6 +44,7 @@ import org.junit.runners.Parameterized;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -1106,7 +1107,33 @@ public class TestShuffleVertexManagerBase extends TestShuffleVertexManagerUtils
     Assert.assertEquals(0, scheduledTasks.size());
     verify(mockContext).doneReconfiguringVertex();
   }
-  
+
+
+  @Test(timeout=5000)
+  public void testTezDrainCompletionsOnVertexStart() throws IOException {
+    Configuration conf = new Configuration();
+    ShuffleVertexManagerBase manager;
+
+    final String mockSrcVertexId1 = "Vertex1";
+    final String mockSrcVertexId2 = "Vertex2";
+    final String mockSrcVertexId3 = "Vertex3";
+    final String mockManagedVertexId = "Vertex4";
+
+    final List<Integer> scheduledTasks = Lists.newLinkedList();
+
+    final VertexManagerPluginContext mockContext = createVertexManagerContext(
+      mockSrcVertexId1, 2, mockSrcVertexId2, 2, mockSrcVertexId3, 2,
+      mockManagedVertexId, 4, scheduledTasks, null);
+
+    //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path
on receiving first event itself.
+    manager = createManager(conf, mockContext, 0.01f, 0.75f);
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+    manager.onVertexStarted(Collections.singletonList(
+      TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+    Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+
+  }
+
   private ShuffleVertexManagerBase createManager(Configuration conf,
       VertexManagerPluginContext context, Float min, Float max) {
     return createManager(this.shuffleVertexManagerClass, conf, context, true,


Mime
View raw message