tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [2/2] git commit: TEZ-338. Determine reduce task parallelism (bikas)
Date Tue, 20 Aug 2013 00:10:32 GMT
TEZ-338. Determine reduce task parallelism (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e368ede8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e368ede8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e368ede8

Branch: refs/heads/master
Commit: e368ede8d49d95418804bf34ff37052a08239a24
Parents: 67c0c17
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Aug 19 17:08:08 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Aug 19 17:08:08 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |   7 +-
 .../java/org/apache/tez/common/TezUtils.java    |  24 ++
 .../apache/tez/dag/api/TezConfiguration.java    |  33 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |   2 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   9 +-
 .../apache/tez/dag/app/dag/VertexScheduler.java |   8 +-
 .../impl/BipartiteSlowStartVertexScheduler.java | 188 -----------
 .../dag/impl/ImmediateStartVertexScheduler.java |   7 +-
 .../dag/app/dag/impl/ShuffleVertexManager.java  | 330 +++++++++++++++++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  22 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  70 +---
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 151 ++++++---
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   1 -
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  45 ++-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  36 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  64 +++-
 .../dag/app/dag/impl/TestVertexScheduler.java   | 292 +++++++++++++---
 .../tez/dag/app/rm/TestTaskScheduler.java       |   1 -
 .../TezDependentTaskCompletionEvent.java        |  48 ++-
 .../common/shuffle/impl/EventFetcher.java       |  33 +-
 .../tez/engine/common/shuffle/impl/Fetcher.java |  16 +-
 .../tez/engine/common/shuffle/impl/MapHost.java |  18 +-
 .../tez/engine/common/shuffle/impl/Shuffle.java |  58 +++-
 .../common/shuffle/impl/ShuffleScheduler.java   |  67 ++--
 .../mapreduce/examples/OrderedWordCount.java    |  14 +-
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |  17 +-
 26 files changed, 1069 insertions(+), 492 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index c0b0ae6..5a847f1 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -156,7 +156,12 @@ public class TezJobConfig {
   public static final String TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY =
       "tez.engine.shuffle.use.in-memory";
   public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY = false;
-  
+
+  @Private
+  public static final String TEZ_ENGINE_SHUFFLE_PARTITION_RANGE = 
+      "tez.engine.shuffle.partition-range";
+  public static int TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT = 1;
+
   /**
    * 
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index c374754..d7774be 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -22,10 +22,14 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 
+import com.google.common.base.Preconditions;
+
 public class TezUtils {
   
   public static void addUserSpecifiedTezConfiguration(Configuration conf) 
@@ -51,4 +55,24 @@ public class TezUtils {
       }
     }
   }
+  
+  public static byte[] createUserPayloadFromConf(Configuration conf)
+      throws IOException {
+    Preconditions.checkNotNull(conf, "Configuration must be specified");
+    DataOutputBuffer dob = new DataOutputBuffer();
+    conf.write(dob);
+    return dob.getData();
+  }
+
+  public static Configuration createConfFromUserPayload(byte[] bb)
+      throws IOException {
+    // TODO Avoid copy ?
+    Preconditions.checkNotNull(bb, "Bytes must be specified");
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(bb, 0, bb.length);
+    Configuration conf = new Configuration(false);
+    conf.readFields(dib);
+    return conf;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index d25145c..da91804 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -103,18 +103,37 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1;
 
   public static final String
-          TEZ_AM_SLOWSTART_VERTEX_SCHEDULER_MIN_SRC_FRACTION = TEZ_AM_PREFIX
-          + "slowstart-vertex-scheduler.min-src-fraction";
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION = TEZ_AM_PREFIX
+          + "shuffle-vertex-manager.min-src-fraction";
   public static final float
-          TEZ_AM_SLOWSTART_VERTEX_SCHEDULER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
 
   public static final String
-          TEZ_AM_SLOWSTART_VERTEX_SCHEDULER_MAX_SRC_FRACTION = TEZ_AM_PREFIX
-          + "slowstart-vertex-scheduler.max-src-fraction";
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = TEZ_AM_PREFIX
+          + "shuffle-vertex-manager.max-src-fraction";
   public static final float
-          TEZ_AM_SLOWSTART_VERTEX_SCHEDULER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
+  
+  public static final String
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = TEZ_AM_PREFIX +
+          "shuffle-vertex-manager.enable.auto-parallel";
+  public static final boolean 
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = false;
+  
+  public static final String
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = TEZ_AM_PREFIX +
+          "shuffle-vertex-manager.desired-task-input-size";
+  public static final long 
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT = 
+          1024*1024*100L;
 
   public static final String
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM = TEZ_AM_PREFIX +
+          "shuffle-vertex-manager.min-task-parallelism";
+  public static final int 
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT = 1;
+
+  public static final String 
           TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION = TEZ_AM_PREFIX
           + "slowstart-dag-scheduler.min-resource-fraction";
   public static final float
@@ -122,7 +141,7 @@ public class TezConfiguration extends Configuration {
 
   public static final String TEZ_AM_AGGRESSIVE_SCHEDULING = TEZ_AM_PREFIX +
       "aggressive.scheduling";
-  public static boolean TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT = true;
+  public static boolean TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT = false;
 
   /**
    * The complete path to the serialized dag plan file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 1ba76fc..42bde6a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -179,7 +179,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     TezDependentTaskCompletionEvent[] events =
         context.getDAG().
             getVertex(taskAttemptID.getTaskID().getVertexID()).
-                getTaskAttemptCompletionEvents(fromEventIdx, maxEvents);
+                getTaskAttemptCompletionEvents(taskAttemptID, fromEventIdx, maxEvents);
 
     taskHeartbeatHandler.progressing(taskAttemptID);
     pingContainerHeartbeatHandler(taskAttemptID);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 9c4d3b8..a33ab91 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -27,9 +27,11 @@ import org.apache.tez.common.InputSpec;
 import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
@@ -63,8 +65,10 @@ public interface Vertex extends Comparable<Vertex> {
   ProgressBuilder getVertexProgress();
   VertexStatusBuilder getVertexStatus();
   
-  TezDependentTaskCompletionEvent[]
-      getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
+  void setParallelism(int parallelism, List<byte[]> taskUserPayloads);
+  
+  TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
+      TezTaskAttemptID attemptId, int fromEventId, int maxEvents);
   
   void setInputVertices(Map<Vertex, EdgeProperty> inVertices);
   void setOutputVertices(Map<Vertex, EdgeProperty> outVertices);
@@ -80,6 +84,7 @@ public interface Vertex extends Comparable<Vertex> {
   void scheduleTasks(Collection<TezTaskID> taskIDs);
   Resource getTaskResource();
 
+  ProcessorDescriptor getProcessorDescriptor();
   public DAG getDAG();
   VertexTerminationCause getTerminationCause();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
index df9c28e..7a85eb1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
@@ -18,10 +18,14 @@
 
 package org.apache.tez.dag.app.dag;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
+// Rename to VertexManager TEZ-364
 public interface VertexScheduler {
+  void initialize(Configuration conf);
   void onVertexStarted();
-  void onVertexCompleted();
-  void onSourceTaskCompleted(TezTaskAttemptID attemptId);
+  void onSourceTaskCompleted(TezTaskAttemptID attemptId,
+      TezDependentTaskCompletionEvent event);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BipartiteSlowStartVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BipartiteSlowStartVertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BipartiteSlowStartVertexScheduler.java
deleted file mode 100644
index 3f586cb..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BipartiteSlowStartVertexScheduler.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexScheduler;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-
-/**
- * Starts scheduling tasks when number of completed source tasks crosses 
- * <code>slowStartMinSrcCompletionFraction</code> and schedules all tasks 
- *  when <code>slowStartMaxSrcCompletionFraction</code> is reached
- */
-public class BipartiteSlowStartVertexScheduler implements VertexScheduler {
-  
-  private static final Log LOG = 
-                   LogFactory.getLog(BipartiteSlowStartVertexScheduler.class);
-
-  final Vertex managedVertex;
-  final float slowStartMinSrcCompletionFraction;
-  final float slowStartMaxSrcCompletionFraction;
-  
-  int numSourceTasks = 0;
-  int numSourceTasksCompleted = 0;
-  ArrayList<TezTaskID> pendingTasks;
-  int totalTasksToSchedule = 0;
-  HashMap<TezVertexID, Vertex> bipartiteSources = 
-                                            new HashMap<TezVertexID, Vertex>();
-  
-  public BipartiteSlowStartVertexScheduler(Vertex managedVertex,
-                                            float slowStartMinSrcCompletionFraction,
-                                            float slowStartMaxSrcCompletionFraction) {
-    this.managedVertex = managedVertex;
-    this.slowStartMinSrcCompletionFraction = slowStartMinSrcCompletionFraction;
-    this.slowStartMaxSrcCompletionFraction = slowStartMaxSrcCompletionFraction;
-    
-    if(slowStartMinSrcCompletionFraction <= 0 || 
-       slowStartMaxSrcCompletionFraction < slowStartMinSrcCompletionFraction) {
-      throw new IllegalArgumentException(
-          "Invalid values for slowStartMinSrcCompletionFraction" + 
-          "/slowStartMaxSrcCompletionFraction. Min cannot be <= 0 and " + 
-          "max cannot be < min.");
-    }
-    
-    Map<Vertex, EdgeProperty> inputs = managedVertex.getInputVertices();
-    for(Map.Entry<Vertex, EdgeProperty> entry : inputs.entrySet()) {
-      if(entry.getValue().getConnectionPattern() == ConnectionPattern.BIPARTITE) {
-        Vertex vertex = entry.getKey();
-        bipartiteSources.put(vertex.getVertexId(), vertex);
-      }
-    }
-    if(bipartiteSources.isEmpty()) {
-      throw new TezUncheckedException("Atleast 1 bipartite source should exist");
-    }
-  }
-  
-  @Override
-  public void onVertexStarted() {
-    pendingTasks = new ArrayList<TezTaskID>(managedVertex.getTotalTasks());
-    // track the tasks in this vertex
-    pendingTasks.addAll(managedVertex.getTasks().keySet());
-    totalTasksToSchedule = pendingTasks.size();
-    
-    // track source vertices
-    for(Vertex vertex : bipartiteSources.values()) {
-      numSourceTasks += vertex.getTotalTasks();
-    }
-    
-    LOG.info("OnVertexStarted vertex: " + managedVertex.getVertexId() + 
-             " with " + numSourceTasks + " source tasks and " + 
-             totalTasksToSchedule + " pending tasks");
-    
-    schedulePendingTasks();
-  }
-
-  @Override
-  public void onVertexCompleted() {
-  }
-
-  @Override
-  public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
-    TezVertexID vertexId = attemptId.getTaskID().getVertexID();
-    if(bipartiteSources.containsKey(vertexId)) {
-      ++numSourceTasksCompleted;
-      schedulePendingTasks();
-    }
-  }
-  
-  void schedulePendingTasks(int numTasksToSchedule) {
-    ArrayList<TezTaskID> scheduledTasks = new ArrayList<TezTaskID>(numTasksToSchedule);
-    while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
-      numTasksToSchedule--;
-      scheduledTasks.add(pendingTasks.get(0));
-      pendingTasks.remove(0);
-    }
-    managedVertex.scheduleTasks(scheduledTasks);
-  }
-  
-  void schedulePendingTasks() {    
-    int numPendingTasks = pendingTasks.size();
-    if (numPendingTasks == 0) {
-      return;
-    }
-    
-    if (numSourceTasksCompleted == numSourceTasks && numPendingTasks > 0) {
-      LOG.info("All source tasks assigned. " +
-          "Ramping up " + numPendingTasks + 
-          " remaining tasks for vertex: " + managedVertex.getName());
-      schedulePendingTasks(numPendingTasks);
-      return;
-    }
-
-    float completedSourceTaskFraction = 0f;
-    if (numSourceTasks != 0) { // support for 0 source tasks
-      completedSourceTaskFraction = (float)numSourceTasksCompleted/numSourceTasks;
-    } else {
-      completedSourceTaskFraction = 1;
-    }
-    
-    // start scheduling when source tasks completed fraction is more than min.
-    // linearly increase the number of scheduled tasks such that all tasks are 
-    // scheduled when source tasks completed fraction reaches max
-    float tasksFractionToSchedule = 1; 
-    float percentRange = slowStartMaxSrcCompletionFraction - 
-                          slowStartMinSrcCompletionFraction;
-    if (percentRange > 0) {
-      tasksFractionToSchedule = 
-            (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/
-            percentRange;
-    } else {
-      // min and max are equal. schedule 100% on reaching min
-      if(completedSourceTaskFraction < slowStartMinSrcCompletionFraction) {
-        tasksFractionToSchedule = 0;
-      }
-    }
-    
-    if (tasksFractionToSchedule > 1) {
-      tasksFractionToSchedule = 1;
-    } else if (tasksFractionToSchedule < 0) {
-      tasksFractionToSchedule = 0;
-    }
-    
-    int numTasksToSchedule = 
-        ((int)(tasksFractionToSchedule * totalTasksToSchedule) - 
-         (totalTasksToSchedule - numPendingTasks));
-    
-    if (numTasksToSchedule > 0) {
-      // numTasksToSchedule can be -ve if numSourceTasksCompleted does not 
-      // does not increase monotonically
-      LOG.info("Scheduling " + numTasksToSchedule + " tasks for vertex: " + 
-               managedVertex.getVertexId() + " with totalTasks: " + 
-               totalTasksToSchedule + ". " + numSourceTasksCompleted + 
-               " source tasks completed out of " + numSourceTasks + 
-               ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + 
-               " min: " + slowStartMinSrcCompletionFraction + 
-               " max: " + slowStartMaxSrcCompletionFraction);
-      schedulePendingTasks(numTasksToSchedule);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
index bd39115..8ee7e55 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
@@ -18,9 +18,11 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 /**
  * Starts all tasks immediately on vertex start
@@ -38,11 +40,12 @@ public class ImmediateStartVertexScheduler implements VertexScheduler {
   }
 
   @Override
-  public void onVertexCompleted() {
+  public void onSourceTaskCompleted(TezTaskAttemptID attemptId, 
+      TezDependentTaskCompletionEvent event) {
   }
 
   @Override
-  public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
+  public void initialize(Configuration conf) {    
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
new file mode 100644
index 0000000..e221483
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -0,0 +1,330 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexScheduler;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+
+/**
+ * Starts scheduling tasks when number of completed source tasks crosses 
+ * <code>slowStartMinSrcCompletionFraction</code> and schedules all tasks 
+ *  when <code>slowStartMaxSrcCompletionFraction</code> is reached
+ */
+public class ShuffleVertexManager implements VertexScheduler {
+  
+  private static final Log LOG = 
+                   LogFactory.getLog(ShuffleVertexManager.class);
+
+  final Vertex managedVertex;
+  float slowStartMinSrcCompletionFraction;
+  float slowStartMaxSrcCompletionFraction;
+  long desiredTaskInputDataSize = 1024*1024*100L;
+  int minTaskParallelism = 1;
+  boolean enableAutoParallelism = false;
+  boolean parallelismDetermined = false;
+  
+  int numSourceTasks = 0;
+  int numSourceTasksCompleted = 0;
+  ArrayList<TezTaskID> pendingTasks;
+  int totalTasksToSchedule = 0;
+  HashMap<TezVertexID, Vertex> bipartiteSources = 
+                                            new HashMap<TezVertexID, Vertex>();
+  
+  Set<TezTaskID> completedSourceTasks = new HashSet<TezTaskID>();
+  long completedSourceTasksOutputSize = 0;
+  
+  public ShuffleVertexManager(Vertex managedVertex) {
+    this.managedVertex = managedVertex;
+    Map<Vertex, EdgeProperty> inputs = managedVertex.getInputVertices();
+    for(Map.Entry<Vertex, EdgeProperty> entry : inputs.entrySet()) {
+      if(entry.getValue().getConnectionPattern() == ConnectionPattern.BIPARTITE) {
+        Vertex vertex = entry.getKey();
+        bipartiteSources.put(vertex.getVertexId(), vertex);
+      }
+    }
+    if(bipartiteSources.isEmpty()) {
+      throw new TezUncheckedException("Atleast 1 bipartite source should exist");
+    }
+    // dont track the source tasks here since those tasks may themselves be
+    // dynamically changed as the DAG progresses.
+  }
+  
+  @Override
+  public void onVertexStarted() {
+    pendingTasks = new ArrayList<TezTaskID>(managedVertex.getTotalTasks());
+    // track the tasks in this vertex
+    updatePendingTasks();
+    updateSourceTaskCount();
+    
+    LOG.info("OnVertexStarted vertex: " + managedVertex.getVertexId() + 
+             " with " + numSourceTasks + " source tasks and " + 
+             totalTasksToSchedule + " pending tasks");
+    
+    // for the special case when source has 0 tasks or min fraction == 0
+    schedulePendingTasks();
+  }
+
+  @Override
+  public void onSourceTaskCompleted(TezTaskAttemptID srcAttemptId, 
+      TezDependentTaskCompletionEvent event) {
+    updateSourceTaskCount();
+    TezTaskID srcTaskId = srcAttemptId.getTaskID();
+    TezVertexID srcVertexId = srcTaskId.getVertexID();
+    if (bipartiteSources.containsKey(srcVertexId)) {
+      // duplicate notifications tracking
+      if (completedSourceTasks.add(srcTaskId)) {
+        // source task has completed
+        ++numSourceTasksCompleted;
+        if (enableAutoParallelism) {
+          // save output size
+          long sourceTaskOutputSize = event.getDataSize();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Source task: " + event.getTaskAttemptID()
+                + " finished with output size: " + sourceTaskOutputSize);
+          }
+          completedSourceTasksOutputSize += sourceTaskOutputSize;
+        }
+      }
+      schedulePendingTasks();
+    }
+  }
+  
+  void updatePendingTasks() {
+    pendingTasks.clear();
+    pendingTasks.addAll(managedVertex.getTasks().keySet());
+    totalTasksToSchedule = pendingTasks.size();
+  }
+  
+  void updateSourceTaskCount() {
+    // track source vertices
+    int numSrcTasks = 0;
+    for(Vertex vertex : bipartiteSources.values()) {
+      numSrcTasks += vertex.getTotalTasks();
+    }
+    numSourceTasks = numSrcTasks;
+  }
+  
+  void determineParallelismAndApply() {
+    if(numSourceTasksCompleted == 0) {
+      return;
+    }
+    int currentParallelism = pendingTasks.size();
+    long expectedTotalSourceTasksOutputSize = 
+        (numSourceTasks*completedSourceTasksOutputSize)/numSourceTasksCompleted;
+    int desiredTaskParallelism = 
+        (int)(expectedTotalSourceTasksOutputSize/desiredTaskInputDataSize);
+    if(desiredTaskParallelism < minTaskParallelism) {
+      desiredTaskParallelism = minTaskParallelism;
+    }
+    
+    if(desiredTaskParallelism >= currentParallelism) {
+      return;
+    }
+    
+    // most shufflers will be assigned this range
+    int basePartitionRange = currentParallelism/desiredTaskParallelism;
+    
+    if (basePartitionRange <= 1) {
+      // nothing to do if range is equal 1 partition. shuffler does it by default
+      return;
+    }
+    
+    int numShufflersWithBaseRange = currentParallelism / basePartitionRange;
+    int remainderRangeForLastShuffler = currentParallelism % basePartitionRange; 
+    
+    int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
+          (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
+    
+    if(finalTaskParallelism < currentParallelism) {
+      // final parallelism is less than actual parallelism
+      LOG.info("Reducing parallelism for vertex: " + managedVertex.getVertexId() 
+          + " to " + finalTaskParallelism + " from " + pendingTasks.size() 
+          + " . Expected output: " + expectedTotalSourceTasksOutputSize 
+          + " based on actual output: " + completedSourceTasksOutputSize
+          + " from " + numSourceTasksCompleted + " completed source tasks. "
+          + " desiredTaskInputSize: " + desiredTaskInputDataSize);
+
+      List<byte[]> taskConfs = new ArrayList<byte[]>(finalTaskParallelism);
+      try {
+        Configuration taskConf = new Configuration(false);
+        taskConf.setInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE,
+            basePartitionRange);
+        // create event user payload to inform the task
+        for (int i = 0; i < numShufflersWithBaseRange; ++i) {
+          taskConfs.add(MRHelpers.createUserPayloadFromConf(taskConf));
+        }
+        if(finalTaskParallelism > numShufflersWithBaseRange) {
+          taskConf.setInt(TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE,
+              remainderRangeForLastShuffler);
+          taskConfs.add(MRHelpers.createUserPayloadFromConf(taskConf));
+        }
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+      
+      managedVertex.setParallelism(finalTaskParallelism, taskConfs);
+      updatePendingTasks();      
+    }
+  }
+  
+  void schedulePendingTasks(int numTasksToSchedule) {
+    // determine parallelism before scheduling the first time
+    // this is the latest we can wait before determining parallelism.
+    // currently this depends on task completion and so this is the best time
+    // to do this. This is the max time we have until we have to launch tasks 
+    // as specified by the user. If/When we move to some other method of 
+    // calculating parallelism or change parallelism while tasks are already
+    // running then we can create other parameters to trigger this calculation.
+    if(enableAutoParallelism && !parallelismDetermined) {
+      // do this once
+      parallelismDetermined = true;
+      determineParallelismAndApply();
+    }
+    ArrayList<TezTaskID> scheduledTasks = new ArrayList<TezTaskID>(numTasksToSchedule);
+    while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) {
+      numTasksToSchedule--;
+      scheduledTasks.add(pendingTasks.get(0));
+      pendingTasks.remove(0);
+    }
+    managedVertex.scheduleTasks(scheduledTasks);
+  }
+  
+  void schedulePendingTasks() {    
+    int numPendingTasks = pendingTasks.size();
+    if (numPendingTasks == 0) {
+      return;
+    }
+    
+    if (numSourceTasksCompleted == numSourceTasks && numPendingTasks > 0) {
+      LOG.info("All source tasks assigned. " +
+          "Ramping up " + numPendingTasks + 
+          " remaining tasks for vertex: " + managedVertex.getName());
+      schedulePendingTasks(numPendingTasks);
+      return;
+    }
+
+    float completedSourceTaskFraction = 0f;
+    if (numSourceTasks != 0) { // support for 0 source tasks
+      completedSourceTaskFraction = (float)numSourceTasksCompleted/numSourceTasks;
+    } else {
+      completedSourceTaskFraction = 1;
+    }
+    
+    // start scheduling when source tasks completed fraction is more than min.
+    // linearly increase the number of scheduled tasks such that all tasks are 
+    // scheduled when source tasks completed fraction reaches max
+    float tasksFractionToSchedule = 1; 
+    float percentRange = slowStartMaxSrcCompletionFraction - 
+                          slowStartMinSrcCompletionFraction;
+    if (percentRange > 0) {
+      tasksFractionToSchedule = 
+            (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/
+            percentRange;
+    } else {
+      // min and max are equal. schedule 100% on reaching min
+      if(completedSourceTaskFraction < slowStartMinSrcCompletionFraction) {
+        tasksFractionToSchedule = 0;
+      }
+    }
+    
+    if (tasksFractionToSchedule > 1) {
+      tasksFractionToSchedule = 1;
+    } else if (tasksFractionToSchedule < 0) {
+      tasksFractionToSchedule = 0;
+    }
+    
+    int numTasksToSchedule = 
+        ((int)(tasksFractionToSchedule * totalTasksToSchedule) - 
+         (totalTasksToSchedule - numPendingTasks));
+    
+    if (numTasksToSchedule > 0) {
+      // numTasksToSchedule can be -ve if numSourceTasksCompleted does not 
+      // does not increase monotonically
+      LOG.info("Scheduling " + numTasksToSchedule + " tasks for vertex: " + 
+               managedVertex.getVertexId() + " with totalTasks: " + 
+               totalTasksToSchedule + ". " + numSourceTasksCompleted + 
+               " source tasks completed out of " + numSourceTasks + 
+               ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + 
+               " min: " + slowStartMinSrcCompletionFraction + 
+               " max: " + slowStartMaxSrcCompletionFraction);
+      schedulePendingTasks(numTasksToSchedule);
+    }
+  }
+
+  @Override
+  public void initialize(Configuration conf) {
+    this.slowStartMinSrcCompletionFraction = conf
+        .getFloat(
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT);
+    this.slowStartMaxSrcCompletionFraction = conf
+        .getFloat(
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT);
+    
+    if(slowStartMinSrcCompletionFraction < 0 || 
+       slowStartMaxSrcCompletionFraction < slowStartMinSrcCompletionFraction) {
+      throw new IllegalArgumentException(
+          "Invalid values for slowStartMinSrcCompletionFraction" + 
+          "/slowStartMaxSrcCompletionFraction. Min cannot be < 0 and " + 
+          "max cannot be < min.");
+    }
+    
+    if (conf.getBoolean(TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING,
+        TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT)) {
+      LOG.info("Setting min/max threshold to 0 due to aggressive scheduling");
+      this.slowStartMinSrcCompletionFraction = 0;
+      this.slowStartMaxSrcCompletionFraction = 0;
+    }
+    
+    enableAutoParallelism = conf
+        .getBoolean(
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT);
+    desiredTaskInputDataSize = conf
+        .getLong(
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT);
+    minTaskParallelism = conf.getInt(
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM,
+            TezConfiguration.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 471ca26..9db4a9e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -114,7 +114,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   static final TezCounters EMPTY_COUNTERS = new TezCounters();
 
   protected final Configuration conf;
-  protected final int partition;
   @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
   private final TezTaskAttemptID attemptId;
@@ -128,6 +127,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected Token<JobTokenIdentifier> jobToken;
   private long launchTime = 0;
   private long finishTime = 0;
+  // TEZ-347 remove this and getShufflePort()
   private int shufflePort = -1;
   private String trackerName;
   private int httpPort;
@@ -152,8 +152,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   protected final String javaOpts;
   protected final boolean isRescheduled;
 
-  protected ProcessorDescriptor processorDescriptor;
-
   protected static final FailedTransitionHelper FAILED_HELPER =
       new FailedTransitionHelper();
 
@@ -259,11 +257,11 @@ public class TaskAttemptImpl implements TaskAttempt,
   // TODO Remove TaskAttemptListener from the constructor.
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
-      TaskAttemptListener tal, int partition,
+      TaskAttemptListener tal,
       Configuration conf,
       Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
-      ProcessorDescriptor processorDescriptor, TaskLocationHint locationHint,
+      TaskLocationHint locationHint,
       Resource resource, Map<String, LocalResource> localResources,
       Map<String, String> environment,
       String javaOpts, boolean isRescheduled) {
@@ -273,7 +271,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
     this.eventHandler = eventHandler;
     //Reported status
-    this.partition = partition;
     this.conf = conf;
     this.jobToken = jobToken;
     this.credentials = credentials;
@@ -282,7 +279,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.appContext = appContext;
     this.taskResource = resource;
     this.reportedStatus = new TaskAttemptStatus();
-    this.processorDescriptor = processorDescriptor;
     initTaskAttemptStatus(reportedStatus);
     RackResolver.init(conf);
     this.stateMachine = stateMachineFactory.make(this);
@@ -315,13 +311,12 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   TezTaskContext createRemoteTask() {
-    Vertex vertex = getTask().getVertex();
+    Vertex vertex = getVertex();
+    ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
     DAG dag = vertex.getDAG();
 
-    // TODO  TEZ-50 user and jobname
     return new TezEngineTaskContext(getID(), dag.getUserName(),
-        dag.getName(), getTask()
-        .getVertex().getName(), processorDescriptor,
+        dag.getName(), vertex.getName(), procDesc,
         vertex.getInputSpecList(), vertex.getOutputSpecList());
   }
 
@@ -528,6 +523,11 @@ public class TaskAttemptImpl implements TaskAttempt,
         .getVertex(attemptId.getTaskID().getVertexID())
         .getTask(attemptId.getTaskID());
   }
+  
+  Vertex getVertex() {
+    return appContext.getDAG()
+        .getVertex(attemptId.getTaskID().getVertexID());
+  }
 
   @SuppressWarnings("unchecked")
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 240c7bf..e83fe17 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -86,7 +87,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
 
   protected final Configuration conf;
-  protected final int partition;
   protected final TaskAttemptListener taskAttemptListener;
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final EventHandler eventHandler;
@@ -104,7 +104,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   protected boolean encryptedShuffle;
   protected Credentials credentials;
   protected Token<JobTokenIdentifier> jobToken;
-  protected ProcessorDescriptor processorDescriptor;
   protected TaskLocationHint locationHint;
   protected Resource taskResource;
   protected Map<String, LocalResource> localResources;
@@ -274,18 +273,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
-  public TaskImpl(TezVertexID vertexId, int partition,
+  public TaskImpl(TezVertexID vertexId, int taskIndex,
       EventHandler eventHandler, Configuration conf,
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
-      // TODO Recovery
-      //Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun,
-      //int startCount,
-      // TODO Metrics
-      //MRAppMetrics metrics,
       TaskHeartbeatHandler thh, AppContext appContext,
-      ProcessorDescriptor processorDescriptor,
       boolean leafVertex, TaskLocationHint locationHint, Resource resource,
       Map<String, LocalResource> localResources,
       Map<String, String> environment,
@@ -296,24 +289,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     readLock = readWriteLock.readLock();
     writeLock = readWriteLock.writeLock();
     this.attempts = Collections.emptyMap();
-    // TODO TEZ-47 get from conf or API
     maxAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS,
                               TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT);
-    taskId = new TezTaskID(vertexId, partition);
-    this.partition = partition;
+    taskId = new TezTaskID(vertexId, taskIndex);
     this.taskAttemptListener = taskAttemptListener;
     this.taskHeartbeatHandler = thh;
     this.eventHandler = eventHandler;
     this.credentials = credentials;
     this.jobToken = jobToken;
-    // TODO Metrics
-    //this.metrics = metrics;
     this.appContext = appContext;
-    // TODO Security
     this.encryptedShuffle = false;
-    //conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
-     //                                       MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
-    this.processorDescriptor = processorDescriptor;
 
     this.leafVertex = leafVertex;
     this.locationHint = locationHint;
@@ -321,42 +306,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     this.localResources = localResources;
     this.environment = environment;
     this.javaOpts = javaOpts;
-    // TODO: Recovery
-    /*
-    // See if this is from a previous generation.
-    if (completedTasksFromPreviousRun != null
-        && completedTasksFromPreviousRun.containsKey(taskId)) {
-      // This task has TaskAttempts from previous generation. We have to replay
-      // them.
-      LOG.info("Task is from previous run " + taskId);
-      TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId);
-      Map<TaskAttemptID, TaskAttemptInfo> allAttempts =
-          taskInfo.getAllTaskAttempts();
-      taskAttemptsFromPreviousGeneration = new ArrayList<TaskAttemptInfo>();
-      taskAttemptsFromPreviousGeneration.addAll(allAttempts.values());
-      Collections.sort(taskAttemptsFromPreviousGeneration,
-        RECOVERED_ATTEMPTS_COMPARATOR);
-    }
-
-    if (taskAttemptsFromPreviousGeneration.isEmpty()) {
-      // All the previous attempts are exhausted, now start with a new
-      // generation.
-
-      // All the new TaskAttemptIDs are generated based on MR
-      // ApplicationAttemptID so that attempts from previous lives don't
-      // over-step the current one. This assumes that a task won't have more
-      // than 1000 attempts in its single generation, which is very reasonable.
-      // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts
-      // and requires serious medical attention.
-      nextAttemptNumber = (startCount - 1) * 1000;
-    } else {
-      // There are still some TaskAttempts from previous generation, use them
-      nextAttemptNumber =
-          taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
-    }
-     */
-    // This "this leak" is okay because the retained pointer is in an
-    //  instance variable.
     stateMachine = stateMachineFactory.make(this);
   }
 
@@ -592,7 +541,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   // TODO remove hacky name lookup
   @Override
   public boolean needsWaitAfterOutputConsumable() {
-    if (processorDescriptor.getClassName().contains("InitialTaskWithInMemSort")) {
+    Vertex vertex = getVertex();
+    ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
+    if (processorDescriptor != null && 
+        processorDescriptor.getClassName().contains("InitialTaskWithInMemSort")) {
       return true;
     } else {
       return false;
@@ -612,9 +564,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
-        taskAttemptListener, 0, conf,
+        taskAttemptListener, conf,
         jobToken, credentials, clock, taskHeartbeatHandler,
-        appContext, processorDescriptor, locationHint, taskResource,
+        appContext, locationHint, taskResource,
         localResources, environment, javaOpts, (failedAttempts>0));
   }
 
@@ -738,8 +690,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       if (attempt.getFinishTime() != 0 && attempt.getLaunchTime() != 0)
         runTime = (int) (attempt.getFinishTime() - attempt.getLaunchTime());
 
+      // TODO TEZ-347. Get this event from Task instead of generating here
+      long dataSize = getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue();
       TezDependentTaskCompletionEvent tce = new TezDependentTaskCompletionEvent(
-          -1, attemptId, status, url, runTime);
+          -1, attemptId, status, url, runTime, dataSize);
 
       // raise the event to job so that it adds the completion event to its
       // data structures

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 41db1fe..df46765 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,7 +58,6 @@ import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -107,6 +107,7 @@ import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 
 /** Implementation of Vertex interface. Maintains the state machines of Vertex.
@@ -146,7 +147,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private final AppContext appContext;
 
   private boolean lazyTasksCopyNeeded = false;
-  volatile Map<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>();
+  // must be a linked map for ordering
+  volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>();
+  private List<byte[]> taskUserPayloads = null;
   private Object fullCountersLock = new Object();
   private TezCounters fullCounters = null;
   private Resource taskResource;
@@ -505,7 +508,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
-      int fromEventId, int maxEvents) {
+      TezTaskAttemptID attemptID, int fromEventId, int maxEvents) {
     TezDependentTaskCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
     readLock.lock();
     try {
@@ -514,6 +517,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             (sourceTaskAttemptCompletionEvents.size() - fromEventId));
         events = sourceTaskAttemptCompletionEvents.subList(fromEventId,
             actualMax + fromEventId).toArray(events);
+        // create a copy if user payload is different per task
+        if(taskUserPayloads != null && events.length > 0) {
+          int taskId = attemptID.getTaskID().getId();
+          byte[] userPayload = taskUserPayloads.get(taskId);
+          TezDependentTaskCompletionEvent event = events[0].clone();
+          event.setUserPayload(userPayload);
+          events[0] = event;
+        }
       }
       return events;
     } finally {
@@ -631,9 +642,64 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   @Override
   public void scheduleTasks(Collection<TezTaskID> taskIDs) {
-    for (TezTaskID taskID : taskIDs) {
-      eventHandler.handle(new TaskEvent(taskID,
-          TaskEventType.T_SCHEDULE));
+    readLock.lock();
+    try {
+      for (TezTaskID taskID : taskIDs) {
+        eventHandler.handle(new TaskEvent(taskID,
+            TaskEventType.T_SCHEDULE));
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  @Override
+  public void setParallelism(int parallelism, List<byte[]> taskUserPayloads) {
+    writeLock.lock();
+    try {
+      Preconditions.checkArgument(
+          taskUserPayloads == null || taskUserPayloads.size() == parallelism,
+          "Userpayload must be set for all tasks or set to null");
+      if (parallelism >= numTasks) {
+        // not that hard to support perhaps. but checking right now since there
+        // is no use case for it and checking may catch other bugs.
+        throw new TezUncheckedException(
+            "Increasing parallelism is not supported");
+      }
+      if (parallelism == numTasks) {
+        LOG.info("Ingoring setParallelism to current value: " + parallelism);
+        return;
+      }
+
+      LOG.info("Vertex " + getVertexId() + " parallelism set to " + parallelism);
+      // assign to local variable of LinkedHashMap to make sure that changing
+      // type of task causes compile error. We depend on LinkedHashMap for order
+      LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
+      Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
+          .iterator();
+      int i = 0;
+      while (iter.hasNext()) {
+        i++;
+        Map.Entry<TezTaskID, Task> entry = iter.next();
+        Task task = entry.getValue();
+        if (task.getState() != TaskState.NEW) {
+          throw new TezUncheckedException(
+              "All tasks must be in initial state when changing parallelism"
+                  + " for vertex: " + getVertexId() + " name: " + getName());
+        }
+        if (i <= parallelism) {
+          continue;
+        }
+        LOG.info("Removing task: " + entry.getKey());
+        iter.remove();
+      }
+      this.numTasks = parallelism;
+      if (taskUserPayloads != null) {
+        this.taskUserPayloads = new ArrayList<byte[]>(taskUserPayloads);
+      }
+      assert tasks.size() == numTasks;
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -687,7 +753,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   protected void addTask(Task task) {
     synchronized (tasksSyncHandle) {
       if (lazyTasksCopyNeeded) {
-        Map<TezTaskID, Task> newTasks = new LinkedHashMap<TezTaskID, Task>();
+        LinkedHashMap<TezTaskID, Task> newTasks = new LinkedHashMap<TezTaskID, Task>();
         newTasks.putAll(tasks);
         tasks = newTasks;
         lazyTasksCopyNeeded = false;
@@ -887,42 +953,28 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         // create the Tasks but don't start them yet
         createTasks(vertex);
 
-        if (vertex.conf.getBoolean(
-            TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING,
-            TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT)) {
-          LOG.info("Using immediate start vertex scheduler due to aggressive scheduling");
-          vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
-        } else {
-          boolean hasBipartite = false;
-          if (vertex.sourceVertices != null) {
-            for (EdgeProperty edgeProperty : vertex.sourceVertices.values()) {
-              if (edgeProperty.getConnectionPattern() == ConnectionPattern.BIPARTITE) {
-                hasBipartite = true;
-                break;
-              }
+        boolean hasBipartite = false;
+        if (vertex.sourceVertices != null) {
+          for (EdgeProperty edgeProperty : vertex.sourceVertices.values()) {
+            if (edgeProperty.getConnectionPattern() == ConnectionPattern.BIPARTITE) {
+              hasBipartite = true;
+              break;
             }
           }
+        }
 
-          if (hasBipartite) {
-            // setup vertex scheduler
-            // TODO this needs to consider data size and perhaps API.
-            // Currently implicitly BIPARTITE is the only edge type
-            vertex.vertexScheduler = new BipartiteSlowStartVertexScheduler(
-                vertex,
-                vertex.conf
-                    .getFloat(
-                        TezConfiguration.TEZ_AM_SLOWSTART_VERTEX_SCHEDULER_MIN_SRC_FRACTION,
-                        TezConfiguration.TEZ_AM_SLOWSTART_VERTEX_SCHEDULER_MIN_SRC_FRACTION_DEFAULT),
-                vertex.conf
-                    .getFloat(
-                        TezConfiguration.TEZ_AM_SLOWSTART_VERTEX_SCHEDULER_MAX_SRC_FRACTION,
-                        TezConfiguration.TEZ_AM_SLOWSTART_VERTEX_SCHEDULER_MAX_SRC_FRACTION_DEFAULT));
-          } else {
-            // schedule all tasks upon vertex start
-            vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
-          }
+        if (hasBipartite) {
+          // setup vertex scheduler
+          // TODO this needs to consider data size and perhaps API.
+          // Currently implicitly BIPARTITE is the only edge type
+          vertex.vertexScheduler = new ShuffleVertexManager(vertex);
+        } else {
+          // schedule all tasks upon vertex start
+          vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
         }
 
+        vertex.vertexScheduler.initialize(vertex.conf);
+
         // FIXME how do we decide vertex needs a committer?
         // Answer: Do commit for every vertex
         // for now, only for leaf vertices
@@ -975,7 +1027,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                 vertex.clock,
                 vertex.taskHeartbeatHandler,
                 vertex.appContext,
-                vertex.processorDescriptor,
                 vertex.targetVertices.isEmpty(),
                 locHint, vertex.taskResource,
                 vertex.localResources,
@@ -1160,6 +1211,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       TezTaskID taskId = attemptId.getTaskID();
       //make the previous completion event as obsolete if it exists
       if (TezDependentTaskCompletionEvent.Status.SUCCEEDED.equals(tce.getStatus())) {
+        vertex.vertexScheduler.onSourceTaskCompleted(attemptId, tce);
         Object successEventNo =
             vertex.successSourceAttemptCompletionEventNoMap.remove(taskId);
         if (successEventNo != null) {
@@ -1170,7 +1222,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         vertex.successSourceAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
       }
 
-      vertex.vertexScheduler.onSourceTaskCompleted(attemptId);
     }
   }
 
@@ -1251,15 +1302,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         taskKilled(vertex, task);
       }
 
-      vertex.vertexScheduler.onVertexCompleted();
       VertexState state = VertexImpl.checkVertexForCompletion(vertex);
       if(state == VertexState.RUNNING && forceTransitionToKillWait){
         return VertexState.TERMINATING;
       }
 
-      if(state == VertexState.SUCCEEDED) {
-        vertex.vertexScheduler.onVertexCompleted();
-      }
       return state;
     }
 
@@ -1366,6 +1413,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public int getOutputVerticesCount() {
     return this.targetVertices.size();
   }
+  
+  @Override
+  public ProcessorDescriptor getProcessorDescriptor() {
+    return processorDescriptor;
+  }
+
+  @Override
+  public DAG getDAG() {
+    return appContext.getDAG();
+  }
 
   private TezDAGID getDAGId() {
     return getDAG().getID();
@@ -1378,12 +1435,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public Resource getTaskResource() {
     return taskResource;
   }
-
-  @Override
-  public DAG getDAG() {
-    return appContext.getDAG();
-  }
-
   @VisibleForTesting
   String getProcessorName() {
     return this.processorDescriptor.getClassName();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index d3568a5..efe37bf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -477,7 +477,6 @@ public class TestDAGImpl {
   @Before
   public void setup() {
     conf = new Configuration();
-    conf.setBoolean(TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING, false);
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     appAttemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(100, 1), 1);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 2cc00dc..b2f7cbd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezTaskContext;
-import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
@@ -91,10 +90,6 @@ import org.mockito.ArgumentCaptor;
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestTaskAttempt {
 
-  private static final ProcessorDescriptor MAP_PROCESSOR_DESC =
-      new ProcessorDescriptor(
-      "org.apache.tez.mapreduce.processor.map.MapProcessor");
-
   static public class StubbedFS extends RawLocalFileSystem {
     @Override
     public FileStatus getFileStatus(Path f) throws IOException {
@@ -134,10 +129,10 @@ public class TestTaskAttempt {
     TezTaskID taskID = new TezTaskID(
         new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        mock(TaskAttemptListener.class), 1, new Configuration(),
+        mock(TaskAttemptListener.class), new Configuration(),
         mock(Token.class), new Credentials(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
-        MAP_PROCESSOR_DESC, locationHint, Resource.newInstance(1024, 1),
+        locationHint, Resource.newInstance(1024, 1),
         new HashMap<String, LocalResource>(), new HashMap<String, String>(),
         "", false);
 
@@ -180,10 +175,10 @@ public class TestTaskAttempt {
     TezTaskID taskID = new TezTaskID(
         new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        mock(TaskAttemptListener.class), 1, new Configuration(),
+        mock(TaskAttemptListener.class), new Configuration(),
         mock(Token.class), new Credentials(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
-        MAP_PROCESSOR_DESC, locationHint, Resource.newInstance(1024, 1),
+        locationHint, Resource.newInstance(1024, 1),
         new HashMap<String, LocalResource>(), new HashMap<String, String>(),
         "", false);
     TaskAttemptImpl spyTa = spy(taImpl);
@@ -331,9 +326,9 @@ public class TestTaskAttempt {
     doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, taskConf, mock(Token.class), new Credentials(),
+        taListener, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext,
-        MAP_PROCESSOR_DESC, locationHint, resource, localResources,
+        locationHint, resource, localResources,
         environment, javaOpts, false);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
@@ -398,9 +393,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, taskConf, mock(Token.class), new Credentials(),
+        taListener, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        MAP_PROCESSOR_DESC, locationHint, resource, localResources,
+        locationHint, resource, localResources,
         environment, javaOpts, false);
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -493,9 +488,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, taskConf, mock(Token.class), new Credentials(),
+        taListener, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        MAP_PROCESSOR_DESC, locationHint, resource, localResources,
+        locationHint, resource, localResources,
         environment, javaOpts, false);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
@@ -558,9 +553,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, taskConf, mock(Token.class), new Credentials(),
+        taListener, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        MAP_PROCESSOR_DESC, locationHint, resource, localResources,
+        locationHint, resource, localResources,
         environment, javaOpts, false);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
@@ -625,9 +620,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, taskConf, mock(Token.class), new Credentials(),
+        taListener, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        MAP_PROCESSOR_DESC, locationHint, resource, localResources,
+        locationHint, resource, localResources,
         environment, javaOpts, false);
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -719,9 +714,9 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        taListener, 1, taskConf, mock(Token.class), new Credentials(),
+        taListener, taskConf, mock(Token.class), new Credentials(),
         new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
-        MAP_PROCESSOR_DESC, locationHint, resource, localResources,
+        locationHint, resource, localResources,
         environment, javaOpts, false);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
@@ -790,16 +785,16 @@ public class TestTaskAttempt {
   private class MockTaskAttemptImpl extends TaskAttemptImpl {
 
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
-        EventHandler eventHandler, TaskAttemptListener tal, int partition,
+        EventHandler eventHandler, TaskAttemptListener tal,
         Configuration conf, Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
         TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
-        ProcessorDescriptor processorDesc, TaskLocationHint locationHint,
+        TaskLocationHint locationHint,
         Resource resource, Map<String, LocalResource> localResources,
         Map<String, String> environment, String javaOpts, boolean isRescheduled) {
-      super(taskId, attemptNumber, eventHandler, tal, partition, conf,
+      super(taskId, attemptNumber, eventHandler, tal, conf,
           jobToken, credentials, clock, taskHeartbeatHandler, appContext,
-          processorDesc, locationHint, resource, localResources, environment,
+          locationHint, resource, localResources, environment,
           javaOpts, isRescheduled);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 412fcb5..35b6e7d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
@@ -50,6 +49,7 @@ import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.TaskTerminationCause;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -88,7 +88,6 @@ public class TestTaskImpl {
   private Map<String, String> environment;
   private String javaOpts;
   private boolean leafVertex;
-  private ProcessorDescriptor mapProcDesc;
 
   private MockTaskImpl mockTask;
 
@@ -113,14 +112,12 @@ public class TestTaskImpl {
     environment = new HashMap<String, String>();
     javaOpts = "";
     leafVertex = false;
-    mapProcDesc = new ProcessorDescriptor(
-        "org.apache.tez.mapreduce.processor.map.MapProcessor");
+    Vertex vertex = mock(Vertex.class);
 
     mockTask = new MockTaskImpl(vertexId, partition,
         dispatcher.getEventHandler(), conf, taskAttemptListener, jobToken,
-        credentials, clock, taskHeartbeatHandler, appContext,
-        mapProcDesc, leafVertex,
-        locationHint, taskResource, localResources, environment, javaOpts);
+        credentials, clock, taskHeartbeatHandler, appContext, leafVertex,
+        locationHint, taskResource, localResources, environment, javaOpts, vertex);
   }
 
   private TezTaskID getNewTaskID() {
@@ -375,28 +372,30 @@ public class TestTaskImpl {
   private class MockTaskImpl extends TaskImpl {
 
     private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
+    private Vertex vertex;
 
     public MockTaskImpl(TezVertexID vertexId, int partition,
         EventHandler eventHandler, Configuration conf,
         TaskAttemptListener taskAttemptListener,
         Token<JobTokenIdentifier> jobToken, Credentials credentials,
         Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
-        ProcessorDescriptor processorDesc, boolean leafVertex,
+        boolean leafVertex,
         TaskLocationHint locationHint, Resource resource,
         Map<String, LocalResource> localResources,
-        Map<String, String> environment, String javaOpts) {
+        Map<String, String> environment, String javaOpts, Vertex vertex) {
       super(vertexId, partition, eventHandler, conf, taskAttemptListener,
-          jobToken, credentials, clock, thh, appContext, processorDesc,
+          jobToken, credentials, clock, thh, appContext,
           leafVertex, locationHint, resource, localResources, environment,
           javaOpts);
+      this.vertex = vertex;
     }
 
     @Override
     protected TaskAttemptImpl createAttempt(int attemptNumber) {
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
-          attemptNumber, eventHandler, taskAttemptListener, attemptNumber,
+          attemptNumber, eventHandler, taskAttemptListener,
           conf, jobToken, credentials, clock, taskHeartbeatHandler, appContext,
-          processorDescriptor, locationHint, taskResource, localResources,
+          locationHint, taskResource, localResources,
           environment, javaOpts, true);
       taskAttempts.add(attempt);
       return attempt;
@@ -415,6 +414,11 @@ public class TestTaskImpl {
     List<MockTaskAttemptImpl> getAttemptList() {
       return taskAttempts;
     }
+    
+    @Override
+    public Vertex getVertex() {
+      return vertex;
+    }
 
     protected void logJobHistoryTaskStartedEvent() {
     }
@@ -433,15 +437,15 @@ public class TestTaskImpl {
     private TaskAttemptState state = TaskAttemptState.NEW;
 
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
-        EventHandler eventHandler, TaskAttemptListener tal, int partition,
+        EventHandler eventHandler, TaskAttemptListener tal, 
         Configuration conf, Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock, TaskHeartbeatHandler thh,
-        AppContext appContext, ProcessorDescriptor processorDesc,
+        AppContext appContext,
         TaskLocationHint locationHing, Resource resource,
         Map<String, LocalResource> localResources,
         Map<String, String> environment, String javaOpts, boolean isRescheduled) {
-      super(taskId, attemptNumber, eventHandler, tal, partition, conf,
-          jobToken, credentials, clock, thh, appContext, processorDesc,
+      super(taskId, attemptNumber, eventHandler, tal, conf,
+          jobToken, credentials, clock, thh, appContext,
           locationHing, resource, localResources, environment, javaOpts,
           isRescheduled);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e368ede8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 23d5b82..f2a7be7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -22,7 +22,9 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -69,6 +71,7 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
@@ -505,7 +508,6 @@ public class TestVertexImpl {
   public void setup() {
     conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
-    conf.setBoolean(TezConfiguration.TEZ_AM_AGGRESSIVE_SCHEDULING, false);
     appAttemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(100, 1), 1);
     dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
@@ -649,6 +651,47 @@ public class TestVertexImpl {
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
   }
+  
+  @Test//(timeout = 5000)
+  public void testVertexSetParallelism() {
+    VertexImpl v2 = vertices.get("vertex2");
+    initVertex(v2);
+    Assert.assertEquals(2, v2.getTotalTasks());
+    Map<TezTaskID, Task> tasks = v2.getTasks();
+    Assert.assertEquals(2, tasks.size());
+    TezTaskID firstTask = tasks.keySet().iterator().next();
+    
+    startVertex(v2);
+    
+    byte[] payload = new byte[0];
+    List<byte[]> taskPayloads = Collections.singletonList(payload);
+    v2.setParallelism(1, taskPayloads);
+    Assert.assertEquals(1, v2.getTotalTasks());
+    Assert.assertEquals(1, tasks.size());
+    // the last one is removed
+    Assert.assertTrue(tasks.keySet().iterator().next().equals(firstTask));
+    
+    VertexImpl v1 = vertices.get("vertex1");
+    TezTaskID t1_v1 = new TezTaskID(v1.getVertexId(), 0);
+    TezTaskAttemptID ta1_t1_v1 = new TezTaskAttemptID(t1_v1, 0);
+
+    TezDependentTaskCompletionEvent cEvt1 =
+        new TezDependentTaskCompletionEvent(1, ta1_t1_v1,
+            Status.SUCCEEDED, "", 3, 0);
+    v2.handle( 
+        new VertexEventSourceTaskAttemptCompleted(v2.getVertexId(), cEvt1));
+
+    TezTaskID t1_v2 = new TezTaskID(v2.getVertexId(), 0);
+    TezTaskAttemptID ta1_t1_v2 = new TezTaskAttemptID(t1_v2, 0);
+    TezDependentTaskCompletionEvent[] events =
+        v2.getTaskAttemptCompletionEvents(ta1_t1_v2, 0, 100);
+    Assert.assertEquals(1, events.length);
+    TezDependentTaskCompletionEvent clone = events[0]; 
+    // payload must be present in the first event
+    Assert.assertEquals(payload, clone.getUserPayload());
+    // event must be a copy
+    Assert.assertFalse(cEvt1 == clone);
+  }
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
@@ -852,7 +895,7 @@ public class TestVertexImpl {
     VertexImpl v6 = vertices.get("vertex6");
     initVertex(v6);
     Assert.assertTrue(v6.getVertexScheduler()
-        instanceof BipartiteSlowStartVertexScheduler);
+        instanceof ShuffleVertexManager);
   }
 
   @SuppressWarnings("unchecked")
@@ -937,6 +980,7 @@ public class TestVertexImpl {
     TezTaskID t2_v4 = new TezTaskID(v4.getVertexId(), 1);
     TezTaskID t1_v5 = new TezTaskID(v5.getVertexId(), 0);
     TezTaskID t2_v5 = new TezTaskID(v5.getVertexId(), 1);
+    TezTaskID t1_v6 = new TezTaskID(v6.getVertexId(), 0);
 
     TezTaskAttemptID ta1_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
     TezTaskAttemptID ta2_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
@@ -944,25 +988,26 @@ public class TestVertexImpl {
     TezTaskAttemptID ta1_t1_v5 = new TezTaskAttemptID(t1_v5, 0);
     TezTaskAttemptID ta1_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
     TezTaskAttemptID ta2_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
+    TezTaskAttemptID ta1_t1_v6 = new TezTaskAttemptID(t1_v6, 0);
 
     TezDependentTaskCompletionEvent cEvt1 =
         new TezDependentTaskCompletionEvent(1, ta1_t1_v4,
-            Status.FAILED, "", 3);
+            Status.FAILED, "", 3, 0);
     TezDependentTaskCompletionEvent cEvt2 =
         new TezDependentTaskCompletionEvent(2, ta2_t1_v4,
-            Status.SUCCEEDED, "", 4);
+            Status.SUCCEEDED, "", 4, 1);
     TezDependentTaskCompletionEvent cEvt3 =
         new TezDependentTaskCompletionEvent(2, ta1_t2_v4,
-            Status.SUCCEEDED, "", 5);
+            Status.SUCCEEDED, "", 5, 2);
     TezDependentTaskCompletionEvent cEvt4 =
         new TezDependentTaskCompletionEvent(2, ta1_t1_v5,
-            Status.SUCCEEDED, "", 5);
+            Status.SUCCEEDED, "", 5, 3);
     TezDependentTaskCompletionEvent cEvt5 =
         new TezDependentTaskCompletionEvent(1, ta1_t2_v5,
-            Status.FAILED, "", 3);
+            Status.FAILED, "", 3, 4);
     TezDependentTaskCompletionEvent cEvt6 =
         new TezDependentTaskCompletionEvent(2, ta2_t2_v5,
-            Status.SUCCEEDED, "", 4);
+            Status.SUCCEEDED, "", 4, 5);
 
     v4.handle(new VertexEventTaskAttemptCompleted(cEvt1));
     v4.handle(new VertexEventTaskAttemptCompleted(cEvt2));
@@ -982,7 +1027,8 @@ public class TestVertexImpl {
 
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
     Assert.assertEquals(4, v6.successSourceAttemptCompletionEventNoMap.size());
-    Assert.assertEquals(6, v6.getTaskAttemptCompletionEvents(0, 100).length);
+    Assert.assertEquals(6,
+        v6.getTaskAttemptCompletionEvents(ta1_t1_v6, 0, 100).length);
 
   }
 


Mime
View raw message