tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2242. Refactor ShuffleVertexManager code (bikas) (cherry picked from commit 505febd63e6c2fdaf882540d5c55f75fb30b7190)
Date Fri, 27 Mar 2015 18:50:37 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 05bfdbec5 -> ff7883e06


TEZ-2242. Refactor ShuffleVertexManager code (bikas)
(cherry picked from commit 505febd63e6c2fdaf882540d5c55f75fb30b7190)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ff7883e0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ff7883e0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ff7883e0

Branch: refs/heads/branch-0.6
Commit: ff7883e062928e550568af151e5bd14a9de2cb92
Parents: 05bfdbe
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Mar 27 11:46:00 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Mar 27 11:48:31 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../vertexmanager/ShuffleVertexManager.java     | 68 ++++++++++++++------
 .../vertexmanager/TestShuffleVertexManager.java | 27 ++++----
 3 files changed, 65 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ff7883e0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c849b2e..ee8115f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2242. Refactor ShuffleVertexManager code
   TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false.
   TEZ-2047. Build fails against hadoop-2.2 post TEZ-2018
   TEZ-2064. SessionNotRunning Exception not thrown is all cases

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7883e0/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 05f94c5..a356829 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -141,6 +141,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   @VisibleForTesting
   int bipartiteSources = 0;
   long completedSourceTasksOutputSize = 0;
+  List<VertexStateUpdate> pendingStateUpdates = Lists.newArrayList();
 
   class SourceVertexInfo {
     EdgeProperty edgeProperty;
@@ -327,7 +328,26 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
   
   @Override
-  public void onVertexStarted(Map<String, List<Integer>> completions) {
+  public synchronized void onVertexStarted(Map<String, List<Integer>> completions)
{
+    // examine edges after vertex started because until then these may not have been defined
+    Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties();
+    for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
+      srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue()));
+      // TODO what if derived class has already called this
+      getContext().registerForVertexStateUpdates(entry.getKey(),
+          EnumSet.of(VertexState.CONFIGURED));
+      if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+        bipartiteSources++;
+      }
+    }
+    if(bipartiteSources == 0) {
+      throw new TezUncheckedException("Atleast 1 bipartite source should exist");
+    }
+    for (VertexStateUpdate stateUpdate : pendingStateUpdates) {
+      handleVertexStateUpdate(stateUpdate);
+    }
+    pendingStateUpdates.clear();
+    
     // track the tasks in this vertex
     updatePendingTasks();
     updateSourceTaskCount();
@@ -349,7 +369,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   }
 
   @Override
-  public void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) {
+  public synchronized void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId)
{
     updateSourceTaskCount();
     SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
 
@@ -369,7 +389,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   }
   
   @Override
-  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
     // TODO handle duplicates from retries
     if (enableAutoParallelism) {
       // save output size
@@ -679,19 +699,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:"
         + minTaskParallelism);
     
-    Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties();
-    for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
-      srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue()));
-      getContext().registerForVertexStateUpdates(entry.getKey(),
-          EnumSet.of(VertexState.CONFIGURED));
-      if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
-        bipartiteSources++;
-      }
-    }
-    if(bipartiteSources == 0) {
-      throw new TezUncheckedException("Atleast 1 bipartite source should exist");
-    }
-    
     if (enableAutoParallelism) {
       getContext().vertexReconfigurationPlanned();
     }
@@ -700,8 +707,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
   }
 
-  @Override
-  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+  private void handleVertexStateUpdate(VertexStateUpdate stateUpdate) {
     Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED,
         "Received incorrect state notification : " + stateUpdate.getVertexState() + " for
vertex: "
             + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
@@ -717,7 +723,31 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   }
   
   @Override
-  public void onRootVertexInitialized(String inputName,
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+    if (stateUpdate.getVertexState() == VertexState.CONFIGURED) {
+      // we will not register for updates until our vertex starts.
+      // derived classes can make other update requests for other states that we should
+      // ignore. However that will not be allowed until the state change notified supports
+      // multiple registers for the same vertex
+      if (onVertexStartedDone.get()) {
+        // normally this if check will always be true because we register after vertex
+        // start.
+        handleVertexStateUpdate(stateUpdate);
+      } else {
+        // normally this code will not trigger since we are the ones who register for
+        // the configured states updates and that will happen after vertex starts.
+        // So this code will only trigger if a derived class also registers for updates
+        // for the same vertices but multiple registers to the same vertex is currently
+        // not supported by the state change notifier code. This is just future proofing
+        // when that is supported
+        // vertex not started yet. So edge info may not have been defined correctly yet.
+        pendingStateUpdates.add(stateUpdate);
+      }
+    }
+  }
+  
+  @Override
+  public synchronized void onRootVertexInitialized(String inputName,
       InputDescriptor inputDescriptor, List<Event> events) {
     // Not allowing this for now. Nothing to do.
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7883e0/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 76c0aa6..4d9302e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -212,17 +212,16 @@ public class TestShuffleVertexManager {
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
-    verify(mockContext, times(2)).vertexReconfigurationPlanned();
-    Assert.assertTrue(manager.bipartiteSources == 2);
-    
-    
     // source vertices have 0 tasks.
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
 
-    // check waiting for notification before scheduling
     manager.onVertexStarted(null);
+    verify(mockContext, times(2)).vertexReconfigurationPlanned();
+    Assert.assertTrue(manager.bipartiteSources == 2);
+    
+    // check waiting for notification before scheduling
     Assert.assertFalse(manager.pendingTasks.isEmpty());
     // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
@@ -235,12 +234,14 @@ public class TestShuffleVertexManager {
     // check scheduling only after onVertexStarted
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
     verify(mockContext, times(3)).vertexReconfigurationPlanned();
-    Assert.assertTrue(manager.bipartiteSources == 2);
     // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling
+    // normally this event will not come before onVertexStarted() is called
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
+    verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after
start
     Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
+    // trigger start and processing of pending notification events
     manager.onVertexStarted(null);
+    Assert.assertTrue(manager.bipartiteSources == 2);
     verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
@@ -500,6 +501,7 @@ public class TestShuffleVertexManager {
     mockInputVertices.put(mockSrcVertexId3, eProp3);
     try {
       manager = createManager(conf, mockContext, 0.1f, 0.1f);
+      manager.onVertexStarted(null);
       Assert.assertFalse(true);
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains(
@@ -511,6 +513,7 @@ public class TestShuffleVertexManager {
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
+    manager.onVertexStarted(null);
     Assert.assertTrue(manager.bipartiteSources == 2);
 
     final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
@@ -791,7 +794,6 @@ public class TestShuffleVertexManager {
 
     // check initialization
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
-    Assert.assertTrue(manager.bipartiteSources == 3);
 
     final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
     doAnswer(new Answer() {
@@ -806,6 +808,7 @@ public class TestShuffleVertexManager {
       }}).when(mockContext_R2).scheduleVertexTasks(anyList());
 
     manager.onVertexStarted(null);
+    Assert.assertTrue(manager.bipartiteSources == 3);
     manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
 
@@ -915,10 +918,6 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(m2)).thenReturn(3);
     when(mockContext.getVertexNumTasks(m3)).thenReturn(3);
 
-    // check initialization
-    manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    Assert.assertTrue(manager.bipartiteSources == 1);
-
     final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
     doAnswer(new Answer() {
       public Object answer(InvocationOnMock invocation) {
@@ -931,7 +930,11 @@ public class TestShuffleVertexManager {
         return null;
       }}).when(mockContext).scheduleVertexTasks(anyList());
 
+    // check initialization
+    manager = createManager(conf, mockContext, 0.001f, 0.001f);
     manager.onVertexStarted(null);
+    Assert.assertTrue(manager.bipartiteSources == 1);
+
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
 


Mime
View raw message