tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2421. Deadlock in AM because attempt and vertex locking each other out (zjffdu)
Date Mon, 11 May 2015 05:52:18 GMT
Repository: tez
Updated Branches:
  refs/heads/master ce69aa1e2 -> ed7f1abbc


TEZ-2421. Deadlock in AM because attempt and vertex locking each other out (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ed7f1abb
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ed7f1abb
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ed7f1abb

Branch: refs/heads/master
Commit: ed7f1abbce54093f56f33c35c8ac92d9e433760f
Parents: ce69aa1
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Mon May 11 13:51:59 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Mon May 11 13:51:59 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/dag/app/dag/Task.java   |   7 +-
 .../app/dag/event/TaskEventScheduleTask.java    |  42 ++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  25 ++-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  29 ++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 206 +++++++++++--------
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  14 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |  79 +++----
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |  26 +--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  27 +++
 .../apache/tez/runtime/api/impl/TaskSpec.java   |  29 +++
 11 files changed, 327 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index efb19b2..b85a8fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
     Default max limit increased. Should not affect existing users.
 
 ALL CHANGES:
+  TEZ-2421. Deadlock in AM because attempt and vertex locking each other out
   TEZ-2426. Ensure the eventRouter thread completes before switching to a new task and thread safety fixes in IPOContexts.
   TEZ-2412. Should kill vertex in DAGImpl#VertexRerunWhileCommitting
   TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index b798fce..177ee8a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -22,11 +22,13 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskReport;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
 /**
@@ -65,5 +67,8 @@ public interface Task {
   TaskState restoreFromEvent(HistoryEvent historyEvent);
 
   public void registerTezEvent(TezEvent tezEvent);
-
+  
+  public TaskSpec getBaseTaskSpec();
+  
+  public TaskLocationHint getTaskLocationHint();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
new file mode 100644
index 0000000..696602a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventScheduleTask.java
@@ -0,0 +1,42 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+public class TaskEventScheduleTask extends TaskEvent {
+  private final TaskSpec baseTaskSpec;
+  private final TaskLocationHint locationHint;
+  
+  public TaskEventScheduleTask(TezTaskID taskId, TaskSpec baseTaskSpec, TaskLocationHint locationHint) {
+    super(taskId, TaskEventType.T_SCHEDULE);
+    this.baseTaskSpec = baseTaskSpec;
+    this.locationHint = locationHint;
+  }
+  
+  public TaskSpec getBaseTaskSpec() {
+    return baseTaskSpec;
+  }
+  
+  public TaskLocationHint getTaskLocationHint() {
+    return locationHint;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index b1c0acc..036022e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -456,14 +455,19 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
-    Vertex vertex = getVertex();
-    ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
-    int taskId = getTaskID().getId();
+    TaskSpec baseTaskSpec = task.getBaseTaskSpec();
+    if (baseTaskSpec == null) {
+      // since recovery does not follow normal transitions, TaskEventScheduleTask
+      // is not being honored by the recovery code path. Using this to workaround 
+      // until recovery is fixed. Calling the non-locking internal method of the vertex
+      // to get the taskSpec directly. Since everything happens on the central dispatcher 
+      // during recovery this is deadlock free for now. TEZ-1019 should remove the need for this.
+      baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId());
+    }
     return new TaskSpec(getID(),
-        vertex.getDAG().getName(),
-        vertex.getName(), vertex.getTotalTasks(), procDesc,
-        vertex.getInputSpecList(taskId), vertex.getOutputSpecList(taskId), 
-        vertex.getGroupInputSpecList(taskId));
+        baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
+        baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
+        baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
   }
 
   @Override
@@ -935,9 +939,8 @@ public class TaskAttemptImpl implements TaskAttempt,
 //    sendEvent(new TaskCleanupEvent(this.attemptId, this.committer, taContext));
   }
   
-  @VisibleForTesting
-  protected TaskLocationHint getTaskLocationHint() {
-    return getVertex().getTaskLocationHint(getTaskID());
+  private TaskLocationHint getTaskLocationHint() {
+    return task.getTaskLocationHint();
   }
 
   protected String[] resolveHosts(String[] src) {

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 2e884e7..de5ab2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -37,7 +37,6 @@ import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -73,6 +73,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
@@ -92,6 +93,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
@@ -128,6 +130,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   //private final MRAppMetrics metrics;
   protected final AppContext appContext;
   private final Resource taskResource;
+  private TaskSpec baseTaskSpec;
+  private TaskLocationHint locationHint;
   private final ContainerContext containerContext;
   @VisibleForTesting
   long scheduledTime;
@@ -516,6 +520,26 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       readLock.unlock();
     }
   }
+  
+  @Override
+  public TaskSpec getBaseTaskSpec() {
+    readLock.lock();
+    try {
+      return baseTaskSpec;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  @Override
+  public TaskLocationHint getTaskLocationHint() {
+    readLock.lock();
+    try {
+      return locationHint;
+    } finally {
+      readLock.unlock();
+    }
+  }
 
   @Override
   public List<String> getDiagnostics() {
@@ -1021,6 +1045,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
+      TaskEventScheduleTask scheduleEvent = (TaskEventScheduleTask) event;
+      task.locationHint = scheduleEvent.getTaskLocationHint();
+      task.baseTaskSpec = scheduleEvent.getBaseTaskSpec();
       task.addAndScheduleAttempt();
       task.scheduledTime = task.clock.getTime();
       task.logJobHistoryTaskStartedEvent();

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6b208b0..80a0358 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -114,6 +114,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
@@ -171,6 +172,7 @@ import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 
@@ -1417,66 +1419,96 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
   }
   
-  void setupEdgeRouting() throws AMUserCodeException {
+  boolean setupEdgeRouting() throws AMUserCodeException {
+    boolean doOnDemand = useOnDemandRouting;
     for (Edge e : sourceVertices.values()) {
       boolean edgeDoingOnDemand = e.routingToBegin();
-      if (useOnDemandRouting && !edgeDoingOnDemand) {
-        useOnDemandRouting = false;
+      if (doOnDemand && !edgeDoingOnDemand) {
+        doOnDemand = false;
         LOG.info("Not using ondemand routing because of edge between " + e.getSourceVertexName()
             + " and " + getLogIdentifier());
       }
     }
+    return doOnDemand;
   }
   
   private void unsetTasksNotYetScheduled() throws AMUserCodeException {
     if (tasksNotYetScheduled) {
-      setupEdgeRouting();
-      tasksNotYetScheduled = false;
-      // only now can we be sure of the edge manager type. so until now
-      // we will accumulate pending tasks in case legacy routing gets used.
-      // this is only needed to support mixed mode routing. Else for
-      // on demand routing events can be directly added to taskEvents when 
-      // they arrive in handleRoutedEvents instead of first caching them in 
-      // pendingTaskEvents. When legacy routing is removed then pendingTaskEvents
-      // can be removed.
-      if (!pendingTaskEvents.isEmpty()) {
-        LOG.info("Routing pending task events for vertex: " + logIdentifier);
-        try {
-          handleRoutedTezEvents(pendingTaskEvents, false, true);
-        } catch (AMUserCodeException e) {
-          String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier;
-          LOG.error(msg, e);
-          addDiagnostic(msg + ", " + e.getMessage() + ", "
-              + ExceptionUtils.getStackTrace(e.getCause()));
-          eventHandler.handle(new VertexEventTermination(vertexId,
-              VertexTerminationCause.AM_USERCODE_FAILURE));
-          return;
+      boolean doOnDemand = setupEdgeRouting();
+      // change state under lock
+      writeLock.lock();
+      try {
+        useOnDemandRouting = doOnDemand;
+        tasksNotYetScheduled = false;
+        // only now can we be sure of the edge manager type. so until now
+        // we will accumulate pending tasks in case legacy routing gets used.
+        // this is only needed to support mixed mode routing. Else for
+        // on demand routing events can be directly added to taskEvents when 
+        // they arrive in handleRoutedEvents instead of first caching them in 
+        // pendingTaskEvents. When legacy routing is removed then pendingTaskEvents
+        // can be removed.
+        if (!pendingTaskEvents.isEmpty()) {
+          LOG.info("Routing pending task events for vertex: " + logIdentifier);
+          try {
+            handleRoutedTezEvents(pendingTaskEvents, false, true);
+          } catch (AMUserCodeException e) {
+            String msg = "Exception in " + e.getSource() + ", vertex=" + logIdentifier;
+            LOG.error(msg, e);
+            addDiagnostic(msg + ", " + e.getMessage() + ", "
+                + ExceptionUtils.getStackTrace(e.getCause()));
+            eventHandler.handle(new VertexEventTermination(vertexId,
+                VertexTerminationCause.AM_USERCODE_FAILURE));
+            return;
+          }
+          pendingTaskEvents.clear();
         }
-        pendingTaskEvents.clear();
+      } finally {
+        writeLock.unlock();
       }
     }
   }
   
+  TaskSpec createRemoteTaskSpec(int taskIndex) throws AMUserCodeException {
+    return TaskSpec.createBaseTaskSpec(getDAG().getName(),
+        getName(), getTotalTasks(), getProcessorDescriptor(),
+        getInputSpecList(taskIndex), getOutputSpecList(taskIndex), 
+        getGroupInputSpecList(taskIndex));
+  }
+  
   @Override
   public void scheduleTasks(List<TaskWithLocationHint> tasksToSchedule) {
-    writeLock.lock();
     try {
       unsetTasksNotYetScheduled();
-      for (TaskWithLocationHint task : tasksToSchedule) {
-        if (numTasks <= task.getTaskIndex().intValue()) {
-          throw new TezUncheckedException(
-              "Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier);
-        }
-        TaskLocationHint locationHint = task.getTaskLocationHint();
-        if (locationHint != null) {
-          if (taskLocationHints == null) {
-            taskLocationHints = new TaskLocationHint[numTasks];
+      // update state under write lock
+      writeLock.lock();
+      try {
+        for (TaskWithLocationHint task : tasksToSchedule) {
+          if (numTasks <= task.getTaskIndex().intValue()) {
+            throw new TezUncheckedException(
+                "Invalid taskId: " + task.getTaskIndex() + " for vertex: " + logIdentifier);
           }
-          taskLocationHints[task.getTaskIndex().intValue()] = locationHint;
+          TaskLocationHint locationHint = task.getTaskLocationHint();
+          if (locationHint != null) {
+            if (taskLocationHints == null) {
+              taskLocationHints = new TaskLocationHint[numTasks];
+            }
+            taskLocationHints[task.getTaskIndex().intValue()] = locationHint;
+          }
+        }
+      } finally {
+        writeLock.unlock();
+      }
+      
+      readLock.lock();
+      try {
+        for (TaskWithLocationHint task : tasksToSchedule) {
+          TezTaskID taskId = TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue());
+          TaskSpec baseTaskSpec = createRemoteTaskSpec(taskId.getId());
+          eventHandler.handle(new TaskEventScheduleTask(taskId, baseTaskSpec,
+              getTaskLocationHint(taskId)));
         }
-        eventHandler.handle(new TaskEvent(
-            TezTaskID.getInstance(vertexId, task.getTaskIndex().intValue()),
-            TaskEventType.T_SCHEDULE));
+      } finally {
+        readLock.unlock();
       }
     } catch (AMUserCodeException e) {
       String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
@@ -1485,8 +1517,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e));
       // throw an unchecked exception to stop the vertex manager that invoked this.
       throw new TezUncheckedException(e);
-    } finally {
-      writeLock.unlock();
     }
   }
   
@@ -4632,50 +4662,58 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return taskLocationHints;
   }
 
-  // TODO Eventually remove synchronization.
   @Override
-  public synchronized List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException {
-    List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
-        + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
-    if (rootInputDescriptors != null) {
-      for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
-           rootInputDescriptorEntry : rootInputDescriptors.entrySet()) {
-        inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(),
-            rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get(
-                rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
+  public List<InputSpec> getInputSpecList(int taskIndex) throws AMUserCodeException {
+    readLock.lock();
+    try {
+      List<InputSpec> inputSpecList = new ArrayList<InputSpec>(this.getInputVerticesCount()
+          + (rootInputDescriptors == null ? 0 : rootInputDescriptors.size()));
+      if (rootInputDescriptors != null) {
+        for (Entry<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
+             rootInputDescriptorEntry : rootInputDescriptors.entrySet()) {
+          inputSpecList.add(new InputSpec(rootInputDescriptorEntry.getKey(),
+              rootInputDescriptorEntry.getValue().getIODescriptor(), rootInputSpecs.get(
+                  rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex)));
+        }
       }
+      for(Vertex vertex : getInputVertices().keySet()) {
+        /**
+         * It is possible that setParallelism is in the middle of processing in target vertex with
+         * its write lock. So we need to get inputspec by acquiring read lock in target vertex to
+         * get consistent view.
+         * Refer TEZ-2251
+         */
+        InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
+        // TODO DAGAM This should be based on the edge type.
+        inputSpecList.add(inputSpec);
+      }
+      return inputSpecList;
+    } finally {
+      readLock.unlock();
     }
-    for(Vertex vertex : getInputVertices().keySet()) {
-      /**
-       * It is possible that setParallelism is in the middle of processing in target vertex with
-       * its write lock. So we need to get inputspec by acquiring read lock in target vertex to
-       * get consistent view.
-       * Refer TEZ-2251
-       */
-      InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex);
-      // TODO DAGAM This should be based on the edge type.
-      inputSpecList.add(inputSpec);
-    }
-    return inputSpecList;
   }
 
-  // TODO Eventually remove synchronization.
   @Override
-  public synchronized List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException {
-    List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
-        + this.additionalOutputSpecs.size());
-    outputSpecList.addAll(additionalOutputSpecs);
-    for(Vertex vertex : targetVertices.keySet()) {
-      /**
-       * It is possible that setParallelism (which could change numTasks) is in the middle of
-       * processing in target vertex with its write lock. So we need to get outputspec by
-       * acquiring read lock in target vertex to get consistent view.
-       * Refer TEZ-2251
-       */
-      OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
-      outputSpecList.add(outputSpec);
-    }
-    return outputSpecList;
+  public List<OutputSpec> getOutputSpecList(int taskIndex) throws AMUserCodeException {
+    readLock.lock();
+    try {
+      List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount()
+          + this.additionalOutputSpecs.size());
+      outputSpecList.addAll(additionalOutputSpecs);
+      for(Vertex vertex : targetVertices.keySet()) {
+        /**
+         * It is possible that setParallelism (which could change numTasks) is in the middle of
+         * processing in target vertex with its write lock. So we need to get outputspec by
+         * acquiring read lock in target vertex to get consistent view.
+         * Refer TEZ-2251
+         */
+        OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex);
+        outputSpecList.add(outputSpec);
+      }
+      return outputSpecList;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws
@@ -4703,10 +4741,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   }
 
 
-  //TODO Eventually remove synchronization.
   @Override
-  public synchronized List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
-    return groupInputSpecList;
+  public List<GroupInputSpec> getGroupInputSpecList(int taskIndex) {
+    readLock.lock();
+    try {
+      return groupInputSpecList;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index d2aa2d0..fff95b5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -93,7 +93,6 @@ import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
@@ -975,12 +974,7 @@ public class TestDAGImpl {
     dispatcher.await();
 
     VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
-    LOG.info(String.valueOf(v2.getTasks().size()));
-    Task t1= v2.getTask(0);
-    TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
-
-    Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState());
-    String diag = StringUtils.join(ta1.getDiagnostics(), ",");
+    String diag = StringUtils.join(v2.getDiagnostics(), ",");
     Assert.assertTrue(diag.contains(ExceptionLocation.GetNumDestinationTaskPhysicalInputs.name()));
   }
 
@@ -998,11 +992,7 @@ public class TestDAGImpl {
     Assert.assertEquals(DAGState.FAILED, dagWithCustomEdge.getState());
 
     VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
-    Task t1= v1.getTask(0);
-    TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskId(), 0));
-
-    Assert.assertEquals(TaskAttemptStateInternal.FAILED, ta1.getInternalState());
-    String diag = StringUtils.join(ta1.getDiagnostics(), ",");
+    String diag = StringUtils.join(v1.getDiagnostics(), ",");
     Assert.assertTrue(diag.contains(ExceptionLocation.GetNumSourceTaskPhysicalOutputs.name()));
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 60c4c88..86251cc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -65,6 +65,7 @@ import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -100,6 +101,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -113,11 +115,19 @@ public class TestTaskAttempt {
       return new FileStatus(1, false, 1, 1, 1, f);
     }
   }
+  
+  Task mockTask;
+  TaskLocationHint locationHint;
 
   @BeforeClass
   public static void setup() {
     MockDNSToSwitchMapping.initializeMockRackResolver();
   }
+  
+  @Before
+  public void setupTest() {
+    mockTask = mock(Task.class);
+  }
 
   @Test(timeout = 5000)
   public void testLocalityRequest() {
@@ -129,14 +139,14 @@ public class TestTaskAttempt {
     hosts.add("host1");
     hosts.add("host2");
     hosts.add("host3");
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(hosts, null);
+    locationHint = TaskLocationHint.createTaskLocationHint(hosts, null);
 
     TezTaskID taskID = TezTaskID.getInstance(
         TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
-        locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
+        false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
 
     TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
 
@@ -148,6 +158,8 @@ public class TestTaskAttempt {
       fail("Second event not of type "
           + AMSchedulerEventTALaunchRequest.class.getName());
     }
+    
+    verify(mockTask, times(1)).getTaskLocationHint();
     // TODO Move the Rack request check to the client after TEZ-125 is fixed.
     Set<String> requestedRacks = taImpl.taskRacks;
     assertEquals(1, requestedRacks.size());
@@ -169,12 +181,12 @@ public class TestTaskAttempt {
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
-        null, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
+        false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
 
     TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
-        null, true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
+        true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
@@ -224,7 +236,7 @@ public class TestTaskAttempt {
     String hosts[] = new String[] { "127.0.0.1", "host2", "host3" };
     Set<String> resolved = new TreeSet<String>(
         Arrays.asList(new String[]{ "host1", "host2", "host3" }));
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new TreeSet<String>(Arrays.asList(hosts)), null);
 
     TezTaskID taskID = TezTaskID.getInstance(
@@ -232,7 +244,7 @@ public class TestTaskAttempt {
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(),
         new SystemClock(), mock(TaskHeartbeatHandler.class),
-        mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
+        mock(AppContext.class), false, Resource.newInstance(1024,
             1), createFakeContainerContext(), false);
 
     TaskAttemptImpl spyTa = spy(taImpl);
@@ -280,7 +292,7 @@ public class TestTaskAttempt {
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -289,7 +301,7 @@ public class TestTaskAttempt {
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mock(TaskHeartbeatHandler.class), mockAppContext, locationHint, false,
+        mock(TaskHeartbeatHandler.class), mockAppContext, false,
         resource, createFakeContainerContext(), false);
 
     NodeId nid = NodeId.newInstance("127.0.0.1", 0);
@@ -330,7 +342,7 @@ public class TestTaskAttempt {
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -353,7 +365,7 @@ public class TestTaskAttempt {
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mockHeartbeatHandler, appCtx, locationHint, false,
+        mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), false);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -431,7 +443,7 @@ public class TestTaskAttempt {
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -454,7 +466,7 @@ public class TestTaskAttempt {
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mockHeartbeatHandler, appCtx, locationHint, false,
+        mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), false);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
@@ -496,7 +508,7 @@ public class TestTaskAttempt {
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -519,7 +531,7 @@ public class TestTaskAttempt {
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mockHeartbeatHandler, appCtx, locationHint, false,
+        mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), false);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -589,7 +601,7 @@ public class TestTaskAttempt {
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
     taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -612,7 +624,7 @@ public class TestTaskAttempt {
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mockHeartbeatHandler, appCtx, locationHint, false,
+        mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), false);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -720,7 +732,7 @@ public class TestTaskAttempt {
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
     taskConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -743,7 +755,7 @@ public class TestTaskAttempt {
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mockHeartbeatHandler, appCtx, locationHint, false,
+        mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), false);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -811,7 +823,7 @@ public class TestTaskAttempt {
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -834,7 +846,7 @@ public class TestTaskAttempt {
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mockHeartbeatHandler, appCtx, locationHint, false,
+        mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), false);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
 
@@ -906,7 +918,7 @@ public class TestTaskAttempt {
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -929,7 +941,7 @@ public class TestTaskAttempt {
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mockHeartbeatHandler, appCtx, locationHint, false,
+        mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), false);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
 
@@ -1009,7 +1021,7 @@ public class TestTaskAttempt {
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -1032,7 +1044,7 @@ public class TestTaskAttempt {
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mockHeartbeatHandler, appCtx, locationHint, false,
+        mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), true);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
 
@@ -1109,7 +1121,7 @@ public class TestTaskAttempt {
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     taskConf.setBoolean("fs.file.impl.disable.cache", true);
 
-    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(
+    locationHint = TaskLocationHint.createTaskLocationHint(
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
@@ -1132,7 +1144,7 @@ public class TestTaskAttempt {
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
-        mockHeartbeatHandler, appCtx, locationHint, false,
+        mockHeartbeatHandler, appCtx, false,
         resource, createFakeContainerContext(), false);
     TezTaskAttemptID taskAttemptID = taImpl.getID();
 
@@ -1231,29 +1243,24 @@ public class TestTaskAttempt {
   };
 
   private class MockTaskAttemptImpl extends TaskAttemptImpl {
-    TaskLocationHint locationHint;
-
+    
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
         EventHandler eventHandler, TaskAttemptListener tal,
         Configuration conf, Clock clock,
         TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
-        TaskLocationHint locationHint,  boolean isRescheduled,
+        boolean isRescheduled,
         Resource resource, ContainerContext containerContext, boolean leafVertex) {
       super(taskId, attemptNumber, eventHandler, tal, conf,
           clock, taskHeartbeatHandler, appContext,
-          isRescheduled, resource, containerContext, leafVertex, mock(TaskImpl.class));
-      this.locationHint = locationHint;
+          isRescheduled, resource, containerContext, leafVertex, mockTask);
+      when(mockTask.getTaskLocationHint()).thenReturn(locationHint);
     }
+
     
     Vertex mockVertex = mock(Vertex.class);
     boolean inputFailedReported = false;
     
     @Override
-    public TaskLocationHint getTaskLocationHint() {
-      return locationHint;
-    }
-    
-    @Override
     protected Vertex getVertex() {
       return mockVertex;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 9da3fab..1ecabef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -56,7 +56,7 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
-import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
@@ -70,6 +70,7 @@ import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Assert;
 import org.junit.Before;
@@ -105,6 +106,7 @@ public class TestTaskImpl {
   private NodeId mockNodeId;
 
   private MockTaskImpl mockTask;
+  private TaskSpec mockTaskSpec;
   
   @SuppressWarnings("rawtypes")
   class TestEventHandler implements EventHandler<Event> {
@@ -149,8 +151,9 @@ public class TestTaskImpl {
     
     mockTask = new MockTaskImpl(vertexId, partition,
         eventHandler, conf, taskAttemptListener, clock,
-        taskHeartbeatHandler, appContext, leafVertex, locationHint,
+        taskHeartbeatHandler, appContext, leafVertex,
         taskResource, containerContext, vertex);
+    mockTaskSpec = mock(TaskSpec.class);
   }
 
   private TezTaskID getNewTaskID() {
@@ -159,8 +162,10 @@ public class TestTaskImpl {
   }
 
   private void scheduleTaskAttempt(TezTaskID taskId) {
-    mockTask.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE));
+    mockTask.handle(new TaskEventScheduleTask(taskId, mockTaskSpec, locationHint));
     assertTaskScheduledState();
+    assertEquals(mockTaskSpec, mockTask.getBaseTaskSpec());
+    assertEquals(locationHint, mockTask.getTaskLocationHint());
   }
 
   private void sendTezEventsToTask(TezTaskID taskId, int numTezEvents) {
@@ -671,19 +676,17 @@ public class TestTaskImpl {
 
     private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
     private Vertex vertex;
-    TaskLocationHint locationHint;
 
     public MockTaskImpl(TezVertexID vertexId, int partition,
         EventHandler eventHandler, Configuration conf,
         TaskAttemptListener taskAttemptListener, Clock clock,
         TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex,
-        TaskLocationHint locationHint, Resource resource,
+        Resource resource,
         ContainerContext containerContext, Vertex vertex) {
       super(vertexId, partition, eventHandler, conf, taskAttemptListener,
           clock, thh, appContext, leafVertex, resource,
           containerContext, mock(StateChangeNotifier.class), vertex);
       this.vertex = vertex;
-      this.locationHint = locationHint;
     }
 
     @Override
@@ -691,7 +694,7 @@ public class TestTaskImpl {
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
           attemptNumber, eventHandler, taskAttemptListener,
           conf, clock, taskHeartbeatHandler, appContext,
-          locationHint, true, taskResource, containerContext);
+          true, taskResource, containerContext);
       taskAttempts.add(attempt);
       return attempt;
     }
@@ -730,21 +733,14 @@ public class TestTaskImpl {
 
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
-    TaskLocationHint locationHint;
 
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
         EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
         Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
-        TaskLocationHint locationHint, boolean isRescheduled,
+        boolean isRescheduled,
         Resource resource, ContainerContext containerContext) {
       super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
           appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class));
-      this.locationHint = locationHint;
-    }
-
-    @Override 
-    public TaskLocationHint getTaskLocationHint() {
-      return locationHint;
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index a8eaca1..6c94465 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -131,6 +131,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
@@ -352,9 +353,11 @@ public class TestVertexImpl {
   }
 
   private class TaskEventDispatcher implements EventHandler<TaskEvent> {
+    List<TaskEvent> events = Lists.newArrayList();
     @SuppressWarnings("unchecked")
     @Override
     public void handle(TaskEvent event) {
+      events.add(event);
       VertexImpl vertex = vertexIdMap.get(event.getTaskID().getVertexID());
       Task task = vertex.getTask(event.getTaskID());
       if (task != null) {
@@ -2706,6 +2709,30 @@ public class TestVertexImpl {
   }
   
   @Test(timeout = 5000)
+  public void testVertexScheduleSendEvent() throws Exception {
+    VertexImpl v3 = vertices.get("vertex3");
+    v3.vertexReconfigurationPlanned();
+    initAllVertices(VertexState.INITED);
+    Assert.assertEquals(2, v3.getTotalTasks());
+    Map<TezTaskID, Task> tasks = v3.getTasks();
+    Assert.assertEquals(2, tasks.size());
+
+    VertexImpl v1 = vertices.get("vertex1");
+    startVertex(vertices.get("vertex2"));
+    startVertex(v1);
+    v3.reconfigureVertex(10, null, null);
+    checkTasks(v3, 10);
+    taskEventDispatcher.events.clear();
+    TaskLocationHint mockLocation = mock(TaskLocationHint.class);
+    v3.scheduleTasks(Collections.singletonList(new TaskWithLocationHint(new Integer(0), mockLocation)));
+    dispatcher.await();
+    Assert.assertEquals(1, taskEventDispatcher.events.size());
+    TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0);
+    Assert.assertEquals(mockLocation, event.getTaskLocationHint());
+    Assert.assertNotNull(event.getBaseTaskSpec());
+  }
+  
+  @Test(timeout = 5000)
   public void testVertexSetParallelismFailAfterSchedule() throws Exception {
     VertexImpl v3 = vertices.get("vertex3");
     v3.vertexReconfigurationPlanned();

http://git-wip-us.apache.org/repos/asf/tez/blob/ed7f1abb/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index cce063f..4dc57e2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -45,6 +45,35 @@ public class TaskSpec implements Writable {
 
   public TaskSpec() {
   }
+  
+  public static TaskSpec createBaseTaskSpec(String dagName, String vertexName,
+      int vertexParallelism, ProcessorDescriptor processorDescriptor,
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+      @Nullable List<GroupInputSpec> groupInputSpecList) {
+    return new TaskSpec(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+        outputSpecList, groupInputSpecList);
+  }
+
+  public TaskSpec(
+      String dagName, String vertexName,
+      int vertexParallelism,
+      ProcessorDescriptor processorDescriptor,
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, 
+      @Nullable List<GroupInputSpec> groupInputSpecList) {
+    checkNotNull(dagName, "dagName is null");
+    checkNotNull(vertexName, "vertexName is null");
+    checkNotNull(processorDescriptor, "processorDescriptor is null");
+    checkNotNull(inputSpecList, "inputSpecList is null");
+    checkNotNull(outputSpecList, "outputSpecList is null");
+    this.taskAttemptId = null;
+    this.dagName = StringInterner.weakIntern(dagName);
+    this.vertexName = StringInterner.weakIntern(vertexName);
+    this.processorDescriptor = processorDescriptor;
+    this.inputSpecList = inputSpecList;
+    this.outputSpecList = outputSpecList;
+    this.groupInputSpecList = groupInputSpecList;
+    this.vertexParallelism = vertexParallelism;
+  }
 
   public TaskSpec(TezTaskAttemptID taskAttemptID,
       String dagName, String vertexName,


Mime
View raw message