Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6F13010B43 for ; Sat, 21 Feb 2015 01:21:09 +0000 (UTC) Received: (qmail 3953 invoked by uid 500); 21 Feb 2015 01:21:09 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 3888 invoked by uid 500); 21 Feb 2015 01:21:09 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 3529 invoked by uid 99); 21 Feb 2015 01:21:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 Feb 2015 01:21:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CAECDE05DC; Sat, 21 Feb 2015 01:21:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Sat, 21 Feb 2015 01:21:18 -0000 Message-Id: <0e401dd8ae124c00bfbdff4fe9066cf7@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/16] tez git commit: TEZ-2123. Fix component managers to use pluggable components. Enable hybrid mode. (sseth) http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java index 60782e6..12390b2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java @@ -59,7 +59,7 @@ public class TestLocalTaskScheduler { TezConfiguration tezConf = new TezConfiguration(); tezConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, MAX_TASKS); - LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext()); + LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext(), 1000); HashMap taskAllocations = new LinkedHashMap(); PriorityBlockingQueue taskRequestQueue = new PriorityBlockingQueue(); TaskSchedulerAppCallback appClientDelegate = mock(TaskSchedulerAppCallback.class); http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java index 3cf4f6c..25cf4b5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.concurrent.BlockingQueue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -81,8 +83,12 @@ public class TestLocalTaskSchedulerService { */ @Test(timeout = 5000) public void testDeallocationBeforeAllocation() { + AppContext appContext = mock(AppContext.class); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce - (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class)); + (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext); taskSchedulerService.init(new Configuration()); taskSchedulerService.start(); @@ -105,8 +111,12 @@ public class TestLocalTaskSchedulerService { */ @Test(timeout = 5000) public void testDeallocationAfterAllocation() { + AppContext appContext = mock(AppContext.class); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1); + doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce - (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class)); + (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext); taskSchedulerService.init(new Configuration()); taskSchedulerService.start(); @@ -132,13 +142,13 @@ public class TestLocalTaskSchedulerService { String appHostName, int appHostPort, String appTrackingUrl, AppContext appContext) { super(appClient, containerSignatureMatcher, appHostName, appHostPort, - appTrackingUrl, appContext); + appTrackingUrl, 10000l, appContext); } @Override public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) { requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue, - new LocalContainerFactory(appContext), + new LocalContainerFactory(appContext, customContainerAppId), taskAllocations, appClientDelegate, conf); http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index 5c09ed1..780c485 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -79,7 +79,7 @@ public class TestTaskSchedulerEventHandler { public MockTaskSchedulerEventHandler(AppContext appContext, DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { - super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}); + super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}, false); } @Override @@ -200,9 +200,14 @@ public class TestTaskSchedulerEventHandler { Configuration conf = new Configuration(false); schedulerHandler.init(conf); schedulerHandler.start(); - + + AMContainer mockAmContainer = mock(AMContainer.class); + when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0); + when(mockAmContainer.getContainerLauncherIdentifier()).thenReturn(0); + when(mockAmContainer.getTaskCommunicatorIdentifier()).thenReturn(0); ContainerId mockCId = mock(ContainerId.class); verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any()); + when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer); schedulerHandler.preemptContainer(mockCId); verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId); Assert.assertEquals(1, mockEventHandler.events.size()); http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 5d18dae..d9a926d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -127,7 +127,7 @@ class TestTaskSchedulerHelpers { EventHandler eventHandler, TezAMRMClientAsync amrmClientAsync, ContainerSignatureMatcher containerSignatureMatcher) { - super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}); + super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}, false); this.amrmClientAsync = amrmClientAsync; this.containerSignatureMatcher = containerSignatureMatcher; } http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index 22c0559..499368f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.ContainerContext; @@ -104,7 +105,7 @@ public class TestAMContainer { wc.verifyState(AMContainerState.LAUNCHING); // 1 Launch request. wc.verifyCountAndGetOutgoingEvents(1); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); assertNull(wc.amContainer.getCurrentTaskAttempt()); // Assign task. @@ -121,7 +122,7 @@ public class TestAMContainer { // Once for the previous NO_TASKS, one for the actual task. verify(wc.chh).register(wc.containerID); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); @@ -130,14 +131,14 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0); // Container completed wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); @@ -156,7 +157,7 @@ public class TestAMContainer { wc.verifyState(AMContainerState.LAUNCHING); // 1 Launch request. wc.verifyCountAndGetOutgoingEvents(1); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); // Container Launched wc.containerLaunched(); @@ -171,7 +172,7 @@ public class TestAMContainer { wc.verifyNoOutgoingEvents(); assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt()); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); @@ -179,13 +180,13 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0); wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertEquals(1, wc.amContainer.getAllTaskAttempts().size()); @@ -204,7 +205,7 @@ public class TestAMContainer { wc.verifyState(AMContainerState.LAUNCHING); // 1 Launch request. wc.verifyCountAndGetOutgoingEvents(1); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); assertNull(wc.amContainer.getCurrentTaskAttempt()); // Assign task. @@ -221,7 +222,7 @@ public class TestAMContainer { // Once for the previous NO_TASKS, one for the actual task. verify(wc.chh).register(wc.containerID); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); assertEquals(1, argumentCaptor.getAllValues().size()); assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID()); @@ -230,13 +231,13 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID); + verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0); TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2); wc.assignTaskAttempt(taId2); wc.verifyState(AMContainerState.RUNNING); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); assertEquals(2, argumentCaptor.getAllValues().size()); assertEquals(taId2, argumentCaptor.getAllValues().get(1).getTask().getTaskAttemptID()); @@ -245,14 +246,14 @@ public class TestAMContainer { wc.verifyState(AMContainerState.IDLE); wc.verifyNoOutgoingEvents(); assertNull(wc.amContainer.getCurrentTaskAttempt()); - verify(wc.tal).unregisterTaskAttempt(taId2); + verify(wc.tal).unregisterTaskAttempt(taId2, 0); // Container completed wc.containerCompleted(); wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertEquals(2, wc.amContainer.getAllTaskAttempts().size()); @@ -285,7 +286,7 @@ public class TestAMContainer { wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertNull(wc.amContainer.getCurrentTaskAttempt()); @@ -322,7 +323,7 @@ public class TestAMContainer { wc.verifyHistoryStopEvent(); wc.verifyState(AMContainerState.COMPLETED); wc.verifyNoOutgoingEvents(); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); assertNull(wc.amContainer.getCurrentTaskAttempt()); @@ -345,7 +346,7 @@ public class TestAMContainer { wc.assignTaskAttempt(taID2); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); @@ -377,13 +378,13 @@ public class TestAMContainer { wc.launchContainer(); wc.assignTaskAttempt(wc.taskAttemptID); wc.verifyState(AMContainerState.LAUNCHING); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2); wc.assignTaskAttempt(taID2); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); // 1 for NM stop request. 2 TERMINATING to TaskAttempt. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3); @@ -419,7 +420,7 @@ public class TestAMContainer { wc.containerTimedOut(); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -453,7 +454,7 @@ public class TestAMContainer { wc.stopRequest(); wc.verifyState(AMContainerState.STOP_REQUESTED); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).unregister(wc.containerID); // 1 to TA, 1 for RM de-allocate. outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); @@ -483,11 +484,11 @@ public class TestAMContainer { wc.launchContainer(); wc.assignTaskAttempt(wc.taskAttemptID); wc.verifyState(AMContainerState.LAUNCHING); - verify(wc.tal).registerRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); wc.launchFailed(); wc.verifyState(AMContainerState.STOPPING); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -536,8 +537,8 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -566,8 +567,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -597,8 +598,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1); verifyUnOrderedOutgoingEventTypes(outgoingEvents, @@ -628,8 +629,8 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -657,8 +658,8 @@ public class TestAMContainer { wc.containerCompleted(); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -692,8 +693,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -729,8 +730,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -766,8 +767,8 @@ public class TestAMContainer { wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR); wc.verifyState(AMContainerState.COMPLETED); - verify(wc.tal).registerRunningContainer(wc.containerID); - verify(wc.tal).unregisterRunningContainer(wc.containerID); + verify(wc.tal).registerRunningContainer(wc.containerID, 0); + verify(wc.tal).unregisterRunningContainer(wc.containerID, 0); verify(wc.chh).register(wc.containerID); verify(wc.chh).unregister(wc.containerID); @@ -1010,7 +1011,7 @@ public class TestAMContainer { wc.containerLaunched(); wc.assignTaskAttempt(wc.taskAttemptID); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); AMContainerTask task1 = argumentCaptor.getAllValues().get(0); assertEquals(0, task1.getAdditionalResources().size()); wc.taskAttemptSucceeded(wc.taskAttemptID); @@ -1023,7 +1024,7 @@ public class TestAMContainer { TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2); wc.assignTaskAttempt(taID2, additionalResources, new Credentials()); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); AMContainerTask task2 = argumentCaptor.getAllValues().get(1); Map pullTaskAdditionalResources = task2.getAdditionalResources(); assertEquals(2, pullTaskAdditionalResources.size()); @@ -1046,7 +1047,7 @@ public class TestAMContainer { TezTaskAttemptID taID3 = TezTaskAttemptID.getInstance(wc.taskID, 3); wc.assignTaskAttempt(taID3, new HashMap(), new Credentials()); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); AMContainerTask task3 = argumentCaptor.getAllValues().get(2); assertEquals(0, task3.getAdditionalResources().size()); wc.taskAttemptSucceeded(taID3); @@ -1099,7 +1100,7 @@ public class TestAMContainer { wc.containerLaunched(); wc.assignTaskAttempt(attempt11, LRs, dag1Credentials); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(0); assertTrue(fetchedTask.haveCredentialsChanged()); assertNotNull(fetchedTask.getCredentials()); @@ -1108,7 +1109,7 @@ public class TestAMContainer { wc.assignTaskAttempt(attempt12, LRs, dag1Credentials); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(1); assertFalse(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1118,7 +1119,7 @@ public class TestAMContainer { wc.setNewDAGID(dagID2); wc.assignTaskAttempt(attempt21, LRs, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(2); assertTrue(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1126,7 +1127,7 @@ public class TestAMContainer { wc.assignTaskAttempt(attempt22, LRs, null); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(3); assertFalse(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1136,7 +1137,7 @@ public class TestAMContainer { wc.setNewDAGID(dagID3); wc.assignTaskAttempt(attempt31, LRs , dag3Credentials); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(4); assertTrue(fetchedTask.haveCredentialsChanged()); assertNotNull(fetchedTask.getCredentials()); @@ -1146,7 +1147,7 @@ public class TestAMContainer { wc.assignTaskAttempt(attempt32, LRs, dag1Credentials); argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class); - verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID)); + verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0)); fetchedTask = argumentCaptor.getAllValues().get(5); assertFalse(fetchedTask.haveCredentialsChanged()); assertNull(fetchedTask.getCredentials()); @@ -1198,9 +1199,10 @@ public class TestAMContainer { chh = mock(ContainerHeartbeatHandler.class); - InetSocketAddress addr = new InetSocketAddress("localhost", 0); tal = mock(TaskAttemptListener.class); - doReturn(addr).when(tal).getAddress(); + TaskCommunicator taskComm = mock(TaskCommunicator.class); + doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); + doReturn(taskComm).when(tal).getTaskCommunicator(0); dagID = TezDAGID.getInstance(applicationID, 1); vertexID = TezVertexID.getInstance(dagID, 1); @@ -1226,7 +1228,7 @@ public class TestAMContainer { doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID(); amContainer = new AMContainerImpl(container, chh, tal, - new ContainerContextMatcher(), appContext); + new ContainerContextMatcher(), appContext, 0, 0, 0); } public WrappedContainer() { @@ -1276,7 +1278,7 @@ public class TestAMContainer { Token jobToken = mock(Token.class); TokenCache.setSessionToken(jobToken, credentials); amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID, - new ContainerContext(localResources, credentials, new HashMap(), ""))); + new ContainerContext(localResources, credentials, new HashMap(), ""), 0, 0)); } public void assignTaskAttempt(TezTaskAttemptID taID) { http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java index 61371e8..dee4541 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskAttemptListener; @@ -43,8 +44,9 @@ public class TestAMContainerMap { private TaskAttemptListener mockTaskAttemptListener() { TaskAttemptListener tal = mock(TaskAttemptListener.class); - InetSocketAddress socketAddr = new InetSocketAddress("localhost", 21000); - doReturn(socketAddr).when(tal).getAddress(); + TaskCommunicator taskComm = mock(TaskCommunicator.class); + doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress(); + doReturn(taskComm).when(tal).getTaskCommunicator(0); return tal; } http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java index a8d4ec9..0708b0f 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java @@ -19,6 +19,7 @@ package org.apache.tez.examples; import java.io.IOException; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -130,7 +131,7 @@ public class JoinValidate extends TezExampleBase { private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions) throws IOException { - DAG dag = DAG.create("JoinValidate"); + DAG dag = DAG.create(getDagName()); // Configuration for intermediate output - shared by Vertex1 and Vertex2 // This should only be setting selective keys from the underlying conf. Fix after there's a @@ -147,15 +148,18 @@ public class JoinValidate extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); + setVertexProperties(lhsVertex, getLhsVertexProperties()); Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create( ForwardingProcessor.class.getName())).addDataSource("rhs", MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); + setVertexProperties(rhsVertex, getRhsVertexProperties()); Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create( JoinValidateProcessor.class.getName()), numPartitions); + setVertexProperties(joinValidateVertex, getValidateVertexProperties()); Edge e1 = Edge.create(lhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty()); Edge e2 = Edge.create(rhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty()); @@ -165,6 +169,30 @@ public class JoinValidate extends TezExampleBase { return dag; } + private void setVertexProperties(Vertex vertex, Map properties) { + if (properties != null) { + for (Map.Entry entry : properties.entrySet()) { + vertex.setConf(entry.getKey(), entry.getValue()); + } + } + } + + protected Map getLhsVertexProperties() { + return null; + } + + protected Map getRhsVertexProperties() { + return null; + } + + protected Map getValidateVertexProperties() { + return null; + } + + protected String getDagName() { + return "JoinValidate"; + } + public static class JoinValidateProcessor extends SimpleProcessor { private static final Log LOG = LogFactory.getLog(JoinValidateProcessor.class); http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java index e83165b..27356bc 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java @@ -14,6 +14,8 @@ package org.apache.tez.dag.app.launcher; +import java.net.InetSocketAddress; + import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; @@ -124,7 +126,8 @@ public class TezTestServiceContainerLauncher extends AbstractService implements private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) { RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder(); - builder.setAmHost(tal.getAddress().getHostName()).setAmPort(tal.getAddress().getPort()); + InetSocketAddress address = tal.getTaskCommunicator(event.getTaskCommId()).getAddress(); + builder.setAmHost(address.getHostName()).setAmPort(address.getPort()); builder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId()); builder.setApplicationIdString( event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString()); http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java index e3c18bf..5657f86 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java @@ -14,7 +14,6 @@ package org.apache.tez.dag.app.rm; -import java.io.IOException; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -32,25 +31,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; import org.apache.tez.service.TezTestServiceConfConstants; -// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes. - public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { private static final Log LOG = LogFactory.getLog(TezTestServiceTaskSchedulerService.class); @@ -71,7 +62,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { private final ConcurrentMap runningTasks = new ConcurrentHashMap(); - private final AMRMClientAsync amRmClient; + // AppIdIdentifier to avoid conflicts with other containres in the system. // Per instance private final int memoryPerInstance; @@ -82,10 +73,13 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { private final Resource resourcePerContainer; + // Not registering with the RM. Assuming the main TezScheduler will always run (except local mode), + // and take care of YARN registration. public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname, int clientPort, String trackingUrl, + long customAppIdIdentifier, Configuration conf) { // Accepting configuration here to allow setting up fields as final super(TezTestServiceTaskSchedulerService.class.getName()); @@ -93,7 +87,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { this.appClientDelegate = createAppCallbackDelegate(appClient); this.appContext = appContext; this.serviceHosts = new LinkedList(); - this.containerFactory = new ContainerFactory(appContext); + this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier); this.memoryPerInstance = conf .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1); @@ -123,7 +117,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer); - this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler()); String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS); if (hosts == null || hosts.length == 0) { @@ -143,36 +136,8 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { } @Override - public void serviceInit(Configuration conf) { - amRmClient.init(conf); - } - - @Override - public void serviceStart() { - amRmClient.start(); - RegisterApplicationMasterResponse response; - try { - amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl); - } catch (YarnException e) { - throw new TezUncheckedException(e); - } catch (IOException e) { - throw new TezUncheckedException(e); - } - } - - @Override public void serviceStop() { if (!this.isStopped.getAndSet(true)) { - - try { - TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus(); - amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage, - status.postCompletionTrackingUrl); - } catch (YarnException e) { - throw new TezUncheckedException(e); - } catch (IOException e) { - throw new TezUncheckedException(e); - } appCallbackExecutor.shutdownNow(); } } @@ -264,7 +229,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { private ExecutorService createAppCallbackExecutorService() { return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); + .setNameFormat("TezTestTaskSchedulerAppCaller").setDaemon(true).build()); } private TaskSchedulerAppCallback createAppCallbackDelegate( @@ -274,7 +239,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { } private String selectHost(String[] requestedHosts) { - String host = null; + String host; if (requestedHosts != null && requestedHosts.length > 0) { Arrays.sort(requestedHosts); host = requestedHosts[0]; @@ -287,17 +252,19 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { } static class ContainerFactory { - final AppContext appContext; AtomicInteger nextId; - - public ContainerFactory(AppContext appContext) { - this.appContext = appContext; - this.nextId = new AtomicInteger(2); + final ApplicationAttemptId customAppAttemptId; + + public ContainerFactory(AppContext appContext, long appIdLong) { + this.nextId = new AtomicInteger(1); + ApplicationId appId = ApplicationId + .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId()); + this.customAppAttemptId = ApplicationAttemptId + .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId()); } public Container createContainer(Resource capability, Priority priority, String hostname, int port) { - ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId(); - ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement()); + ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement()); NodeId nodeId = NodeId.newInstance(hostname, port); String nodeHttpAddress = "hostname:0"; @@ -311,37 +278,4 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService { return container; } } - - private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler { - - @Override - public void onContainersCompleted(List statuses) { - - } - - @Override - public void onContainersAllocated(List containers) { - - } - - @Override - public void onShutdownRequest() { - - } - - @Override - public void onNodesUpdated(List updatedNodes) { - - } - - @Override - public float getProgress() { - return 0; - } - - @Override - public void onError(Throwable e) { - - } - } } http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java new file mode 100644 index 0000000..e5d2e3b --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.examples; + +import java.util.Map; + +public class JoinValidateConfigured extends JoinValidate { + + private final Map lhsProps; + private final Map rhsProps; + private final Map validateProps; + private final String dagNameSuffix; + + public JoinValidateConfigured(Map lhsProps, Map rhsProps, + Map validateProps, String dagNameSuffix) { + this.lhsProps = lhsProps; + this.rhsProps = rhsProps; + this.validateProps = validateProps; + this.dagNameSuffix = dagNameSuffix; + } + + @Override + protected Map getLhsVertexProperties() { + return this.lhsProps; + } + + @Override + protected Map getRhsVertexProperties() { + return this.rhsProps; + } + + @Override + protected Map getValidateVertexProperties() { + return this.validateProps; + } + + @Override + protected String getDagName() { + return "JoinValidate_" + dagNameSuffix; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/a2190b17/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index ae7e7f8..9c149c6 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -15,11 +15,11 @@ package org.apache.tez.tests; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Map; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,13 +28,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService; import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; import org.apache.tez.examples.HashJoinExample; import org.apache.tez.examples.JoinDataGen; -import org.apache.tez.examples.JoinValidate; +import org.apache.tez.examples.JoinValidateConfigured; import org.apache.tez.service.MiniTezTestServiceCluster; import org.apache.tez.test.MiniTezCluster; import org.junit.AfterClass; @@ -47,23 +48,31 @@ public class TestExternalTezServices { private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush"; - private static MiniTezCluster tezCluster; - private static MiniDFSCluster dfsCluster; - private static MiniTezTestServiceCluster tezTestServiceCluster; + private static volatile MiniTezCluster tezCluster; + private static volatile MiniDFSCluster dfsCluster; + private static volatile MiniTezTestServiceCluster tezTestServiceCluster; - private static Configuration clusterConf = new Configuration(); - private static Configuration confForJobs; + private static volatile Configuration clusterConf = new Configuration(); + private static volatile Configuration confForJobs; - private static FileSystem remoteFs; - private static FileSystem localFs; + private static volatile FileSystem remoteFs; + private static volatile FileSystem localFs; - private static TezClient sharedTezClient; + private static volatile TezClient sharedTezClient; + + private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServices.class.getSimpleName()); + private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath"); + private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath"); + + private static final Map PROPS_EXT_SERVICE_PUSH = Maps.newHashMap(); + private static final Map PROPS_REGULAR_CONTAINERS = Maps.newHashMap(); + private static final Map PROPS_IN_AM = Maps.newHashMap(); private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName() + "-tmpDir"; @BeforeClass - public static void setup() throws IOException, TezException, InterruptedException { + public static void setup() throws Exception { localFs = FileSystem.getLocal(clusterConf); @@ -108,27 +117,79 @@ public class TestExternalTezServices { remoteFs.mkdirs(stagingDirPath); // This is currently configured to push tasks into the Service, and then use the standard RPC confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); - confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, + + confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, +// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName()); - confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, - EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName()); - confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, - EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName()); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME); + confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, +// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, + EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName()); + confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, +// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, + EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName()); - TezConfiguration tezConf = new TezConfiguration(confForJobs); + // Default all jobs to run via the service. Individual tests override this on a per vertex/dag level. + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + + // Setup various executor sets + PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + + PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME); + PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME); + PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME); + + PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT); + PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT); + PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT); + + + // Create a session to use for all tests. + TezConfiguration tezClientConf = new TezConfiguration(confForJobs); sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session", - tezConf, true); + tezClientConf, true); sharedTezClient.start(); LOG.info("Shared TezSession started"); sharedTezClient.waitTillReady(); LOG.info("Shared TezSession ready for submission"); + // Generate the join data set used for each run. + // Can a timeout be enforced here ? + remoteFs.mkdirs(SRC_DATA_DIR); + Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1"); + Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2"); + TezConfiguration tezConf = new TezConfiguration(confForJobs); + // Generate join data - with 2 tasks. + JoinDataGen dataGen = new JoinDataGen(); + String[] dataGenArgs = new String[]{ + dataPath1.toString(), "1048576", dataPath2.toString(), "524288", + HASH_JOIN_EXPECTED_RESULT_PATH.toString(), "2"}; + assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient)); + // Run the actual join - with 2 reducers + HashJoinExample joinExample = new HashJoinExample(); + String[] args = new String[]{ + dataPath1.toString(), dataPath2.toString(), "2", HASH_JOIN_OUTPUT_PATH.toString()}; + assertEquals(0, joinExample.run(tezConf, args, sharedTezClient)); + + LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result"); } @AfterClass @@ -156,35 +217,50 @@ public class TestExternalTezServices { @Test(timeout = 60000) - public void test1() throws Exception { - Path testDir = new Path("/tmp/testHashJoinExample"); + public void testAllInService() throws Exception { + int expectedExternalSubmissions = 4 + 3; //4 for 4 src files, 3 for num reducers. + runJoinValidate("AllInService", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH, + PROPS_EXT_SERVICE_PUSH, PROPS_EXT_SERVICE_PUSH); + } - remoteFs.mkdirs(testDir); + @Test(timeout = 60000) + public void testAllInContainers() throws Exception { + int expectedExternalSubmissions = 0; // All in containers + runJoinValidate("AllInContainers", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, + PROPS_REGULAR_CONTAINERS, PROPS_REGULAR_CONTAINERS); + } - Path dataPath1 = new Path(testDir, "inPath1"); - Path dataPath2 = new Path(testDir, "inPath2"); - Path expectedOutputPath = new Path(testDir, "expectedOutputPath"); - Path outPath = new Path(testDir, "outPath"); + @Test(timeout = 60000) + public void testMixed1() throws Exception { // M-ExtService, R-containers + int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 3 for num reducers. + runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH, + PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS); + } - TezConfiguration tezConf = new TezConfiguration(confForJobs); + @Test(timeout = 60000) + public void testMixed2() throws Exception { // M-Containers, R-ExtService + int expectedExternalSubmissions = 0 + 3; //4 for 4 src files, 3 for num reducers. + runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, + PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH); + } - JoinDataGen dataGen = new JoinDataGen(); - String[] dataGenArgs = new String[]{ - dataPath1.toString(), "1048576", dataPath2.toString(), "524288", - expectedOutputPath.toString(), "2"}; - assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient)); - HashJoinExample joinExample = new HashJoinExample(); - String[] args = new String[]{ - dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()}; - assertEquals(0, joinExample.run(tezConf, args, sharedTezClient)); + private void runJoinValidate(String name, int extExpectedCount, Map lhsProps, + Map rhsProps, + Map validateProps) throws + Exception { + int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions(); - JoinValidate joinValidate = new JoinValidate(); - String[] validateArgs = new String[]{ - expectedOutputPath.toString(), outPath.toString(), "3"}; + TezConfiguration tezConf = new TezConfiguration(confForJobs); + JoinValidateConfigured joinValidate = + new JoinValidateConfigured(lhsProps, rhsProps, + validateProps, name); + String[] validateArgs = new String[]{"-disableSplitGrouping", + HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"}; assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient)); // Ensure this was actually submitted to the external cluster - assertTrue(tezTestServiceCluster.getNumSubmissions() > 0); + assertEquals(extExpectedCount, + (tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount)); } }