Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 999DC17D61 for ; Mon, 29 Sep 2014 00:35:08 +0000 (UTC) Received: (qmail 58789 invoked by uid 500); 29 Sep 2014 00:35:08 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 58700 invoked by uid 500); 29 Sep 2014 00:35:08 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 58199 invoked by uid 99); 29 Sep 2014 00:35:08 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 00:35:08 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C84729B9C56; Mon, 29 Sep 2014 00:35:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Date: Mon, 29 Sep 2014 00:35:21 -0000 Message-Id: <06293c9b334a496cb7971da2cf0bf0c8@git.apache.org> In-Reply-To: <82bc55d5adc141e4922f88c793b00754@git.apache.org> References: <82bc55d5adc141e4922f88c793b00754@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/50] [abbrv] git commit: TEZ-1569. Add tests for preemption (bikas) TEZ-1569. Add tests for preemption (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5e5683ab Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5e5683ab Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5e5683ab Branch: refs/heads/branch-0.5 Commit: 5e5683ab5e89f1fd88883ed40574c8fb04316dd1 Parents: 8e382b3 Author: Bikas Saha Authored: Fri Sep 12 14:15:56 2014 -0700 Committer: Bikas Saha Committed: Fri Sep 12 14:15:56 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../java/org/apache/tez/client/TezClient.java | 2 +- .../java/org/apache/tez/client/LocalClient.java | 15 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 15 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 6 + .../apache/tez/dag/app/MockDAGAppMaster.java | 245 +++++++++++++++++++ .../org/apache/tez/dag/app/MockLocalClient.java | 48 ++++ .../org/apache/tez/dag/app/MockTezClient.java | 48 ++++ .../org/apache/tez/dag/app/TestPreemption.java | 203 +++++++++++++++ 9 files changed, 576 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 87729b3..73a3671 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,7 @@ ALL CHANGES: TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized. TEZ-1575. MRRSleepJob does not pick MR settings for container size and java opts. TEZ-1578. Remove TeraSort from Tez codebase. + TEZ-1569. Add tests for preemption Release 0.5.1: Unreleased @@ -33,7 +34,7 @@ ALL CHANGES TEZ-1534. Make client side configs available to AM and tasks. TEZ-1574. Support additional formats for the tez deployed archive -Release 0.5.0: Unreleased +Release 0.5.0: 2014-09-03 INCOMPATIBLE CHANGES TEZ-1038. Move TaskLocationHint outside of VertexLocationHint. http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index 13ca2dc..77ab20c 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -123,7 +123,7 @@ public class TezClient { } @Private - TezClient(String name, TezConfiguration tezConf, boolean isSession, + protected TezClient(String name, TezConfiguration tezConf, boolean isSession, @Nullable Map localResources, @Nullable Credentials credentials) { this.clientName = name; http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index 0b615fa..0a95cf0 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -54,6 +55,8 @@ import org.apache.tez.dag.app.DAGAppMasterState; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.utils.EnvironmentUpdateUtils; +import com.google.common.annotations.VisibleForTesting; + public class LocalClient extends FrameworkClient { public static final Logger LOG = Logger.getLogger(LocalClient.class); @@ -286,8 +289,7 @@ public class LocalClient extends FrameworkClient { int nmHttpPort = YarnConfiguration.DEFAULT_NM_WEBAPP_PORT; long appSubmitTime = System.currentTimeMillis(); - dagAppMaster = - new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, + dagAppMaster = createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, new SystemClock(), appSubmitTime, isSession, userDir.toUri().getPath()); clientHandler = new DAGClientHandler(dagAppMaster); @@ -305,4 +307,13 @@ public class LocalClient extends FrameworkClient { return thread; } + + // this can be overridden by test code to create a mock app + @VisibleForTesting + protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId, + ContainerId cId, String currentHost, int nmPort, int nmHttpPort, + Clock clock, long appSubmitTime, boolean isSession, String userDir) { + return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, + new SystemClock(), appSubmitTime, isSession, userDir); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 9cd716a..ea0ab3b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -156,12 +156,12 @@ import com.google.common.base.Function; import com.google.common.collect.Maps; /** - * The Map-Reduce Application Master. + * The Tez DAG Application Master. * The state machine is encapsulated in the implementation of Job interface. * All state changes happens via Job interface. Each event * results in a Finite State Transition in Job. * - * MR AppMaster is the composition of loosely coupled services. The services + * Tez DAG AppMaster is the composition of loosely coupled services. The services * interact with each other via events. The components resembles the * Actors model. The component acts on received event and send out the * events to other components. @@ -443,6 +443,11 @@ public class DAGAppMaster extends AbstractService { System.exit(0); } } + + @VisibleForTesting + protected TaskSchedulerEventHandler getTaskSchedulerEventHandler() { + return taskSchedulerEventHandler; + } private synchronized void handle(DAGAppMasterEvent event) { switch (event.getType()) { @@ -1434,14 +1439,14 @@ public class DAGAppMaster extends AbstractService { } return null; } - + @Override public synchronized void serviceStart() throws Exception { //start all the components startServices(); super.serviceStart(); - + // metrics system init is really init & start. // It's more test friendly to put it here. DefaultMetricsSystem.initialize("DAGAppMaster"); @@ -1882,7 +1887,7 @@ public class DAGAppMaster extends AbstractService { UserGroupInformation.setConfiguration(conf); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); - + appMaster.appMasterUgi = UserGroupInformation .createRemoteUser(jobUserName); appMaster.appMasterUgi.addCredentials(credentials); http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index daaa81b..23f9096 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -105,6 +105,7 @@ import org.apache.tez.dag.history.events.DAGStartedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent; import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; import org.apache.tez.dag.utils.RelocalizationUtils; @@ -690,6 +691,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } return vertex.getVertexStatus(statusOptions); } + + public TaskAttemptImpl getTaskAttempt(TezTaskAttemptID taId) { + return (TaskAttemptImpl) getVertex(taId.getTaskID().getVertexID()).getTask(taId.getTaskID()) + .getAttempt(taId); + } protected void initializeVerticesAndStart() { for (Vertex v : vertices.values()) { http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java new file mode 100644 index 0000000..9fe9c4d --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -0,0 +1,245 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.common.ContainerContext; +import org.apache.tez.common.ContainerTask; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; +import org.apache.tez.dag.app.launcher.ContainerLauncher; +import org.apache.tez.dag.app.rm.NMCommunicatorEvent; +import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; +import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEvent; +import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; +import org.apache.tez.dag.app.rm.container.AMContainerEventType; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; +import org.apache.tez.runtime.api.impl.EventMetaData; +import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; + +import com.google.common.collect.Maps; + +@SuppressWarnings("unchecked") +public class MockDAGAppMaster extends DAGAppMaster { + + MockContainerLauncher containerLauncher; + + // mock container launcher does not launch real tasks. + // Upon, launch of a container is simulates the container asking for tasks + // Upon receiving a task it simulates completion of the tasks + // It can be used to preempt the container for a given task + public class MockContainerLauncher extends AbstractService implements ContainerLauncher, Runnable { + + BlockingQueue eventQueue = new LinkedBlockingQueue(); + Thread eventHandlingThread; + + Map containers = Maps.newConcurrentMap(); + TaskAttemptListenerImpTezDag taListener; + + AtomicBoolean startScheduling = new AtomicBoolean(true); + AtomicBoolean goFlag; + + Map preemptedTasks = Maps.newConcurrentMap(); + + public MockContainerLauncher(AtomicBoolean goFlag) { + super("MockContainerLauncher"); + this.goFlag = goFlag; + } + + public class ContainerData { + ContainerId cId; + TezTaskAttemptID taId; + String vName; + boolean completed; + + public ContainerData(ContainerId cId) { + this.cId = cId; + } + + void clear() { + taId = null; + vName = null; + completed = false; + } + } + + @Override + public void serviceStart() throws Exception { + taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener(); + eventHandlingThread = new Thread(this); + eventHandlingThread.start(); + } + + @Override + public void serviceStop() throws Exception { + if (eventHandlingThread != null) { + eventHandlingThread.interrupt(); + eventHandlingThread.join(2000l); + } + } + + @Override + public void handle(NMCommunicatorEvent event) { + switch (event.getType()) { + case CONTAINER_LAUNCH_REQUEST: + launch((NMCommunicatorLaunchRequestEvent) event); + break; + case CONTAINER_STOP_REQUEST: + stop((NMCommunicatorStopRequestEvent)event); + break; + } + } + + + void waitToGo() { + synchronized (goFlag) { + goFlag.set(true); + goFlag.notify(); + try { + goFlag.wait(); + } catch (InterruptedException e) { + throw new TezUncheckedException(e); + } + } + } + + public void startScheduling(boolean value) { + startScheduling.set(value); + } + + public Map getContainers() { + return containers; + } + + public void preemptContainerForTask(TezTaskID tId, int uptoVersion) { + preemptedTasks.put(tId, uptoVersion); + } + + public void preemptContainer(ContainerData cData) { + getTaskSchedulerEventHandler().containerCompleted(null, + ContainerStatus.newInstance(cData.cId, null, "Preempted", ContainerExitStatus.PREEMPTED)); + cData.clear(); + } + + void stop(NMCommunicatorStopRequestEvent event) { + // remove from simulated container list + containers.remove(event.getContainerId()); + getContext().getEventHandler().handle( + new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT)); + } + + void launch(NMCommunicatorLaunchRequestEvent event) { + // launch container by putting it in simulated container list + containers.put(event.getContainerId(), new ContainerData(event.getContainerId())); + getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId())); + } + + @Override + public void run() { + // wait for test to sync with us and get a reference to us. Go when sync is done + waitToGo(); + while(true) { + if (!startScheduling.get()) { // schedule when asked to do so by the test code + continue; + } + for (Map.Entry entry : containers.entrySet()) { + ContainerData cData = entry.getValue(); + ContainerId cId = entry.getKey(); + if (cData.taId == null) { + // if container is not assigned a task, ask for a task + try { + ContainerTask cTask = taListener.getTask(new ContainerContext(cId.toString())); + if (cTask == null) { + continue; + } + if (cTask.shouldDie()) { + containers.remove(cId); + } else { + cData.taId = cTask.getTaskSpec().getTaskAttemptID(); + cData.vName = cTask.getTaskSpec().getVertexName(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } else if (!cData.completed) { + // container is assigned a task and task is not completed + // complete the task or preempt the task + Integer version = preemptedTasks.get(cData.taId.getTaskID()); + if (version != null && cData.taId.getId() <= version.intValue()) { + preemptContainer(cData); + } else { + // send a done notification + TezVertexID vertexId = cData.taId.getTaskID().getVertexID(); + cData.completed = true; + getContext().getEventHandler().handle( + new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent( + new TaskAttemptCompletedEvent(), new EventMetaData( + EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))))); + cData.clear(); + } + } + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + System.out.println("Interrupted in mock container launcher thread"); + break; + } + } + } + + } + + public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, + String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, + boolean isSession, String workingDirectory, AtomicBoolean launcherGoFlag) { + super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, + isSession, workingDirectory); + containerLauncher = new MockContainerLauncher(launcherGoFlag); + } + + // use mock container launcher for tests + @Override + protected ContainerLauncher createContainerLauncher(final AppContext context) + throws UnknownHostException { + return containerLauncher; + } + + public MockContainerLauncher getContainerLauncher() { + return containerLauncher; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java new file mode 100644 index 0000000..7e408e1 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java @@ -0,0 +1,48 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.client.LocalClient; + +public class MockLocalClient extends LocalClient { + MockDAGAppMaster mockApp; + AtomicBoolean mockAppLauncherGoFlag; + + public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag) { + this.mockAppLauncherGoFlag = mockAppLauncherGoFlag; + } + + protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId, + ContainerId cId, String currentHost, int nmPort, int nmHttpPort, + Clock clock, long appSubmitTime, boolean isSession, String userDir) { + mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, + new SystemClock(), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag); + return mockApp; + } + + public MockDAGAppMaster getMockApp() { + return mockApp; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java new file mode 100644 index 0000000..617415e --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java @@ -0,0 +1,48 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.FrameworkClient; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; + +public class MockTezClient extends TezClient { + MockLocalClient client; + + MockTezClient(String name, TezConfiguration tezConf, boolean isSession, + Map localResources, Credentials credentials, + AtomicBoolean mockAppLauncherGoFlag) { + super(name, tezConf, isSession, localResources, credentials); + this.client = new MockLocalClient(mockAppLauncherGoFlag); + } + + protected FrameworkClient createFrameworkClient() { + return client; + } + + public MockLocalClient getLocalClient() { + return client; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java new file mode 100644 index 0000000..c7aacd4 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java @@ -0,0 +1,203 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.app; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.EdgeProperty.DataSourceType; +import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher; +import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; +import org.apache.tez.dag.app.dag.impl.DAGImpl; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.junit.Assert; +import org.junit.Test; + +@SuppressWarnings("deprecation") +public class TestPreemption { + + static Configuration defaultConf; + static FileSystem localFs; + static Path workDir; + + static { + try { + defaultConf = new Configuration(false); + defaultConf.set("fs.defaultFS", "file:///"); + defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + localFs = FileSystem.getLocal(defaultConf); + workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), + "TestDAGAppMaster").makeQualified(localFs); + } catch (IOException e) { + throw new RuntimeException("init failure", e); + } + } + + MockDAGAppMaster mockApp; + MockContainerLauncher mockLauncher; + + int dagCount = 0; + + DAG createDAG(DataMovementType dmType) { + DAG dag = DAG.create("test-" + dagCount++); + Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5); + Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 5); + Edge eAB = Edge.create(vA, vB, + EdgeProperty.create(dmType, DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, OutputDescriptor.create("O.class"), + InputDescriptor.create("I.class"))); + + dag.addVertex(vA).addVertex(vB).addEdge(eAB); + return dag; + } + + @Test + public void testPreemptionWithoutSession() throws Exception { + System.out.println("TestPreemptionWithoutSession"); + TezConfiguration tezconf = new TezConfiguration(defaultConf); + tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0); + AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false); + MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, false, null, null, + mockAppLauncherGoFlag); + tezClient.start(); + + DAGClient dagClient = tezClient.submitDAG(createDAG(DataMovementType.SCATTER_GATHER)); + // now the MockApp has been started. sync with it to get the launcher + syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient); + + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + int vertexIndex = 0; + int upToTaskVersion = 3; + TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), vertexIndex); + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0); + + mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion); + mockLauncher.startScheduling(true); + + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + + for (int i=0; i<=upToTaskVersion; ++i) { + TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i); + TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId); + Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState()); + } + + tezClient.stop(); + } + + @Test + public void testPreemptionWithSession() throws Exception { + System.out.println("TestPreemptionWithSession"); + MockTezClient tezClient = createTezSession(); + testPreemptionSingle(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather"); + testPreemptionMultiple(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather"); + testPreemptionSingle(tezClient, createDAG(DataMovementType.BROADCAST), 0, "Broadcast"); + testPreemptionMultiple(tezClient, createDAG(DataMovementType.BROADCAST), 0, "Broadcast"); + testPreemptionSingle(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 0, "1-1"); + testPreemptionMultiple(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 0, "1-1"); + testPreemptionSingle(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather"); + testPreemptionMultiple(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather"); + testPreemptionSingle(tezClient, createDAG(DataMovementType.BROADCAST), 1, "Broadcast"); + testPreemptionMultiple(tezClient, createDAG(DataMovementType.BROADCAST), 1, "Broadcast"); + testPreemptionSingle(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 1, "1-1"); + testPreemptionMultiple(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 1, "1-1"); + tezClient.stop(); + } + + MockTezClient createTezSession() throws Exception { + TezConfiguration tezconf = new TezConfiguration(defaultConf); + tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0); + AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false); + MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, true, null, null, + mockAppLauncherGoFlag); + tezClient.start(); + syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient); + return tezClient; + } + + void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag, + MockTezClient tezClient) throws Exception { + synchronized (mockAppLauncherGoFlag) { + while (!mockAppLauncherGoFlag.get()) { + mockAppLauncherGoFlag.wait(); + } + mockApp = tezClient.getLocalClient().getMockApp(); + mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(allowScheduling); + mockAppLauncherGoFlag.notify(); + } + } + + void testPreemptionSingle(MockTezClient tezClient, DAG dag, int vertexIndex, String info) + throws Exception { + testPreemptionJob(tezClient, dag, vertexIndex, 0, info + "-Single"); + } + + void testPreemptionMultiple(MockTezClient tezClient, DAG dag, int vertexIndex, String info) + throws Exception { + testPreemptionJob(tezClient, dag, vertexIndex, 3, info + "-Multiple"); + } + + void testPreemptionJob(MockTezClient tezClient, DAG dag, int vertexIndex, + int upToTaskVersion, String info) throws Exception { + System.out.println("TestPreemption - Running - " + info); + TezConfiguration tezconf = new TezConfiguration(defaultConf); + tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0); + + mockLauncher.startScheduling(false); // turn off scheduling to block DAG before submitting it + DAGClient dagClient = tezClient.submitDAG(dag); + + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), vertexIndex); + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0); + + mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion); + mockLauncher.startScheduling(true); + + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + + for (int i=0; i<=upToTaskVersion; ++i) { + TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i); + TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId); + Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState()); + } + + System.out.println("TestPreemption - Done running - " + info); + } +}