Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2624E18973 for ; Fri, 21 Aug 2015 01:36:11 +0000 (UTC) Received: (qmail 173 invoked by uid 500); 21 Aug 2015 01:36:11 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 99990 invoked by uid 500); 21 Aug 2015 01:36:11 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 98054 invoked by uid 99); 21 Aug 2015 01:36:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 01:36:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A7C1CE7DF9; Fri, 21 Aug 2015 01:36:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Fri, 21 Aug 2015 01:36:49 -0000 Message-Id: <866431f56e684d25ba8b90d76967df42@git.apache.org> In-Reply-To: <0a6214bdcf644e979ab2906bb3bbf947@git.apache.org> References: <0a6214bdcf644e979ab2906bb3bbf947@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [41/50] [abbrv] tez git commit: TEZ-2678. Fix comments from reviews - part 1. (sseth) http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 8e8224a..0a02f9e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.app.rm; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; @@ -38,6 +39,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.tez.common.TezUtils; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.mockito.ArgumentCaptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -213,10 +216,10 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent(lrTa31); taskSchedulerEventHandler.handleEvent( - new AMSchedulerEventTAEnded( - ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + new AMSchedulerEventTAEnded( + ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta11, true, null, null); verify(taskSchedulerEventHandler, times(1)).taskAllocated( eq(0), eq(ta31), any(Object.class), eq(containerHost1)); verify(rmClient, times(0)).releaseAssignedContainer( @@ -226,7 +229,7 @@ public class TestContainerReuse { taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), - TaskAttemptState.SUCCEEDED, null, 0)); + TaskAttemptState.SUCCEEDED, null, null, 0)); long currentTs = System.currentTimeMillis(); Throwable exception = null; @@ -332,16 +335,17 @@ public class TestContainerReuse { TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1)); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class), eq(containerHost2)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class), + eq(containerHost2)); // Adding the event later so that task1 assigned to containerHost1 is deterministic. taskSchedulerEventHandler.handleEvent(lrTa31); taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), - TaskAttemptState.SUCCEEDED, null, 0)); + TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta21), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta21, true, null, null); verify(taskSchedulerEventHandler, times(0)).taskAllocated( eq(0), eq(ta31), any(Object.class), eq(containerHost2)); verify(rmClient, times(1)).releaseAssignedContainer( @@ -431,12 +435,15 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), + eq(container1)); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta11, true, null, null); verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -444,19 +451,25 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta12), eq(true), eq((TaskAttemptEndReason)null)); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1)); + verifyDeAllocateTask(taskScheduler, ta12, true, null, null); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), + eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); // Verify no re-use if a previous task fails. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, + "TIMEOUT", 0)); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container1)); - verify(taskScheduler).deallocateTask(eq(ta13), eq(false), eq((TaskAttemptEndReason)null)); + verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), + eq(container1)); + verifyDeAllocateTask(taskScheduler, ta13, false, null, "TIMEOUT"); verify(rmClient).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -468,12 +481,15 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container2)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class), + eq(container2)); // Task assigned to container completed successfully. No pending requests. Container should be released. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta14), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta14, true, null, null); verify(rmClient).releaseAssignedContainer(eq(container2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -570,13 +586,15 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container1)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), + eq(container1)); // First task had profiling on. This container can not be reused further. taskSchedulerEventHandler.handleEvent( - new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta11, true, null, null); verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); @@ -620,9 +638,11 @@ public class TestContainerReuse { // Verify that the container can not be reused when profiling option is turned on // Even for 2 tasks having same profiling option can have container reusability. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta13), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta13, true, null, null); verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2)); verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId())); @@ -662,12 +682,15 @@ public class TestContainerReuse { taskScheduler.onContainersAllocated(Collections.singletonList(container3)); TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); drainableAppCallback.drain(); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class), eq(container3)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class), + eq(container3)); //Ensure task 6 (of vertex 1) is allocated to same container - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta15), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta15, true, null, null); verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3)); eventHandler.reset(); @@ -769,10 +792,10 @@ public class TestContainerReuse { // Container should not be immediately assigned to task 2 // until delay expires. taskSchedulerEventHandler.handleEvent( - new AMSchedulerEventTAEnded(ta11, container1.getId(), - TaskAttemptState.SUCCEEDED, null, 0)); + new AMSchedulerEventTAEnded(ta11, container1.getId(), + TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta11, true, null, null); verify(taskSchedulerEventHandler, times(0)).taskAllocated( eq(0), eq(ta12), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); @@ -787,7 +810,7 @@ public class TestContainerReuse { // TA12 completed. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta12, container1.getId(), - TaskAttemptState.SUCCEEDED, null, 0)); + TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); LOG.info("Sleeping to ensure that the scheduling loop runs"); Thread.sleep(3000l); @@ -897,9 +920,9 @@ public class TestContainerReuse { // Container should be assigned to task21. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta11, container1.getId(), - TaskAttemptState.SUCCEEDED, null, 0)); + TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta11, true, null, null); verify(taskSchedulerEventHandler).taskAllocated( eq(0), eq(ta21), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); @@ -907,7 +930,7 @@ public class TestContainerReuse { // Task 2 completes. taskSchedulerEventHandler.handleEvent( new AMSchedulerEventTAEnded(ta21, container1.getId(), - TaskAttemptState.SUCCEEDED, null, 0)); + TaskAttemptState.SUCCEEDED, null, null, 0)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); LOG.info("Sleeping to ensure that the scheduling loop runs"); @@ -1008,9 +1031,11 @@ public class TestContainerReuse { assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta111, true, null, null); verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -1020,9 +1045,11 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta112), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta112, true, null, null); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -1054,16 +1081,19 @@ public class TestContainerReuse { // TODO This is terrible, need a better way to ensure the scheduling loop has run LOG.info("Sleeping to ensure that the scheduling loop runs"); Thread.sleep(6000l); - verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container1)); + verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), + eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); assertEquals(2, assignEvent.getRemoteTaskLocalResources().size()); eventHandler.reset(); - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta211), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta211, true, null, null); verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -1174,9 +1204,11 @@ public class TestContainerReuse { assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); // Task assigned to container completed successfully. Container should be re-used. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent( + new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, + null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta111, true, null, null); verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); @@ -1186,9 +1218,9 @@ public class TestContainerReuse { // Task assigned to container completed successfully. // Verify reuse across hosts. - taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0)); + taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); drainableAppCallback.drain(); - verify(taskScheduler).deallocateTask(eq(ta112), eq(true), eq((TaskAttemptEndReason)null)); + verifyDeAllocateTask(taskScheduler, ta112, true, null, null); verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); eventHandler.reset(); @@ -1301,6 +1333,7 @@ public class TestContainerReuse { } private Container createContainer(int id, String host, Resource resource, Priority priority) { + @SuppressWarnings("deprecation") ContainerId containerID = ContainerId.newInstance( ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), id); @@ -1368,4 +1401,17 @@ public class TestContainerReuse { return this.dagID; } } + + private void verifyDeAllocateTask(TaskScheduler taskScheduler, Object ta, boolean taskSucceeded, + TaskAttemptEndReason endReason, String diagContains) { + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + verify(taskScheduler) + .deallocateTask(eq(ta), eq(taskSucceeded), eq(endReason), argumentCaptor.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + if (diagContains == null) { + assertNull(argumentCaptor.getValue()); + } else { + assertTrue(argumentCaptor.getValue().contains(diagContains)); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java index c637f5f..3b2de34 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java @@ -93,7 +93,7 @@ public class TestLocalTaskSchedulerService { Task task = mock(Task.class); taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null); - taskSchedulerService.deallocateTask(task, false, null); + taskSchedulerService.deallocateTask(task, false, null, null); // start the RequestHandler, DeallocateTaskRequest has higher priority, so will be processed first taskSchedulerService.startRequestHandlerThread(); @@ -128,7 +128,7 @@ public class TestLocalTaskSchedulerService { MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler(); requestHandler.drainRequest(1); - taskSchedulerService.deallocateTask(task, false, null); + taskSchedulerService.deallocateTask(task, false, null, null); requestHandler.drainRequest(2); assertEquals(1, requestHandler.deallocateCount); assertEquals(1, requestHandler.allocateCount); http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 6af9815..d956ff9 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -193,7 +193,7 @@ public class TestTaskScheduler { addContainerRequest((CookieContainerRequest) any()); // returned from task requests before allocation happens - assertFalse(scheduler.deallocateTask(mockTask1, true, null)); + assertFalse(scheduler.deallocateTask(mockTask1, true, null, null)); verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class)); verify(mockRMClient, times(1)). removeContainerRequest((CookieContainerRequest) any()); @@ -201,7 +201,7 @@ public class TestTaskScheduler { releaseAssignedContainer((ContainerId) any()); // deallocating unknown task - assertFalse(scheduler.deallocateTask(mockTask1, true, null)); + assertFalse(scheduler.deallocateTask(mockTask1, true, null, null)); verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class)); verify(mockRMClient, times(1)). removeContainerRequest((CookieContainerRequest) any()); @@ -346,7 +346,7 @@ public class TestTaskScheduler { verify(mockRMClient).releaseAssignedContainer(mockCId4); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask1, true, null)); + assertTrue(scheduler.deallocateTask(mockTask1, true, null, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId1); verify(mockRMClient).releaseAssignedContainer(mockCId1); @@ -466,7 +466,7 @@ public class TestTaskScheduler { verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any()); verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask4, true, null)); + assertTrue(scheduler.deallocateTask(mockTask4, true, null, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId6); verify(mockRMClient).releaseAssignedContainer(mockCId6); @@ -496,7 +496,7 @@ public class TestTaskScheduler { removeContainerRequest((CookieContainerRequest) any()); verify(mockRMClient, times(8)).addContainerRequest( (CookieContainerRequest) any()); - assertFalse(scheduler.deallocateTask(mockTask1, true, null)); + assertFalse(scheduler.deallocateTask(mockTask1, true, null, null)); List mockUpdatedNodes = mock(List.class); scheduler.onNodesUpdated(mockUpdatedNodes); @@ -760,7 +760,7 @@ public class TestTaskScheduler { verify(mockRMClient).releaseAssignedContainer(mockCId4); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask1, true, null)); + assertTrue(scheduler.deallocateTask(mockTask1, true, null, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId1); verify(mockRMClient).releaseAssignedContainer(mockCId1); @@ -890,7 +890,7 @@ public class TestTaskScheduler { verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any()); verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask4, true, null)); + assertTrue(scheduler.deallocateTask(mockTask4, true, null, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId6); verify(mockRMClient).releaseAssignedContainer(mockCId6); @@ -979,8 +979,8 @@ public class TestTaskScheduler { // container7 allocated to the task with affinity for it verify(mockApp).taskAllocated(mockTask6, mockCookie6, mockContainer7); // deallocate allocated task - assertTrue(scheduler.deallocateTask(mockTask5, true, null)); - assertTrue(scheduler.deallocateTask(mockTask6, true, null)); + assertTrue(scheduler.deallocateTask(mockTask5, true, null, null)); + assertTrue(scheduler.deallocateTask(mockTask6, true, null, null)); drainableAppCallback.drain(); verify(mockApp).containerBeingReleased(mockCId7); verify(mockApp).containerBeingReleased(mockCId8); http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index 3e68a4c..1550085 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -107,7 +107,7 @@ public class TestTaskSchedulerEventHandler { class MockTaskSchedulerEventHandler extends TaskSchedulerEventHandler { - AtomicBoolean notify = new AtomicBoolean(false); + final AtomicBoolean notify = new AtomicBoolean(false); public MockTaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, @@ -120,7 +120,7 @@ public class TestTaskSchedulerEventHandler { protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { taskSchedulers[0] = mockTaskScheduler; - taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]); + taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]); } @Override @@ -154,7 +154,6 @@ public class TestTaskSchedulerEventHandler { mockWebUIService = mock(WebUIService.class); when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap); when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000)); - Configuration conf = new Configuration(false); schedulerHandler = new MockTaskSchedulerEventHandler( mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService); } @@ -412,9 +411,8 @@ public class TestTaskSchedulerEventHandler { @Test(timeout = 5000) public void testNoSchedulerSpecified() throws IOException { try { - TSEHForMultipleSchedulersTest tseh = - new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler, - mockSigMatcher, mockWebUIService, null, false); + new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler, + mockSigMatcher, mockWebUIService, null, false); fail("Expecting an IllegalStateException with no schedulers specified"); } catch (IllegalArgumentException e) { } @@ -686,7 +684,8 @@ public class TestTaskSchedulerEventHandler { @Override public boolean deallocateTask(Object task, boolean taskSucceeded, - TaskAttemptEndReason endReason) { + TaskAttemptEndReason endReason, + String diagnostics) { return false; } http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index f9952d8..13fa4c5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -127,7 +127,8 @@ public class TestAMContainer { // Once for the previous NO_TASKS, one for the actual task. verify(wc.chh).register(wc.containerID); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), + eq(0)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); assertEquals(WrappedContainer.taskPriority, argumentCaptor.getAllValues().get(0).getPriority()); @@ -137,14 +138,14 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER); + verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null); // Container completed wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); verify(wc.chh).unregister(wc.containerID); assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); @@ -178,21 +179,23 @@ public class TestAMContainer { wc.verifyNoOutgoingEvents(); assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt()); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), + eq(0)); assertEquals(1, argumentCaptor.getAllValues().size()); - assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); + assertEquals(wc.taskAttemptID, + argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); wc.taskAttemptSucceeded(wc.taskAttemptID); wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER); + verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null); wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); verify(wc.chh).unregister(wc.containerID); assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); @@ -228,22 +231,25 @@ public class TestAMContainer { // Once for the previous NO_TASKS, one for the actual task. verify(wc.chh).register(wc.containerID); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), + eq(0)); assertEquals(1, argumentCaptor.getAllValues().size()); - assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); + assertEquals(wc.taskAttemptID, + argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); // Attempt succeeded wc.taskAttemptSucceeded(wc.taskAttemptID); wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER); + verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null); TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2); wc.assignTaskAttempt(taId2); wc.verifyState(AMContainerState.RUNNING); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); + verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), + eq(0)); assertEquals(2, argumentCaptor.getAllValues().size()); assertEquals(taId2, argumentCaptor.getAllValues().get(1).getTask().getTaskAttemptID()); @@ -252,14 +258,14 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(taId2, 0, TaskAttemptEndReason.OTHER); + verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null); // Container completed wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); verify(wc.chh).unregister(wc.containerID); assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); @@ -292,7 +298,8 @@ public class TestAMContainer { wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, + "received a STOP_REQUEST"); verify(wc.chh).unregister(wc.containerID); assertNull(wc.amContainer.getCurrentTaskAttempt()); @@ -329,7 +336,8 @@ public class TestAMContainer { wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, + "received a STOP_REQUEST"); verify(wc.chh).unregister(wc.containerID); assertNull(wc.amContainer.getCurrentTaskAttempt()); @@ -352,7 +360,8 @@ public class TestAMContainer { wc.assignTaskAttempt(taID2); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR, + "Multiple simultaneous taskAttempt"); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); @@ -390,7 +399,8 @@ public class TestAMContainer { wc.assignTaskAttempt(taID2); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR, + "Multiple simultaneous taskAttempt"); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); @@ -426,7 +436,8 @@ public class TestAMContainer { wc.containerTimedOut(); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, + "timed out"); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -460,7 +471,8 @@ public class TestAMContainer { wc.stopRequest(); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, + "received a STOP_REQUEST"); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -494,7 +506,8 @@ public class TestAMContainer { wc.launchFailed(); wc.verifyState(AMContainerState.STOPPING); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED, + "launchFailed"); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -544,7 +557,7 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -571,10 +584,10 @@ public class TestAMContainer { wc.assignTaskAttempt(wc.taskAttemptID); - wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); + wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR, "DiskFailed"); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, "DiskFailed"); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -602,10 +615,11 @@ public class TestAMContainer { wc.assignTaskAttempt(wc.taskAttemptID); - wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED); + wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED, "NodeFailed"); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.NODE_FAILED); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.NODE_FAILED, + "NodeFailed"); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -636,7 +650,7 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -665,7 +679,7 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -697,10 +711,12 @@ public class TestAMContainer { wc.containerLaunched(); wc.verifyState(AMContainerState.RUNNING); - wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION); + wc.containerCompleted(ContainerExitStatus.PREEMPTED, + TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, "Container preempted externally"); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.EXTERNAL_PREEMPTION); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, + ContainerEndReason.EXTERNAL_PREEMPTION, "Container preempted externally"); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -734,11 +750,12 @@ public class TestAMContainer { wc.containerLaunched(); wc.verifyState(AMContainerState.RUNNING); - wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION); + wc.containerCompleted(ContainerExitStatus.INVALID, + TaskAttemptTerminationCause.INTERNAL_PREEMPTION, "Container preempted internally"); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, - ContainerEndReason.INTERNAL_PREEMPTION); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, + ContainerEndReason.INTERNAL_PREEMPTION, "Container preempted internally"); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -772,10 +789,11 @@ public class TestAMContainer { wc.containerLaunched(); wc.verifyState(AMContainerState.RUNNING); - wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); + wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, + TaskAttemptTerminationCause.NODE_DISK_ERROR, "NodeDiskError"); wc.verifyState(AMContainerState.COMPLETED); verify(wc.tal).registerRunningContainer(wc.containerID, 0); - verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER); + verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, "NodeDiskError"); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -1194,6 +1212,7 @@ public class TestAMContainer { public AMContainerImpl amContainer; + @SuppressWarnings("deprecation") // ContainerId public WrappedContainer(boolean shouldProfile, String profileString) { applicationID = ApplicationId.newInstance(rmIdentifier, 1); appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1); @@ -1286,7 +1305,8 @@ public class TestAMContainer { Token jobToken = mock(Token.class); TokenCache.setSessionToken(jobToken, credentials); amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID, - new ContainerContext(localResources, credentials, new HashMap(), ""), 0, 0)); + new ContainerContext(localResources, credentials, new HashMap(), ""), 0, + 0)); } public void assignTaskAttempt(TezTaskAttemptID taID) { @@ -1333,10 +1353,12 @@ public class TestAMContainer { amContainer.handle(new AMContainerEventCompleted(containerID, ContainerExitStatus.SUCCESS, null, TaskAttemptTerminationCause.CONTAINER_EXITED)); } - - public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause) { + + public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause, + String diagnostics) { reset(eventHandler); - amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null, errCause)); + amContainer.handle( + new AMContainerEventCompleted(containerID, exitStatus, diagnostics, errCause)); } public void containerTimedOut() { @@ -1417,4 +1439,33 @@ public class TestAMContainer { LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 1, 1000000); return lr; } + + private void verifyUnregisterRunningContainer(TaskAttemptListener tal, ContainerId containerId, + int taskCommId, + ContainerEndReason containerEndReason, + String diagContains) { + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + verify(tal).unregisterRunningContainer(eq(containerId), eq(taskCommId), eq(containerEndReason), + argumentCaptor.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + if (diagContains != null) { + assertTrue(argumentCaptor.getValue().contains(diagContains)); + } else { + assertNull(argumentCaptor.getValue()); + } + } + + private void verifyUnregisterTaskAttempt(TaskAttemptListener tal, TezTaskAttemptID taId, + int taskCommId, TaskAttemptEndReason endReason, + String diagContains) { + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + verify(tal) + .unregisterTaskAttempt(eq(taId), eq(taskCommId), eq(endReason), argumentCaptor.capture()); + assertEquals(1, argumentCaptor.getAllValues().size()); + if (diagContains != null) { + assertTrue(argumentCaptor.getValue().contains(diagContains)); + } else { + assertNull(argumentCaptor.getValue()); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java index 611e8cc..4883351 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java @@ -183,18 +183,26 @@ public class JoinValidate extends TezExampleBase { } } + // This is for internal use only, to use this example for external service testing. + // Not meant as documentation for the example. protected VertexExecutionContext getDefaultExecutionContext() { return null; } + // This is for internal use only, to use this example for external service testing. + // Not meant as documentation for the example. protected VertexExecutionContext getLhsExecutionContext() { return null; } + // This is for internal use only, to use this example for external service testing. + // Not meant as documentation for the example. protected VertexExecutionContext getRhsExecutionContext() { return null; } + // This is for internal use only, to use this example for external service testing. + // Not meant as documentation for the example. protected VertexExecutionContext getValidateExecutionContext() { return null; } http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-ext-service-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml index f95f4ca..5a1907f 100644 --- a/tez-ext-service-tests/pom.xml +++ b/tez-ext-service-tests/pom.xml @@ -81,11 +81,6 @@ org.apache.hadoop hadoop-common - test - - - org.apache.hadoop - hadoop-common test-jar test http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index 17f8a87..8b91dde 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -57,7 +57,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler { private final ConcurrentMap runningTasks = new ConcurrentHashMap(); - // AppIdIdentifier to avoid conflicts with other containres in the system. + // AppIdIdentifier to avoid conflicts with other containers in the system. // Per instance private final int memoryPerInstance; @@ -181,7 +181,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler { } @Override - public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) { + public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) { ContainerId containerId = runningTasks.remove(task); if (containerId == null) { LOG.error("Could not determine ContainerId for task: " + task + @@ -235,6 +235,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler { .newInstance(appId, appAttemptId.getAttemptId()); } + @SuppressWarnings("deprecation") public Container createContainer(Resource capability, Priority priority, String hostname, int port) { ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement()); NodeId nodeId = NodeId.newInstance(hostname, port); http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index ef8f9e4..127967a 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -98,8 +98,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl } @Override - public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) { - super.registerContainerEnd(containerId, endReason); + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) { + super.registerContainerEnd(containerId, endReason, diagnostics); } @Override @@ -154,7 +154,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl String message = re.toString(); if (message.contains(RejectedExecutionException.class.getName())) { getContext().taskKilled(taskSpec.getTaskAttemptID(), - TaskAttemptEndReason.SERVICE_BUSY, "Service Busy"); + TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy"); } else { getContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, @@ -175,8 +175,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl } @Override - public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) { - super.unregisterRunningTaskAttempt(taskAttemptID, endReason); + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, String diagnostics) { + super.unregisterRunningTaskAttempt(taskAttemptID, endReason, diagnostics); // Nothing else to do for now. The push API in the test does not support termination of a running task } http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index 472a43c..3b4c768 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -454,7 +454,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun try { shouldDie = !taskRunner.run(); if (shouldDie) { - LOG.info("Got a shouldDie notification via hearbeats. Shutting down"); + LOG.info("Got a shouldDie notification via heartbeats. Shutting down"); return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null, "Asked to die by the AM"); } http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index d8539c5..7fd4c75 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -273,10 +273,10 @@ public class TezTaskRunner2 { isFirstError = true; killTaskRequested.set(true); } else { - logErrorIngored("killTask", null); + logErrorIgnored("killTask", null); } } else { - logErrorIngored("killTask", null); + logErrorIgnored("killTask", null); } } if (isFirstError) { @@ -331,10 +331,10 @@ public class TezTaskRunner2 { errorReporterToAm.set(true); oobSignalErrorInProgress = true; } else { - logErrorIngored("signalFatalError", message); + logErrorIgnored("signalFatalError", message); } } else { - logErrorIngored("signalFatalError", message); + logErrorIgnored("signalFatalError", message); } } @@ -394,14 +394,14 @@ public class TezTaskRunner2 { registerFirstException(t, null); isFirstError = true; } else { - logErrorIngored("umbilicalFatalError", null); + logErrorIgnored("umbilicalFatalError", null); } // A race is possible between a task succeeding, and a subsequent timed heartbeat failing. // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded // method does not throw an exception, in which case task success is registered with the AM. // Leave subsequent heartbeat errors to the next entity to communicate using the TaskReporter } else { - logErrorIngored("umbilicalFatalError", null); + logErrorIgnored("umbilicalFatalError", null); } // Since this error came from the taskReporter - there's no point attempting to report a failure back to it. // However, the task does need to be cleaned up @@ -425,7 +425,7 @@ public class TezTaskRunner2 { logAborting("shutdownRequested"); killTaskInternal(); } else { - logErrorIngored("shutdownRequested", null); + logErrorIgnored("shutdownRequested", null); } } } @@ -474,7 +474,7 @@ public class TezTaskRunner2 { (successReportAttempted ? "success" : "failure/killed"), t); } - private void logErrorIngored(String ignoredEndReason, String errorMessage) { + private void logErrorIgnored(String ignoredEndReason, String errorMessage) { LOG.info( "Ignoring {} request since the task with id {} has ended for reason: {}. IgnoredError: {} ", ignoredEndReason, task.getTaskAttemptID(), http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java index fc42da3..7502c41 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java @@ -437,6 +437,7 @@ public class TaskExecutionTestHelpers { } } + @SuppressWarnings("deprecation") public static ContainerId createContainerId(ApplicationId appId) { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); http://git-wip-us.apache.org/repos/asf/tez/blob/6b0c142f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java index c1616af..c3c4705 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java @@ -40,6 +40,7 @@ public class TestContainerExecution { executor = MoreExecutors.listeningDecorator(rawExecutor); ApplicationId appId = ApplicationId.newInstance(10000, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + @SuppressWarnings("deprecation") ContainerId containerId = ContainerId.newInstance(appAttemptId, 1); TaskExecutionTestHelpers.TezTaskUmbilicalForTest