Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AF02F17F2D for ; Tue, 21 Apr 2015 04:05:26 +0000 (UTC) Received: (qmail 21934 invoked by uid 500); 21 Apr 2015 04:05:26 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 21897 invoked by uid 500); 21 Apr 2015 04:05:26 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 21888 invoked by uid 99); 21 Apr 2015 04:05:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Apr 2015 04:05:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 62379E099D; Tue, 21 Apr 2015 04:05:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: <786ed23198dc46e7afeaa43b2d826131@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2212. Notify components on DAG completion. (sseth) Date: Tue, 21 Apr 2015 04:05:26 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master db0a50c5e -> 44046f8a4 TEZ-2212. Notify components on DAG completion. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/44046f8a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/44046f8a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/44046f8a Branch: refs/heads/master Commit: 44046f8a4ae7edab466f6c3789772c97955f5786 Parents: db0a50c Author: Siddharth Seth Authored: Mon Apr 20 21:05:04 2015 -0700 Committer: Siddharth Seth Committed: Mon Apr 20 21:05:04 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/AsyncDispatcher.java | 5 +++ .../org/apache/tez/dag/records/TezDAGID.java | 7 ++++ .../tez/dag/records/TezTaskAttemptID.java | 6 +++ .../org/apache/tez/dag/records/TezTaskID.java | 6 +++ .../org/apache/tez/dag/records/TezVertexID.java | 6 +++ .../org/apache/tez/dag/app/DAGAppMaster.java | 39 ++++++++++++++++++++ .../apache/tez/dag/app/TaskAttemptListener.java | 5 +++ .../dag/app/TaskAttemptListenerImpTezDag.java | 16 ++++++++ .../dag/event/DAGAppMasterEventDagCleanup.java | 31 ++++++++++++++++ .../app/dag/event/DAGAppMasterEventType.java | 4 +- .../tez/dag/app/launcher/ContainerLauncher.java | 5 +++ .../dag/app/launcher/ContainerLauncherImpl.java | 12 ++++++ .../app/launcher/LocalContainerLauncher.java | 9 +++++ .../dag/app/rm/LocalTaskSchedulerService.java | 2 +- .../dag/app/rm/TaskSchedulerEventHandler.java | 7 +++- .../tez/dag/app/rm/TaskSchedulerService.java | 2 +- .../dag/app/rm/YarnTaskSchedulerService.java | 2 +- .../app/rm/container/AMContainerHelpers.java | 5 +++ .../dag/app/rm/container/AMContainerMap.java | 6 +++ .../tez/dag/app/rm/node/AMNodeTracker.java | 6 +++ .../apache/tez/dag/app/MockDAGAppMaster.java | 11 +++++- 22 files changed, 187 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c44824c..64b3561 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2212. Notify components on DAG completion. TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads to tez.runtime.pipelined.sorter.sort.threads. TEZ-2333. Enable local fetch optimization by default. http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index b751260..5aaa4cf 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -326,4 +326,9 @@ public class AsyncDispatcher extends CompositeService implements Dispatcher { } }; } + + @Private + public int getQueueSize() { + return eventQueue.size(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java index d12f760..0fe1d44 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.IOException; import java.text.NumberFormat; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.api.records.ApplicationId; import com.google.common.base.Preconditions; @@ -66,6 +67,12 @@ public class TezDAGID extends TezID { Preconditions.checkArgument(applicationId != null, "ApplicationID cannot be null"); return dagIdCache.getUnchecked(new TezDAGID(applicationId, id)); } + + @InterfaceAudience.Private + public static void clearCache() { + dagIdCache.invalidateAll(); + dagIdCache.cleanUp(); + } /** * Get a DAGID object from given parts. http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java index 0896b99..296d577 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java @@ -74,6 +74,12 @@ public class TezTaskAttemptID extends TezID { return taskAttemptIDCache.getUnchecked(new TezTaskAttemptID(taskID, id)); } + @InterfaceAudience.Private + public static void clearCache() { + taskAttemptIDCache.invalidateAll(); + taskAttemptIDCache.cleanUp(); + } + private TezTaskAttemptID(TezTaskID taskId, int id) { super(id); if(taskId == null) { http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java index 0a7bc4a..b4c7b32 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java @@ -81,6 +81,12 @@ public class TezTaskID extends TezID { return taskIDCache.getUnchecked(new TezTaskID(vertexID, id)); } + @InterfaceAudience.Private + public static void clearCache() { + taskIDCache.invalidateAll(); + taskIDCache.cleanUp(); + } + private TezTaskID(TezVertexID vertexID, int id) { super(id); Preconditions.checkArgument(vertexID != null, "vertexID cannot be null"); http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java index a9b2625..93ef551 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java @@ -83,6 +83,12 @@ public class TezVertexID extends TezID { return vertexIDCache.getUnchecked(new TezVertexID(dagId, id)); } + @InterfaceAudience.Private + public static void clearCache() { + vertexIDCache.invalidateAll(); + vertexIDCache.cleanUp(); + } + private TezVertexID(TezDAGID dagId, int id) { super(id); this.dagId = dagId; http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 2924b25..3dd9e4c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -57,6 +57,9 @@ import java.util.regex.Pattern; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -661,7 +664,19 @@ public class DAGAppMaster extends AbstractService { } if (!state.equals(DAGAppMasterState.ERROR)) { if (!sessionStopped.get()) { + LOG.info("Central Dispatcher queue size after DAG completion, before cleanup: " + + dispatcher.getQueueSize()); LOG.info("Waiting for next DAG to be submitted."); + + // Sending this via the event queue, in case there are pending events which need to be + // processed. TaskKilled for example, or ContainerCompletions. + // The DAG needs to be part of the event, since the dag can get reset when the next + // dag is submitted. The next DAG, however, will not start executing till the cleanup + // is complete, since execution start is on the same dispatcher. + sendEvent(new DAGAppMasterEventDagCleanup(context.getCurrentDAG())); + + // Leaving the taskSchedulerEventHandler here for now. Doesn't generate new events. + // However, eventually it needs to be moved out. this.taskSchedulerEventHandler.dagCompleted(); state = DAGAppMasterState.IDLE; } else { @@ -689,6 +704,27 @@ public class DAGAppMaster extends AbstractService { this.state = DAGAppMasterState.KILLED; shutdownHandler.shutdown(true); break; + case DAG_CLEANUP: + DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event; + LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" + + cleanupEvent.getDag().getID()); + containerLauncher.dagComplete(cleanupEvent.getDag()); + taskAttemptListener.dagComplete(cleanupEvent.getDag()); + nodes.dagComplete(cleanupEvent.getDag()); + containers.dagComplete(cleanupEvent.getDag()); + TezTaskAttemptID.clearCache(); + TezTaskID.clearCache(); + TezVertexID.clearCache(); + TezDAGID.clearCache(); + LOG.info("Completed cleanup for DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" + + cleanupEvent.getDag().getID()); + break; + case NEW_DAG_SUBMITTED: + // Inform sub-components that a new DAG has been submitted. + taskSchedulerEventHandler.dagSubmitted(); + containerLauncher.dagSubmitted(); + taskAttemptListener.dagSubmitted(); + break; default: throw new TezUncheckedException( "AppMaster: No handler for event type: " + event.getType()); @@ -2081,6 +2117,9 @@ public class DAGAppMaster extends AbstractService { // End of creating the job. ((RunningAppContext) context).setDAG(currentDAG); + // Send out an event to inform components that a new DAG has been submitted. + // Information about this DAG is available via the context. + sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.NEW_DAG_SUBMITTED)); // create a job event for job initialization DAGEvent initDagEvent = new DAGEvent(currentDAG.getID(), DAGEventType.DAG_INIT); // Send init to the job (this does NOT trigger job execution) http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java index aeb0cd5..9caa7cf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.app; import java.net.InetSocketAddress; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TezTaskAttemptID; /** @@ -38,4 +39,8 @@ public interface TaskAttemptListener { void unregisterTaskAttempt(TezTaskAttemptID attemptID); + void dagComplete(DAG dag); + + void dagSubmitted(); + } http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index a01ca39..b64283b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -282,6 +282,22 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } @Override + public void dagComplete(DAG dag) { + // TODO TEZ-2335. Cleanup TaskHeartbeat handler structures. + // TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are told to die. + // Container structures remain unchanged - since they could be re-used across restarts. + // This becomes more relevant when task kills without container kills are allowed. + + // TODO TEZ-2336. Send a signal to containers indicating DAG completion. + } + + @Override + public void dagSubmitted() { + // Nothing to do right now. Indicates that a new DAG has been submitted and + // the context has updated information. + } + + @Override public void registerRunningContainer(ContainerId containerId) { if (LOG.isDebugEnabled()) { LOG.debug("ContainerId: " + containerId http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDagCleanup.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDagCleanup.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDagCleanup.java new file mode 100644 index 0000000..2d1a23a --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDagCleanup.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.event; + +import org.apache.tez.dag.app.dag.DAG; + +public class DAGAppMasterEventDagCleanup extends DAGAppMasterEvent { + + private final DAG dag; + + public DAGAppMasterEventDagCleanup(DAG dag) { + super(DAGAppMasterEventType.DAG_CLEANUP); + this.dag = dag; + } + + public DAG getDag() { + return this.dag; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java index ee03ef5..5a102a5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java @@ -22,5 +22,7 @@ public enum DAGAppMasterEventType { INTERNAL_ERROR, AM_REBOOT, DAG_FINISHED, - SCHEDULING_SERVICE_ERROR + SCHEDULING_SERVICE_ERROR, + NEW_DAG_SUBMITTED, // Indicates a new dag being submitted, to notify sub-components + DAG_CLEANUP } http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java index 35cdeda..305eb50 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java @@ -20,8 +20,13 @@ package org.apache.tez.dag.app.launcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; public interface ContainerLauncher extends EventHandler { + + void dagComplete(DAG dag); + + void dagSubmitted(); } http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java index a1eb2a7..94889a1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -110,6 +111,17 @@ public class ContainerLauncherImpl extends AbstractService implements } } + @Override + public void dagComplete(DAG dag) { + // Nothing required at the moment. Containers are shared across DAGs + } + + @Override + public void dagSubmitted() { + // Nothing to do right now. Indicates that a new DAG has been submitted and + // the context has updated information. + } + private static enum ContainerState { PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH } http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 5c8aab6..9faf8c0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -43,6 +43,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -154,6 +155,14 @@ public class LocalContainerLauncher extends AbstractService implements callbackExecutor.shutdownNow(); } + @Override + public void dagComplete(DAG dag) { + } + + @Override + public void dagSubmitted() { + } + // Thread to monitor the queue of incoming NMCommunicator events private class TezSubTaskRunner implements Runnable { @Override http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index ce01bfb..51d8b9d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -112,7 +112,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { } @Override - public void resetMatchLocalityForAllHeldContainers() { + public void dagComplete() { } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 250446d..19db660 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -563,7 +563,12 @@ public class TaskSchedulerEventHandler extends AbstractService } public void dagCompleted() { - taskScheduler.resetMatchLocalityForAllHeldContainers(); + taskScheduler.dagComplete(); + } + + public void dagSubmitted() { + // Nothing to do right now. Indicates that a new DAG has been submitted and + // the context has updated information. } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java index 096069b..48d5455 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java @@ -43,7 +43,7 @@ public abstract class TaskSchedulerService extends AbstractService{ public abstract int getClusterNodeCount(); - public abstract void resetMatchLocalityForAllHeldContainers(); + public abstract void dagComplete(); public abstract Resource getTotalResources(); http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 66a6f33..44f5484 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -838,7 +838,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService } @Override - public synchronized void resetMatchLocalityForAllHeldContainers() { + public synchronized void dagComplete() { for (HeldContainer heldContainer : heldContainers.values()) { heldContainer.resetLocalityMatchLevel(); } http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 41e814b..470fa56 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -65,6 +65,11 @@ public class AMContainerHelpers { private static Map commonContainerSpecs = new HashMap(); + public static void dagComplete(TezDAGID dagId) { + synchronized (commonContainerSpecLock) { + commonContainerSpecs.remove(dagId); + } + } /** * Create a {@link LocalResource} record with all the given parameters. http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java index 6037a3a..574c38e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.app.rm.container; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; +import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.service.AbstractService; @@ -74,4 +75,9 @@ public class AMContainerMap extends AbstractService implements EventHandler values() { return containerMap.values(); } + + public void dagComplete(DAG dag){ + AMContainerHelpers.dagComplete(dag.getID()); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java index a0be825..102cbe9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -205,4 +206,9 @@ public class AMNodeTracker extends AbstractService implements public boolean isBlacklistingIgnored() { return this.ignoreBlacklisting; } + + public void dagComplete(DAG dag) { + // TODO TEZ-2337 Maybe reset failures from previous DAGs + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/44046f8a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index fca15fd..92dfdb5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.tez.dag.app.dag.DAG; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -127,7 +128,7 @@ public class MockDAGAppMaster extends DAGAppMaster { AtomicBoolean startScheduling = new AtomicBoolean(true); AtomicBoolean goFlag; boolean updateProgress = true; - + LinkedBlockingQueue containersToProcess = new LinkedBlockingQueue(); Map preemptedTasks = Maps.newConcurrentMap(); @@ -139,6 +140,14 @@ public class MockDAGAppMaster extends DAGAppMaster { this.goFlag = goFlag; } + @Override + public void dagComplete(DAG dag) { + } + + @Override + public void dagSubmitted() { + } + public class ContainerData { ContainerId cId; TezTaskAttemptID taId;