Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3C283200BD8 for ; Wed, 7 Dec 2016 22:10:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3ADCC160B2F; Wed, 7 Dec 2016 21:10:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DDA7E160AF9 for ; Wed, 7 Dec 2016 22:10:14 +0100 (CET) Received: (qmail 38697 invoked by uid 500); 7 Dec 2016 21:10:03 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 37242 invoked by uid 99); 7 Dec 2016 21:10:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Dec 2016 21:10:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 14550F2151; Wed, 7 Dec 2016 21:10:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Wed, 07 Dec 2016 21:10:23 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [23/76] [abbrv] hadoop git commit: YARN-5461. Initial code ported from slider-core module. (jianhe) archived-at: Wed, 07 Dec 2016 21:10:17 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/RunService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/RunService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/RunService.java new file mode 100644 index 0000000..c3a1d0e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/RunService.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.main; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; + +/** + * An interface which services can implement to have their + * execution managed by the ServiceLauncher. + * The command line options will be passed down before the + * {@link Service#init(Configuration)} operation is invoked via an + * invocation of {@link RunService#bindArgs(Configuration, String...)} + * After the service has been successfully started via {@link Service#start()} + * the {@link RunService#runService()} method is called to execute the + * service. When this method returns, the service launcher will exit, using + * the return code from the method as its exit option. + */ +public interface RunService extends Service { + + /** + * Propagate the command line arguments. + * This method is called before {@link Service#init(Configuration)}; + * the configuration that is returned from this operation + * is the one that is passed on to the init operation. + * This permits implemenations to change the configuration before + * the init operation.n + * + * + * @param config the initial configuration build up by the + * service launcher. + * @param args argument list list of arguments passed to the command line + * after any launcher-specific commands have been stripped. + * @return the configuration to init the service with. This MUST NOT be null. + * Recommended: pass down the config parameter with any changes + * @throws Exception any problem + */ + Configuration bindArgs(Configuration config, String... args) throws Exception; + + /** + * Run a service. This called after {@link Service#start()} + * @return the exit code + * @throws Throwable any exception to report + */ + int runService() throws Throwable ; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceLaunchException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceLaunchException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceLaunchException.java new file mode 100644 index 0000000..27813b7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceLaunchException.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.main; + + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * A service launch exception that includes an exit code; + * when caught by the ServiceLauncher, it will convert that + * into a process exit code. + */ +public class ServiceLaunchException extends YarnException + implements ExitCodeProvider, LauncherExitCodes { + + private final int exitCode; + + /** + * Create an exception with the specific exit code + * @param exitCode exit code + * @param cause cause of the exception + */ + public ServiceLaunchException(int exitCode, Throwable cause) { + super(cause); + this.exitCode = exitCode; + } + + /** + * Create an exception with the specific exit code and text + * @param exitCode exit code + * @param message message to use in exception + */ + public ServiceLaunchException(int exitCode, String message) { + super(message); + this.exitCode = exitCode; + } + + /** + * Create an exception with the specific exit code, text and cause + * @param exitCode exit code + * @param message message to use in exception + * @param cause cause of the exception + */ + public ServiceLaunchException(int exitCode, String message, Throwable cause) { + super(message, cause); + this.exitCode = exitCode; + } + + /** + * Get the exit code + * @return the exit code + */ + @Override + public int getExitCode() { + return exitCode; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java new file mode 100644 index 0000000..f192ec8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.main; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A class to launch any service by name. + * + * It's designed to be subclassed for custom entry points. + * + * + * Workflow + *
    + *
  1. An instance of the class is created
  2. + *
  3. If it implements RunService, it is given the binding args off the CLI
  4. + *
  5. Its service.init() and service.start() methods are called.
  6. + *
  7. If it implements RunService, runService() is called and its return + * code used as the exit code.
  8. + *
  9. Otherwise: it waits for the service to stop, assuming in its start() method + * it begins work
  10. + *
  11. If an exception returned an exit code, that becomes the exit code of the + * command.
  12. + *
+ * Error and warning messages are logged to stderr. Why? If the classpath + * is wrong & logger configurations not on it, then no error messages by + * the started app will be seen and the caller is left trying to debug + * using exit codes. + * + */ +@SuppressWarnings("UseOfSystemOutOrSystemErr") +public class ServiceLauncher + implements LauncherExitCodes, IrqHandler.Interrupted, + Thread.UncaughtExceptionHandler { + private static final Logger LOG = LoggerFactory.getLogger( + ServiceLauncher.class); + + protected static final int PRIORITY = 30; + + public static final String NAME = "ServiceLauncher"; + + /** + * Name of the "--conf" argument. + */ + public static final String ARG_CONF = "--conf"; + + public static final String USAGE_MESSAGE = + "Usage: " + NAME + " classname [" + ARG_CONF + + "] | "; + static final int SHUTDOWN_TIME_ON_INTERRUPT = 30 * 1000; + + private volatile S service; + private int serviceExitCode; + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") + private final List interruptHandlers = new ArrayList<>(1); + private Configuration configuration; + private String serviceClassName; + private static AtomicBoolean signalAlreadyReceived = new AtomicBoolean(false); + + + /** + * Create an instance of the launcher + * @param serviceClassName classname of the service + */ + public ServiceLauncher(String serviceClassName) { + this.serviceClassName = serviceClassName; + } + + /** + * Get the service. Null until and unless + * {@link #launchService(Configuration, String[], boolean)} has completed + * @return the service + */ + public S getService() { + return service; + } + + /** + * Get the configuration constructed from the command line arguments + * @return the configuration used to create the service + */ + public Configuration getConfiguration() { + return configuration; + } + + /** + * The exit code from a successful service execution + * @return the exit code. + */ + public int getServiceExitCode() { + return serviceExitCode; + } + + @Override + public String toString() { + return "ServiceLauncher for " + serviceClassName; + } + + /** + * Launch the service, by creating it, initing it, starting it and then + * maybe running it. {@link RunService#bindArgs(Configuration, String...)} is invoked + * on the service between creation and init. + * + * All exceptions that occur are propagated upwards. + * + * If the method returns a status code, it means that it got as far starting + * the service, and if it implements {@link RunService}, that the + * method {@link RunService#runService()} has completed. + * + * At this point, the service is returned by {@link #getService()}. + * + * @param conf configuration + * @param processedArgs arguments after the configuration parameters + * have been stripped out. + * @param addProcessHooks should process failure handlers be added to + * terminate this service on shutdown. Tests should set this to false. + * @throws ClassNotFoundException classname not on the classpath + * @throws IllegalAccessException not allowed at the class + * @throws InstantiationException not allowed to instantiate it + * @throws InterruptedException thread interrupted + * @throws Throwable any other failure + */ + public int launchService(Configuration conf, + String[] processedArgs, + boolean addProcessHooks) + throws Throwable { + + instantiateService(conf); + + // add any process shutdown hooks + if (addProcessHooks) { + ServiceShutdownHook shutdownHook = new ServiceShutdownHook(service); + ShutdownHookManager.get().addShutdownHook(shutdownHook, PRIORITY); + } + RunService runService = null; + + if (service instanceof RunService) { + //if its a runService, pass in the conf and arguments before init) + runService = (RunService) service; + configuration = runService.bindArgs(configuration, processedArgs); + Preconditions.checkNotNull(configuration, + "null configuration returned by bindArgs()"); + } + + //some class constructors init; here this is picked up on. + if (!service.isInState(Service.STATE.INITED)) { + service.init(configuration); + } + service.start(); + int exitCode = EXIT_SUCCESS; + if (runService != null) { + //assume that runnable services are meant to run from here + exitCode = runService.runService(); + LOG.debug("Service exited with exit code {}", exitCode); + + } else { + //run the service until it stops or an interrupt happens on a different thread. + LOG.debug("waiting for service threads to terminate"); + service.waitForServiceToStop(0); + } + //exit + serviceExitCode = exitCode; + return serviceExitCode; + } + + /** + * Instantiate the service defined in serviceClassName + * . Sets the configuration field + * to the configuration, and service to the service. + * + * @param conf configuration to use + * @throws ClassNotFoundException classname not on the classpath + * @throws IllegalAccessException not allowed at the class + * @throws InstantiationException not allowed to instantiate it + */ + @SuppressWarnings("unchecked") + public Service instantiateService(Configuration conf) + throws ClassNotFoundException, InstantiationException, IllegalAccessException, + ExitUtil.ExitException, NoSuchMethodException, InvocationTargetException { + Preconditions.checkArgument(conf != null, "null conf"); + configuration = conf; + + //Instantiate the class -this requires the service to have a public + // zero-argument constructor + Class serviceClass = + this.getClass().getClassLoader().loadClass(serviceClassName); + Object instance = serviceClass.getConstructor().newInstance(); + if (!(instance instanceof Service)) { + //not a service + throw new ExitUtil.ExitException(EXIT_COMMAND_ARGUMENT_ERROR, + "Not a Service class: " + serviceClassName); + } + + service = (S) instance; + return service; + } + + /** + * Register this class as the handler for the control-C interrupt. + * Can be overridden for testing. + */ + protected void registerInterruptHandler() { + try { + interruptHandlers.add(new IrqHandler(IrqHandler.CONTROL_C, this)); + interruptHandlers.add(new IrqHandler(IrqHandler.SIGTERM, this)); + } catch (IOException e) { + error("Signal handler setup failed : {}" + e, e); + } + } + + /** + * The service has been interrupted -try to shut down the service. + * Give the service time to do this before the exit operation is called + * @param interruptData the interrupted data. + */ + @Override + public void interrupted(IrqHandler.InterruptData interruptData) { + String message = "Service interrupted by " + interruptData.toString(); + warn(message); + if (!signalAlreadyReceived.compareAndSet(false, true)) { + warn("Repeated interrupt: escalating to a JVM halt"); + // signal already received. On a second request to a hard JVM + // halt and so bypass any blocking shutdown hooks. + ExitUtil.halt(EXIT_INTERRUPTED, message); + } + int shutdownTimeMillis = SHUTDOWN_TIME_ON_INTERRUPT; + //start an async shutdown thread with a timeout + ServiceForcedShutdown forcedShutdown = + new ServiceForcedShutdown(shutdownTimeMillis); + Thread thread = new Thread(forcedShutdown); + thread.setDaemon(true); + thread.start(); + //wait for that thread to finish + try { + thread.join(shutdownTimeMillis); + } catch (InterruptedException ignored) { + //ignored + } + if (!forcedShutdown.isServiceStopped()) { + warn("Service did not shut down in time"); + } + exit(EXIT_INTERRUPTED, message); + } + + /** + * Uncaught exception handler. + * If an error is raised: shutdown + * The state of the system is unknown at this point -attempting + * a clean shutdown is dangerous. Instead: exit + * @param thread thread that failed + * @param exception exception + */ + @Override + public void uncaughtException(Thread thread, Throwable exception) { + if (ShutdownHookManager.get().isShutdownInProgress()) { + LOG.error("Thread {} threw an error during shutdown: {}.", + thread.toString(), + exception, + exception); + } else if (exception instanceof Error) { + try { + LOG.error("Thread {} threw an error: {}. Shutting down", + thread.toString(), + exception, + exception); + } catch (Throwable err) { + // We don't want to not exit because of an issue with logging + } + if (exception instanceof OutOfMemoryError) { + // After catching an OOM java says it is undefined behavior, so don't + // even try to clean up or we can get stuck on shutdown. + try { + System.err.println("Halting due to Out Of Memory Error..."); + } catch (Throwable err) { + // Again we don't want to exit because of logging issues. + } + ExitUtil.halt(EXIT_EXCEPTION_THROWN); + } else { + // error other than OutOfMemory + exit(convertToExitException(exception)); + } + } else { + // simple exception in a thread. There's a policy decision here: + // terminate the service vs. keep going after a thread has failed + LOG.error("Thread {} threw an exception: {}", + thread.toString(), + exception, + exception); + } + } + + /** + * Print a warning: currently this goes to stderr + * @param text + */ + protected void warn(String text) { + System.err.println(text); + } + + /** + * Report an error. The message is printed to stderr; the exception + * is logged via the current logger. + * @param message message for the user + * @param thrown the exception thrown + */ + protected void error(String message, Throwable thrown) { + String text = "Exception: " + message; + warn(text); + LOG.error(text, thrown); + } + + /** + * Exit the code. + * This is method can be overridden for testing, throwing an + * exception instead. Any subclassed method MUST raise an + * ExitUtil.ExitException instance. + * The service launcher code assumes that after this method is invoked, + * no other code in the same method is called. + * @param exitCode code to exit + */ + protected void exit(int exitCode, String message) { + ExitUtil.terminate(exitCode, message); + } + + /** + * Exit off an exception. This can be subclassed for testing + * @param ee exit exception + */ + protected void exit(ExitUtil.ExitException ee) { + ExitUtil.terminate(ee.status, ee); + } + + /** + * Get the service name via {@link Service#getName()}. + * If the service is not instantiated, the classname is returned instead. + * @return the service name + */ + public String getServiceName() { + Service s = service; + String name = null; + if (s != null) { + try { + name = s.getName(); + } catch (Exception ignored) { + // ignored + } + } + if (name != null) { + return "service " + name; + } else { + return "service classname " + serviceClassName; + } + } + + /** + * Parse the command line, building a configuration from it, then + * launch the service and wait for it to finish. finally, exit + * passing the status code to the #exit(int) method. + * @param args arguments to the service. arg[0] is + * assumed to be the service classname and is automatically + */ + public void launchServiceAndExit(List args) { + + registerInterruptHandler(); + //Currently the config just the default + Configuration conf = new Configuration(); + String[] processedArgs = extractConfigurationArgs(conf, args); + ExitUtil.ExitException ee = launchServiceRobustly(conf, processedArgs); + System.out.flush(); + System.err.flush(); + exit(ee); + } + + /** + * Extract the configuration arguments and apply them to the configuration, + * building an array of processed arguments to hand down to the service. + * + * @param conf configuration to update + * @param args main arguments. args[0] is assumed to be the service + * classname and is skipped + * @return the processed list. + */ + public static String[] extractConfigurationArgs(Configuration conf, + List args) { + + //convert args to a list + int argCount = args.size(); + if (argCount <= 1 ) { + return new String[0]; + } + List argsList = new ArrayList(argCount); + ListIterator arguments = args.listIterator(); + //skip that first entry + arguments.next(); + while (arguments.hasNext()) { + String arg = arguments.next(); + if (arg.equals(ARG_CONF)) { + //the argument is a --conf file tuple: extract the path and load + //it in as a configuration resource. + + //increment the loop iterator + if (!arguments.hasNext()) { + //overshot the end of the file + exitWithMessage(EXIT_COMMAND_ARGUMENT_ERROR, + ARG_CONF + ": missing configuration file after "); + } + File file = new File(arguments.next()); + if (!file.exists()) { + exitWithMessage(EXIT_COMMAND_ARGUMENT_ERROR, + ARG_CONF + ": configuration file not found: " + file); + } + try { + conf.addResource(file.toURI().toURL()); + } catch (MalformedURLException e) { + LOG.debug("File {} cannot be converted to URL", file, e); + exitWithMessage(EXIT_COMMAND_ARGUMENT_ERROR, + ARG_CONF + ": configuration file path invalid: " + file); + } + } else { + argsList.add(arg); + } + } + String[] processedArgs = new String[argsList.size()]; + argsList.toArray(processedArgs); + return processedArgs; + } + + /** + * Launch a service catching all exceptions and downgrading them to exit codes + * after logging. + * @param conf configuration to use + * @param processedArgs command line after the launcher-specific arguments have + * been stripped out + * @return an exit exception, which will have a status code of 0 if it worked + */ + public ExitUtil.ExitException launchServiceRobustly(Configuration conf, + String[] processedArgs) { + ExitUtil.ExitException exitException; + try { + int exitCode = launchService(conf, processedArgs, true); + if (service != null) { + Throwable failure = service.getFailureCause(); + if (failure != null) { + //the service exited with a failure. + //check what state it is in + Service.STATE failureState = service.getFailureState(); + if (failureState == Service.STATE.STOPPED) { + //the failure occurred during shutdown, not important enough to bother + //the user as it may just scare them + LOG.debug("Failure during shutdown:{} ", failure, failure); + } else { + //throw it for the catch handlers to deal with + throw failure; + } + } + } + exitException = new ExitUtil.ExitException(exitCode, + "In " + serviceClassName); + // either the service succeeded, or an error raised during shutdown, + // which we don't worry that much about + } catch (ExitUtil.ExitException ee) { + exitException = ee; + } catch (Throwable thrown) { + exitException = convertToExitException(thrown); + } + return exitException; + } + + /** + * Convert the exception to one that can be handed off to ExitUtils; + * if it is of the write type it is passed throw as is. If not, a + * new exception with the exit code {@link #EXIT_EXCEPTION_THROWN} + * is created, with the argument thrown as the inner cause + * @param thrown the exception thrown + * @return an exception to terminate the process with + */ + protected ExitUtil.ExitException convertToExitException(Throwable thrown) { + ExitUtil.ExitException exitException; + int exitCode; + String message = thrown.getMessage(); + if (message == null) { + message = thrown.toString(); + } + if (thrown instanceof ExitCodeProvider) { + exitCode = ((ExitCodeProvider) thrown).getExitCode(); + if (LOG.isDebugEnabled()) { + LOG.debug("While running {}: {}", getServiceName(), message, thrown); + } + LOG.error(message); + } else { + // not any of the service launcher exceptions -assume something worse + error(message, thrown); + exitCode = EXIT_EXCEPTION_THROWN; + } + exitException = new ExitUtil.ExitException(exitCode, message); + exitException.initCause(thrown); + return exitException; + } + + + /** + * Build a log message for starting up and shutting down. + * This was grabbed from the ToolRunner code. + * @param classname the class of the server + * @param args arguments + */ + public static String startupShutdownMessage(String classname, + List args) { + final String hostname = NetUtils.getHostname(); + + return toStartupShutdownString("STARTUP_MSG: ", new String[]{ + "Starting " + classname, + " host = " + hostname, + " args = " + args, + " version = " + VersionInfo.getVersion(), + " classpath = " + System.getProperty("java.class.path"), + " build = " + VersionInfo.getUrl() + " -r " + + VersionInfo.getRevision() + + "; compiled by '" + VersionInfo.getUser() + + "' on " + VersionInfo.getDate(), + " java = " + System.getProperty("java.version") + }); + } + + /** + * Exit with a printed message + * @param status status code + * @param message message + */ + private static void exitWithMessage(int status, String message) { + System.err.println(message); + ExitUtil.terminate(status); + } + + private static String toStartupShutdownString(String prefix, String[] msg) { + StringBuilder b = new StringBuilder(prefix); + b.append("\n/************************************************************"); + for (String s : msg) { + b.append("\n").append(prefix).append(s); + } + b.append("\n************************************************************/"); + return b.toString(); + } + + /** + * forced shutdown runnable. + */ + protected class ServiceForcedShutdown implements Runnable { + + private final int shutdownTimeMillis; + private boolean serviceStopped; + + public ServiceForcedShutdown(int shutdownTimeoutMillis) { + this.shutdownTimeMillis = shutdownTimeoutMillis; + } + + @Override + public void run() { + if (service != null) { + service.stop(); + serviceStopped = service.waitForServiceToStop(shutdownTimeMillis); + } else { + serviceStopped = true; + } + } + + private boolean isServiceStopped() { + return serviceStopped; + } + } + + /** + * The real main function, which takes the arguments as a list + * arg 0 must be the service classname + * @param argsList the list of arguments + */ + public static void serviceMain(List argsList) { + if (argsList.isEmpty()) { + exitWithMessage(EXIT_USAGE, USAGE_MESSAGE); + } else { + String serviceClassName = argsList.get(0); + + if (LOG.isDebugEnabled()) { + LOG.debug(startupShutdownMessage(serviceClassName, argsList)); + StringBuilder builder = new StringBuilder(); + for (String arg : argsList) { + builder.append('"').append(arg).append("\" "); + } + LOG.debug(builder.toString()); + } + Thread.setDefaultUncaughtExceptionHandler( + new YarnUncaughtExceptionHandler()); + + ServiceLauncher serviceLauncher = new ServiceLauncher<>(serviceClassName); + serviceLauncher.launchServiceAndExit(argsList); + } + } + + /** + * This is the main entry point for the service launcher. + * @param args command line arguments. + */ + public static void main(String[] args) { + List argsList = Arrays.asList(args); + serviceMain(argsList); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceShutdownHook.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceShutdownHook.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceShutdownHook.java new file mode 100644 index 0000000..de55789 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/main/ServiceShutdownHook.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.main; + +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.ShutdownHookManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.ref.WeakReference; + +/** + * JVM Shutdown hook for Service which will stop the + * Service gracefully in case of JVM shutdown. + * This hook uses a weak reference to the service, so + * does not cause services to be retained after they have + * been stopped and deferenced elsewhere. + */ +public class ServiceShutdownHook implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger( + ServiceShutdownHook.class); + + private final WeakReference serviceRef; + private Runnable hook; + + public ServiceShutdownHook(Service service) { + serviceRef = new WeakReference<>(service); + } + + public void register(int priority) { + unregister(); + hook = this; + ShutdownHookManager.get().addShutdownHook(hook, priority); + } + + public synchronized void unregister() { + if (hook != null) { + try { + ShutdownHookManager.get().removeShutdownHook(hook); + } catch (IllegalStateException e) { + LOG.info("Failed to unregister shutdown hook: {}", e, e); + } + hook = null; + } + } + + @Override + public void run() { + Service service; + synchronized (this) { + service = serviceRef.get(); + serviceRef.clear(); + } + if (service == null) { + return; + } + try { + // Stop the Service + service.stop(); + } catch (Throwable t) { + LOG.info("Error stopping {}", service.getName(), t); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/AggregateConfSerDeser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/AggregateConfSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/AggregateConfSerDeser.java new file mode 100644 index 0000000..90537b6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/AggregateConfSerDeser.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +import org.apache.slider.core.conf.AggregateConf; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; + +import java.io.IOException; + +/** + * Conf tree to JSON binding + */ +public class AggregateConfSerDeser extends JsonSerDeser { + public AggregateConfSerDeser() { + super(AggregateConf.class); + } + + + private static final AggregateConfSerDeser + staticinstance = new AggregateConfSerDeser(); + + /** + * Convert a tree instance to a JSON string -sync access to a shared ser/deser + * object instance + * @param instance object to convert + * @return a JSON string description + * @throws JsonParseException parse problems + * @throws JsonMappingException O/J mapping problems + */ + public static String toString(AggregateConf instance) throws IOException, + JsonGenerationException, + JsonMappingException { + synchronized (staticinstance) { + return staticinstance.toJson(instance); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/AppDefinitionPersister.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/AppDefinitionPersister.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/AppDefinitionPersister.java new file mode 100644 index 0000000..7fb3158 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/AppDefinitionPersister.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.Files; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.params.AbstractClusterBuildingActionArgs; +import org.apache.slider.common.params.Arguments; +import org.apache.slider.common.tools.SliderFileSystem; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.conf.ConfTreeOperations; +import org.apache.slider.core.exceptions.BadCommandArgumentsException; +import org.apache.slider.core.exceptions.BadConfigException; +import org.apache.slider.providers.agent.AgentKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Class to prepare and persist app and add-on definitions. + * + * In this case, the app definition and add-on definitions are auto-inferred from the user input rather than explicit + * inclusion of application package in the config. + * + * Processing an app definition involves one or more of the following: - modify appConfig - package definition into a + * temporary folder - upload to HDFS + * + * This class keeps track of all the required operations and allows them to be invoked by build operation + */ +public class AppDefinitionPersister { + private static final Logger log = + LoggerFactory.getLogger(AppDefinitionPersister.class); + + private final SliderFileSystem sliderFileSystem; + private List appDefinitions; + + public AppDefinitionPersister(SliderFileSystem sliderFileSystem) { + this.sliderFileSystem = sliderFileSystem; + appDefinitions = new ArrayList<>(); + } + + + /** + * Process the application package or folder by copying it to the cluster path + * + * @param appDefinition details of application package + * + * @throws BadConfigException + * @throws IOException + */ + private void persistDefinitionPackageOrFolder(AppDefinition appDefinition) + throws BadConfigException, IOException { + if (!appDefinition.appDefPkgOrFolder.canRead()) { + throw new BadConfigException("Pkg/Folder cannot be accessed - " + + appDefinition.appDefPkgOrFolder.getAbsolutePath()); + } + + File src = appDefinition.appDefPkgOrFolder; + String targetName = appDefinition.pkgName; + log.debug("Package name: " + targetName); + if (appDefinition.appDefPkgOrFolder.isDirectory()) { + log.info("Processing app package/folder {} for {}", + appDefinition.appDefPkgOrFolder.getAbsolutePath(), + appDefinition.pkgName); + File tmpDir = Files.createTempDir(); + File zipFile = new File(tmpDir.getCanonicalPath(), File.separator + appDefinition.pkgName); + SliderUtils.zipFolder(appDefinition.appDefPkgOrFolder, zipFile); + src = zipFile; + } + + sliderFileSystem.getFileSystem().copyFromLocalFile( + false, + false, + new Path(src.toURI()), + new Path(appDefinition.targetFolderInFs, targetName)); + } + + public void persistPackages() throws BadConfigException, IOException { + for (AppDefinition appDefinition : appDefinitions) { + persistDefinitionPackageOrFolder(appDefinition); + } + } + + public void processSuppliedDefinitions(String clustername, + AbstractClusterBuildingActionArgs buildInfo, + ConfTreeOperations appConf) + throws BadConfigException, IOException, BadCommandArgumentsException { + // if metainfo is provided add to the app instance + if (buildInfo.appMetaInfo != null || buildInfo.appMetaInfoJson != null) { + if (buildInfo.appMetaInfo != null && buildInfo.appMetaInfoJson != null) { + throw new BadConfigException("Both %s and %s cannot be specified", + Arguments.ARG_METAINFO, Arguments.ARG_METAINFO_JSON); + } + + // Now we know that only one of either file or JSON is used + boolean isFileUsed = buildInfo.appMetaInfo != null ? true : false; + String argUsed = isFileUsed ? Arguments.ARG_METAINFO + : Arguments.ARG_METAINFO_JSON; + + if (buildInfo.appDef != null) { + throw new BadConfigException("Both %s and %s cannot be specified", + argUsed, Arguments.ARG_APPDEF); + } + if (SliderUtils.isSet(appConf.getGlobalOptions().get(AgentKeys.APP_DEF))) { + throw new BadConfigException( + "%s cannot not be set if %s is specified in the cmd line ", + AgentKeys.APP_DEF, argUsed); + } + + if (isFileUsed) { + if (!buildInfo.appMetaInfo.canRead() || !buildInfo.appMetaInfo.isFile()) { + throw new BadConfigException( + "Path specified with %s either cannot be read or is not a file", + Arguments.ARG_METAINFO); + } + } else { + if (StringUtils.isEmpty(buildInfo.appMetaInfoJson.trim())) { + throw new BadConfigException("Empty string specified with %s", + Arguments.ARG_METAINFO_JSON); + } + } + + File tempDir = Files.createTempDir(); + File pkgSrcDir = new File(tempDir, "default"); + pkgSrcDir.mkdirs(); + File destMetaInfo = new File(pkgSrcDir, "metainfo.json"); + if (isFileUsed) { + if (buildInfo.appMetaInfo.getName().endsWith(".xml")) { + Files.copy(buildInfo.appMetaInfo, new File(pkgSrcDir, "metainfo.xml")); + } else { + Files.copy(buildInfo.appMetaInfo, destMetaInfo); + } + } else { + Files.write( + buildInfo.appMetaInfoJson.getBytes(Charset.forName("UTF-8")), + destMetaInfo); + } + + Path appDirPath = sliderFileSystem.buildAppDefDirPath(clustername); + log.info("Using default app def path {}", appDirPath.toString()); + + appDefinitions.add(new AppDefinition(appDirPath, pkgSrcDir, + SliderKeys.DEFAULT_APP_PKG)); + Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG); + appConf.getGlobalOptions().set(AgentKeys.APP_DEF, appDefPath); + log.info("Setting app package to {}.", appDefPath); + } + + if (buildInfo.appDef != null) { + if (SliderUtils.isSet(appConf.getGlobalOptions().get(AgentKeys.APP_DEF))) { + throw new BadConfigException("application.def must not be set if --appdef is provided."); + } + + if (!buildInfo.appDef.exists()) { + throw new BadConfigException("--appdef is not a valid path."); + } + + Path appDirPath = sliderFileSystem.buildAppDefDirPath(clustername); + appDefinitions.add(new AppDefinition(appDirPath, buildInfo.appDef, SliderKeys.DEFAULT_APP_PKG)); + Path appDefPath = new Path(appDirPath, SliderKeys.DEFAULT_APP_PKG); + appConf.getGlobalOptions().set(AgentKeys.APP_DEF, appDefPath); + log.info("Setting app package to {}.", appDefPath); + } + + if (buildInfo.addonDelegate.getAddonMap().size() > 0) { + if (SliderUtils.isUnset(appConf.getGlobalOptions().get(AgentKeys.APP_DEF))) { + throw new BadConfigException("addon package can only be specified if main app package is specified."); + } + + List addons = new ArrayList(); + Map addonMap = buildInfo.addonDelegate.getAddonMap(); + for (String key : addonMap.keySet()) { + File defPath = new File(addonMap.get(key)); + if (SliderUtils.isUnset(addonMap.get(key))) { + throw new BadConfigException("Invalid path for addon package " + key); + } + + if (!defPath.exists()) { + throw new BadConfigException("addon folder or package path is not valid."); + } + + Path addonPath = sliderFileSystem.buildAddonDirPath(clustername, key); + String addonPkgName = "addon_" + key + ".zip"; + + log.debug( + "addonMap.get(key): {} addonPath: {} defPath: {} addonPkgName: {}", + addonMap.get(key), addonPath, defPath, addonPkgName); + + appDefinitions.add(new AppDefinition(addonPath, defPath, addonPkgName)); + String addOnKey = AgentKeys.ADDON_PREFIX + key; + Path addonPkgPath = new Path(addonPath, addonPkgName); + log.info("Setting addon package {} to {}.", addOnKey, addonPkgPath); + appConf.getGlobalOptions().set(addOnKey, addonPkgPath); + addons.add(addOnKey); + } + + String existingList = appConf.getGlobalOptions().get(AgentKeys.ADDONS); + if (SliderUtils.isUnset(existingList)) { + existingList = ""; + } + appConf.getGlobalOptions().set(AgentKeys.ADDONS, existingList + StringUtils.join(addons, ",")); + } + } + + + @VisibleForTesting + public List getAppDefinitions() { + return appDefinitions; + } + + // Helper class to hold details for the app and addon packages + public class AppDefinition { + // The target folder where the package will be stored + public Path targetFolderInFs; + // The on disk location of the app def package or folder + public File appDefPkgOrFolder; + // Package name + public String pkgName; + + public AppDefinition(Path targetFolderInFs, File appDefPkgOrFolder, String pkgName) { + this.targetFolderInFs = targetFolderInFs; + this.appDefPkgOrFolder = appDefPkgOrFolder; + this.pkgName = pkgName; + } + + @Override + public String toString() { + return new StringBuilder().append("targetFolderInFs").append(" : ").append(targetFolderInFs.toString()) + .append(", ") + .append("appDefPkgOrFolder").append(" : ").append(appDefPkgOrFolder.toString()) + .append(", ") + .append("pkgName").append(" : ").append(pkgName).toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ApplicationReportSerDeser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ApplicationReportSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ApplicationReportSerDeser.java new file mode 100644 index 0000000..a8c72ce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ApplicationReportSerDeser.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +import org.apache.slider.core.launch.SerializedApplicationReport; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; + +import java.io.IOException; + +/** + * Persistence of {@link SerializedApplicationReport} + * + */ +public class ApplicationReportSerDeser + extends JsonSerDeser { + public ApplicationReportSerDeser() { + super(SerializedApplicationReport.class); + } + + + private static final ApplicationReportSerDeser + staticinstance = new ApplicationReportSerDeser(); + + /** + * Convert an instance to a JSON string -sync access to a shared ser/deser + * object instance + * @param instance object to convert + * @return a JSON string description + * @throws JsonParseException parse problems + * @throws JsonMappingException O/J mapping problems + */ + public static String toString(SerializedApplicationReport instance) + throws IOException, JsonGenerationException, JsonMappingException { + synchronized (staticinstance) { + return staticinstance.toJson(instance); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ConfPersister.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ConfPersister.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ConfPersister.java new file mode 100644 index 0000000..9759205 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ConfPersister.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.slider.common.tools.CoreFileSystem; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.exceptions.SliderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Date; + +/** + * Class to implement persistence of a configuration. + * + * This code contains the logic to acquire and release locks. + * # writelock MUST be acquired exclusively for writes. This is done + * by creating the file with no overwrite + * # shared readlock MUST be acquired for reads. This is done by creating the readlock + * file with overwrite forbidden -but treating a failure as a sign that + * the lock exists, and therefore the operation can continue. + * # releaselock is only released if the client created it. + * # after acquiring either lock, client must check for the alternate lock + * existing. If it is, release lock and fail. + * + * There's one small race here: multiple readers; first reader releases lock + * while second is in use. + * + * Strict Fix: client checks for readlock after read completed. + * If it is not there, problem: fail. But this massively increases the risk of + * false negatives. + * + * This isn't 100% perfect, because of the condition where the owner releases + * a lock, a writer grabs its lock & writes to it, the reader gets slightly + * contaminated data: + * own-share-delete-write-own-release(shared)-delete + * + * We are assuming that the rate of change is low enough that this is rare, and + * of limited damage. + * + * ONCE A CLUSTER IS RUNNING, ONLY THE AM MAY PERSIST UPDATES VIA ITS APIs + * + * That is: outside the AM, a writelock MUST only be acquired after verifying there is no + * running application. + */ +public class ConfPersister { + private static final Logger log = + LoggerFactory.getLogger(ConfPersister.class); + + + private final ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser(); + + private final CoreFileSystem coreFS; + private final FileSystem fileSystem; + private final Path persistDir; + private final Path internal, resources, app_conf; + private final Path writelock, readlock; + + public ConfPersister(CoreFileSystem coreFS, Path persistDir) { + this.coreFS = coreFS; + this.persistDir = persistDir; + internal = new Path(persistDir, Filenames.INTERNAL); + resources = new Path(persistDir, Filenames.RESOURCES); + app_conf = new Path(persistDir, Filenames.APPCONF); + writelock = new Path(persistDir, Filenames.WRITELOCK); + readlock = new Path(persistDir, Filenames.READLOCK); + fileSystem = coreFS.getFileSystem(); + } + + /** + * Get the target directory + * @return the directory for persistence + */ + public Path getPersistDir() { + return persistDir; + } + + /** + * Make the persistent directory + * @throws IOException IO failure + */ + public void mkPersistDir() throws IOException { + coreFS.getFileSystem().mkdirs(persistDir); + } + + @Override + public String toString() { + return "Persister to " + persistDir; + } + + /** + * Acquire the writelock + * @throws IOException IO + * @throws LockAcquireFailedException + */ + @VisibleForTesting + void acquireWritelock() throws IOException, + LockAcquireFailedException { + mkPersistDir(); + long now = System.currentTimeMillis(); + try { + coreFS.cat(writelock, false, new Date(now).toGMTString()); + } catch (FileAlreadyExistsException e) { + // filesystems should raise this (HDFS does) + throw new LockAcquireFailedException(writelock); + } catch (IOException e) { + // some filesystems throw a generic IOE + throw new LockAcquireFailedException(writelock, e); + } + //here the lock is acquired, but verify there is no readlock + boolean lockFailure; + try { + lockFailure = readLockExists(); + } catch (IOException e) { + lockFailure = true; + } + if (lockFailure) { + releaseWritelock(); + throw new LockAcquireFailedException(readlock); + } + } + + @VisibleForTesting + boolean readLockExists() throws IOException { + return fileSystem.exists(readlock); + } + + /** + * Release the writelock if it is present. + * IOExceptions are logged + */ + @VisibleForTesting + boolean releaseWritelock() { + try { + return fileSystem.delete(writelock, false); + } catch (IOException e) { + log.warn("IOException releasing writelock {} ", writelock, e); + } + return false; + } + + /** + * Acquire the writelock + * @throws IOException IO + * @throws LockAcquireFailedException + * @throws FileNotFoundException if the target dir does not exist. + */ + @VisibleForTesting + boolean acquireReadLock() throws FileNotFoundException, + IOException, + LockAcquireFailedException { + if (!coreFS.getFileSystem().exists(persistDir)) { + // the dir is not there, so the data is not there, so there + // is nothing to read + throw new FileNotFoundException(persistDir.toString()); + } + long now = System.currentTimeMillis(); + boolean owner; + try { + coreFS.cat(readlock, false, new Date(now).toGMTString()); + owner = true; + } catch (IOException e) { + owner = false; + } + //here the lock is acquired, but verify there is no readlock + boolean lockFailure; + try { + lockFailure = writelockExists(); + } catch (IOException e) { + lockFailure = true; + } + if (lockFailure) { + releaseReadlock(owner); + throw new LockAcquireFailedException(writelock); + } + return owner; + } + + @VisibleForTesting + boolean writelockExists() throws IOException { + return fileSystem.exists(writelock); + } + + /** + * Release the writelock if it is present. + * IOExceptions are downgraded to failures + * @return true if the lock was present and then released + */ + @VisibleForTesting + boolean releaseReadlock(boolean owner) { + if (owner) { + try { + return fileSystem.delete(readlock, false); + } catch (IOException e) { + log.warn("IOException releasing writelock {} ", readlock, e); + } + } + return false; + } + + private void saveConf(AggregateConf conf) throws IOException { + confTreeSerDeser.save(fileSystem, internal, conf.getInternal(), true); + confTreeSerDeser.save(fileSystem, resources, conf.getResources(), true); + confTreeSerDeser.save(fileSystem, app_conf, conf.getAppConf(), true); + } + + private void loadConf(AggregateConf conf) throws IOException { + conf.setInternal(confTreeSerDeser.load(fileSystem, internal)); + conf.setResources(confTreeSerDeser.load(fileSystem, resources)); + conf.setAppConf(confTreeSerDeser.load(fileSystem, app_conf)); + } + + + private void maybeExecLockHeldAction(LockHeldAction action) throws + IOException, + SliderException { + if (action != null) { + action.execute(); + } + } + + /** + * Save the configuration + * @param conf configuration to fill in + * @param action + * @throws IOException IO problems + * @throws LockAcquireFailedException the lock could not be acquired + */ + public void save(AggregateConf conf, LockHeldAction action) throws + IOException, + SliderException, + LockAcquireFailedException { + acquireWritelock(); + try { + saveConf(conf); + maybeExecLockHeldAction(action); + } finally { + releaseWritelock(); + } + } + + /** + * Load the configuration. If a lock failure is raised, the + * contents of the configuration MAY have changed -lock race conditions + * are looked for on exit + * @param conf configuration to fill in + * @throws IOException IO problems + * @throws LockAcquireFailedException the lock could not be acquired + */ + public void load(AggregateConf conf) throws + FileNotFoundException, + IOException, + SliderException, + LockAcquireFailedException { + boolean owner = acquireReadLock(); + try { + loadConf(conf); + } finally { + releaseReadlock(owner); + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ConfTreeSerDeser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ConfTreeSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ConfTreeSerDeser.java new file mode 100644 index 0000000..8271ef1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/ConfTreeSerDeser.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +import org.apache.slider.core.conf.ConfTree; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; + +import java.io.IOException; + +/** + * Conf tree to JSON binding + */ +public class ConfTreeSerDeser extends JsonSerDeser { + public ConfTreeSerDeser() { + super(ConfTree.class); + } + + + private static final ConfTreeSerDeser staticinstance = new ConfTreeSerDeser(); + + /** + * Convert a tree instance to a JSON string -sync access to a shared ser/deser + * object instance + * @param instance object to convert + * @return a JSON string description + * @throws JsonParseException parse problems + * @throws JsonMappingException O/J mapping problems + */ + public static String toString(ConfTree instance) throws IOException, + JsonGenerationException, + JsonMappingException { + synchronized (staticinstance) { + return staticinstance.toJson(instance); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/Filenames.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/Filenames.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/Filenames.java new file mode 100644 index 0000000..06ecc51 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/Filenames.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +public interface Filenames { + + String RESOURCES = "resources.json"; + String APPCONF = "app_config.json"; + String INTERNAL = "internal.json"; + String WRITELOCK = "writelock"; + String READLOCK = "readlock"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/InstancePaths.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/InstancePaths.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/InstancePaths.java new file mode 100644 index 0000000..3505ac3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/InstancePaths.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +import org.apache.hadoop.fs.Path; +import org.apache.slider.common.SliderKeys; + +/** + * Build up all the paths of an instance relative to the supplied instance + * directory. + */ +public class InstancePaths { + + public final Path instanceDir; + public final Path snapshotConfPath; + public final Path generatedConfPath; + public final Path historyPath; + public final Path dataPath; + public final Path tmpPath; + public final Path tmpPathAM; + public final Path appDefPath; + public final Path addonsPath; + + public InstancePaths(Path instanceDir) { + this.instanceDir = instanceDir; + snapshotConfPath = + new Path(instanceDir, SliderKeys.SNAPSHOT_CONF_DIR_NAME); + generatedConfPath = + new Path(instanceDir, SliderKeys.GENERATED_CONF_DIR_NAME); + historyPath = new Path(instanceDir, SliderKeys.HISTORY_DIR_NAME); + dataPath = new Path(instanceDir, SliderKeys.DATA_DIR_NAME); + tmpPath = new Path(instanceDir, SliderKeys.TMP_DIR_PREFIX); + tmpPathAM = new Path(tmpPath, SliderKeys.AM_DIR_PREFIX); + appDefPath = new Path(tmpPath, SliderKeys.APP_DEF_DIR); + addonsPath = new Path(tmpPath, SliderKeys.ADDONS_DIR); + } + + @Override + public String toString() { + return "instance at " + instanceDir; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/JsonSerDeser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/JsonSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/JsonSerDeser.java new file mode 100644 index 0000000..4f60c06 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/JsonSerDeser.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Support for marshalling objects to and from JSON. + * This class is NOT thread safe; it constructs an object mapper + * as an instance field. + * @param + */ +public class JsonSerDeser { + + private static final Logger log = LoggerFactory.getLogger(JsonSerDeser.class); + private static final String UTF_8 = "UTF-8"; + + private final Class classType; + private final ObjectMapper mapper; + + /** + * Create an instance bound to a specific type + * @param classType class type + */ + public JsonSerDeser(Class classType) { + this.classType = classType; + this.mapper = new ObjectMapper(); + mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + /** + * Convert from JSON + * @param json input + * @return the parsed JSON + * @throws IOException IO + * @throws JsonMappingException failure to map from the JSON to this class + */ + public T fromJson(String json) + throws IOException, JsonParseException, JsonMappingException { + try { + return mapper.readValue(json, classType); + } catch (IOException e) { + log.error("Exception while parsing json : " + e + "\n" + json, e); + throw e; + } + } + + /** + * Convert from a JSON file + * @param jsonFile input file + * @return the parsed JSON + * @throws IOException IO problems + * @throws JsonMappingException failure to map from the JSON to this class + */ + public T fromFile(File jsonFile) + throws IOException, JsonParseException, JsonMappingException { + File absoluteFile = jsonFile.getAbsoluteFile(); + try { + return mapper.readValue(absoluteFile, classType); + } catch (IOException e) { + log.error("Exception while parsing json file {}", absoluteFile, e); + throw e; + } + } + + /** + * Convert from a JSON file + * @param resource input file + * @return the parsed JSON + * @throws IOException IO problems + * @throws JsonMappingException failure to map from the JSON to this class + */ + public T fromResource(String resource) + throws IOException, JsonParseException, JsonMappingException { + try(InputStream resStream = this.getClass().getResourceAsStream(resource)) { + if (resStream == null) { + throw new FileNotFoundException(resource); + } + return (T) (mapper.readValue(resStream, classType)); + } catch (IOException e) { + log.error("Exception while parsing json resource {}", resource, e); + throw e; + } + } + + /** + * Convert from an input stream, closing the stream afterwards. + * @param stream + * @return the parsed JSON + * @throws IOException IO problems + */ + public T fromStream(InputStream stream) throws IOException { + try { + return (T) (mapper.readValue(stream, classType)); + } catch (IOException e) { + log.error("Exception while parsing json input stream", e); + throw e; + } finally { + IOUtils.closeStream(stream); + } + } + + /** + * clone by converting to JSON and back again. + * This is much less efficient than any Java clone process. + * @param instance instance to duplicate + * @return a new instance + * @throws IOException problems. + */ + public T fromInstance(T instance) throws IOException { + return fromJson(toJson(instance)); + } + + /** + * Deserialize from a byte array + * @param b + * @return the deserialized value + * @throws IOException parse problems + */ + public T fromBytes(byte[] b) throws IOException { + String json = new String(b, 0, b.length, UTF_8); + return fromJson(json); + } + + /** + * Load from a Hadoop filesystem + * @param fs filesystem + * @param path path + * @return a loaded CD + * @throws IOException IO problems + * @throws JsonParseException parse problems + * @throws JsonMappingException O/J mapping problems + */ + public T load(FileSystem fs, Path path) + throws IOException, JsonParseException, JsonMappingException { + FileStatus status = fs.getFileStatus(path); + long len = status.getLen(); + byte[] b = new byte[(int) len]; + FSDataInputStream dataInputStream = fs.open(path); + int count = dataInputStream.read(b); + if (count != len) { + throw new EOFException("Read of " + path +" finished prematurely"); + } + return fromBytes(b); + } + + + /** + * Save to a hadoop filesystem + * @param fs filesystem + * @param path path + * @param instance instance to save + * @param overwrite should any existing file be overwritten + * @throws IOException IO exception + */ + public void save(FileSystem fs, Path path, T instance, + boolean overwrite) throws + IOException { + FSDataOutputStream dataOutputStream = fs.create(path, overwrite); + writeJsonAsBytes(instance, dataOutputStream); + } + + /** + * Save an instance to a file + * @param instance instance to save + * @param file file + * @throws IOException + */ + public void save(T instance, File file) throws + IOException { + writeJsonAsBytes(instance, new FileOutputStream(file.getAbsoluteFile())); + } + + /** + * Write the json as bytes -then close the file + * @param dataOutputStream an outout stream that will always be closed + * @throws IOException on any failure + */ + private void writeJsonAsBytes(T instance, + OutputStream dataOutputStream) throws IOException { + try { + String json = toJson(instance); + byte[] b = json.getBytes(UTF_8); + dataOutputStream.write(b); + dataOutputStream.flush(); + dataOutputStream.close(); + } finally { + IOUtils.closeStream(dataOutputStream); + } + } + + /** + * Convert an object to a JSON string + * @param instance instance to convert + * @return a JSON string description + * @throws JsonParseException parse problems + * @throws JsonMappingException O/J mapping problems + */ + public String toJson(T instance) throws IOException, + JsonGenerationException, + JsonMappingException { + mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); + return mapper.writeValueAsString(instance); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/LockAcquireFailedException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/LockAcquireFailedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/LockAcquireFailedException.java new file mode 100644 index 0000000..da58520 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/LockAcquireFailedException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +import org.apache.hadoop.fs.Path; + +public class LockAcquireFailedException extends Exception { + + private final Path path; + + public LockAcquireFailedException(Path path) { + super("Failed to acquire lock " +path); + this.path = path; + } + + public LockAcquireFailedException(Path path, Throwable cause) { + super("Failed to acquire lock " + path, cause); + this.path = path; + } + + public Path getPath() { + return path; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/LockHeldAction.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/LockHeldAction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/LockHeldAction.java new file mode 100644 index 0000000..6659687 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/LockHeldAction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +import org.apache.slider.core.exceptions.SliderException; + +import java.io.IOException; + +/** + * Optional action to add while the lock is held; this is needed to execute + * some other persistent operations within the scope at the same lock + * without inserting too much code into the persister + */ +public interface LockHeldAction { + + /** + * Execute the action + * @throws IOException on any failure + */ + public void execute() throws IOException, SliderException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/PersistKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/PersistKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/PersistKeys.java new file mode 100644 index 0000000..1964459 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/persist/PersistKeys.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.persist; + +public class PersistKeys { + + public static final String SCHEMA = + "http://example.org/specification/v2.0.0"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/SliderRegistryUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/SliderRegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/SliderRegistryUtils.java new file mode 100644 index 0000000..37b36ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/registry/SliderRegistryUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.core.registry; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.slider.common.SliderKeys; + +/** + * Miscellaneous methods to assist slider registry work + * + */ +public class SliderRegistryUtils { + + + /** + * Get the registry path for an instance under the user's home node + * @param instanceName application instance + * @return a path to the registry location for this application instance. + */ + public static String registryPathForInstance(String instanceName) { + return RegistryUtils.servicePath( + RegistryUtils.currentUser(), SliderKeys.APP_TYPE, instanceName + ); + } + + /** + * Process a path expanding it if needed. + * Validation is delegated to later as the core registry will need + * to do that anyway + * @param path path + * @return a path maybe with some expansion + */ + public static String resolvePath(String path) { + Preconditions.checkArgument(path!=null, "null path"); + Preconditions.checkArgument(!path.isEmpty(), "empty path"); + String newpath = path; + if (path.startsWith("~/")) { + // add user expansion + newpath = RegistryUtils.homePathForCurrentUser() + path.substring(1); + } else if (path.equals("~")) { + newpath = RegistryUtils.homePathForCurrentUser(); + } + return newpath; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org