tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] tez git commit: TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts instead of tasks (bikas)
Date Wed, 05 Aug 2015 18:06:16 GMT
Repository: tez
Updated Branches:
  refs/heads/master cc1d89cba -> 7b45e9a14


http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index b164a6d..d59439e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.*;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -34,9 +33,10 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,13 +44,13 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
 
 @SuppressWarnings("unchecked")
 public class TestInputReadyVertexManager {
   
   @Captor
-  ArgumentCaptor<List<TaskWithLocationHint>> requestCaptor;
+  ArgumentCaptor<List<ScheduleTaskRequest>> requestCaptor;
   
   @Before
   public void init() {
@@ -77,23 +77,23 @@ public class TestInputReadyVertexManager {
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
     mockInputVertices.put(mockSrcVertexId1, eProp1);
-    
-    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
-    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-    
+
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
     verify(mockContext, times(1)).vertexReconfigurationPlanned();
     // source vertex configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     verify(mockContext, times(1)).doneReconfiguringVertex();
-    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
     // then own vertex started
-    manager.onVertexStarted(initialCompletions);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
-    verify(mockContext, times(0)).scheduleVertexTasks(anyList());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
-    verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onVertexStarted(Collections.singletonList(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    verify(mockContext, times(0)).scheduleTasks(anyList());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(2, requestCaptor.getValue().size());
   }
   
@@ -118,36 +118,36 @@ public class TestInputReadyVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
     mockInputVertices.put(mockSrcVertexId1, eProp1);
     
-    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
-    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-    
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
     verify(mockContext, times(1)).vertexReconfigurationPlanned();
     // source vertex configured
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     verify(mockContext, times(1)).doneReconfiguringVertex();
-    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
-    manager.onVertexStarted(initialCompletions);
-    verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
+    manager.onVertexStarted(Collections.singletonList(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+    verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(0, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
-    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
-    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(2, requestCaptor.getValue().get(0)
@@ -175,28 +175,28 @@ public class TestInputReadyVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
     mockInputVertices.put(mockSrcVertexId1, eProp1);
     
-    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
-    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-    
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
     verify(mockContext, times(1)).vertexReconfigurationPlanned();
-    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
     // ok to have source task complete before anything else
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     // first own vertex started
-    manager.onVertexStarted(initialCompletions);
+    manager.onVertexStarted(Collections.singletonList(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
     // no scheduling as we are not configured yet
-    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
     // then source vertex configured. now we start
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     verify(mockContext, times(1)).doneReconfiguringVertex();
     
-    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
-    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+    verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(2, requestCaptor.getValue().get(0)
@@ -247,7 +247,7 @@ public class TestInputReadyVertexManager {
     mockInputVertices.put(mockSrcVertexId2, eProp2);
     mockInputVertices.put(mockSrcVertexId3, eProp3);
     
-    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+    List<TaskAttemptIdentifier> initialCompletions = Lists.newArrayList();
     
     // 1-1 sources do not match managed tasks. setParallelism called to make them match
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
@@ -280,8 +280,8 @@ public class TestInputReadyVertexManager {
     
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
     
-    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-    initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0));
+    initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
     verify(mockContext, times(3)).vertexReconfigurationPlanned();
@@ -293,44 +293,53 @@ public class TestInputReadyVertexManager {
     verify(mockContext, times(2)).doneReconfiguringVertex();
     manager.onVertexStarted(initialCompletions);
     // all 1-1 0's done but not scheduled because v1 is not done
-    manager.onSourceTaskCompleted(mockSrcVertexId3, 0);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 1); // duplicate
-    manager.onSourceTaskCompleted(mockSrcVertexId2, 1);
-    verify(mockContext, times(0)).scheduleVertexTasks(anyList());
-    manager.onSourceTaskCompleted(mockSrcVertexId1, 2); // v1 done
-    verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // duplicate
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+    verify(mockContext, times(0)).scheduleTasks(anyList());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); // v1 done
+    verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(0, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
     // 1-1 completion triggers since other 1-1 is done
-    manager.onSourceTaskCompleted(mockSrcVertexId3, 1);
-    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 1));
+    verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(1, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
     // 1-1 completion does not trigger since other 1-1 is not done
-    manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
-    verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2));
+    verify(mockContext, times(2)).scheduleTasks(anyList());
     // 1-1 completion trigger start
-    manager.onSourceTaskCompleted(mockSrcVertexId2, 2);
-    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
+    verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
-    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex());
     Assert.assertEquals(mockSrcVertexId2, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getVertexName());
     Assert.assertEquals(2, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
     
     // no more starts
-    manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
-    verify(mockContext, times(3)).scheduleVertexTasks(anyList());
+    manager.onSourceTaskCompleted(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2));
+    verify(mockContext, times(3)).scheduleTasks(anyList());
     
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 3a3c71b..18d4bc1 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -38,10 +38,17 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.TaskIdentifier;
+import org.apache.tez.runtime.api.VertexIdentifier;
 import org.apache.tez.runtime.api.VertexStatistics;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -75,6 +82,10 @@ import static org.mockito.Mockito.when;
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestShuffleVertexManager {
 
+  TezVertexID vertexId = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
+  int taskId = 0;
+  List<TaskAttemptIdentifier> emptyCompletions = null;
+
   @Test(timeout = 5000)
   public void testShuffleVertexManagerAutoParallelism() throws Exception {
     Configuration conf = new Configuration();
@@ -163,12 +174,12 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) {
           Object[] args = invocation.getArguments();
           scheduledTasks.clear();
-          List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-          for (TaskWithLocationHint task : tasks) {
+          List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+          for (ScheduleTaskRequest task : tasks) {
             scheduledTasks.add(task.getTaskIndex());
           }
           return null;
-      }}).when(mockContext).scheduleVertexTasks(anyList());
+      }}).when(mockContext).scheduleTasks(anyList());
     
     final Map<String, EdgeManagerPlugin> newEdgeManagers =
         new HashMap<String, EdgeManagerPlugin>();
@@ -224,7 +235,7 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
 
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     verify(mockContext, times(2)).vertexReconfigurationPlanned();
     Assert.assertTrue(manager.bipartiteSources == 2);
     
@@ -247,7 +258,7 @@ public class TestShuffleVertexManager {
     verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start
     Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
     // trigger start and processing of pending notification events
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 2);
     verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
     Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -257,20 +268,18 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
 
-    ByteBuffer payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteString().asReadOnlyByteBuffer();
-    VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload);
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
     // parallelism not change due to large data size
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
     verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     manager.onVertexManagerEventReceived(vmEvent);
 
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
     verify(mockContext, times(2)).doneReconfiguringVertex();
     // trigger scheduling
@@ -290,49 +299,54 @@ public class TestShuffleVertexManager {
     //{5,9,12,18} in bitmap
     long[] sizes = new long[]{(0l), (1000l * 1000l),
                               (1010 * 1000l * 1000l), (50 * 1000l * 1000l)};
-    RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes);
-    DataOutputBuffer dout = new DataOutputBuffer();
-    partitionStats.serialize(dout);
-    ByteString
-        partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L)
-            .setPartitionStats(partitionStatsBytes).build().toByteString().asReadOnlyByteBuffer();
-    vmEvent = VertexManagerEvent.create("Vertex", payload);
+    vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex");
 
     manager = createManager(conf, mockContext, 0.01f, 0.75f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
+    TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0");
+    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId1));
+    manager.onVertexManagerEventReceived(vmEvent);
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
+
+    Assert.assertEquals(4, manager.stats.length);
+    Assert.assertEquals(0, manager.stats[0]); //0 MB bucket
+    Assert.assertEquals(1, manager.stats[1]); //1 MB bucket
+    Assert.assertEquals(100, manager.stats[2]); //100 MB bucket
+    Assert.assertEquals(10, manager.stats[3]); //10 MB bucket
+
+    // sending again from a different version of the same task has not impact
+    TezTaskAttemptID taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1");
+    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId2));
     manager.onVertexManagerEventReceived(vmEvent);
+    Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
 
-    Assert.assertEquals(manager.stats.length, 4);
-    Assert.assertEquals(manager.stats[0], 0); //0 MB bucket
-    Assert.assertEquals(manager.stats[1], 1); //1 MB bucket
-    Assert.assertEquals(manager.stats[2], 100); //100 MB bucket
-    Assert.assertEquals(manager.stats[3], 10); //10 MB bucket
+    Assert.assertEquals(4, manager.stats.length);
+    Assert.assertEquals(0, manager.stats[0]); //0 MB bucket
+    Assert.assertEquals(1, manager.stats[1]); //1 MB bucket
+    Assert.assertEquals(100, manager.stats[2]); //100 MB bucket
+    Assert.assertEquals(10, manager.stats[3]); //10 MB bucket
 
     /**
      * Test for TEZ-978
      * Delay determining parallelism until enough data has been received.
      */
     scheduledTasks.clear();
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L).build().toByteString().asReadOnlyByteBuffer();
-    vmEvent = VertexManagerEvent.create("Vertex", payload);
 
     //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself.
     manager = createManager(conf, mockContext, 0.01f, 0.75f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     //First task in src1 completed with small payload
+    vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent); //small payload
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.determineParallelismAndApply() == false);
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
@@ -341,8 +355,9 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
 
     //Second task in src1 completed with small payload
+    vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent); //small payload
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later
     Assert.assertTrue(manager.determineParallelismAndApply() == false);
     Assert.assertEquals(4, manager.pendingTasks.size());
@@ -352,17 +367,14 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
 
     //First task in src2 completed (with larger payload) to trigger determining parallelism
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(1200L).build().toByteString()
-            .asReadOnlyByteBuffer();
-    vmEvent = VertexManagerEvent.create("Vertex", payload);
+    vmEvent = getVertexManagerEvent(null, 1200L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent);
     Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
     verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertEquals(1, manager.pendingTasks.size());
     Assert.assertEquals(1, scheduledTasks.size());
     Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
@@ -374,14 +386,11 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
     scheduledTasks.clear();
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(100L).build().toByteString()
-            .asReadOnlyByteBuffer();
-    vmEvent = VertexManagerEvent.create("Vertex", payload);
+    vmEvent = getVertexManagerEvent(null, 100L, "Vertex");
 
     //min/max fraction of 0.0/0.2
     manager = createManager(conf, mockContext, 0.0f, 0.2f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
@@ -391,7 +400,7 @@ public class TestShuffleVertexManager {
     //send 7 events with payload size as 100
     for(int i=0;i<7;i++) {
       manager.onVertexManagerEventReceived(vmEvent); //small payload
-      manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i));
+      manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i));
       //should not change parallelism
       verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
     }
@@ -401,7 +410,7 @@ public class TestShuffleVertexManager {
     //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
 
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(8));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
     //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
     verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class),
         anyMap());
@@ -413,41 +422,39 @@ public class TestShuffleVertexManager {
 
     // parallelism changed due to small data size
     scheduledTasks.clear();
-    payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer();
-    vmEvent = VertexManagerEvent.create("Vertex", payload);
 
     manager = createManager(conf, mockContext, 0.5f, 0.5f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     // task completion from non-bipartite stage does nothing
-    manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
     Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+    vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
     Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
     Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
     // ignore duplicate completion
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertEquals(4, manager.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
     Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
     Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
-    
+    vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
     manager.onVertexManagerEventReceived(vmEvent);
     //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
 
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     // managedVertex tasks reduced
     verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
@@ -461,7 +468,7 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(1000L, manager.completedSourceTasksOutputSize);
     
     // more completions dont cause recalculation of parallelism
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
     Assert.assertEquals(2, newEdgeManagers.size());
     
@@ -537,7 +544,7 @@ public class TestShuffleVertexManager {
     mockInputVertices.put(mockSrcVertexId3, eProp3);
     try {
       manager = createManager(conf, mockContext, 0.1f, 0.1f);
-      manager.onVertexStarted(null);
+      manager.onVertexStarted(emptyCompletions);
       Assert.assertFalse(true);
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains(
@@ -549,7 +556,7 @@ public class TestShuffleVertexManager {
     
     // check initialization
     manager = createManager(conf, mockContext, 0.1f, 0.1f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 2);
 
     final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
@@ -557,17 +564,17 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) {
           Object[] args = invocation.getArguments();
           scheduledTasks.clear();
-          List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-          for (TaskWithLocationHint task : tasks) {
+          List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+          for (ScheduleTaskRequest task : tasks) {
             scheduledTasks.add(task.getTaskIndex());
           }
           return null;
-      }}).when(mockContext).scheduleVertexTasks(anyList());
+      }}).when(mockContext).scheduleTasks(anyList());
     
     // source vertices have 0 tasks. immediate start of all managed tasks
     when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
     when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     
@@ -594,7 +601,7 @@ public class TestShuffleVertexManager {
     
     // source vertex have some tasks. min, max == 0
     manager = createManager(conf, mockContext, 0, 0);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.totalTasksToSchedule == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
@@ -607,122 +614,122 @@ public class TestShuffleVertexManager {
     
     // min, max > 0 and min == max
     manager = createManager(conf, mockContext, 0.25f, 0.25f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     // task completion from non-bipartite stage does nothing
-    manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
     
     // min, max > 0 and min == max == absolute max 1.0
     manager = createManager(conf, mockContext, 1.0f, 1.0f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     // task completion from non-bipartite stage does nothing
-    manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
     
     // min, max > 0 and min == max
     manager = createManager(conf, mockContext, 1.0f, 1.0f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     // task completion from non-bipartite stage does nothing
-    manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3);
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.isEmpty());
     Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
     
     // min, max > and min < max
     manager = createManager(conf, mockContext, 0.25f, 0.75f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 2);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
     // completion of same task again should not get counted
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 2);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
     scheduledTasks.clear();
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); // we are done. no action
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
 
     // min, max > and min < max
     manager = createManager(conf, mockContext, 0.25f, 1.0f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 2);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-    manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 1);
     Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-    manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 0);
     Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
     Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
@@ -732,9 +739,10 @@ public class TestShuffleVertexManager {
 
   /**
    * Tasks should be scheduled only when all source vertices are configured completely
+   * @throws IOException 
    */
   @Test(timeout = 5000)
-  public void test_Tez1649_with_scatter_gather_edges() {
+  public void test_Tez1649_with_scatter_gather_edges() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(
         ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
@@ -778,10 +786,7 @@ public class TestShuffleVertexManager {
     when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
     when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
 
-    ByteBuffer payload =
-        VertexManagerEventPayloadProto.newBuilder().setOutputSize(50L).build().toByteString().asReadOnlyByteBuffer();
-    VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload);
-
+    VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex");
     // check initialization
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
 
@@ -790,14 +795,14 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) {
         Object[] args = invocation.getArguments();
         scheduledTasks.clear();
-        List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-        for (TaskWithLocationHint task : tasks) {
+        List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+        for (ScheduleTaskRequest task : tasks) {
           scheduledTasks.add(task.getTaskIndex());
         }
         return null;
-      }}).when(mockContext_R2).scheduleVertexTasks(anyList());
+      }}).when(mockContext_R2).scheduleTasks(anyList());
 
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 3);
     manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
@@ -811,15 +816,15 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
 
     //Send events for all tasks of m3.
-    manager.onSourceTaskCompleted(m3, new Integer(0));
-    manager.onSourceTaskCompleted(m3, new Integer(1));
-    manager.onSourceTaskCompleted(m3, new Integer(2));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2));
 
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
 
     //Send an event for m2. But still we need to wait for at least 1 event from r1.
-    manager.onSourceTaskCompleted(m2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
 
@@ -848,7 +853,7 @@ public class TestShuffleVertexManager {
     when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
 
     manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
@@ -858,7 +863,7 @@ public class TestShuffleVertexManager {
 
     // Only need completed configuration notification from m3
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
-    manager.onSourceTaskCompleted(m3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
   }
@@ -885,7 +890,10 @@ public class TestShuffleVertexManager {
               .build().toByteString()
               .asReadOnlyByteBuffer();
     }
+    TaskAttemptIdentifierImpl taId = new TaskAttemptIdentifierImpl("dag", vertexName,
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, taskId++), 0));
     VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload);
+    vmEvent.setProducerAttemptIdentifier(taId);
     return vmEvent;
   }
 
@@ -940,16 +948,16 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) {
         Object[] args = invocation.getArguments();
         scheduledTasks.clear();
-        List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-        for (TaskWithLocationHint task : tasks) {
+        List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+        for (ScheduleTaskRequest task : tasks) {
           scheduledTasks.add(task.getTaskIndex());
         }
         return null;
-      }}).when(mockContext).scheduleVertexTasks(anyList());
+      }}).when(mockContext).scheduleTasks(anyList());
 
     // check initialization
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 1);
 
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
@@ -960,7 +968,7 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     //Send an event for r1.
-    manager.onSourceTaskCompleted(r1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
@@ -975,13 +983,13 @@ public class TestShuffleVertexManager {
     manager.onVertexManagerEventReceived(vmEvent); //send VM event
 
     //Send an event for m2.
-    manager.onSourceTaskCompleted(m2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
     //Send an event for m3.
     manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
-    manager.onSourceTaskCompleted(m3, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
 
@@ -1043,16 +1051,16 @@ public class TestShuffleVertexManager {
       public Object answer(InvocationOnMock invocation) {
         Object[] args = invocation.getArguments();
         scheduledTasks.clear();
-        List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
-        for (TaskWithLocationHint task : tasks) {
+        List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+        for (ScheduleTaskRequest task : tasks) {
           scheduledTasks.add(task.getTaskIndex());
         }
         return null;
-      }}).when(mockContext).scheduleVertexTasks(anyList());
+      }}).when(mockContext).scheduleTasks(anyList());
 
     // check initialization
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     Assert.assertTrue(manager.bipartiteSources == 1);
 
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
@@ -1063,13 +1071,13 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     //Send events for 2 tasks of r1.
-    manager.onSourceTaskCompleted(r1, new Integer(0));
-    manager.onSourceTaskCompleted(r1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
     //Send an event for m2.
-    manager.onSourceTaskCompleted(m2, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
@@ -1082,7 +1090,7 @@ public class TestShuffleVertexManager {
     //Still, wait for a configuration to be completed from other edges
     scheduledTasks.clear();
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
 
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
@@ -1094,9 +1102,9 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
 
-    manager.onSourceTaskCompleted(r1, new Integer(0));
-    manager.onSourceTaskCompleted(r1, new Integer(1));
-    manager.onSourceTaskCompleted(r1, new Integer(2));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 2));
     //Tasks from non-scatter edges of m2 and m3 are not complete.
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
@@ -1109,7 +1117,7 @@ public class TestShuffleVertexManager {
     //try with a zero task vertex (with non-scatter-gather edges)
     scheduledTasks.clear();
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
     when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
@@ -1118,7 +1126,7 @@ public class TestShuffleVertexManager {
     when(mockContext.getVertexNumTasks(m3)).thenReturn(3); //broadcast
 
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
 
     Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
@@ -1126,8 +1134,8 @@ public class TestShuffleVertexManager {
     Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
 
     //Send 2 events for tasks of r1.
-    manager.onSourceTaskCompleted(r1, new Integer(0));
-    manager.onSourceTaskCompleted(r1, new Integer(1));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
     Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 0);
 
@@ -1139,7 +1147,7 @@ public class TestShuffleVertexManager {
     //try with all zero task vertices in non-SG edges
     scheduledTasks.clear();
     manager = createManager(conf, mockContext, 0.001f, 0.001f);
-    manager.onVertexStarted(null);
+    manager.onVertexStarted(emptyCompletions);
     when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
     when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
@@ -1149,10 +1157,22 @@ public class TestShuffleVertexManager {
 
     //Send 1 events for tasks of r1.
     manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
-    manager.onSourceTaskCompleted(r1, new Integer(0));
+    manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
   }
+  
+  public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) {
+    VertexIdentifier mockVertex = mock(VertexIdentifier.class);
+    when(mockVertex.getName()).thenReturn(vName);
+    TaskIdentifier mockTask = mock(TaskIdentifier.class);
+    when(mockTask.getIdentifier()).thenReturn(tId);
+    when(mockTask.getVertexIdentifier()).thenReturn(mockVertex);
+    TaskAttemptIdentifier mockAttempt = mock(TaskAttemptIdentifier.class);
+    when(mockAttempt.getIdentifier()).thenReturn(0);
+    when(mockAttempt.getTaskIdentifier()).thenReturn(mockTask);
+    return mockAttempt;
+  }
 
   private ShuffleVertexManager createManager(Configuration conf,
       VertexManagerPluginContext context, float min, float max) {

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 1d17b23..74efee2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import org.slf4j.Logger;
@@ -56,7 +55,7 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -71,6 +70,7 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 import org.junit.After;
@@ -522,8 +522,8 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-      super.onSourceTaskCompleted(srcVertexName, taskId);
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+      super.onSourceTaskCompleted(attempt);
       completedTaskNum ++;
       if (getContext().getDAGAttemptNumber() == 1) {
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
@@ -531,7 +531,8 @@ public class TestAMRecovery {
             System.exit(-1);
           }
         } else {
-          if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
+          if (completedTaskNum == getContext().
+              getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
             System.exit(-1);
           }
         }
@@ -561,8 +562,8 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-      super.onSourceTaskCompleted(srcVertexName, taskId);
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+      super.onSourceTaskCompleted(attempt);
       completedTaskNum ++;
       if (getContext().getDAGAttemptNumber() == 1) {
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
@@ -570,7 +571,8 @@ public class TestAMRecovery {
             System.exit(-1);
           }
         } else {
-          if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
+          if (completedTaskNum == getContext().
+              getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
             System.exit(-1);
           }
         }
@@ -601,8 +603,8 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-      super.onSourceTaskCompleted(srcVertexName, taskId);
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+      super.onSourceTaskCompleted(attempt);
       completedTaskNum ++;
       if (getContext().getDAGAttemptNumber() == 1) {
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
@@ -610,7 +612,8 @@ public class TestAMRecovery {
             System.exit(-1);
           }
         } else {
-          if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
+          if (completedTaskNum == getContext().
+              getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
             System.exit(-1);
           }
         }
@@ -642,26 +645,26 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions)
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions)
         throws Exception {
       if (getContext().getDAGAttemptNumber() == 1) {
         // only schedule one task if it is partiallyFinished case
         if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
-          getContext().scheduleVertexTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+          getContext().scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null)));
           return ;
         }
       }
       // schedule all tasks when it is not partiallyFinished
       int taskNum = getContext().getVertexNumTasks(getContext().getVertexName());
-      List<TaskWithLocationHint> taskWithLocationHints = new ArrayList<TaskWithLocationHint>();
+      List<ScheduleTaskRequest> taskWithLocationHints = new ArrayList<ScheduleTaskRequest>();
       for (int i=0;i<taskNum;++i) {
-        taskWithLocationHints.add(new TaskWithLocationHint(i, null));
+        taskWithLocationHints.add(ScheduleTaskRequest.create(i, null));
       }
-      getContext().scheduleVertexTasks(taskWithLocationHints);
+      getContext().scheduleTasks(taskWithLocationHints);
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId)
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt)
         throws Exception {
       
     }
@@ -703,9 +706,9 @@ public class TestAMRecovery {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
       int curAttempt = getContext().getDAGAttemptNumber();
-      super.onSourceTaskCompleted(srcVertexName, taskId);
+      super.onSourceTaskCompleted(attempt);
       int failOnAttempt = conf.getInt(FAIL_ON_ATTEMPT, 1);
       LOG.info("failOnAttempt:" + failOnAttempt);
       LOG.info("curAttempt:" + curAttempt);

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index 49bb9f5..b8b46cb 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -79,6 +79,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
@@ -686,7 +687,7 @@ public class TestExceptionPropagation {
     }
 
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
       if (this.exLocation == ExceptionLocation.VM_ON_VERTEX_STARTED) {
         throw new RuntimeException(this.exLocation.name());
       }
@@ -729,11 +730,11 @@ public class TestExceptionPropagation {
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
       if (this.exLocation == ExceptionLocation.VM_ON_SOURCETASK_COMPLETED) {
         throw new RuntimeException(this.exLocation.name());
       }
-      super.onSourceTaskCompleted(srcVertexName, attemptId);
+      super.onSourceTaskCompleted(attempt);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 7244d8d..5c6f855 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -40,7 +40,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
 import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
@@ -49,6 +49,7 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.InputInitializer;
@@ -57,15 +58,12 @@ import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.test.TestInput;
 import org.apache.tez.test.TestOutput;
 import org.apache.tez.test.TestProcessor;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MultiAttemptDAG {
@@ -102,14 +100,11 @@ public class MultiAttemptDAG {
     }
 
     @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
+    public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
       if (completions != null) {
-        for (Entry<String, List<Integer>> entry : completions.entrySet()) {
-          LOG.info("Received completion events on vertexStarted"
-              + ", vertex=" + entry.getKey()
-              + ", completions=" + entry.getValue().size());
-          numCompletions.addAndGet(entry.getValue().size());
-        }
+        LOG.info("Received completion events on vertexStarted"
+            + ", completions=" + completions.size());
+        numCompletions.addAndGet(completions.size());
       }
       maybeScheduleTasks();
     }
@@ -129,20 +124,20 @@ public class MultiAttemptDAG {
         } else if (successAttemptId == getContext().getDAGAttemptNumber()) {
           LOG.info("Scheduling tasks for vertex=" + getContext().getVertexName());
           int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
-          List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
+          List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
           for (int i=0; i<numTasks; ++i) {
-            scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
+            scheduledTasks.add(ScheduleTaskRequest.create(i, null));
           }
-          getContext().scheduleVertexTasks(scheduledTasks);
+          getContext().scheduleTasks(scheduledTasks);
         }
       }
     }
 
     @Override
-    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
       LOG.info("Received completion events for source task"
-          + ", vertex=" + srcVertexName
-          + ", taskIdx=" + taskId);
+          + ", vertex=" + attempt.getTaskIdentifier().getVertexIdentifier().getName()
+          + ", taskIdx=" + attempt.getTaskIdentifier().getIdentifier());
       numCompletions.incrementAndGet();
       maybeScheduleTasks();
     }


Mime
View raw message