tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject tez git commit: TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks (jlowe)
Date Thu, 28 Apr 2016 13:56:16 GMT
Repository: tez
Updated Branches:
  refs/heads/master f8e014876 -> 7221d386a


TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7221d386
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7221d386
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7221d386

Branch: refs/heads/master
Commit: 7221d386a4fbc1f32aae1854bd25defb4c6d557a
Parents: f8e0148
Author: Jason Lowe <jlowe@apache.org>
Authored: Thu Apr 28 13:55:49 2016 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Thu Apr 28 13:55:49 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../vertexmanager/ShuffleVertexManager.java     |  7 +--
 .../vertexmanager/TestShuffleVertexManager.java | 52 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7221d386/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 324ca38..3979725 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.9.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
   TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput.
   TEZ-3232. Disable randomFailingInputs in testFaulttolerance to unblock other tests.
   TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts.
@@ -25,6 +26,7 @@ Release 0.8.4: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
   TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts.
   TEZ-3224. User payload is not initialized before creating vertex manager plugin.
   TEZ-3226. Tez UI 2: All DAGs UX improvements.
@@ -462,6 +464,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
   TEZ-3224. User payload is not initialized before creating vertex manager plugin.
   TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization
relative to Inputs/Outputs
   TEZ-3202. Reduce the memory need for jobs with high number of segments

http://git-wip-us.apache.org/repos/asf/tez/blob/7221d386/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 47fc60f..aee8b6f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -909,10 +909,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       // vertex not started yet
       return;
     }
-    int numPendingTasks = pendingTasks.size();
-    if (numPendingTasks == 0) {
-      return;
-    }
 
     if (!sourceVerticesScheduled && !canScheduleTasks()) {
       if (LOG.isDebugEnabled()) {
@@ -922,7 +918,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
       return;
     }
 
-    if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks && numPendingTasks
> 0) {
+    int numPendingTasks = pendingTasks.size();
+    if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks) {
       LOG.info("All source tasks assigned. " +
           "Ramping up " + numPendingTasks + 
           " remaining tasks for vertex: " + getContext().getVertexName());

http://git-wip-us.apache.org/repos/asf/tez/blob/7221d386/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 9c21aed..df1f080 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -1281,6 +1281,58 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
   }
+
+  @Test
+  public void testZeroTasksSendsConfigured() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(
+        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+        true);
+    conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
1000L);
+    ShuffleVertexManager manager = null;
+
+    HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
+    String r1 = "R1";
+    EdgeProperty eProp1 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+
+    final String mockManagedVertexId = "R2";
+    mockInputVertices.put(r1, eProp1);
+
+    final VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0);
+
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
+    // check initialization
+    manager = createManager(conf, mockContext, 0.001f, 0.001f);
+
+    final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
+    doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+        Object[] args = invocation.getArguments();
+        scheduledTasks.clear();
+        List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+        for (ScheduleTaskRequest task : tasks) {
+          scheduledTasks.add(task.getTaskIndex());
+        }
+        return null;
+      }}).when(mockContext).scheduleTasks(anyList());
+
+    manager.onVertexStarted(emptyCompletions);
+    manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
+    Assert.assertEquals(1, manager.bipartiteSources);
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+    Assert.assertEquals(0, manager.totalNumBipartiteSourceTasks);
+    Assert.assertEquals(0, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(0, scheduledTasks.size());
+    verify(mockContext).doneReconfiguringVertex();
+  }
   
   public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId)
{
     VertexIdentifier mockVertex = mock(VertexIdentifier.class);


Mime
View raw message