Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B9992200AED for ; Wed, 4 May 2016 00:00:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B84BA1609F4; Wed, 4 May 2016 00:00:14 +0200 (CEST) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B9E021609F5 for ; Wed, 4 May 2016 00:00:13 +0200 (CEST) Received: (qmail 12989 invoked by uid 500); 3 May 2016 22:00:12 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 12980 invoked by uid 99); 3 May 2016 22:00:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2016 22:00:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5E94DFD9F; Tue, 3 May 2016 22:00:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.apache.org Date: Tue, 03 May 2016 22:00:12 -0000 Message-Id: <482372fb0664488aa626437ac297a155@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] tez git commit: TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via hitesh) archived-at: Tue, 03 May 2016 22:00:14 -0000 Repository: tez Updated Branches: refs/heads/branch-0.7.1 258d68131 -> 81c09a069 TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8daa21b6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8daa21b6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8daa21b6 Branch: refs/heads/branch-0.7.1 Commit: 8daa21b649e67a4e4365bbe744cfeba770abe97d Parents: 0ba1e97 Author: Hitesh Shah Authored: Tue May 3 14:54:49 2016 -0700 Committer: Hitesh Shah Committed: Tue May 3 14:54:49 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 44 +++++++++++--------- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 13 +++++- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 20 +++------ .../app/dag/impl/TestTaskAttemptRecovery.java | 5 ++- .../tez/dag/app/dag/impl/TestTaskImpl.java | 14 ++++--- 6 files changed, 53 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 36d3d55..8a9ae7f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-3193. Deadlock in AM during task commit request. TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks TEZ-3213. Uncaught exception during vertex recovery leads to invalid state transition loop. TEZ-3224. User payload is not initialized before creating vertex manager plugin. http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 9b8fd80..a2da34a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -62,7 +62,6 @@ import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; -import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; import org.apache.tez.dag.app.dag.Vertex; @@ -99,7 +98,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto; -import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; @@ -180,8 +178,9 @@ public class TaskAttemptImpl implements TaskAttempt, private String nodeHttpAddress; private String nodeRackName; - private final Task task; private final Vertex vertex; + private final TaskLocationHint locationHint; + private TaskSpec taskSpec; @VisibleForTesting boolean appendNextDataEvent = true; @@ -451,22 +450,25 @@ public class TaskAttemptImpl implements TaskAttempt, private boolean recoveryStartEventSeen = false; @SuppressWarnings("rawtypes") - public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, + public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex, - Task task) { - this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock, + Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) { + this(attemptId, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex, - task, null); + vertex, locationHint, taskSpec, null); } - public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler, + + @SuppressWarnings("rawtypes") + public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex, - Task task, TezTaskAttemptID schedulingCausalTA) { + Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec, + TezTaskAttemptID schedulingCausalTA) { MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration @@ -482,15 +484,16 @@ public class TaskAttemptImpl implements TaskAttempt, ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); - this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber); + this.attemptId = attemptId; this.eventHandler = eventHandler; //Reported status this.conf = conf; this.clock = clock; this.taskHeartbeatHandler = taskHeartbeatHandler; this.appContext = appContext; - this.task = task; - this.vertex = this.task.getVertex(); + this.vertex = vertex; + this.locationHint = locationHint; + this.taskSpec = taskSpec; this.creationCausalTA = schedulingCausalTA; this.creationTime = clock.getTime(); @@ -533,20 +536,20 @@ public class TaskAttemptImpl implements TaskAttempt, return creationCausalTA; } - TaskSpec createRemoteTaskSpec() throws AMUserCodeException { - TaskSpec baseTaskSpec = task.getBaseTaskSpec(); - if (baseTaskSpec == null) { + TaskSpec getTaskSpec() throws AMUserCodeException { + if (taskSpec == null) { // since recovery does not follow normal transitions, TaskEventScheduleTask // is not being honored by the recovery code path. Using this to workaround // until recovery is fixed. Calling the non-locking internal method of the vertex // to get the taskSpec directly. Since everything happens on the central dispatcher // during recovery this is deadlock free for now. TEZ-1019 should remove the need for this. - baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId()); - } - return new TaskSpec(getID(), + TaskSpec baseTaskSpec = ((VertexImpl) vertex).createRemoteTaskSpec(getID().getTaskID().getId()); + taskSpec = new TaskSpec(getID(), baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(), baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(), baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs()); + } + return taskSpec; } @Override @@ -1065,7 +1068,7 @@ public class TaskAttemptImpl implements TaskAttempt, } private TaskLocationHint getTaskLocationHint() { - return task.getTaskLocationHint(); + return locationHint; } protected String[] resolveHosts(String[] src) { @@ -1170,7 +1173,7 @@ public class TaskAttemptImpl implements TaskAttempt, // Create the remote task. TaskSpec remoteTaskSpec; try { - remoteTaskSpec = ta.createRemoteTaskSpec(); + remoteTaskSpec = ta.getTaskSpec(); if (LOG.isDebugEnabled()) { LOG.debug("remoteTaskSpec:" + remoteTaskSpec); } @@ -1183,6 +1186,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptTerminationCause.APPLICATION_ERROR)); return TaskAttemptStateInternal.FAILED; } + // Create startTaskRequest String[] requestHosts = new String[0]; http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index ad678d7..4a0742f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -95,6 +95,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; @@ -826,9 +827,17 @@ public class TaskImpl implements Task, EventHandler { } TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) { - return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler, + TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber); + TaskSpec taskSpec = null; + if (baseTaskSpec != null) { + taskSpec = new TaskSpec(attemptId, baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(), + baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(), + baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs()); + } + return new TaskAttemptImpl(attemptId, eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, - (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA); + (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(), + locationHint, taskSpec, schedulingCausalTA); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 6f06f2d..97108d4 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -100,6 +100,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.events.InputReadErrorEvent; import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; @@ -122,7 +123,6 @@ public class TestTaskAttempt { } AppContext appCtx; - Task mockTask; TaskLocationHint locationHint; @BeforeClass @@ -133,7 +133,6 @@ public class TestTaskAttempt { @Before public void setupTest() { appCtx = mock(AppContext.class); - mockTask = mock(Task.class); HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); } @@ -168,7 +167,6 @@ public class TestTaskAttempt { + AMSchedulerEventTALaunchRequest.class.getName()); } - verify(mockTask, times(1)).getTaskLocationHint(); // TODO Move the Rack request check to the client after TEZ-125 is fixed. Set requestedRacks = taImpl.taskRacks; assertEquals(1, requestedRacks.size()); @@ -1567,23 +1565,17 @@ public class TestTaskAttempt { TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, boolean leafVertex) { - super(taskId, attemptNumber, eventHandler, tal, conf, + super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber), + eventHandler, tal, conf, clock, taskHeartbeatHandler, appContext, - isRescheduled, resource, containerContext, leafVertex, mockTask); - when(mockTask.getTaskLocationHint()).thenReturn(locationHint); + isRescheduled, resource, containerContext, leafVertex, mock(Vertex.class), + locationHint, null); } - - Vertex mockVertex = mock(Vertex.class); boolean inputFailedReported = false; @Override - protected Vertex getVertex() { - return mockVertex; - } - - @Override - protected TaskSpec createRemoteTaskSpec() { + protected TaskSpec getTaskSpec() { // FIXME return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java index cd37ab9..197a442 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java @@ -62,6 +62,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.utils.TezBuilderUtils; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -148,11 +149,11 @@ public class TestTaskAttemptRecovery { TezTaskID taskId = TezTaskID.fromString("task_1407371892933_0001_1_00_000000"); ta = - new TaskAttemptImpl(taskId, 0, mockEventHandler, + new TaskAttemptImpl(TezBuilderUtils.newTaskAttemptId(taskId, 0), mockEventHandler, mock(TaskAttemptListener.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext, false, Resource.newInstance(1, 1), - mock(ContainerContext.class), false, mockTask); + mock(ContainerContext.class), false, mockVertex, null, null); taId = ta.getID(); } http://git-wip-us.apache.org/repos/asf/tez/blob/8daa21b6/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 3b154a5..b7a6d21 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -72,6 +72,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -888,8 +889,9 @@ public class TestTaskImpl { @Override protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) { - MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(), - attemptNumber, eventHandler, taskAttemptListener, + MockTaskAttemptImpl attempt = new MockTaskAttemptImpl( + TezBuilderUtils.newTaskAttemptId(getTaskId(), attemptNumber), + eventHandler, taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext, true, taskResource, containerContext, schedCausalTA); taskAttempts.add(attempt); @@ -934,14 +936,14 @@ public class TestTaskImpl { private float progress = 0; private TaskAttemptState state = TaskAttemptState.NEW; - public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber, + public MockTaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler, TaskAttemptListener tal, Configuration conf, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, boolean isRescheduled, Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) { - super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh, - appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), - schedCausalTA); + super(attemptId, eventHandler, tal, conf, clock, thh, + appContext, isRescheduled, resource, containerContext, false, + mock(Vertex.class), locationHint, mockTaskSpec, schedCausalTA); } @Override