Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E22EE188A0 for ; Wed, 24 Feb 2016 18:51:41 +0000 (UTC) Received: (qmail 27042 invoked by uid 500); 24 Feb 2016 18:51:35 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 26999 invoked by uid 500); 24 Feb 2016 18:51:35 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 26988 invoked by uid 99); 24 Feb 2016 18:51:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Feb 2016 18:51:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79322E8EBD; Wed, 24 Feb 2016 18:51:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sree@apache.org To: commits@tez.apache.org Date: Wed, 24 Feb 2016 18:51:35 -0000 Message-Id: <44cc53964f69428aa331867fc91161d1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/9] tez git commit: TEZ-3123. Containers can get re-used even with conflicting local resources. (hitesh) Repository: tez Updated Branches: refs/heads/TEZ-2980 6ae1ba74f -> 35a0c4e1c TEZ-3123. Containers can get re-used even with conflicting local resources. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/941d1990 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/941d1990 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/941d1990 Branch: refs/heads/TEZ-2980 Commit: 941d1990f68e8a98b4ab2cd25688a117e29dd697 Parents: de3a074 Author: Hitesh Shah Authored: Fri Feb 19 14:05:33 2016 -0800 Committer: Hitesh Shah Committed: Fri Feb 19 14:05:33 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../dag/app/rm/YarnTaskSchedulerService.java | 7 ++- .../tez/dag/app/rm/TestContainerReuse.java | 64 ++++++++++++++++++-- 3 files changed, 66 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/941d1990/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d10b47a..48eca4d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3123. Containers can get re-used even with conflicting local resources. TEZ-3117. Deadlock in Edge and Vertex code TEZ-3103. Shuffle can hang when memory to memory merging enabled TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime). @@ -335,6 +336,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-3123. Containers can get re-used even with conflicting local resources. TEZ-3117. Deadlock in Edge and Vertex code TEZ-3103. Shuffle can hang when memory to memory merging enabled TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime). http://git-wip-us.apache.org/repos/asf/tez/blob/941d1990/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index c1c363b..bd4ac2f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -2337,11 +2337,12 @@ public class YarnTaskSchedulerService extends TaskScheduler // Merge the container signatures to account for any changes to the container // footprint. For example, re-localization of additional resources will // cause the held container's signature to change. - lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature(); - if (lastTaskInfo != null && lastTaskInfo.getCookie().getContainerSignature() != null) { + if (lastAssignedContainerSignature != null) { lastAssignedContainerSignature = signatureMatcher.union( - lastTaskInfo.getCookie().getContainerSignature(), + lastAssignedContainerSignature, taskInfo.getCookie().getContainerSignature()); + } else { + lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature(); } lastTaskInfo = taskInfo; } http://git-wip-us.apache.org/repos/asf/tez/blob/941d1990/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 78dc8fd..99c85ab 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -1173,7 +1173,8 @@ public class TestContainerReuse { TaskAttempt ta111 = mock(TaskAttempt.class); doReturn(taID111).when(ta111).getID(); doReturn("Mock for TA " + taID111.toString()).when(ta111).toString(); - AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(taID111, ta111, resource1, host1, racks, priority1, v11LR); + AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent( + taID111, ta111, resource1, host1, racks, priority1, v11LR); Map v12LR = Maps.newHashMap(); v12LR.put(rsrc1, lr1); @@ -1184,7 +1185,24 @@ public class TestContainerReuse { TaskAttempt ta112 = mock(TaskAttempt.class); doReturn(taID112).when(ta112).getID(); doReturn("Mock for TA " + taID112.toString()).when(ta112).toString(); - AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, v12LR); + AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent( + taID112, ta112, resource1, host1, racks, priority1, v12LR); + + //Vertex 1, Task 3, Attempt 1, host1 + TezTaskAttemptID taID113 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 3), 1); + TaskAttempt ta113 = mock(TaskAttempt.class); + doReturn(taID113).when(ta113).getID(); + doReturn("Mock for TA " + taID113.toString()).when(ta113).toString(); + AMSchedulerEventTALaunchRequest lrEvent13 = createLaunchRequestEvent( + taID113, ta113, resource1, host1, racks, priority1, new HashMap()); + //Vertex 1, Task 4, Attempt 1, host1 + TezTaskAttemptID taID114 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 4), 1); + TaskAttempt ta114 = mock(TaskAttempt.class); + doReturn(taID114).when(ta114).getID(); + doReturn("Mock for TA " + taID114.toString()).when(ta114).toString(); + AMSchedulerEventTALaunchRequest lrEvent14 = createLaunchRequestEvent( + taID114, ta114, resource1, host1, racks, priority1, new HashMap()); + drainNotifier.set(false); taskSchedulerManager.handleEvent(lrEvent11); @@ -1220,14 +1238,52 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); verifyDeAllocateTask(taskScheduler, ta112, true, null, null); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); - // Setup DAG2 with additional resources. Make sure the container, even without all resources, is reused. + // Task 3 + drainNotifier.set(false); + taskSchedulerManager.handleEvent(lrEvent13); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta113), any(Object.class), eq(container1)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta113, container1.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta113, true, null, null); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + // Task 4 + drainNotifier.set(false); + taskSchedulerManager.handleEvent(lrEvent14); + TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); + + verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta114), any(Object.class), eq(container1)); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta114, container1.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); + drainableAppCallback.drain(); + verifyDeAllocateTask(taskScheduler, ta114, true, null, null); + verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); + eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); + eventHandler.reset(); + + + // Setup DAG2 with different resources. TezDAGID dagID2 = TezDAGID.getInstance("0", 2, 0); dagIDAnswer.setDAGID(dagID2);