Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D208918484 for ; Sat, 22 Aug 2015 07:26:01 +0000 (UTC) Received: (qmail 350 invoked by uid 500); 22 Aug 2015 07:26:01 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 240 invoked by uid 500); 22 Aug 2015 07:26:01 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 99683 invoked by uid 99); 22 Aug 2015 07:26:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Aug 2015 07:26:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5E6ACDFD9E; Sat, 22 Aug 2015 07:26:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Sat, 22 Aug 2015 07:26:24 -0000 Message-Id: <1a9dc24e3eec4a3689658bdf9dd16e51@git.apache.org> In-Reply-To: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org> References: <2a5d73e46f33459bb1e1ac71d0f152f9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [25/50] [abbrv] tez git commit: TEZ-2651. Pluggable services should not extend AbstractService. (sseth) TEZ-2651. Pluggable services should not extend AbstractService. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/25a6a131 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/25a6a131 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/25a6a131 Branch: refs/heads/master Commit: 25a6a131b95fdeb1b8cdd04f91820ca114531806 Parents: cebbb01 Author: Siddharth Seth Authored: Tue Jul 28 14:55:40 2015 -0700 Committer: Siddharth Seth Committed: Fri Aug 21 18:14:40 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../serviceplugins/api/ContainerLauncher.java | 18 ++++++++++-- .../apache/tez/dag/api/TaskCommunicator.java | 30 +++++++++++++++++--- .../tez/dag/api/TaskCommunicatorContext.java | 5 ++++ .../org/apache/tez/dag/app/DAGAppMaster.java | 4 +-- .../dag/app/TaskAttemptListenerImpTezDag.java | 16 ++++++----- .../dag/app/TaskCommunicatorContextImpl.java | 9 ++++++ .../tez/dag/app/TezTaskCommunicatorImpl.java | 24 ++++++---------- .../dag/app/launcher/ContainerLauncherImpl.java | 6 ++-- .../app/launcher/ContainerLauncherRouter.java | 12 ++++++-- .../app/launcher/LocalContainerLauncher.java | 6 ++-- .../apache/tez/dag/app/MockDAGAppMaster.java | 6 ++-- .../app/TestTaskAttemptListenerImplTezDag.java | 8 +++--- .../TezTestServiceContainerLauncher.java | 6 ++-- .../TezTestServiceNoOpContainerLauncher.java | 2 +- .../TezTestServiceTaskCommunicatorImpl.java | 29 +++++++++---------- 16 files changed, 116 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index a51669d..e57f76f 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -36,5 +36,6 @@ ALL CHANGES: TEZ-2124. Change Node tracking to work per external container source. TEZ-2004. Define basic interface for pluggable ContainerLaunchers. TEZ-2005. Define basic interface for pluggable TaskScheduler. + TEZ-2651. Pluggable services should not extend AbstractService. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java index 218edb6..8337dcb 100644 --- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java @@ -17,6 +17,7 @@ package org.apache.tez.serviceplugins.api; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.service.AbstractService; +import org.apache.tez.common.ServicePluginLifecycle; /** * Plugin to allow custom container launchers to be written to launch containers on different types @@ -25,18 +26,29 @@ import org.apache.hadoop.service.AbstractService; @InterfaceAudience.Public @InterfaceStability.Unstable -public abstract class ContainerLauncher extends AbstractService { +public abstract class ContainerLauncher implements ServicePluginLifecycle { private final ContainerLauncherContext containerLauncherContext; // TODO TEZ-2003 Simplify this by moving away from AbstractService. Potentially Guava AbstractService. // A serviceInit(Configuration) is not likely to be very useful, and will expose unnecessary internal // configuration to the services if populated with the AM Configuration - public ContainerLauncher(String name, ContainerLauncherContext containerLauncherContext) { - super(name); + public ContainerLauncher(ContainerLauncherContext containerLauncherContext) { this.containerLauncherContext = containerLauncherContext; } + @Override + public void initialize() throws Exception { + } + + @Override + public void start() throws Exception { + } + + @Override + public void shutdown() throws Exception { + } + public final ContainerLauncherContext getContext() { return this.containerLauncherContext; } http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java index 05e437c..f221414 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java @@ -18,9 +18,9 @@ import java.net.InetSocketAddress; import java.util.Map; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.common.ServicePluginLifecycle; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; @@ -28,11 +28,33 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; // TODO TEZ-2003 Move this into the tez-api module -public abstract class TaskCommunicator extends AbstractService { - public TaskCommunicator(String name) { - super(name); +public abstract class TaskCommunicator implements ServicePluginLifecycle { + + private final TaskCommunicatorContext taskCommunicatorContext; + + public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) { + this.taskCommunicatorContext = taskCommunicatorContext; + } + + public TaskCommunicatorContext getContext() { + return taskCommunicatorContext; + } + + @Override + public void initialize() throws Exception { } + @Override + public void start() throws Exception { + } + + @Override + public void shutdown() throws Exception { + } + + // TODO Post TEZ-2003 Move this into the API module. Moving this requires abstractions for + // TaskSpec and related classes. (assuming that's efficient for execution) + // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct. // TODO When talking to an external service, this plugin implementer may need access to a host:port public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port); http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index b6e63f7..ab32ec1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -18,6 +18,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -35,6 +36,9 @@ public interface TaskCommunicatorContext { // TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc. + // TODO TEZ-2003 To be replaced by getInitialPayload + Configuration getInitialConfiguration(); + ApplicationAttemptId getApplicationAttemptId(); Credentials getCredentials(); @@ -42,6 +46,7 @@ public interface TaskCommunicatorContext { boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException; // TODO TEZ-2003 Split the heartbeat API to a liveness check and a status update + // KKK Rename this API TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException; boolean isKnownContainer(ContainerId containerId); http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 70c0bc6..e4d3f8b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1047,8 +1047,8 @@ public class DAGAppMaster extends AbstractService { String[] taskCommunicatorClasses, boolean isLocal) { TaskAttemptListener lis = - new TaskAttemptListenerImpTezDag(context, thh, chh, jobTokenSecretManager, - taskCommunicatorClasses, isLocal); + new TaskAttemptListenerImpTezDag(context, thh, chh, + taskCommunicatorClasses, amConf, isLocal); return lis; } http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 47b63dd..599c208 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections4.ListUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.dag.app.dag.event.TaskAttemptEvent; import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate; @@ -61,7 +62,6 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.TezEvent; -import org.apache.tez.common.security.JobTokenSecretManager; @SuppressWarnings("unchecked") @@ -75,6 +75,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements private final AppContext context; private final TaskCommunicator[] taskCommunicators; private final TaskCommunicatorContext[] taskCommunicatorContexts; + protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers; protected final TaskHeartbeatHandler taskHeartbeatHandler; protected final ContainerHeartbeatHandler containerHeartbeatHandler; @@ -99,9 +100,8 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements public TaskAttemptListenerImpTezDag(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - // TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager. - JobTokenSecretManager jobTokenSecretManager, String [] taskCommunicatorClassIdentifiers, + Configuration conf, boolean isPureLocalMode) { super(TaskAttemptListenerImpTezDag.class.getName()); this.context = context; @@ -118,9 +118,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length]; this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length]; + this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorClassIdentifiers.length]; for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) { - taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, i); + taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, conf, i); taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i); + taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]); } // TODO TEZ-2118 Start using taskCommunicator indices properly } @@ -129,15 +131,15 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements public void serviceStart() { // TODO Why is init tied to serviceStart for (int i = 0 ; i < taskCommunicators.length ; i++) { - taskCommunicators[i].init(getConfig()); - taskCommunicators[i].start(); + taskCommunicatorServiceWrappers[i].init(getConfig()); + taskCommunicatorServiceWrappers[i].start(); } } @Override public void serviceStop() { for (int i = 0 ; i < taskCommunicators.length ; i++) { - taskCommunicators[i].stop(); + taskCommunicatorServiceWrappers[i].stop(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 50e006d..035db93 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -48,14 +49,17 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver private final int taskCommunicatorIndex; private final ReentrantReadWriteLock.ReadLock dagChangedReadLock; private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock; + private final Configuration conf; private DAG dag; public TaskCommunicatorContextImpl(AppContext appContext, TaskAttemptListenerImpTezDag taskAttemptListener, + Configuration conf, int taskCommunicatorIndex) { this.context = appContext; this.taskAttemptListener = taskAttemptListener; + this.conf = conf; this.taskCommunicatorIndex = taskCommunicatorIndex; ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock(); @@ -64,6 +68,11 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver } @Override + public Configuration getInitialConfiguration() { + return conf; + } + + @Override public ApplicationAttemptId getApplicationAttemptId() { return context.getApplicationAttemptId(); } http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 0374022..93b5b43 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -67,7 +67,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask( null, true, null, null, false); - private final TaskCommunicatorContext taskCommunicatorContext; private final TezTaskUmbilicalProtocol taskUmbilical; protected final ConcurrentMap registeredContainers = @@ -116,25 +115,24 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { * Construct the service. */ public TezTaskCommunicatorImpl(TaskCommunicatorContext taskCommunicatorContext) { - super(TezTaskCommunicatorImpl.class.getName()); - this.taskCommunicatorContext = taskCommunicatorContext; + super(taskCommunicatorContext); this.taskUmbilical = new TezTaskUmbilicalProtocolImpl(); - this.tokenIdentifier = this.taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(); + this.tokenIdentifier = taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString(); this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials()); } @Override - public void serviceStart() { + public void start() { startRpcServer(); } @Override - public void serviceStop() { + public void shutdown() { stopRpcServer(); } protected void startRpcServer() { - Configuration conf = getConfig(); + Configuration conf = getContext().getInitialConfiguration(); try { JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); @@ -281,10 +279,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { return sessionToken; } - protected TaskCommunicatorContext getTaskCommunicatorContext() { - return taskCommunicatorContext; - } - public TezTaskUmbilicalProtocol getUmbilical() { return this.taskUmbilical; } @@ -305,7 +299,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { } task = getContainerTask(containerId); if (task != null && !task.shouldDie()) { - taskCommunicatorContext + getContext() .taskStartedRemotely(task.getTaskSpec().getTaskAttemptID(), containerId); } } @@ -317,7 +311,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { @Override public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException { - return taskCommunicatorContext.canCommit(taskAttemptId); + return getContext().canCommit(taskAttemptId); } @Override @@ -370,7 +364,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { TaskHeartbeatRequest tRequest = new TaskHeartbeatRequest(request.getContainerIdentifier(), request.getCurrentTaskAttemptID(), request.getEvents(), request.getStartIndex(), request.getPreRoutedStartIndex(), request.getMaxEvents()); - tResponse = taskCommunicatorContext.heartbeat(tRequest); + tResponse = getContext().heartbeat(tRequest); } TezHeartbeatResponse response = new TezHeartbeatResponse(); response.setLastRequestId(requestId); @@ -402,7 +396,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { ContainerInfo containerInfo = registeredContainers.get(containerId); ContainerTask task = null; if (containerInfo == null) { - if (taskCommunicatorContext.isKnownContainer(containerId)) { + if (getContext().isKnownContainer(containerId)) { LOG.info("Container with id: " + containerId + " is valid, but no longer registered, and will be killed"); } else { http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java index fe0178c..34c7bc0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java @@ -223,7 +223,7 @@ public class ContainerLauncherImpl extends ContainerLauncher { } public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) { - super(ContainerLauncherImpl.class.getName(), containerLauncherContext); + super(containerLauncherContext); this.conf = new Configuration(containerLauncherContext.getInitialConfiguration()); conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, @@ -235,7 +235,7 @@ public class ContainerLauncherImpl extends ContainerLauncher { } @Override - public void serviceStart() { + public void start() { // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed cmProxy = new ContainerManagementProtocolProxy(conf); @@ -307,7 +307,7 @@ public class ContainerLauncherImpl extends ContainerLauncher { } @Override - public void serviceStop() { + public void shutdown() { if(!serviceStopped.compareAndSet(false, true)) { LOG.info("Ignoring multiple stops"); return; http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index 9f741cf..7c6a6a4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; @@ -45,6 +46,7 @@ public class ContainerLauncherRouter extends AbstractService private final ContainerLauncher containerLaunchers[]; private final ContainerLauncherContext containerLauncherContexts[]; + protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers; private final AppContext appContext; @VisibleForTesting @@ -53,6 +55,8 @@ public class ContainerLauncherRouter extends AbstractService this.appContext = context; containerLaunchers = new ContainerLauncher[] {containerLauncher}; containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()}; + containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{ + new ServicePluginLifecycleAbstractService(containerLauncher)}; } // Accepting conf to setup final parameters, if required. @@ -75,6 +79,7 @@ public class ContainerLauncherRouter extends AbstractService } containerLauncherContexts = new ContainerLauncherContext[containerLauncherClassIdentifiers.length]; containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length]; + containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherClassIdentifiers.length]; for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) { @@ -82,6 +87,7 @@ public class ContainerLauncherRouter extends AbstractService containerLauncherContexts[i] = containerLauncherContext; containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context, containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf); + containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]); } } @@ -130,21 +136,21 @@ public class ContainerLauncherRouter extends AbstractService @Override public void serviceInit(Configuration conf) { for (int i = 0 ; i < containerLaunchers.length ; i++) { - ((AbstractService) containerLaunchers[i]).init(conf); + containerLauncherServiceWrappers[i].init(conf); } } @Override public void serviceStart() { for (int i = 0 ; i < containerLaunchers.length ; i++) { - ((AbstractService) containerLaunchers[i]).start(); + containerLauncherServiceWrappers[i].start(); } } @Override public void serviceStop() { for (int i = 0 ; i < containerLaunchers.length ; i++) { - ((AbstractService) containerLaunchers[i]).stop(); + containerLauncherServiceWrappers[i].stop(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index a1b8e29..3975111 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -111,7 +111,7 @@ public class LocalContainerLauncher extends ContainerLauncher { // starts up. It's not possible to set these up via a static payload. // Will need some kind of mechanism to dynamically crate payloads / bind to parameters // after the AM starts up. - super(LocalContainerLauncher.class.getName(), containerLauncherContext); + super(containerLauncherContext); this.context = context; this.tal = taskAttemptListener; this.workingDirectory = workingDirectory; @@ -139,14 +139,14 @@ public class LocalContainerLauncher extends ContainerLauncher { } @Override - public void serviceStart() throws Exception { + public void start() throws Exception { eventHandlingThread = new Thread(new TezSubTaskRunner(), "LocalContainerLauncher-SubTaskRunner"); eventHandlingThread.start(); } @Override - public void serviceStop() throws Exception { + public void shutdown() throws Exception { if (!serviceStopped.compareAndSet(false, true)) { LOG.info("Service Already stopped. Ignoring additional stop"); return; http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 3c3c6a7..21ae5f7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -139,7 +139,7 @@ public class MockDAGAppMaster extends DAGAppMaster { public MockContainerLauncher(AtomicBoolean goFlag, ContainerLauncherContext containerLauncherContext) { - super("MockContainerLauncher", containerLauncherContext); + super(containerLauncherContext); this.goFlag = goFlag; } @@ -182,7 +182,7 @@ public class MockDAGAppMaster extends DAGAppMaster { } @Override - public void serviceStart() throws Exception { + public void start() throws Exception { taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener(); taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(0); eventHandlingThread = new Thread(this); @@ -199,7 +199,7 @@ public class MockDAGAppMaster extends DAGAppMaster { } @Override - public void serviceStop() throws Exception { + public void shutdown() throws Exception { if (eventHandlingThread != null) { eventHandlingThread.interrupt(); eventHandlingThread.join(2000l); http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index df643e4..41a7373 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -299,7 +299,7 @@ public class TestTaskAttemptListenerImplTezDag { sessionToken.setService(identifier.getJobId()); TokenCache.setSessionToken(sessionToken, credentials); taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false); + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false); // no exception happen, should started properly taskAttemptListener.init(conf); taskAttemptListener.start(); @@ -319,7 +319,7 @@ public class TestTaskAttemptListenerImplTezDag { conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port); taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false); + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false); taskAttemptListener.init(conf); taskAttemptListener.start(); int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort(); @@ -375,10 +375,10 @@ public class TestTaskAttemptListenerImplTezDag { public TaskAttemptListenerImplForTest(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - JobTokenSecretManager jobTokenSecretManager, String[] taskCommunicatorClassIdentifiers, + Configuration conf, boolean isPureLocalMode) { - super(context, thh, chh, jobTokenSecretManager, taskCommunicatorClassIdentifiers, + super(context, thh, chh, taskCommunicatorClassIdentifiers, conf, isPureLocalMode); } http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java index dbf5054..85f9415 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java @@ -54,7 +54,7 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher { // Configuration passed in here to set up final parameters public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) { - super(TezTestServiceContainerLauncher.class.getName(), containerLauncherContext); + super(containerLauncherContext); int numThreads = getContext().getInitialConfiguration().getInt( TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS, TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT); @@ -69,13 +69,13 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher { } @Override - public void serviceStart() { + public void start() { communicator.init(getContext().getInitialConfiguration()); communicator.start(); } @Override - public void serviceStop() { + public void shutdown() { communicator.stop(); } http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java index d3743e1..7b42296 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java @@ -27,7 +27,7 @@ public class TezTestServiceNoOpContainerLauncher extends ContainerLauncher { public TezTestServiceNoOpContainerLauncher(ContainerLauncherContext containerLauncherContext) { - super(TezTestServiceNoOpContainerLauncher.class.getName(), containerLauncherContext); + super(containerLauncherContext); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/25a6a131/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java index 444498e..078ea79 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java @@ -23,7 +23,6 @@ import java.util.concurrent.RejectedExecutionException; import com.google.protobuf.ByteString; import com.google.protobuf.ServiceException; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.Credentials; @@ -75,20 +74,20 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl } @Override - public void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - this.communicator.init(conf); + public void initialize() throws Exception { + super.initialize(); + this.communicator.init(getContext().getInitialConfiguration()); } @Override - public void serviceStart() { - super.serviceStart(); + public void start() { + super.start(); this.communicator.start(); } @Override - public void serviceStop() { - super.serviceStop(); + public void shutdown() { + super.shutdown(); this.communicator.stop(); } @@ -132,7 +131,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl } // Have to register this up front right now. Otherwise, it's possible for the task to start // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. - getTaskCommunicatorContext() + getContext() .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); communicator.submitWork(requestProto, host, port, new TezTestServiceCommunicator.ExecuteRequestCallback() { @@ -154,19 +153,19 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl RemoteException re = (RemoteException) t; String message = re.toString(); if (message.contains(RejectedExecutionException.class.getName())) { - getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(), + getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.SERVICE_BUSY, "Service Busy"); } else { - getTaskCommunicatorContext() + getContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.toString()); } } else { if (t instanceof IOException) { - getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(), + getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error"); } else { - getTaskCommunicatorContext() + getContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.getMessage()); } @@ -191,11 +190,11 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl builder.setAmPort(getAddress().getPort()); Credentials taskCredentials = new Credentials(); // Credentials can change across DAGs. Ideally construct only once per DAG. - taskCredentials.addAll(getTaskCommunicatorContext().getCredentials()); + taskCredentials.addAll(getContext().getCredentials()); ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName()); if (credentialsBinary == null) { - credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials()); + credentialsBinary = serializeCredentials(getContext().getCredentials()); credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate()); } else { credentialsBinary = credentialsBinary.duplicate();