Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D39B4200CF7 for ; Tue, 5 Sep 2017 07:10:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D1EAD1635D5; Tue, 5 Sep 2017 05:10:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6AA6F163592 for ; Tue, 5 Sep 2017 07:10:35 +0200 (CEST) Received: (qmail 76179 invoked by uid 500); 5 Sep 2017 05:10:30 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 73348 invoked by uid 99); 5 Sep 2017 05:10:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Sep 2017 05:10:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9BBBCF5681; Tue, 5 Sep 2017 05:10:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Tue, 05 Sep 2017 05:10:38 -0000 Message-Id: <958255a5f54c498586747d63e692b384@git.apache.org> In-Reply-To: <4fca20bef1544afaaef6959a814e6665@git.apache.org> References: <4fca20bef1544afaaef6959a814e6665@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/51] [abbrv] hadoop git commit: YARN-7050. Post cleanup after YARN-6903, removal of org.apache.slider package. Contributed by Jian He archived-at: Tue, 05 Sep 2017 05:10:38 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java deleted file mode 100644 index da122da..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.servicemonitor; - -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; -import org.apache.slider.common.tools.SliderUtils; -import org.apache.slider.server.appmaster.state.RoleInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Map; - -/** - * Probe for a port being open. - */ -public class PortProbe extends Probe { - protected static final Logger log = LoggerFactory.getLogger(PortProbe.class); - private final int port; - private final int timeout; - - public PortProbe(int port, int timeout) { - super("Port probe of " + port + " for " + timeout + "ms", null); - this.port = port; - this.timeout = timeout; - } - - public static PortProbe create(Map props) - throws IOException { - int port = getPropertyInt(props, PORT_PROBE_PORT, null); - - if (port >= 65536) { - throw new IOException(PORT_PROBE_PORT + " " + port + " is out of " + - "range"); - } - - int timeout = getPropertyInt(props, PORT_PROBE_CONNECT_TIMEOUT, - PORT_PROBE_CONNECT_TIMEOUT_DEFAULT); - - return new PortProbe(port, timeout); - } - - /** - * Try to connect to the (host,port); a failure to connect within - * the specified timeout is a failure. - * @param instance role instance - * @return the outcome - */ - @Override - public ProbeStatus ping(ComponentInstance instance) { - ProbeStatus status = new ProbeStatus(); - - if (instance.getContainerStatus() == null || SliderUtils - .isEmpty(instance.getContainerStatus().getIPs())) { - status.fail(this, new IOException( - instance.getCompInstanceName() + ": IP is not available yet")); - return status; - } - - String ip = instance.getContainerStatus().getIPs().get(0); - InetSocketAddress sockAddr = new InetSocketAddress(ip, port); - Socket socket = new Socket(); - try { - if (log.isDebugEnabled()) { - log.debug(instance.getCompInstanceName() + ": Connecting " + sockAddr - .toString() + ", timeout=" + MonitorUtils - .millisToHumanTime(timeout)); - } - socket.connect(sockAddr, timeout); - status.succeed(this); - } catch (Throwable e) { - String error = - instance.getCompInstanceName() + ": Probe " + sockAddr + " failed"; - log.debug(error, e); - status.fail(this, new IOException(error, e)); - } finally { - IOUtils.closeSocket(socket); - } - return status; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java deleted file mode 100644 index 4809b45..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.servicemonitor; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.service.compinstance.ComponentInstance; -import org.apache.slider.server.appmaster.state.RoleInstance; - -import java.io.IOException; -import java.util.Map; - -/** - * Base class of all probes. - */ -public abstract class Probe implements MonitorKeys { - - protected final Configuration conf; - private String name; - - /** - * Create a probe of a specific name - * - * @param name probe name - * @param conf configuration being stored. - */ - public Probe(String name, Configuration conf) { - this.name = name; - this.conf = conf; - } - - - protected void setName(String name) { - this.name = name; - } - - public String getName() { - return name; - } - - - @Override - public String toString() { - return getName(); - } - - public static String getProperty(Map props, String name, - String defaultValue) throws IOException { - String value = props.get(name); - if (StringUtils.isEmpty(value)) { - if (defaultValue == null) { - throw new IOException(name + " not specified"); - } - return defaultValue; - } - return value; - } - - public static int getPropertyInt(Map props, String name, - Integer defaultValue) throws IOException { - String value = props.get(name); - if (StringUtils.isEmpty(value)) { - if (defaultValue == null) { - throw new IOException(name + " not specified"); - } - return defaultValue; - } - return Integer.parseInt(value); - } - - /** - * perform any prelaunch initialization - */ - public void init() throws IOException { - - } - - /** - * Ping the endpoint. All exceptions must be caught and included in the - * (failure) status. - * - * @param instance instance to ping - * @return the status - */ - public abstract ProbeStatus ping(ComponentInstance instance); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java deleted file mode 100644 index 24668bd..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.servicemonitor; - -import java.io.Serializable; -import java.util.Date; - -/** - * Status message of a probe. This is designed to be sent over the wire, though the exception - * Had better be unserializable at the far end if that is to work. - */ -public final class ProbeStatus implements Serializable { - private static final long serialVersionUID = 165468L; - - private long timestamp; - private String timestampText; - private boolean success; - private boolean realOutcome; - private String message; - private Throwable thrown; - private transient Probe originator; - - public ProbeStatus() { - } - - public ProbeStatus(long timestamp, String message, Throwable thrown) { - this.success = false; - this.message = message; - this.thrown = thrown; - setTimestamp(timestamp); - } - - public ProbeStatus(long timestamp, String message) { - this.success = true; - setTimestamp(timestamp); - this.message = message; - this.thrown = null; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - timestampText = new Date(timestamp).toString(); - } - - public boolean isSuccess() { - return success; - } - - /** - * Set both the success and the real outcome bits to the same value - * @param success the new value - */ - public void setSuccess(boolean success) { - this.success = success; - realOutcome = success; - } - - public String getTimestampText() { - return timestampText; - } - - public boolean getRealOutcome() { - return realOutcome; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public Throwable getThrown() { - return thrown; - } - - public void setThrown(Throwable thrown) { - this.thrown = thrown; - } - - /** - * Get the probe that generated this result. May be null - * @return a possibly null reference to a probe - */ - public Probe getOriginator() { - return originator; - } - - /** - * The probe has succeeded -capture the current timestamp, set - * success to true, and record any other data needed. - * @param probe probe - */ - public void succeed(Probe probe) { - finish(probe, true, probe.getName(), null); - } - - /** - * A probe has failed either because the test returned false, or an exception - * was thrown. The {@link #success} field is set to false, any exception - * thrown is recorded. - * @param probe probe that failed - * @param thrown an exception that was thrown. - */ - public void fail(Probe probe, Throwable thrown) { - finish(probe, false, "Failure in " + probe, thrown); - } - - public void finish(Probe probe, boolean succeeded, String text, Throwable thrown) { - setTimestamp(System.currentTimeMillis()); - setSuccess(succeeded); - originator = probe; - message = text; - this.thrown = thrown; - } - - @Override - public String toString() { - LogEntryBuilder builder = new LogEntryBuilder("Probe Status"); - builder.elt("time", timestampText) - .elt("outcome", (success ? "success" : "failure")); - - if (success != realOutcome) { - builder.elt("originaloutcome", (realOutcome ? "success" : "failure")); - } - builder.elt("message", message); - if (thrown != null) { - builder.elt("exception", thrown); - } - - return builder.toString(); - } - - /** - * Flip the success bit on while the real outcome bit is kept false - */ - public void markAsSuccessful() { - success = true; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java deleted file mode 100644 index 43f0e4e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.utility; - -import org.apache.hadoop.registry.client.api.RegistryConstants; -import org.apache.hadoop.registry.client.api.RegistryOperations; -import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.slider.common.tools.ConfigHelper; -import org.apache.slider.common.tools.SliderUtils; -import org.apache.slider.core.exceptions.BadCommandArgumentsException; -import org.apache.slider.core.exceptions.BadConfigException; -import org.apache.slider.core.zk.ZookeeperUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base service for the standard slider client/server services - */ -public abstract class AbstractSliderLaunchedService extends - LaunchedWorkflowCompositeService { - private static final Logger log = - LoggerFactory.getLogger(AbstractSliderLaunchedService.class); - - protected AbstractSliderLaunchedService(String name) { - super(name); - // make sure all the yarn configs get loaded - ConfigHelper.registerDeprecatedConfigItems(); - } - - /** - * look up the registry quorum from the config - * @return the quorum string - * @throws BadConfigException if it is not there or invalid - */ - public String lookupZKQuorum() throws BadConfigException { - - String registryQuorum = getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_QUORUM); - - // though if neither is set: trouble - if (SliderUtils.isUnset(registryQuorum)) { - throw new BadConfigException( - "No Zookeeper quorum provided in the" - + " configuration property " + RegistryConstants.KEY_REGISTRY_ZK_QUORUM - ); - } - ZookeeperUtils.splitToHostsAndPortsStrictly(registryQuorum); - return registryQuorum; - } - - /** - * Create, adopt ,and start the YARN registration service - * @return the registry operations service, already deployed as a child - * of the AbstractSliderLaunchedService instance. - */ - public RegistryOperations startRegistryOperationsService() - throws BadConfigException { - - // push back the slider registry entry if needed - RegistryOperations registryWriterService = - createRegistryOperationsInstance(); - deployChildService(registryWriterService); - return registryWriterService; - } - - /** - * Create the registry operations instance. This is to allow - * subclasses to instantiate a subclass service - * @return an instance to match to the lifecycle of this service - */ - protected RegistryOperations createRegistryOperationsInstance() { - return RegistryOperationsFactory.createInstance("YarnRegistry", getConfig()); - } - - /** - * Utility method to require an argument to be set (non null, non-empty) - * @param argname argument name - * @param value value - * @throws BadCommandArgumentsException if the condition is not met - */ - protected static void requireArgumentSet(String argname, String value) - throws BadCommandArgumentsException { - require(isSet(value), "Required argument %s missing", argname ); - } - - /** - * Require a condition to hold; throw {@link BadCommandArgumentsException} if not. - * The exception text is the formatted message. - * @param condition condition - * @param message string to format - * @param args list of arguments to format. - * @throws BadCommandArgumentsException - */ - protected static void require(boolean condition, String message, - Object... args) - throws BadCommandArgumentsException { - if (!condition) { - throw new BadCommandArgumentsException(message, args); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java deleted file mode 100644 index 40ceab8..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.utility; - -import org.apache.hadoop.service.Service; -import org.apache.hadoop.service.ServiceStateChangeListener; - -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Wait for a service to stop. - * - * WARNING: the notification may come in as soon as the service enters - * the stopped state: it may take some time for the actual stop operation - * to complete. - */ -public class EndOfServiceWaiter implements ServiceStateChangeListener { - - private final AtomicBoolean finished = new AtomicBoolean(false); - private final String name; - private Service service; - - /** - * Wait for a service; use the service name as this instance's name - * @param service service - */ - public EndOfServiceWaiter(Service service) { - this(service.getName(), service); - } - - - /** - * Wait for a service - * @param name name for messages - * @param service service - */ - public EndOfServiceWaiter(String name, Service service) { - this.name = name; - this.service = service; - service.registerServiceListener(this); - } - - public synchronized void waitForServiceToStop(long timeout) throws - InterruptedException, TimeoutException { - service.waitForServiceToStop(timeout); - if (!finished.get()) { - wait(timeout); - if (!finished.get()) { - throw new TimeoutException(name - + " did not finish after " + timeout + - " milliseconds"); - } - } - } - - /** - * Wait for service state change callbacks; notify self if the service has - * now stopped - * @param service service - */ - @Override - public synchronized void stateChanged(Service service) { - if (service.isInState(Service.STATE.STOPPED)) { - finished.set(true); - notify(); - } - } - - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java deleted file mode 100644 index bcd1969..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.utility; - -import com.google.common.base.Preconditions; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.Service; -import org.apache.slider.core.main.LauncherExitCodes; -import org.apache.slider.core.main.RunService; -import org.apache.slider.server.services.workflow.WorkflowCompositeService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a workflow compositoe service which can be launched from the CLI - * ... catches the arguments and implements a stub runService operation. - */ -public class LaunchedWorkflowCompositeService extends WorkflowCompositeService - implements RunService { - private static final Logger log = LoggerFactory.getLogger( - LaunchedWorkflowCompositeService.class); - private String[] argv; - - public LaunchedWorkflowCompositeService(String name) { - super(name); - } - - public LaunchedWorkflowCompositeService(String name, Service... children) { - super(name, children); - } - - /** - * Implementation of set-ness, groovy definition of true/false for a string - * @param s - * @return true iff the string is non-null and non-empty - */ - protected static boolean isUnset(String s) { - return StringUtils.isEmpty(s); - } - - protected static boolean isSet(String s) { - return StringUtils.isNotEmpty(s); - } - - protected String[] getArgv() { - return argv; - } - - /** - * Pre-init argument binding - * @param config the initial configuration build up by the - * service launcher. - * @param args argument list list of arguments passed to the command line - * after any launcher-specific commands have been stripped. - * @return the configuration - * @throws Exception - */ - @Override - public Configuration bindArgs(Configuration config, String... args) throws - Exception { - this.argv = args; - if (log.isDebugEnabled()) { - log.debug("Binding {} Arguments:", args.length); - - StringBuilder builder = new StringBuilder(); - for (String arg : args) { - builder.append('"').append(arg).append("\" "); - } - log.debug(builder.toString()); - } - return config; - } - - @Override - public int runService() throws Throwable { - return LauncherExitCodes.EXIT_SUCCESS; - } - - @Override - public synchronized void addService(Service service) { - Preconditions.checkArgument(service != null, "null service argument"); - super.addService(service); - } - - /** - * Run a child service -initing and starting it if this - * service has already passed those parts of its own lifecycle - * @param service the service to start - */ - protected boolean deployChildService(Service service) { - service.init(getConfig()); - addService(service); - if (isInState(STATE.STARTED)) { - service.start(); - return true; - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java deleted file mode 100644 index 6ab9de6..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.utility; - -import org.apache.slider.server.appmaster.web.rest.RestPaths; - -import java.util.regex.Pattern; - -/** - * Utility class to validate strings against a predefined pattern. - */ -public class PatternValidator { - - public static final String E_INVALID_NAME = - "Invalid name %s does not match the pattern pattern %s "; - private final Pattern valid; - private final String pattern; - - public PatternValidator(String pattern) { - this.pattern = pattern; - valid = Pattern.compile(pattern); - } - - /** - * Validate the name -restricting it to the set defined in - * {@link RestPaths#PUBLISHED_CONFIGURATION_REGEXP} - * @param name name to validate - * @throws IllegalArgumentException if not a valid name - */ - public void validate(String name) { - if (!matches(name)) { - throw new IllegalArgumentException( - String.format(E_INVALID_NAME, name, pattern)); - } - } - - /** - * Query to see if the pattern matches - * @param name name to validate - * @return true if the string matches the pattern - */ - public boolean matches(String name) { - return valid.matcher(name).matches(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java deleted file mode 100644 index ebfcb99..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.utility; - -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.webapp.WebApp; - -/** - * Contains a webapp reference and stops it in teardown if non-null - *

- * It does not start the application. - * Access to the field is not synchronized across threads; it is the - * responsibility of the caller. - */ -public class WebAppService extends AbstractService { - - private volatile T webApp; - - public WebAppService(String name) { - super(name); - } - - public WebAppService(String name, T app) { - super(name); - webApp = app; - } - - public T getWebApp() { - return webApp; - } - - public void setWebApp(T webApp) { - this.webApp = webApp; - } - - - @Override - protected void serviceStart() throws Exception { - - } - - /** - * Stop operation stops the webapp; sets the reference to null - * @throws Exception - */ - @Override - protected void serviceStop() throws Exception { - if (webApp != null) { - webApp.stop(); - webApp = null; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java deleted file mode 100644 index 8b711aa..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.workflow; - -import org.apache.hadoop.service.AbstractService; - -import java.io.Closeable; -import java.io.IOException; - -/** - * Service that closes the closeable supplied during shutdown, if not null. - * - * As the Service interface itself extends Closeable, this service - * can be used to shut down other services if desired. - */ -public class ClosingService extends AbstractService { - - private C closeable; - - public ClosingService(String name) { - super(name); - } - - /** - * Construct an instance of the service - * @param name service name - * @param closeable closeable to close (may be null) - */ - public ClosingService(String name, - C closeable) { - super(name); - this.closeable = closeable; - } - - /** - * Construct an instance of the service, using the default name - * @param closeable closeable to close (may be null) - */ - public ClosingService(C closeable) { - this("ClosingService", closeable); - } - - - /** - * Get the closeable - * @return the closeable - */ - public synchronized C getCloseable() { - return closeable; - } - - /** - * Set or update the closeable. - * @param closeable - */ - public synchronized void setCloseable(C closeable) { - this.closeable = closeable; - } - - /** - * Stop routine will close the closeable -if not null - and set the - * reference to null afterwards - * This operation does raise any exception on the close, though it does - * record it - */ - @Override - protected void serviceStop() { - C target = getCloseable(); - if (target != null) { - try { - target.close(); - } catch (IOException ioe) { - noteFailure(ioe); - } - setCloseable(null); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java deleted file mode 100644 index 352be49..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.workflow; - -import org.apache.hadoop.service.ServiceStateException; -import org.apache.slider.core.main.ServiceLaunchException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Service wrapper for an external program that is launched and can/will terminate. - * This service is notified when the subprocess terminates, and stops itself - * and converts a non-zero exit code into a failure exception. - * - *

- * Key Features: - *

    - *
  1. The property {@link #executionTimeout} can be set to set a limit - * on the duration of a process
  2. - *
  3. Output is streamed to the output logger provided
  4. . - *
  5. The most recent lines of output are saved to a linked list
  6. . - *
  7. A synchronous callback, {@link LongLivedProcessLifecycleEvent}, is raised on the start - * and finish of a process.
  8. - *
- * - * Usage: - *

- * The service can be built in the constructor, {@link #ForkedProcessService(String, Map, List)}, - * or have its simple constructor used to instantiate the service, then the - * {@link #build(Map, List)} command used to define the environment variables - * and list of commands to execute. One of these two options MUST be exercised - * before calling the services's {@link #start()} method. - *

- * The forked process is executed in the service's {@link #serviceStart()} method; - * if still running when the service is stopped, {@link #serviceStop()} will - * attempt to stop it. - *

- * - * The service delegates process execution to {@link LongLivedProcess}, - * receiving callbacks via the {@link LongLivedProcessLifecycleEvent}. - * When the service receives a callback notifying that the process has completed, - * it calls its {@link #stop()} method. If the error code was non-zero, - * the service is logged as having failed. - */ -public class ForkedProcessService - extends WorkflowExecutorService - implements LongLivedProcessLifecycleEvent, Runnable { - - /** - * Log for the forked master process - */ - private static final Logger LOG = - LoggerFactory.getLogger(ForkedProcessService.class); - - private final AtomicBoolean processTerminated = new AtomicBoolean(false); - private boolean processStarted = false; - private LongLivedProcess process; - private int executionTimeout = -1; - private int timeoutCode = 1; - /** - log to log to; defaults to this service log - */ - private Logger processLog = LOG; - - /** - * Exit code set when the spawned process exits - */ - private AtomicInteger exitCode = new AtomicInteger(0); - - /** - * Create an instance of the service - * @param name a name - */ - public ForkedProcessService(String name) { - super(name); - } - - /** - * Create an instance of the service, set up the process - * @param name a name - * @param commandList list of commands is inserted on the front - * @param env environment variables above those generated by - * @throws IOException IO problems - */ - public ForkedProcessService(String name, - Map env, - List commandList) throws IOException { - super(name); - build(env, commandList); - } - - @Override //AbstractService - protected void serviceStart() throws Exception { - if (process == null) { - throw new ServiceStateException("Process not yet configured"); - } - //now spawn the process -expect updates via callbacks - process.start(); - } - - @Override //AbstractService - protected void serviceStop() throws Exception { - completed(); - stopForkedProcess(); - } - - private void stopForkedProcess() { - if (process != null) { - process.stop(); - } - } - - /** - * Set the process log. This may be null for "do not log" - * @param processLog process log - */ - public void setProcessLog(Logger processLog) { - this.processLog = processLog; - process.setProcessLog(processLog); - } - - /** - * Set the timeout by which time a process must have finished -or -1 for forever - * @param timeout timeout in milliseconds - */ - public void setTimeout(int timeout, int code) { - this.executionTimeout = timeout; - this.timeoutCode = code; - } - - /** - * Build the process to execute when the service is started - * @param commandList list of commands is inserted on the front - * @param env environment variables above those generated by - * @throws IOException IO problems - */ - public void build(Map env, - List commandList) - throws IOException { - assert process == null; - - process = new LongLivedProcess(getName(), processLog, commandList); - process.setLifecycleCallback(this); - //set the env variable mapping - process.putEnvMap(env); - } - - @Override // notification from executed process - public synchronized void onProcessStarted(LongLivedProcess process) { - LOG.debug("Process has started"); - processStarted = true; - if (executionTimeout > 0) { - setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true)); - execute(this); - } - } - - @Override // notification from executed process - public void onProcessExited(LongLivedProcess process, - int uncorrected, - int code) { - try { - synchronized (this) { - completed(); - //note whether or not the service had already stopped - LOG.debug("Process has exited with exit code {}", code); - if (code != 0) { - reportFailure(code, getName() + " failed with code " + code); - } - } - } finally { - stop(); - } - } - - private void reportFailure(int code, String text) { - //error - ServiceLaunchException execEx = new ServiceLaunchException(code, text); - LOG.debug("Noting failure", execEx); - noteFailure(execEx); - } - - /** - * handle timeout response by escalating it to a failure - */ - @Override - public void run() { - try { - synchronized (processTerminated) { - if (!processTerminated.get()) { - processTerminated.wait(executionTimeout); - } - } - - } catch (InterruptedException e) { - //assume signalled; exit - } - //check the status; if the marker isn't true, bail - if (!processTerminated.getAndSet(true)) { - LOG.info("process timeout: reporting error code {}", timeoutCode); - - //timeout - if (isInState(STATE.STARTED)) { - //trigger a failure - stopForkedProcess(); - } - reportFailure(timeoutCode, getName() + ": timeout after " + executionTimeout - + " millis: exit code =" + timeoutCode); - } - } - - /** - * Note the process as having completed. - * The process marked as terminated - * -and anything synchronized on processTerminated - * is notified - */ - protected void completed() { - processTerminated.set(true); - synchronized (processTerminated) { - processTerminated.notify(); - } - } - - public boolean isProcessTerminated() { - return processTerminated.get(); - } - - public synchronized boolean isProcessStarted() { - return processStarted; - } - - /** - * Is a process running: between started and terminated - * @return true if the process is up. - */ - public synchronized boolean isProcessRunning() { - return processStarted && !isProcessTerminated(); - } - - - public Integer getExitCode() { - return process.getExitCode(); - } - - public int getExitCodeSignCorrected() { - Integer exitCode = process.getExitCodeSignCorrected(); - if (exitCode == null) return -1; - return exitCode; - } - - /** - * Get the recent output from the process, or [] if not defined - * @return a possibly empty list - */ - public List getRecentOutput() { - return process != null - ? process.getRecentOutput() - : new LinkedList(); - } - - /** - * Get the recent output from the process, or [] if not defined - * - * @param finalOutput flag to indicate "wait for the final output of the process" - * @param duration the duration, in ms, - * to wait for recent output to become non-empty - * @return a possibly empty list - */ - public List getRecentOutput(boolean finalOutput, int duration) { - if (process == null) { - return new LinkedList<>(); - } - return process.getRecentOutput(finalOutput, duration); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java deleted file mode 100644 index 90a8d40..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java +++ /dev/null @@ -1,599 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.workflow; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Execute a long-lived process. - * - *

- * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing - * a short lived application; this class allows for the process to run for the - * life of the Java process that forked it. - * It is designed to be embedded inside a YARN service, though this is not - * the sole way that it can be used - *

- * Key Features: - *

    - *
  1. Output is streamed to the output logger provided
  2. . - *
  3. the input stream is closed as soon as the process starts.
  4. - *
  5. The most recent lines of output are saved to a linked list
  6. . - *
  7. A synchronous callback, {@link LongLivedProcessLifecycleEvent}, - * is raised on the start and finish of a process.
  8. - *
- * - */ -public class LongLivedProcess implements Runnable { - /** - * Limit on number of lines to retain in the "recent" line list:{@value} - */ - public static final int RECENT_LINE_LOG_LIMIT = 64; - - /** - * Const defining the time in millis between polling for new text. - */ - private static final int STREAM_READER_SLEEP_TIME = 200; - - /** - * limit on the length of a stream before it triggers an automatic newline. - */ - private static final int LINE_LENGTH = 256; - private final ProcessBuilder processBuilder; - private Process process; - private Integer exitCode = null; - private final String name; - private final ExecutorService processExecutor; - private final ExecutorService logExecutor; - - private ProcessStreamReader processStreamReader; - //list of recent lines, recorded for extraction into reports - private final List recentLines = new LinkedList<>(); - private int recentLineLimit = RECENT_LINE_LOG_LIMIT; - private LongLivedProcessLifecycleEvent lifecycleCallback; - private final AtomicBoolean finalOutputProcessed = new AtomicBoolean(false); - - /** - * Log supplied in the constructor for the spawned process -accessible - * to inner classes - */ - private Logger processLog; - - /** - * Class log -accessible to inner classes - */ - private static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class); - - /** - * flag to indicate that the process is done - */ - private final AtomicBoolean finished = new AtomicBoolean(false); - - /** - * Create an instance - * @param name process name - * @param processLog log for output (or null) - * @param commands command list - */ - public LongLivedProcess(String name, - Logger processLog, - List commands) { - Preconditions.checkArgument(commands != null, "commands"); - - this.name = name; - this.processLog = processLog; - ServiceThreadFactory factory = new ServiceThreadFactory(name, true); - processExecutor = Executors.newSingleThreadExecutor(factory); - logExecutor = Executors.newSingleThreadExecutor(factory); - processBuilder = new ProcessBuilder(commands); - processBuilder.redirectErrorStream(false); - } - - /** - * Set the limit on recent lines to retain - * @param recentLineLimit size of rolling list of recent lines. - */ - public void setRecentLineLimit(int recentLineLimit) { - this.recentLineLimit = recentLineLimit; - } - - /** - * Set an optional application exit callback - * @param lifecycleCallback callback to notify on application exit - */ - public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) { - this.lifecycleCallback = lifecycleCallback; - } - - /** - * Add an entry to the environment - * @param envVar envVar -must not be null - * @param val value - */ - public void setEnv(String envVar, String val) { - Preconditions.checkArgument(envVar != null, "envVar"); - Preconditions.checkArgument(val != null, "val"); - processBuilder.environment().put(envVar, val); - } - - /** - * Bulk set the environment from a map. This does - * not replace the existing environment, just extend it/overwrite single - * entries. - * @param map map to add - */ - public void putEnvMap(Map map) { - for (Map.Entry entry : map.entrySet()) { - String val = entry.getValue(); - String key = entry.getKey(); - setEnv(key, val); - } - } - - /** - * Get the process environment - * @param variable environment variable - * @return the value or null if there is no match - */ - public String getEnv(String variable) { - return processBuilder.environment().get(variable); - } - - /** - * Set the process log. Ignored once the process starts - * @param processLog new log ... may be null - */ - public void setProcessLog(Logger processLog) { - this.processLog = processLog; - } - - /** - * Get the process reference - * @return the process -null if the process is not started - */ - public Process getProcess() { - return process; - } - - /** - * Get the process builder -this can be manipulated - * up to the start() operation. As there is no synchronization - * around it, it must only be used in the same thread setting up the commmand. - * @return the process builder - */ - public ProcessBuilder getProcessBuilder() { - return processBuilder; - } - - /** - * Get the command list - * @return the comands - */ - public List getCommands() { - return processBuilder.command(); - } - - public String getCommand() { - return getCommands().get(0); - } - - /** - * probe to see if the process is running - * @return true iff the process has been started and is not yet finished - */ - public boolean isRunning() { - return process != null && !finished.get(); - } - - /** - * Get the exit code: null until the process has finished - * @return the exit code or null - */ - public Integer getExitCode() { - return exitCode; - } - - /** - * Get the exit code sign corrected: null until the process has finished - * @return the exit code or null - */ - public Integer getExitCodeSignCorrected() { - Integer result; - if (exitCode != null) { - result = (exitCode << 24) >> 24; - } else { - result = null; - } - return result; - } - - /** - * Stop the process if it is running. - * This will trigger an application completion event with the given exit code - */ - public void stop() { - if (!isRunning()) { - return; - } - process.destroy(); - } - - /** - * Get a text description of the builder suitable for log output - * @return a multiline string - */ - protected String describeBuilder() { - StringBuilder buffer = new StringBuilder(); - for (String arg : processBuilder.command()) { - buffer.append('"').append(arg).append("\" "); - } - return buffer.toString(); - } - - /** - * Dump the environment to a string builder - * @param buffer the buffer to append to - */ - public void dumpEnv(StringBuilder buffer) { - buffer.append("\nEnvironment\n-----------"); - Map env = processBuilder.environment(); - Set keys = env.keySet(); - List sortedKeys = new ArrayList(keys); - Collections.sort(sortedKeys); - for (String key : sortedKeys) { - buffer.append(key).append("=").append(env.get(key)).append('\n'); - } - } - - /** - * Exec the process - * @return the process - * @throws IOException on aany failure to start the process - * @throws FileNotFoundException if the process could not be found - */ - private Process spawnChildProcess() throws IOException { - if (process != null) { - throw new IOException("Process already started"); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Spawning process:\n " + describeBuilder()); - } - try { - process = processBuilder.start(); - } catch (IOException e) { - // on windows, upconvert DOS error 2 from ::CreateProcess() - // to its real meaning: FileNotFound - if (e.toString().contains("CreateProcess error=2")) { - FileNotFoundException fnfe = - new FileNotFoundException(e.toString()); - fnfe.initCause(e); - throw fnfe; - } else { - throw e; - } - } - return process; - } - - /** - * Entry point for waiting for the program to finish - */ - @Override // Runnable - public void run() { - Preconditions.checkNotNull(process, "null process"); - LOG.debug("Lifecycle callback thread running"); - //notify the callback that the process has started - if (lifecycleCallback != null) { - lifecycleCallback.onProcessStarted(this); - } - try { - //close stdin for the process - IOUtils.closeStream(process.getOutputStream()); - exitCode = process.waitFor(); - } catch (InterruptedException e) { - LOG.debug("Process wait interrupted -exiting thread", e); - } finally { - //here the process has finished - LOG.debug("process {} has finished", name); - //tell the logger it has to finish too - finished.set(true); - - // shut down the threads - logExecutor.shutdown(); - try { - logExecutor.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException ignored) { - //ignored - } - - //now call the callback if it is set - if (lifecycleCallback != null) { - lifecycleCallback.onProcessExited(this, exitCode, - getExitCodeSignCorrected()); - } - } - } - - /** - * Spawn the application - * @throws IOException IO problems - */ - public void start() throws IOException { - - spawnChildProcess(); - processStreamReader = - new ProcessStreamReader(processLog, STREAM_READER_SLEEP_TIME); - logExecutor.submit(processStreamReader); - processExecutor.submit(this); - } - - /** - * Get the lines of recent output - * @return the last few lines of output; an empty list if there are none - * or the process is not actually running - */ - public synchronized List getRecentOutput() { - return new ArrayList(recentLines); - } - - /** - * @return whether lines of recent output are empty - */ - public synchronized boolean isRecentOutputEmpty() { - return recentLines.isEmpty(); - } - - /** - * Query to see if the final output has been processed - * @return - */ - public boolean isFinalOutputProcessed() { - return finalOutputProcessed.get(); - } - - /** - * Get the recent output from the process, or [] if not defined - * - * @param finalOutput flag to indicate "wait for the final output of the process" - * @param duration the duration, in ms, - * ro wait for recent output to become non-empty - * @return a possibly empty list - */ - public List getRecentOutput(boolean finalOutput, int duration) { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start <= duration) { - boolean finishedOutput; - if (finalOutput) { - // final flag means block until all data is done - finishedOutput = isFinalOutputProcessed(); - } else { - // there is some output - finishedOutput = !isRecentOutputEmpty(); - } - if (finishedOutput) { - break; - } - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } - } - return getRecentOutput(); - } - - /** - * add the recent line to the list of recent lines; deleting - * an earlier on if the limit is reached. - * - * Implementation note: yes, a circular array would be more - * efficient, especially with some power of two as the modulo, - * but is it worth the complexity and risk of errors for - * something that is only called once per line of IO? - * @param line line to record - * @param isErrorStream is the line from the error stream - * @param logger logger to log to - null for no logging - */ - private synchronized void recordRecentLine(String line, - boolean isErrorStream, - Logger logger) { - if (line == null) { - return; - } - String entry = (isErrorStream ? "[ERR] " : "[OUT] ") + line; - recentLines.add(entry); - if (recentLines.size() > recentLineLimit) { - recentLines.remove(0); - } - if (logger != null) { - if (isErrorStream) { - logger.warn(line); - } else { - logger.info(line); - } - } - } - - /** - * Class to read data from the two process streams, and, when run in a thread - * to keep running until the done flag is set. - * Lines are fetched from stdout and stderr and logged at info and error - * respectively. - */ - - private class ProcessStreamReader implements Runnable { - private final Logger streamLog; - private final int sleepTime; - - /** - * Create an instance - * @param streamLog log -or null to disable logging (recent entries - * will still be retained) - * @param sleepTime time to sleep when stopping - */ - private ProcessStreamReader(Logger streamLog, int sleepTime) { - this.streamLog = streamLog; - this.sleepTime = sleepTime; - } - - /** - * Return a character if there is one, -1 if nothing is ready yet - * @param reader reader - * @return the value from the reader, or -1 if it is not ready - * @throws IOException IO problems - */ - private int readCharNonBlocking(BufferedReader reader) throws IOException { - if (reader.ready()) { - return reader.read(); - } else { - return -1; - } - } - - /** - * Read in a line, or, if the limit has been reached, the buffer - * so far - * @param reader source of data - * @param line line to build - * @param limit limit of line length - * @return true if the line can be printed - * @throws IOException IO trouble - */ - @SuppressWarnings("NestedAssignment") - private boolean readAnyLine(BufferedReader reader, - StringBuilder line, - int limit) - throws IOException { - int next; - while ((-1 != (next = readCharNonBlocking(reader)))) { - if (next != '\n') { - line.append((char) next); - limit--; - if (line.length() > limit) { - //enough has been read in to print it any - return true; - } - } else { - //line end return flag to say so - return true; - } - } - //here the end of the stream is hit, or the limit - return false; - } - - - @Override //Runnable - @SuppressWarnings("IOResourceOpenedButNotSafelyClosed") - public void run() { - BufferedReader errReader = null; - BufferedReader outReader = null; - StringBuilder outLine = new StringBuilder(LINE_LENGTH); - StringBuilder errorLine = new StringBuilder(LINE_LENGTH); - try { - errReader = new BufferedReader( - new InputStreamReader(process.getErrorStream(), "UTF-8")); - outReader = new BufferedReader( - new InputStreamReader(process.getInputStream(), "UTF-8")); - while (!finished.get()) { - boolean processed = false; - if (readAnyLine(errReader, errorLine, LINE_LENGTH)) { - recordRecentLine(errorLine.toString(), true, streamLog); - errorLine.setLength(0); - processed = true; - } - if (readAnyLine(outReader, outLine, LINE_LENGTH)) { - recordRecentLine(outLine.toString(), false, streamLog); - outLine.setLength(0); - processed |= true; - } - if (!processed && !finished.get()) { - //nothing processed: wait a bit for data. - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - //ignore this, rely on the done flag - LOG.debug("Ignoring ", e); - } - } - } - // finished: cleanup - - //print the current error line then stream through the rest - recordFinalOutput(errReader, errorLine, true, streamLog); - //now do the info line - recordFinalOutput(outReader, outLine, false, streamLog); - - } catch (Exception ignored) { - LOG.warn("encountered {}", ignored, ignored); - //process connection has been torn down - } finally { - // close streams - IOUtils.closeStream(errReader); - IOUtils.closeStream(outReader); - //mark output as done - finalOutputProcessed.set(true); - } - } - - /** - * Record the final output of a process stream - * @param reader reader of output - * @param lineBuilder string builder into which line is built - * @param isErrorStream flag to indicate whether or not this is the - * is the line from the error stream - * @param logger logger to log to - * @throws IOException - */ - protected void recordFinalOutput(BufferedReader reader, - StringBuilder lineBuilder, boolean isErrorStream, Logger logger) throws - IOException { - String line = lineBuilder.toString(); - recordRecentLine(line, isErrorStream, logger); - line = reader.readLine(); - while (line != null) { - recordRecentLine(line, isErrorStream, logger); - line = reader.readLine(); - if (Thread.interrupted()) { - break; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java deleted file mode 100644 index a13b508..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.workflow; - -/** - * Callback when a long-lived application exits - */ -public interface LongLivedProcessLifecycleEvent { - - /** - * Callback when a process is started - * @param process the process invoking the callback - */ - void onProcessStarted(LongLivedProcess process); - - /** - * Callback when a process has finished - * @param process the process invoking the callback - * @param exitCode exit code from the process - * @param signCorrectedCode the code- as sign corrected - */ - void onProcessExited(LongLivedProcess process, - int exitCode, - int signCorrectedCode); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java deleted file mode 100644 index a123584..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.workflow; - -import org.apache.hadoop.service.Service; - -import java.util.List; - -/** - * Interface for accessing services that contain one or more child - * services. - */ -public interface ServiceParent extends Service { - - /** - * Add a child service. It must be in a consistent state with the - * service to which it is being added. - * @param service the service to add. - */ - void addService(Service service); - - /** - * Get an unmodifiable list of services - * @return a list of child services at the time of invocation - - * added services will not be picked up. - */ - List getServices(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java deleted file mode 100644 index 5ebf77c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.workflow; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.service.Service; - -import java.util.concurrent.Callable; - -/** - * A runnable which terminates its owner; it also catches any - * exception raised and can serve it back. - * - */ -public class ServiceTerminatingCallable implements Callable { - - private final Service owner; - private Exception exception; - /** - * This is the callback - */ - private final Callable callable; - - - /** - * Create an instance. If the owner is null, the owning service - * is not terminated. - * @param owner owning service -can be null - * @param callable callback. - */ - public ServiceTerminatingCallable(Service owner, - Callable callable) { - Preconditions.checkArgument(callable != null, "null callable"); - this.owner = owner; - this.callable = callable; - } - - - /** - * Get the owning service - * @return the service to receive notification when - * the runnable completes. - */ - public Service getOwner() { - return owner; - } - - /** - * Any exception raised by inner action's run. - * @return an exception or null. - */ - public Exception getException() { - return exception; - } - - /** - * Delegates the call to the callable supplied in the constructor, - * then calls the stop() operation on its owner. Any exception - * is caught, noted and rethrown - * @return the outcome of the delegated call operation - * @throws Exception if one was raised. - */ - @Override - public V call() throws Exception { - try { - return callable.call(); - } catch (Exception e) { - exception = e; - throw e; - } finally { - if (owner != null) { - owner.stop(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java deleted file mode 100644 index dc591df..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.workflow; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.service.Service; - -/** - * A runnable which terminates its after running; it also catches any - * exception raised and can serve it back. - */ -public class ServiceTerminatingRunnable implements Runnable { - - private final Service owner; - private final Runnable action; - private Exception exception; - - /** - * Create an instance. - * @param owner owning service - * @param action action to execute before terminating the service - */ - public ServiceTerminatingRunnable(Service owner, Runnable action) { - Preconditions.checkArgument(owner != null, "null owner"); - Preconditions.checkArgument(action != null, "null action"); - this.owner = owner; - this.action = action; - } - - /** - * Get the owning service. - * @return the service to receive notification when - * the runnable completes. - */ - public Service getOwner() { - return owner; - } - - /** - * Any exception raised by inner action's run. - * @return an exception or null. - */ - public Exception getException() { - return exception; - } - - @Override - public void run() { - try { - action.run(); - } catch (Exception e) { - exception = e; - } - owner.stop(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java deleted file mode 100644 index 737197b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.workflow; - -import com.google.common.base.Preconditions; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * A thread factory that creates threads (possibly daemon threads) - * using the name and naming policy supplied. - * The thread counter starts at 1, increments atomically, - * and is supplied as the second argument in the format string. - * - * A static method, {@link #singleThreadExecutor(String, boolean)}, - * exists to simplify the construction of an executor with a single well-named - * threads. - * - * Example - *
- *  ExecutorService exec = ServiceThreadFactory.newSingleThreadExecutor("live", true)
- * 
- */ -public class ServiceThreadFactory implements ThreadFactory { - - private static final AtomicInteger counter = new AtomicInteger(1); - - /** - * Default format for thread names: {@value}. - */ - public static final String DEFAULT_NAMING_FORMAT = "%s-%03d"; - private final String name; - private final boolean daemons; - private final String namingFormat; - - /** - * Create an instance - * @param name base thread name - * @param daemons flag to indicate the threads should be marked as daemons - * @param namingFormat format string to generate thread names from - */ - public ServiceThreadFactory(String name, - boolean daemons, - String namingFormat) { - Preconditions.checkArgument(name != null, "null name"); - Preconditions.checkArgument(namingFormat != null, "null naming format"); - this.name = name; - this.daemons = daemons; - this.namingFormat = namingFormat; - } - - /** - * Create an instance with the default naming format. - * @param name base thread name - * @param daemons flag to indicate the threads should be marked as daemons - */ - public ServiceThreadFactory(String name, - boolean daemons) { - this(name, daemons, DEFAULT_NAMING_FORMAT); - } - - @Override - public Thread newThread(Runnable r) { - Preconditions.checkArgument(r != null, "null runnable"); - String threadName = - String.format(namingFormat, name, counter.getAndIncrement()); - Thread thread = new Thread(r, threadName); - thread.setDaemon(daemons); - return thread; - } - - /** - * Create a single thread executor using this naming policy. - * @param name base thread name - * @param daemons flag to indicate the threads should be marked as daemons - * @return an executor - */ - public static ExecutorService singleThreadExecutor(String name, - boolean daemons) { - return Executors.newSingleThreadExecutor( - new ServiceThreadFactory(name, daemons)); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf581071/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java deleted file mode 100644 index 65d14b7..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.services.workflow; - -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -/** - * A service that calls the supplied callback when it is started -after the - * given delay. - * - * It can be configured to stop itself after the callback has - * completed, marking any exception raised as the exception of this service. - * The notifications come in on a callback thread -a thread that is only - * started in this service's start() operation. - */ -public class WorkflowCallbackService extends - WorkflowScheduledExecutorService { - protected static final Logger LOG = - LoggerFactory.getLogger(WorkflowCallbackService.class); - - /** - * This is the callback. - */ - private final Callable callback; - private final int delay; - private final ServiceTerminatingCallable command; - - private ScheduledFuture scheduledFuture; - - /** - * Create an instance of the service - * @param name service name - * @param callback callback to invoke - * @param delay delay -or 0 for no delay - * @param terminate terminate this service after the callback? - */ - public WorkflowCallbackService(String name, - Callable callback, - int delay, - boolean terminate) { - super(name); - Preconditions.checkNotNull(callback, "Null callback argument"); - this.callback = callback; - this.delay = delay; - command = new ServiceTerminatingCallable( - terminate ? this : null, - callback); - } - - public ScheduledFuture getScheduledFuture() { - return scheduledFuture; - } - - @Override - protected void serviceStart() throws Exception { - LOG.debug("Notifying {} after a delay of {} millis", callback, delay); - ScheduledExecutorService executorService = - Executors.newSingleThreadScheduledExecutor( - new ServiceThreadFactory(getName(), true)); - setExecutor(executorService); - scheduledFuture = - executorService.schedule(command, delay, TimeUnit.MILLISECONDS); - } - - /** - * Stop the service. - * If there is any exception noted from any executed notification, - * note the exception in this class - * @throws Exception exception. - */ - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - // propagate any failure - if (getCallbackException() != null) { - throw getCallbackException(); - } - } - - /** - * Get the exception raised by a callback. Will always be null if the - * callback has not been executed; will only be non-null after any success. - * @return a callback - */ - public Exception getCallbackException() { - return command.getException(); - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org