tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [03/35] tez git commit: TEZ-2242. Refactor ShuffleVertexManager code (bikas)
Date Tue, 07 Apr 2015 20:12:21 GMT
TEZ-2242. Refactor ShuffleVertexManager code (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/505febd6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/505febd6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/505febd6

Branch: refs/heads/TEZ-2003
Commit: 505febd63e6c2fdaf882540d5c55f75fb30b7190
Parents: 17d2388
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Mar 27 11:46:00 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Mar 27 11:46:00 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../vertexmanager/ShuffleVertexManager.java     | 68 ++++++++++++++------
 .../vertexmanager/TestShuffleVertexManager.java | 27 ++++----
 3 files changed, 65 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/505febd6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 91653bf..3c859c6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -91,6 +91,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2242. Refactor ShuffleVertexManager code
   TEZ-2205. Tez still tries to post to ATS when yarn.timeline-service.enabled=false.
   TEZ-2047. Build fails against hadoop-2.2 post TEZ-2018
   TEZ-2064. SessionNotRunning Exception not thrown is all cases

http://git-wip-us.apache.org/repos/asf/tez/blob/505febd6/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index f923319..b6d69dc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -141,6 +141,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   @VisibleForTesting
   int bipartiteSources = 0;
   long completedSourceTasksOutputSize = 0;
+  List<VertexStateUpdate> pendingStateUpdates = Lists.newArrayList();
 
   static class SourceVertexInfo {
     EdgeProperty edgeProperty;
@@ -326,7 +327,26 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
   
   @Override
-  public void onVertexStarted(Map<String, List<Integer>> completions) {
+  public synchronized void onVertexStarted(Map<String, List<Integer>> completions)
{
+    // examine edges after vertex started because until then these may not have been defined
+    Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties();
+    for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
+      srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue()));
+      // TODO what if derived class has already called this
+      getContext().registerForVertexStateUpdates(entry.getKey(),
+          EnumSet.of(VertexState.CONFIGURED));
+      if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+        bipartiteSources++;
+      }
+    }
+    if(bipartiteSources == 0) {
+      throw new TezUncheckedException("Atleast 1 bipartite source should exist");
+    }
+    for (VertexStateUpdate stateUpdate : pendingStateUpdates) {
+      handleVertexStateUpdate(stateUpdate);
+    }
+    pendingStateUpdates.clear();
+    
     // track the tasks in this vertex
     updatePendingTasks();
     updateSourceTaskCount();
@@ -348,7 +368,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   }
 
   @Override
-  public void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) {
+  public synchronized void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId)
{
     updateSourceTaskCount();
     SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
 
@@ -368,7 +388,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   }
   
   @Override
-  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
     // TODO handle duplicates from retries
     if (enableAutoParallelism) {
       // save output size
@@ -678,19 +698,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
         + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:"
         + minTaskParallelism);
     
-    Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties();
-    for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
-      srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue()));
-      getContext().registerForVertexStateUpdates(entry.getKey(),
-          EnumSet.of(VertexState.CONFIGURED));
-      if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
-        bipartiteSources++;
-      }
-    }
-    if(bipartiteSources == 0) {
-      throw new TezUncheckedException("Atleast 1 bipartite source should exist");
-    }
-    
     if (enableAutoParallelism) {
       getContext().vertexReconfigurationPlanned();
     }
@@ -699,8 +706,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
 
   }
 
-  @Override
-  public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+  private void handleVertexStateUpdate(VertexStateUpdate stateUpdate) {
     Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.CONFIGURED,
         "Received incorrect state notification : " + stateUpdate.getVertexState() + " for
vertex: "
             + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
@@ -716,7 +722,31 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
   }
   
   @Override
-  public void onRootVertexInitialized(String inputName,
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+    if (stateUpdate.getVertexState() == VertexState.CONFIGURED) {
+      // we will not register for updates until our vertex starts.
+      // derived classes can make other update requests for other states that we should
+      // ignore. However that will not be allowed until the state change notified supports
+      // multiple registers for the same vertex
+      if (onVertexStartedDone.get()) {
+        // normally this if check will always be true because we register after vertex
+        // start.
+        handleVertexStateUpdate(stateUpdate);
+      } else {
+        // normally this code will not trigger since we are the ones who register for
+        // the configured states updates and that will happen after vertex starts.
+        // So this code will only trigger if a derived class also registers for updates
+        // for the same vertices but multiple registers to the same vertex is currently
+        // not supported by the state change notifier code. This is just future proofing
+        // when that is supported
+        // vertex not started yet. So edge info may not have been defined correctly yet.
+        pendingStateUpdates.add(stateUpdate);
+      }
+    }
+  }
+  
+  @Override
+  public synchronized void onRootVertexInitialized(String inputName,
       InputDescriptor inputDescriptor, List<Event> events) {
     // Not allowing this for now. Nothing to do.
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/505febd6/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 76c0aa6..4d9302e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -212,17 +212,16 @@ public class TestShuffleVertexManager {
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
-    verify(mockContext, times(2)).vertexReconfigurationPlanned();
-    Assert.assertTrue(manager.bipartiteSources == 2);
-    
-    
     // source vertices have 0 tasks.
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
 
-    // check waiting for notification before scheduling
     manager.onVertexStarted(null);
+    verify(mockContext, times(2)).vertexReconfigurationPlanned();
+    Assert.assertTrue(manager.bipartiteSources == 2);
+    
+    // check waiting for notification before scheduling
     Assert.assertFalse(manager.pendingTasks.isEmpty());
     // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
@@ -235,12 +234,14 @@ public class TestShuffleVertexManager {
     // check scheduling only after onVertexStarted
     manager = createManager(conf, mockContext, 0.1f, 0.1f); // Tez notified of reconfig
     verify(mockContext, times(3)).vertexReconfigurationPlanned();
-    Assert.assertTrue(manager.bipartiteSources == 2);
     // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling
+    // normally this event will not come before onVertexStarted() is called
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
+    verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after
start
     Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
+    // trigger start and processing of pending notification events
     manager.onVertexStarted(null);
+    Assert.assertTrue(manager.bipartiteSources == 2);
     verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 4); // all tasks scheduled
@@ -500,6 +501,7 @@ public class TestShuffleVertexManager {
     mockInputVertices.put(mockSrcVertexId3, eProp3);
     try {
       manager = createManager(conf, mockContext, 0.1f, 0.1f);
+      manager.onVertexStarted(null);
       Assert.assertFalse(true);
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains(
@@ -511,6 +513,7 @@ public class TestShuffleVertexManager {
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
+    manager.onVertexStarted(null);
     Assert.assertTrue(manager.bipartiteSources == 2);
 
     final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
@@ -791,7 +794,6 @@ public class TestShuffleVertexManager {
 
     // check initialization
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
-    Assert.assertTrue(manager.bipartiteSources == 3);
 
     final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
     doAnswer(new Answer() {
@@ -806,6 +808,7 @@ public class TestShuffleVertexManager {
       }}).when(mockContext_R2).scheduleVertexTasks(anyList());
 
     manager.onVertexStarted(null);
+    Assert.assertTrue(manager.bipartiteSources == 3);
     manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
 
@@ -915,10 +918,6 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(m2)).thenReturn(3);
     when(mockContext.getVertexNumTasks(m3)).thenReturn(3);
 
-    // check initialization
-    manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    Assert.assertTrue(manager.bipartiteSources == 1);
-
     final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
     doAnswer(new Answer() {
       public Object answer(InvocationOnMock invocation) {
@@ -931,7 +930,11 @@ public class TestShuffleVertexManager {
         return null;
       }}).when(mockContext).scheduleVertexTasks(anyList());
 
+    // check initialization
+    manager = createManager(conf, mockContext, 0.001f, 0.001f);
     manager.onVertexStarted(null);
+    Assert.assertTrue(manager.bipartiteSources == 1);
+
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
 


Mime
View raw message