tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2232. Allow setParallelism to be called multiple times before tasks get scheduled (bikas)
Date Fri, 03 Apr 2015 19:37:02 GMT
Repository: tez
Updated Branches:
  refs/heads/master 09a96088c -> 5e2a55fb1


TEZ-2232. Allow setParallelism to be called multiple times before tasks get scheduled (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5e2a55fb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5e2a55fb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5e2a55fb

Branch: refs/heads/master
Commit: 5e2a55fb12334ad272dc4d6990f6e79e98f1e9b3
Parents: 09a9608
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Apr 3 12:36:49 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Apr 3 12:36:49 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 208 +++++++++++--------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 115 ++++++++--
 .../dag/app/dag/impl/TestVertexRecovery.java    |  62 ++++++
 4 files changed, 277 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5e2a55fb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a669147..8fad569 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES
   TEZ-2176. Move all logging to slf4j. (commons-logging jar no longer part of Tez tar)
 
 ALL CHANGES:
+  TEZ-2232. Allow setParallelism to be called multiple times before tasks get
+  scheduled
   TEZ-2265. All inputs/outputs in a task share the same counter object
   TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang.
   TEZ-2264. Remove unused taskUmbilical reference in TezTaskRunner, register as running late.

http://git-wip-us.apache.org/repos/asf/tez/blob/5e2a55fb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 7f124b4..81e9bb9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -685,8 +685,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private final UserGroupInformation dagUgi;
 
-  private boolean parallelismSet = false;
-
   private AtomicBoolean committed = new AtomicBoolean(false);
   private AtomicBoolean aborted = new AtomicBoolean(false);
   private boolean commitVertexOutputs = false;
@@ -1121,19 +1119,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private void handleParallelismUpdate(int newParallelism,
       Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
-      Map<String, InputSpecUpdate> rootInputSpecUpdates) {
-    LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
-    Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
-        .iterator();
-    int i = 0;
-    while (iter.hasNext()) {
-      i++;
-      iter.next();
-      if (i <= newParallelism) {
-        continue;
-      }
-      iter.remove();
-    }
+      Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldParallelism) {
+    // initial parallelism must have been set by this time
+    // parallelism update is recorded in history only for change from an initialized value
+    Preconditions.checkArgument(oldParallelism != -1, getLogIdentifier());
+    if (oldParallelism < newParallelism) {
+      addTasks(newParallelism);
+    } else if (oldParallelism > newParallelism) {
+      removeTasks(newParallelism);
+    }
+    Preconditions.checkState(this.numTasks == newParallelism, getLogIdentifier());
     this.recoveredSourceEdgeManagers = sourceEdgeManagers;
     this.recoveredRootInputSpecUpdates = rootInputSpecUpdates;
   }
@@ -1168,17 +1163,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         }
         return recoveredState;
       case VERTEX_PARALLELISM_UPDATED:
+        // TODO TEZ-1019 this should flow through setParallelism method
         VertexParallelismUpdatedEvent updatedEvent =
             (VertexParallelismUpdatedEvent) historyEvent;
+        int oldNumTasks = numTasks;
+        int newNumTasks = updatedEvent.getNumTasks();
+        handleParallelismUpdate(newNumTasks, updatedEvent.getSourceEdgeManagers(),
+          updatedEvent.getRootInputSpecUpdates(), oldNumTasks);
+        Preconditions.checkState(this.numTasks == newNumTasks, getLogIdentifier());
         if (updatedEvent.getVertexLocationHint() != null) {
-          setTaskLocationHints(updatedEvent.getVertexLocationHint());
+          setVertexLocationHint(updatedEvent.getVertexLocationHint());
         }
-        int oldNumTasks = numTasks;
-        numTasks = updatedEvent.getNumTasks();
         stateChangeNotifier.stateChanged(vertexId,
             new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
-        handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers(),
-          updatedEvent.getRootInputSpecUpdates());
         if (LOG.isDebugEnabled()) {
           LOG.debug("Recovered state for vertex after parallelism updated event"
               + ", vertex=" + logIdentifier
@@ -1250,6 +1247,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertexLocationHint.getTaskLocationHints() != null &&
         !vertexLocationHint.getTaskLocationHints().isEmpty()) {
       List<TaskLocationHint> locHints = vertexLocationHint.getTaskLocationHints();
+      // TODO TEZ-2246 hints size must match num tasks
       taskLocationHints = locHints.toArray(new TaskLocationHint[locHints.size()]);
     }
   }
@@ -1348,14 +1346,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         writeLock.unlock();
       }
     }
-    Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value:
"
-    + parallelism + " for vertex: " + logIdentifier);
-    setVertexLocationHint(vertexLocationHint);
+    Preconditions.checkArgument(parallelism >= 0, "Parallelism must be >=0. Value:
" + parallelism
+        + " for vertex: " + logIdentifier);
     writeLock.lock();
 
     try {
-      if (parallelismSet == true) {
-        String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier;

+      // disallow changing things after a vertex has started
+      if (!tasksNotYetScheduled) {
+        String msg = "setParallelism cannot be called after scheduling tasks. Vertex: "
+            + getLogIdentifier();
         LOG.info(msg);
         throw new TezUncheckedException(msg);
       }
@@ -1364,13 +1363,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         // vertex is fully defined. setParallelism has been called. VertexManager should
have 
         // informed us about this. Otherwise we would have notified listeners that we are
fully 
         // defined before we are actually fully defined
-        Preconditions.checkState(vertexToBeReconfiguredByManager, "Vertex is fully configured
but still"
-            + " the reconfiguration API has been called. VertexManager must notify the framework
using " 
-            + " context.vertexReconfigurationPlanned() before re-configuring the vertex.");
+        Preconditions
+            .checkState(
+                vertexToBeReconfiguredByManager,
+                "Vertex is fully configured but still"
+                    + " the reconfiguration API has been called. VertexManager must notify
the framework using "
+                    + " context.vertexReconfigurationPlanned() before re-configuring the
vertex.");
       }
       
-      parallelismSet = true;
-
       // Input initializer/Vertex Manager/1-1 split expected to set parallelism.
       if (numTasks == -1) {
         if (getState() != VertexState.INITIALIZING) {
@@ -1414,6 +1414,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         stateChangeNotifier.stateChanged(vertexId,
             new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
         this.createTasks();
+        setVertexLocationHint(vertexLocationHint);
         LOG.info("Vertex " + getLogIdentifier() +
             " parallelism set to " + parallelism);
         if (canInitVertex()) {
@@ -1427,55 +1428,39 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         // for a vertex to start.
         Preconditions.checkState(rootInputSpecUpdates == null,
             "Root Input specs can only be updated when the vertex is configured with -1 tasks");
-        if (parallelism >= numTasks) {
-          // not that hard to support perhaps. but checking right now since there
-          // is no use case for it and checking may catch other bugs.
-          String msg = "Increasing parallelism is not supported, vertexId=" + logIdentifier;

-          LOG.warn(msg);
-          throw new TezUncheckedException(msg);
+ 
+        int oldNumTasks = numTasks;
+        
+        // start buffering incoming events so that we can re-route existing events
+        for (Edge edge : sourceVertices.values()) {
+          edge.startEventBuffering();
         }
+
         if (parallelism == numTasks) {
           LOG.info("setParallelism same as current value: " + parallelism +
               " for vertex: " + logIdentifier);
           Preconditions.checkArgument(sourceEdgeManagers != null,
               "Source edge managers or RootInputSpecs must be set when not changing parallelism");
         } else {
-          LOG.info(
-              "Resetting vertex location hints due to change in parallelism for vertex: "
+ logIdentifier);
+          LOG.info("Resetting vertex location hints due to change in parallelism for vertex:
"
+              + logIdentifier);
           vertexLocationHint = null;
-        }
-
-        // start buffering incoming events so that we can re-route existing events
-        for (Edge edge : sourceVertices.values()) {
-          edge.startEventBuffering();
-        }
 
-        // assign to local variable of LinkedHashMap to make sure that changing
-        // type of task causes compile error. We depend on LinkedHashMap for order
-        LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
-        Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
-            .iterator();
-        int i = 0;
-        while (iter.hasNext()) {
-          i++;
-          Map.Entry<TezTaskID, Task> entry = iter.next();
-          Task task = entry.getValue();
-          if (task.getState() != TaskState.NEW) {
-            String msg = "All tasks must be in initial state when changing parallelism"
-                + " for vertex: " + getLogIdentifier();
-            LOG.warn(msg);
-            throw new TezUncheckedException(msg);
-          }
-          if (i <= parallelism) {
-            continue;
+          if (parallelism > numTasks) {
+            addTasks((parallelism));
+          } else if (parallelism < numTasks) {
+            removeTasks(parallelism);
           }
-          LOG.info("Removing task: " + entry.getKey());
-          iter.remove();
         }
-        LOG.info("Vertex " + logIdentifier +
-            " parallelism set to " + parallelism + " from " + numTasks);
-        int oldNumTasks = numTasks;
-        this.numTasks = parallelism;
+
+        Preconditions.checkState(this.numTasks == parallelism, getLogIdentifier());
+        
+        // set new vertex location hints
+        setVertexLocationHint(vertexLocationHint);
+        LOG.info("Vertex " + getLogIdentifier() + " parallelism set to " + parallelism +
" from "
+            + numTasks);
+        
+        // notify listeners
         stateChangeNotifier.stateChanged(vertexId,
             new VertexStateUpdateParallelismUpdated(vertexName, numTasks, oldNumTasks));
         assert tasks.size() == numTasks;
@@ -1495,12 +1480,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
         }
 
-        VertexParallelismUpdatedEvent parallelismUpdatedEvent =
-            new VertexParallelismUpdatedEvent(vertexId, numTasks,
-                vertexLocationHint,
-                sourceEdgeManagers, rootInputSpecUpdates, oldNumTasks);
-        appContext.getHistoryHandler().handle(new DAGHistoryEvent(getDAGId(),
-            parallelismUpdatedEvent));
+        // update history
+        VertexParallelismUpdatedEvent parallelismUpdatedEvent = new VertexParallelismUpdatedEvent(
+            vertexId, numTasks, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdates,
+            oldNumTasks);
+        appContext.getHistoryHandler().handle(
+            new DAGHistoryEvent(getDAGId(), parallelismUpdatedEvent));
 
         // stop buffering events
         for (Edge edge : sourceVertices.values()) {
@@ -2028,29 +2013,73 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  private TaskImpl createTask(int taskIndex) {
+    ContainerContext conContext = getContainerContext(taskIndex);
+    return new TaskImpl(this.getVertexId(), taskIndex,
+        this.eventHandler,
+        vertexConf,
+        this.taskAttemptListener,
+        this.clock,
+        this.taskHeartbeatHandler,
+        this.appContext,
+        (this.targetVertices != null ?
+          this.targetVertices.isEmpty() : true),
+        this.taskResource,
+        conContext,
+        this.stateChangeNotifier);
+  }
+  
   private void createTasks() {
     for (int i=0; i < this.numTasks; ++i) {
-      ContainerContext conContext = getContainerContext(i);
-      TaskImpl task =
-          new TaskImpl(this.getVertexId(), i,
-              this.eventHandler,
-              vertexConf,
-              this.taskAttemptListener,
-              this.clock,
-              this.taskHeartbeatHandler,
-              this.appContext,
-              (this.targetVertices != null ?
-                this.targetVertices.isEmpty() : true),
-              this.taskResource,
-              conContext,
-              this.stateChangeNotifier);
+      TaskImpl task = createTask(i);
       this.addTask(task);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Created task for vertex " + logIdentifier + ": " +
             task.getTaskId());
       }
     }
-
+  }
+  
+  private void addTasks(int newNumTasks) {
+    Preconditions.checkArgument(newNumTasks > this.numTasks, getLogIdentifier());
+    int initialNumTasks = this.numTasks;
+    for (int i = initialNumTasks; i < newNumTasks; ++i) {
+      TaskImpl task = createTask(i);
+      this.addTask(task);
+      this.numTasks++;
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Created task for vertex " + logIdentifier + ": " +
+            task.getTaskId());
+      }
+    }
+  }
+  
+  private void removeTasks(int newNumTasks) {
+    Preconditions.checkArgument(newNumTasks < this.numTasks, getLogIdentifier());
+    // assign to local variable of LinkedHashMap to make sure that changing
+    // type of task causes compile error. We depend on LinkedHashMap for order
+    LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
+    Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
+        .iterator();
+    // remove tasks from the end to maintain index numbers
+    int i = 0;
+    while (iter.hasNext()) {
+      i++;
+      Map.Entry<TezTaskID, Task> entry = iter.next();
+      Task task = entry.getValue();
+      if (task.getState() != TaskState.NEW) {
+        String msg = "All tasks must be in initial state when changing parallelism"
+            + " for vertex: " + getLogIdentifier();
+        LOG.warn(msg);
+        throw new TezUncheckedException(msg);
+      }
+      if (i <= newNumTasks) {
+        continue;
+      }
+      LOG.info("Removing task: " + entry.getKey());
+      iter.remove();
+      this.numTasks--;
+    }
   }
 
   private VertexState setupVertex() {
@@ -2709,6 +2738,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
           boolean successSetParallelism ;
           try {
+            // recovering only edge manager
             vertex.setParallelism(0,
               null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates,
true, false);
             successSetParallelism = true;

http://git-wip-us.apache.org/repos/asf/tez/blob/5e2a55fb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 35f35e6..e643a5b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2468,16 +2468,27 @@ public class TestVertexImpl {
       Assert.assertTrue(e.getMessage().contains("context.vertexReconfigurationPlanned() cannot
be called after initialize()"));
     }
   }
+
+  private void checkTasks(Vertex v, int numTasks) {
+    Assert.assertEquals(numTasks, v.getTotalTasks());
+    Map<TezTaskID, Task> tasks = v.getTasks();
+    Assert.assertEquals(numTasks, tasks.size());
+    // check all indices
+    int i = 0;
+    // iteration maintains order due to linked hash map
+    for(Task task : tasks.values()) {
+      Assert.assertEquals(i, task.getTaskId().getId());
+      i++;
+    }
+  }
   
   @Test(timeout = 5000)
-  public void testVertexSetParallelism() throws Exception {
+  public void testVertexSetParallelismDecrease() throws Exception {
     VertexImpl v3 = vertices.get("vertex3");
     v3.vertexReconfigurationPlanned();
     initAllVertices(VertexState.INITED);
     Assert.assertEquals(2, v3.getTotalTasks());
-    Map<TezTaskID, Task> tasks = v3.getTasks();
-    Assert.assertEquals(2, tasks.size());
-    TezTaskID firstTask = tasks.keySet().iterator().next();
+    Assert.assertEquals(2, v3.getTasks().size());
 
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
@@ -2492,15 +2503,56 @@ public class TestVertexImpl {
     v3.doneReconfiguringVertex();
     assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
         EdgeManagerForTest);
-    Assert.assertEquals(1, v3.getTotalTasks());
-    Assert.assertEquals(1, tasks.size());
-    // the last one is removed
-    assertTrue(tasks.keySet().iterator().next().equals(firstTask));
+    checkTasks(v3, 1);
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexSetParallelismIncrease() throws Exception {
+    VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
+    Assert.assertEquals(2, v3.getTotalTasks());
+    Assert.assertEquals(2, v3.getTasks().size());
+
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(vertices.get("vertex2"));
+    startVertex(v1);
+
+    EdgeManagerPluginDescriptor mockEdgeManagerDescriptor =
+        EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
+
+    Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
+        Collections.singletonMap(
+       v1.getName(), mockEdgeManagerDescriptor);
+    v3.setParallelism(10, null, edgeManagerDescriptors, null, true);
+    v3.doneReconfiguringVertex();
+    assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
+        EdgeManagerForTest);
+    checkTasks(v3, 10);
+  }
+  
+  @Test(timeout = 5000)
+  public void testVertexSetParallelismMultiple() throws Exception {
+    VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
+    Assert.assertEquals(2, v3.getTotalTasks());
+    Map<TezTaskID, Task> tasks = v3.getTasks();
+    Assert.assertEquals(2, tasks.size());
 
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(vertices.get("vertex2"));
+    startVertex(v1);
+    v3.setParallelism(10, null, null, null, true);
+    checkTasks(v3, 10);
+    
+    v3.setParallelism(5, null, null, null, true);
+    checkTasks(v3, 5);
+    v3.doneReconfiguringVertex();
   }
 
   @Test(timeout = 5000)
-  public void testVertexSetParallelismIncreaseException() throws Exception {
+  public void testVertexSetParallelismMultipleFailAfterDone() throws Exception {
     VertexImpl v3 = vertices.get("vertex3");
     v3.vertexReconfigurationPlanned();
     initAllVertices(VertexState.INITED);
@@ -2511,19 +2563,43 @@ public class TestVertexImpl {
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
+    v3.setParallelism(10, null, null, null, true);
+    checkTasks(v3, 10);
+    v3.doneReconfiguringVertex();
 
-    // increase not supported
     try {
-      v3.setParallelism(100, null, null, null, true);
-      v3.doneReconfiguringVertex();
+      v3.setParallelism(5, null, null, null, true);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("Vertex is fully configured but still"));
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testVertexSetParallelismMultipleFailAfterSchedule() throws Exception {
+    VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
+    Assert.assertEquals(2, v3.getTotalTasks());
+    Map<TezTaskID, Task> tasks = v3.getTasks();
+    Assert.assertEquals(2, tasks.size());
+
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(vertices.get("vertex2"));
+    startVertex(v1);
+    v3.setParallelism(10, null, null, null, true);
+    checkTasks(v3, 10);
+    v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
+    try {
+      v3.setParallelism(5, null, null, null, true);
       Assert.fail();
     } catch (TezUncheckedException e) {
-      Assert.assertTrue(e.getMessage().contains("Increasing parallelism is not supported"));
+      Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling"));
     }
   }
   
   @Test(timeout = 5000)
-  public void testVertexSetParallelismMultipleException() throws Exception {
+  public void testVertexSetParallelismFailAfterSchedule() throws Exception {
     VertexImpl v3 = vertices.get("vertex3");
     v3.vertexReconfigurationPlanned();
     initAllVertices(VertexState.INITED);
@@ -2534,18 +2610,15 @@ public class TestVertexImpl {
     VertexImpl v1 = vertices.get("vertex1");
     startVertex(vertices.get("vertex2"));
     startVertex(v1);
-    v3.setParallelism(1, null, null, null, true);
-
-    // multiple invocations not supported
+    v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), null)));
     try {
-      v3.setParallelism(1, null, null, null, true);
+      v3.setParallelism(5, null, null, null, true);
       Assert.fail();
     } catch (TezUncheckedException e) {
-      Assert.assertTrue(e.getMessage().contains("Parallelism can only be set dynamically
once per vertex"));
+      Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling"));
     }
-    v3.doneReconfiguringVertex();
   }
-
+  
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexPendingTaskEvents() {

http://git-wip-us.apache.org/repos/asf/tez/blob/5e2a55fb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 3c658d4..24defea 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -69,6 +69,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
@@ -362,6 +363,67 @@ public class TestVertexRecovery {
 
   }
 
+  @Test(timeout = 5000)
+  public void testRecovery_SetParallelism() {
+    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
+    int oldNumTasks = 10;
+    VertexState recoveredState = vertex1
+        .restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), "vertex1",
+            initRequestedTime, initedTime, oldNumTasks, "", null));
+    assertEquals(VertexState.INITED, recoveredState);
+    recoveredState = vertex1.restoreFromEvent(new VertexParallelismUpdatedEvent(vertex1
+        .getVertexId(), 5, null, null, null, oldNumTasks));
+    assertEquals(5, vertex1.getTotalTasks());
+    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
+        VertexState.SUCCEEDED));
+    dispatcher.await();
+    assertEquals(VertexState.SUCCEEDED, vertex1.getState());
+    assertEquals(vertex1.numTasks, vertex1.succeededTaskCount);
+    assertEquals(vertex1.numTasks, vertex1.completedTaskCount);
+    // recover its task
+    assertTaskRecoveredEventSent(vertex1);
+
+    // vertex3 is still in NEW, when the desiredState is
+    // Completed State, each vertex recovery by itself, not depend on its parent
+    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
+    assertEquals(VertexState.NEW, vertex3.getState());
+    // no VertexEvent pass to downstream vertex
+    assertEquals(0, vertexEventHandler.getEvents().size());
+  }
+  
+  @Test(timeout = 5000)
+  public void testRecovery_SetParallelismMultiple() {
+    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
+    int oldNumTasks = 10;
+    VertexState recoveredState = vertex1
+        .restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(), "vertex1",
+            initRequestedTime, initedTime, oldNumTasks, "", null));
+    assertEquals(VertexState.INITED, recoveredState);
+    recoveredState = vertex1.restoreFromEvent(new VertexParallelismUpdatedEvent(vertex1
+        .getVertexId(), 5, null, null, null, oldNumTasks));
+    assertEquals(5, vertex1.getTotalTasks());
+    recoveredState = vertex1.restoreFromEvent(new VertexParallelismUpdatedEvent(vertex1
+        .getVertexId(), 7, null, null, null, 5));
+    assertEquals(7, vertex1.getTotalTasks());
+    vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
+        VertexState.SUCCEEDED));
+    dispatcher.await();
+    assertEquals(VertexState.SUCCEEDED, vertex1.getState());
+    assertEquals(vertex1.numTasks, vertex1.succeededTaskCount);
+    assertEquals(vertex1.numTasks, vertex1.completedTaskCount);
+    // recover its task
+    assertTaskRecoveredEventSent(vertex1);
+
+    // vertex3 is still in NEW, when the desiredState is
+    // Completed State, each vertex recovery by itself, not depend on its parent
+    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
+    assertEquals(VertexState.NEW, vertex3.getState());
+    // no VertexEvent pass to downstream vertex
+    assertEquals(0, vertexEventHandler.getEvents().size());
+
+  }
+
+
   /**
    * vertex1(New) -> StartRecoveryTransition(SUCCEEDED)
    * @throws IOException 


Mime
View raw message