tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/3] tez git commit: TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via hitesh)
Date Tue, 03 May 2016 22:00:12 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7.1 258d68131 -> 81c09a069


TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8daa21b6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8daa21b6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8daa21b6

Branch: refs/heads/branch-0.7.1
Commit: 8daa21b649e67a4e4365bbe744cfeba770abe97d
Parents: 0ba1e97
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue May 3 14:54:49 2016 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue May 3 14:54:49 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 44 +++++++++++---------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 13 +++++-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 20 +++------
 .../app/dag/impl/TestTaskAttemptRecovery.java   |  5 ++-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 14 ++++---
 6 files changed, 53 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 36d3d55..8a9ae7f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-3193. Deadlock in AM during task commit request. 
   TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
   TEZ-3213. Uncaught exception during vertex recovery leads to invalid state transition loop.
   TEZ-3224. User payload is not initialized before creating vertex manager plugin. 

http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 9b8fd80..a2da34a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -62,7 +62,6 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -99,7 +98,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
-import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -180,8 +178,9 @@ public class TaskAttemptImpl implements TaskAttempt,
   private String nodeHttpAddress;
   private String nodeRackName;
   
-  private final Task task;
   private final Vertex vertex;
+  private final TaskLocationHint locationHint;
+  private TaskSpec taskSpec;
 
   @VisibleForTesting
   boolean appendNextDataEvent = true;
@@ -451,22 +450,25 @@ public class TaskAttemptImpl implements TaskAttempt,
   private boolean recoveryStartEventSeen = false;
 
   @SuppressWarnings("rawtypes")
-  public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+  public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
-      Task task) {
-    this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock,
+      Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) {
+    this(attemptId, eventHandler, taskAttemptListener, conf, clock,
         taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
-        task, null);
+        vertex, locationHint, taskSpec, null);
   }
-  public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+
+  @SuppressWarnings("rawtypes")
+  public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
-      Task task, TezTaskAttemptID schedulingCausalTA) {
+      Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
+      TezTaskAttemptID schedulingCausalTA) {
 
     MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
@@ -482,15 +484,16 @@ public class TaskAttemptImpl implements TaskAttempt,
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
-    this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
+    this.attemptId = attemptId;
     this.eventHandler = eventHandler;
     //Reported status
     this.conf = conf;
     this.clock = clock;
     this.taskHeartbeatHandler = taskHeartbeatHandler;
     this.appContext = appContext;
-    this.task = task;
-    this.vertex = this.task.getVertex();
+    this.vertex = vertex;
+    this.locationHint = locationHint;
+    this.taskSpec = taskSpec;
     this.creationCausalTA = schedulingCausalTA;
     this.creationTime = clock.getTime();
 
@@ -533,20 +536,20 @@ public class TaskAttemptImpl implements TaskAttempt,
     return creationCausalTA;
   }
 
-  TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
-    TaskSpec baseTaskSpec = task.getBaseTaskSpec();
-    if (baseTaskSpec == null) {
+  TaskSpec getTaskSpec() throws AMUserCodeException {
+    if (taskSpec == null) {
       // since recovery does not follow normal transitions, TaskEventScheduleTask
       // is not being honored by the recovery code path. Using this to workaround 
       // until recovery is fixed. Calling the non-locking internal method of the vertex
       // to get the taskSpec directly. Since everything happens on the central dispatcher

       // during recovery this is deadlock free for now. TEZ-1019 should remove the need for
this.
-      baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId());
-    }
-    return new TaskSpec(getID(),
+      TaskSpec baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId());
+      taskSpec = new TaskSpec(getID(),
         baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
         baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
         baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+    }
+    return taskSpec;
   }
 
   @Override
@@ -1065,7 +1068,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
   
   private TaskLocationHint getTaskLocationHint() {
-    return task.getTaskLocationHint();
+    return locationHint;
   }
 
   protected String[] resolveHosts(String[] src) {
@@ -1170,7 +1173,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       // Create the remote task.
       TaskSpec remoteTaskSpec;
       try {
-        remoteTaskSpec = ta.createRemoteTaskSpec();
+        remoteTaskSpec = ta.getTaskSpec();
         if (LOG.isDebugEnabled()) {
           LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
         }
@@ -1183,6 +1186,7 @@ public class TaskAttemptImpl implements TaskAttempt,
                 TaskAttemptTerminationCause.APPLICATION_ERROR));
         return TaskAttemptStateInternal.FAILED;
       }
+
       // Create startTaskRequest
 
       String[] requestHosts = new String[0];

http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index ad678d7..4a0742f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -95,6 +95,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
@@ -826,9 +827,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent>
{
   }
 
   TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
-    return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
+    TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
+    TaskSpec taskSpec = null;
+    if (baseTaskSpec != null) {
+      taskSpec = new TaskSpec(attemptId, baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
+          baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
+          baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+    }
+    return new TaskAttemptImpl(attemptId, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
-        (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
+        (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
+        locationHint, taskSpec, schedulingCausalTA);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 6f06f2d..97108d4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -100,6 +100,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -122,7 +123,6 @@ public class TestTaskAttempt {
   }
   
   AppContext appCtx;
-  Task mockTask;
   TaskLocationHint locationHint;
 
   @BeforeClass
@@ -133,7 +133,6 @@ public class TestTaskAttempt {
   @Before
   public void setupTest() {
     appCtx = mock(AppContext.class);
-    mockTask = mock(Task.class);
     HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
     doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
   }
@@ -168,7 +167,6 @@ public class TestTaskAttempt {
           + AMSchedulerEventTALaunchRequest.class.getName());
     }
     
-    verify(mockTask, times(1)).getTaskLocationHint();
     // TODO Move the Rack request check to the client after TEZ-125 is fixed.
     Set<String> requestedRacks = taImpl.taskRacks;
     assertEquals(1, requestedRacks.size());
@@ -1567,23 +1565,17 @@ public class TestTaskAttempt {
         TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
         boolean isRescheduled,
         Resource resource, ContainerContext containerContext, boolean leafVertex) {
-      super(taskId, attemptNumber, eventHandler, tal, conf,
+      super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber),
+          eventHandler, tal, conf,
           clock, taskHeartbeatHandler, appContext,
-          isRescheduled, resource, containerContext, leafVertex, mockTask);
-      when(mockTask.getTaskLocationHint()).thenReturn(locationHint);
+          isRescheduled, resource, containerContext, leafVertex, mock(Vertex.class),
+          locationHint, null);
     }
 
-    
-    Vertex mockVertex = mock(Vertex.class);
     boolean inputFailedReported = false;
     
     @Override
-    protected Vertex getVertex() {
-      return mockVertex;
-    }
-
-    @Override
-    protected TaskSpec createRemoteTaskSpec() {
+    protected TaskSpec getTaskSpec() {
       // FIXME
       return null;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index cd37ab9..197a442 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -62,6 +62,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -148,11 +149,11 @@ public class TestTaskAttemptRecovery {
     TezTaskID taskId =
         TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
     ta =
-        new TaskAttemptImpl(taskId, 0, mockEventHandler,
+        new TaskAttemptImpl(TezBuilderUtils.newTaskAttemptId(taskId, 0), mockEventHandler,
             mock(TaskAttemptListener.class), new Configuration(),
             new SystemClock(), mock(TaskHeartbeatHandler.class),
             mockAppContext, false, Resource.newInstance(1, 1),
-            mock(ContainerContext.class), false, mockTask);
+            mock(ContainerContext.class), false, mockVertex, null, null);
     taId = ta.getID();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 3b154a5..b7a6d21 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -72,6 +72,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -888,8 +889,9 @@ public class TestTaskImpl {
 
     @Override
     protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA)
{
-      MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
-          attemptNumber, eventHandler, taskAttemptListener,
+      MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(
+          TezBuilderUtils.newTaskAttemptId(getTaskId(), attemptNumber),
+          eventHandler, taskAttemptListener,
           conf, clock, taskHeartbeatHandler, appContext,
           true, taskResource, containerContext, schedCausalTA);
       taskAttempts.add(attempt);
@@ -934,14 +936,14 @@ public class TestTaskImpl {
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
 
-    public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
+    public MockTaskAttemptImpl(TezTaskAttemptID attemptId,
         EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
         Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
         boolean isRescheduled,
         Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA)
{
-      super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
-          appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class),
-          schedCausalTA);
+      super(attemptId, eventHandler, tal, conf, clock, thh,
+          appContext, isRescheduled, resource, containerContext, false,
+          mock(Vertex.class), locationHint, mockTaskSpec, schedCausalTA);
     }
     
     @Override


Mime
View raw message