tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2987. TestVertexImpl.testTez2684 fails (bikas) (cherry picked from commit c9741492056758e3cac1a3d34cd1c7cbd7d9ddd4)
Date Mon, 14 Dec 2015 03:56:37 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 f5c87c1ef -> c033a7b0a


TEZ-2987. TestVertexImpl.testTez2684 fails (bikas)
(cherry picked from commit c9741492056758e3cac1a3d34cd1c7cbd7d9ddd4)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c033a7b0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c033a7b0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c033a7b0

Branch: refs/heads/branch-0.7
Commit: c033a7b0af166e885893713df935a3677c7cddc5
Parents: f5c87c1
Author: Bikas Saha <bikas@apache.org>
Authored: Sun Dec 13 19:48:17 2015 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Sun Dec 13 19:56:20 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  4 ++--
 .../vertexmanager/ShuffleVertexManager.java     | 17 ++++++++++++++++
 .../vertexmanager/TestShuffleVertexManager.java | 21 +++++++++++++++++++-
 4 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c033a7b0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 50d0b67..09e4666 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
 ALL CHANGES
   TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats
should be initialized.
   TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes
from the source.
+  TEZ-2987. TestVertexImpl.testTez2684 fails
   TEZ-2995. Timeline primary filter should only be on callerId and not type.
   TEZ-2599. Dont send obsoleted data movement events to tasks
   TEZ-2943. Change shuffle vertex manager to use per vertex data for auto

http://git-wip-us.apache.org/repos/asf/tez/blob/c033a7b0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index fe540e8..f0a8625 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -5930,11 +5930,11 @@ public class TestVertexImpl {
 
     //Send VertexManagerEvent
     long[] sizes = new long[]{(100 * 1000l * 1000l)};
-    Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "C");
+    Event vmEvent = getVertexManagerEvent(sizes, 1060000000, "B");
 
     TezTaskAttemptID taId = TezTaskAttemptID.getInstance(
         TezTaskID.getInstance(vC.getVertexId(), 1), 1);
-    EventMetaData sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT, "C", "C",
taId);
+    EventMetaData sourceInfo = new EventMetaData(EventProducerConsumerType.INPUT, "B", "C",
taId);
     TezEvent tezEvent = new TezEvent(vmEvent, sourceInfo);
     dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vC.getVertexId(),
         Lists.newArrayList(tezEvent)));

http://git-wip-us.apache.org/repos/asf/tez/blob/c033a7b0/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index c88c7a2..410ad73 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -145,6 +145,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   int numBipartiteSourceTasksCompleted = 0;
   int numVertexManagerEventsReceived = 0;
   List<PendingTaskInfo> pendingTasks = Lists.newLinkedList();
+  List<VertexManagerEvent> pendingVMEvents = Lists.newLinkedList();
   int totalTasksToSchedule = 0;
   private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
   
@@ -501,10 +502,16 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
     if(bipartiteSources == 0) {
       throw new TezUncheckedException("Atleast 1 bipartite source should exist");
     }
+
     for (VertexStateUpdate stateUpdate : pendingStateUpdates) {
       handleVertexStateUpdate(stateUpdate);
     }
     pendingStateUpdates.clear();
+
+    for (VertexManagerEvent vmEvent : pendingVMEvents) {
+      handleVertexManagerEvent(vmEvent);
+    }
+    pendingVMEvents.clear();
     
     // track the tasks in this vertex
     updatePendingTasks();
@@ -567,6 +574,16 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
   @Override
   public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+    if (onVertexStartedDone.get()) {
+      // internal data structures have been initialized - so handle the events directly
+      handleVertexManagerEvent(vmEvent);
+    } else {
+      // save this event for processing after vertex starts
+      pendingVMEvents.add(vmEvent);
+    }
+  }
+
+  private void handleVertexManagerEvent(VertexManagerEvent vmEvent) {
     // currently events from multiple attempts of the same task can be ignored because
     // their output will be the same. However, with pipelined events that may not hold.
     TaskIdentifier producerTask = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier();

http://git-wip-us.apache.org/repos/asf/tez/blob/c033a7b0/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 9d53ebc..9c21aed 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -307,6 +307,25 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
 
     /**
+     * Test vmEvent and vertexStatusUpdate before started
+     */
+    scheduledTasks.clear();
+    //{5,9,12,18} in bitmap
+    vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+
+    manager = createManager(conf, mockContext, 0.01f, 0.75f);
+    Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
+    Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+
+    TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
+    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1,
taId1));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexManagerEventReceived(vmEvent);
+    Assert.assertEquals(0, manager.numVertexManagerEventsReceived); // nothing happens
+    manager.onVertexStarted(emptyCompletions); // now the processing happens
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+
+    /**
      * Test partition stats
      */
     scheduledTasks.clear();
@@ -320,7 +339,7 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
-    TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
+    taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
     vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1,
taId1));
     manager.onVertexManagerEventReceived(vmEvent);
     Assert.assertEquals(1, manager.numVertexManagerEventsReceived);


Mime
View raw message