tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks (jlowe) (cherry picked from commit 7221d386a4fbc1f32aae1854bd25defb4c6d557a)
Date Thu, 28 Apr 2016 14:00:09 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 2917f4571 -> 0ba1e97db


TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks (jlowe)
(cherry picked from commit 7221d386a4fbc1f32aae1854bd25defb4c6d557a)

Conflicts:

	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0ba1e97d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0ba1e97d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0ba1e97d

Branch: refs/heads/branch-0.7
Commit: 0ba1e97dbf7c5348ca5142d38da2fa82feb3be97
Parents: 2917f45
Author: Jason Lowe <jlowe@apache.org>
Authored: Thu Apr 28 13:59:41 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Thu Apr 28 13:59:41 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../vertexmanager/ShuffleVertexManager.java     |  7 +--
 .../vertexmanager/TestShuffleVertexManager.java | 52 ++++++++++++++++++++
 3 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0ba1e97d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 129e3cd..36d3d55 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
   TEZ-3213. Uncaught exception during vertex recovery leads to invalid state transition loop.
   TEZ-3224. User payload is not initialized before creating vertex manager plugin. 
   TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization
relative to Inputs/Outputs

http://git-wip-us.apache.org/repos/asf/tez/blob/0ba1e97d/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index b11ed3f..3b2b669 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -909,10 +909,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       // vertex not started yet
       return;
     }
-    int numPendingTasks = pendingTasks.size();
-    if (numPendingTasks == 0) {
-      return;
-    }
 
     if (!sourceVerticesScheduled && !canScheduleTasks()) {
       if (LOG.isDebugEnabled()) {
@@ -922,7 +918,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       return;
     }
 
-    if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks && numPendingTasks
> 0) {
+    int numPendingTasks = pendingTasks.size();
+    if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks) {
       LOG.info("All source tasks assigned. " +
           "Ramping up " + numPendingTasks + 
           " remaining tasks for vertex: " + getContext().getVertexName());

http://git-wip-us.apache.org/repos/asf/tez/blob/0ba1e97d/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 9c21aed..df1f080 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -1281,6 +1281,58 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
   }
+
+  @Test
+  public void testZeroTasksSendsConfigured() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(
+        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+        true);
+    conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
1000L);
+    ShuffleVertexManager manager = null;
+
+    HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
+    String r1 = "R1";
+    EdgeProperty eProp1 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+
+    final String mockManagedVertexId = "R2";
+    mockInputVertices.put(r1, eProp1);
+
+    final VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0);
+
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
+    // check initialization
+    manager = createManager(conf, mockContext, 0.001f, 0.001f);
+
+    final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
+    doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+        Object[] args = invocation.getArguments();
+        scheduledTasks.clear();
+        List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+        for (ScheduleTaskRequest task : tasks) {
+          scheduledTasks.add(task.getTaskIndex());
+        }
+        return null;
+      }}).when(mockContext).scheduleTasks(anyList());
+
+    manager.onVertexStarted(emptyCompletions);
+    manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+    Assert.assertEquals(1, manager.bipartiteSources);
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(0, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(0, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(0, scheduledTasks.size());
+    verify(mockContext).doneReconfiguringVertex();
+  }
   
   public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId)
{
     VertexIdentifier mockVertex = mock(VertexIdentifier.class);


Mime
View raw message