hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [07/50] [abbrv] hadoop git commit: YARN-5461. Initial code ported from slider-core module. (jianhe)
Date Tue, 11 Oct 2016 20:37:11 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
new file mode 100644
index 0000000..9e9e7ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -0,0 +1,598 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Execute a long-lived process.
+ *
+ * <p>
+ * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
+ * a short lived application; this class allows for the process to run for the
+ * life of the Java process that forked it.
+ * It is designed to be embedded inside a YARN service, though this is not
+ * the sole way that it can be used
+ * <p>
+ * Key Features:
+ * <ol>
+ *   <li>Output is streamed to the output logger provided</li>.
+ *   <li>the input stream is closed as soon as the process starts.</li>
+ *   <li>The most recent lines of output are saved to a linked list</li>.
+ *   <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent},
+ *   is raised on the start and finish of a process.</li>
+ * </ol>
+ * 
+ */
+public class LongLivedProcess implements Runnable {
+  /**
+   * Limit on number of lines to retain in the "recent" line list:{@value}
+   */
+  public static final int RECENT_LINE_LOG_LIMIT = 64;
+
+  /**
+   * Const defining the time in millis between polling for new text.
+   */
+  private static final int STREAM_READER_SLEEP_TIME = 200;
+  
+  /**
+   * limit on the length of a stream before it triggers an automatic newline.
+   */
+  private static final int LINE_LENGTH = 256;
+  private final ProcessBuilder processBuilder;
+  private Process process;
+  private Integer exitCode = null;
+  private final String name;
+  private final ExecutorService processExecutor;
+  private final ExecutorService logExecutor;
+  
+  private ProcessStreamReader processStreamReader;
+  //list of recent lines, recorded for extraction into reports
+  private final List<String> recentLines = new LinkedList<>();
+  private int recentLineLimit = RECENT_LINE_LOG_LIMIT;
+  private LongLivedProcessLifecycleEvent lifecycleCallback;
+  private final AtomicBoolean finalOutputProcessed = new AtomicBoolean(false);
+
+  /**
+   * Log supplied in the constructor for the spawned process -accessible
+   * to inner classes
+   */
+  private Logger processLog;
+  
+  /**
+   * Class log -accessible to inner classes
+   */
+  private static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class);
+
+  /**
+   *  flag to indicate that the process is done
+   */
+  private final AtomicBoolean finished = new AtomicBoolean(false);
+
+  /**
+   * Create an instance
+   * @param name process name
+   * @param processLog log for output (or null)
+   * @param commands command list
+   */
+  public LongLivedProcess(String name,
+      Logger processLog,
+      List<String> commands) {
+    Preconditions.checkArgument(commands != null, "commands");
+
+    this.name = name;
+    this.processLog = processLog;
+    ServiceThreadFactory factory = new ServiceThreadFactory(name, true);
+    processExecutor = Executors.newSingleThreadExecutor(factory);
+    logExecutor = Executors.newSingleThreadExecutor(factory);
+    processBuilder = new ProcessBuilder(commands);
+    processBuilder.redirectErrorStream(false);
+  }
+
+  /**
+   * Set the limit on recent lines to retain
+   * @param recentLineLimit size of rolling list of recent lines.
+   */
+  public void setRecentLineLimit(int recentLineLimit) {
+    this.recentLineLimit = recentLineLimit;
+  }
+
+  /**
+   * Set an optional application exit callback
+   * @param lifecycleCallback callback to notify on application exit
+   */
+  public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) {
+    this.lifecycleCallback = lifecycleCallback;
+  }
+
+  /**
+   * Add an entry to the environment
+   * @param envVar envVar -must not be null
+   * @param val value 
+   */
+  public void setEnv(String envVar, String val) {
+    Preconditions.checkArgument(envVar != null, "envVar");
+    Preconditions.checkArgument(val != null, "val");
+    processBuilder.environment().put(envVar, val);
+  }
+
+  /**
+   * Bulk set the environment from a map. This does
+   * not replace the existing environment, just extend it/overwrite single
+   * entries.
+   * @param map map to add
+   */
+  public void putEnvMap(Map<String, String> map) {
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      String val = entry.getValue();
+      String key = entry.getKey();
+      setEnv(key, val);
+    }
+  }
+
+  /**
+   * Get the process environment
+   * @param variable environment variable
+   * @return the value or null if there is no match
+   */
+  public String getEnv(String variable) {
+    return processBuilder.environment().get(variable);
+  }
+
+  /**
+   * Set the process log. Ignored once the process starts
+   * @param processLog new log ... may be null
+   */
+  public void setProcessLog(Logger processLog) {
+    this.processLog = processLog;
+  }
+
+  /**
+   * Get the process reference
+   * @return the process -null if the process is  not started
+   */
+  public Process getProcess() {
+    return process;
+  }
+
+  /**
+   * Get the process builder -this can be manipulated
+   * up to the start() operation. As there is no synchronization
+   * around it, it must only be used in the same thread setting up the commmand.
+   * @return the process builder
+   */
+  public ProcessBuilder getProcessBuilder() {
+    return processBuilder;
+  }
+
+  /**
+   * Get the command list
+   * @return the comands
+   */
+  public List<String> getCommands() {
+    return processBuilder.command();
+  }
+
+  public String getCommand() {
+    return getCommands().get(0);
+  }
+
+  /**
+   * probe to see if the process is running
+   * @return true iff the process has been started and is not yet finished
+   */
+  public boolean isRunning() {
+    return process != null && !finished.get();
+  }
+
+  /**
+   * Get the exit code: null until the process has finished
+   * @return the exit code or null
+   */
+  public Integer getExitCode() {
+    return exitCode;
+  }
+  
+    /**
+   * Get the exit code sign corrected: null until the process has finished
+   * @return the exit code or null
+   */
+  public Integer getExitCodeSignCorrected() {
+    Integer result;
+    if (exitCode != null) {
+      result = (exitCode << 24) >> 24;
+    } else {
+      result = null;
+    }
+    return result;
+  }
+
+  /**
+   * Stop the process if it is running.
+   * This will trigger an application completion event with the given exit code
+   */
+  public void stop() {
+    if (!isRunning()) {
+      return;
+    }
+    process.destroy();
+  }
+
+  /**
+   * Get a text description of the builder suitable for log output
+   * @return a multiline string 
+   */
+  protected String describeBuilder() {
+    StringBuilder buffer = new StringBuilder();
+    for (String arg : processBuilder.command()) {
+      buffer.append('"').append(arg).append("\" ");
+    }
+    return buffer.toString();
+  }
+
+  /**
+   * Dump the environment to a string builder
+   * @param buffer the buffer to append to
+   */
+  public void dumpEnv(StringBuilder buffer) {
+    buffer.append("\nEnvironment\n-----------");
+    Map<String, String> env = processBuilder.environment();
+    Set<String> keys = env.keySet();
+    List<String> sortedKeys = new ArrayList<String>(keys);
+    Collections.sort(sortedKeys);
+    for (String key : sortedKeys) {
+      buffer.append(key).append("=").append(env.get(key)).append('\n');
+    }
+  }
+
+  /**
+   * Exec the process
+   * @return the process
+   * @throws IOException on aany failure to start the process
+   * @throws FileNotFoundException if the process could not be found
+   */
+  private Process spawnChildProcess() throws IOException {
+    if (process != null) {
+      throw new IOException("Process already started");
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Spawning process:\n " + describeBuilder());
+    }
+    try {
+      process = processBuilder.start();
+    } catch (IOException e) {
+      // on windows, upconvert DOS error 2 from ::CreateProcess()
+      // to its real meaning: FileNotFound
+      if (e.toString().contains("CreateProcess error=2")) {
+        FileNotFoundException fnfe =
+            new FileNotFoundException(e.toString());
+        fnfe.initCause(e);
+        throw fnfe;
+      } else {
+        throw e;
+      }
+    }
+    return process;
+  }
+
+  /**
+   * Entry point for waiting for the program to finish
+   */
+  @Override // Runnable
+  public void run() {
+    Preconditions.checkNotNull(process, "null process");
+    LOG.debug("Lifecycle callback thread running");
+    //notify the callback that the process has started
+    if (lifecycleCallback != null) {
+      lifecycleCallback.onProcessStarted(this);
+    }
+    try {
+      //close stdin for the process
+      IOUtils.closeStream(process.getOutputStream());
+      exitCode = process.waitFor();
+    } catch (InterruptedException e) {
+      LOG.debug("Process wait interrupted -exiting thread", e);
+    } finally {
+      //here the process has finished
+      LOG.debug("process {} has finished", name);
+      //tell the logger it has to finish too
+      finished.set(true);
+
+      // shut down the threads
+      logExecutor.shutdown();
+      try {
+        logExecutor.awaitTermination(60, TimeUnit.SECONDS);
+      } catch (InterruptedException ignored) {
+        //ignored
+      }
+
+      //now call the callback if it is set
+      if (lifecycleCallback != null) {
+        lifecycleCallback.onProcessExited(this, exitCode,
+            getExitCodeSignCorrected());
+      }
+    }
+  }
+
+  /**
+   * Spawn the application
+   * @throws IOException IO problems
+   */
+  public void start() throws IOException {
+
+    spawnChildProcess();
+    processStreamReader =
+        new ProcessStreamReader(processLog, STREAM_READER_SLEEP_TIME);
+    logExecutor.submit(processStreamReader);
+    processExecutor.submit(this);
+  }
+
+  /**
+   * Get the lines of recent output
+   * @return the last few lines of output; an empty list if there are none
+   * or the process is not actually running
+   */
+  public synchronized List<String> getRecentOutput() {
+    return new ArrayList<String>(recentLines);
+  }
+
+  /**
+   * @return whether lines of recent output are empty
+   */
+  public synchronized boolean isRecentOutputEmpty() {
+    return recentLines.isEmpty();
+  }
+
+  /**
+   * Query to see if the final output has been processed
+   * @return
+   */
+  public boolean isFinalOutputProcessed() {
+    return finalOutputProcessed.get();
+  }
+
+  /**
+   * Get the recent output from the process, or [] if not defined
+   *
+   * @param finalOutput flag to indicate "wait for the final output of the process"
+   * @param duration the duration, in ms, 
+   * ro wait for recent output to become non-empty
+   * @return a possibly empty list
+   */
+  public List<String> getRecentOutput(boolean finalOutput, int duration) {
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() - start <= duration) {
+      boolean finishedOutput;
+      if (finalOutput) {
+        // final flag means block until all data is done
+        finishedOutput = isFinalOutputProcessed();
+      } else {
+        // there is some output
+        finishedOutput = !isRecentOutputEmpty();
+      }
+      if (finishedOutput) {
+        break;
+      }
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+    }
+    return getRecentOutput();
+  }
+
+  /**
+   * add the recent line to the list of recent lines; deleting
+   * an earlier on if the limit is reached.
+   *
+   * Implementation note: yes, a circular array would be more
+   * efficient, especially with some power of two as the modulo,
+   * but is it worth the complexity and risk of errors for
+   * something that is only called once per line of IO?
+   * @param line line to record
+   * @param isErrorStream is the line from the error stream
+   * @param logger logger to log to - null for no logging
+   */
+  private synchronized void recordRecentLine(String line,
+      boolean isErrorStream,
+      Logger logger) {
+    if (line == null) {
+      return;
+    }
+    String entry = (isErrorStream ? "[ERR] " : "[OUT] ") + line;
+    recentLines.add(entry);
+    if (recentLines.size() > recentLineLimit) {
+      recentLines.remove(0);
+    }
+    if (logger != null) {
+      if (isErrorStream) {
+        logger.warn(line);
+      } else {
+        logger.info(line);
+      }
+    }
+  }
+
+  /**
+   * Class to read data from the two process streams, and, when run in a thread
+   * to keep running until the <code>done</code> flag is set. 
+   * Lines are fetched from stdout and stderr and logged at info and error
+   * respectively.
+   */
+
+  private class ProcessStreamReader implements Runnable {
+    private final Logger streamLog;
+    private final int sleepTime;
+
+    /**
+     * Create an instance
+     * @param streamLog log -or null to disable logging (recent entries
+     * will still be retained)
+     * @param sleepTime time to sleep when stopping
+     */
+    private ProcessStreamReader(Logger streamLog, int sleepTime) {
+      this.streamLog = streamLog;
+      this.sleepTime = sleepTime;
+    }
+
+    /**
+     * Return a character if there is one, -1 if nothing is ready yet
+     * @param reader reader
+     * @return the value from the reader, or -1 if it is not ready
+     * @throws IOException IO problems
+     */
+    private int readCharNonBlocking(BufferedReader reader) throws IOException {
+      if (reader.ready()) {
+        return reader.read();
+      } else {
+        return -1;
+      }
+    }
+
+    /**
+     * Read in a line, or, if the limit has been reached, the buffer
+     * so far
+     * @param reader source of data
+     * @param line line to build
+     * @param limit limit of line length
+     * @return true if the line can be printed
+     * @throws IOException IO trouble
+     */
+    @SuppressWarnings("NestedAssignment")
+    private boolean readAnyLine(BufferedReader reader,
+                                StringBuilder line,
+                                int limit)
+      throws IOException {
+      int next;
+      while ((-1 != (next = readCharNonBlocking(reader)))) {
+        if (next != '\n') {
+          line.append((char) next);
+          limit--;
+          if (line.length() > limit) {
+            //enough has been read in to print it any
+            return true;
+          }
+        } else {
+          //line end return flag to say so
+          return true;
+        }
+      }
+      //here the end of the stream is hit, or the limit
+      return false;
+    }
+
+
+    @Override //Runnable
+    @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
+    public void run() {
+      BufferedReader errReader = null;
+      BufferedReader outReader = null;
+      StringBuilder outLine = new StringBuilder(LINE_LENGTH);
+      StringBuilder errorLine = new StringBuilder(LINE_LENGTH);
+      try {
+        errReader = new BufferedReader(
+            new InputStreamReader(process.getErrorStream()));
+        outReader = new BufferedReader(
+            new InputStreamReader(process.getInputStream()));
+        while (!finished.get()) {
+          boolean processed = false;
+          if (readAnyLine(errReader, errorLine, LINE_LENGTH)) {
+            recordRecentLine(errorLine.toString(), true, streamLog);
+            errorLine.setLength(0);
+            processed = true;
+          }
+          if (readAnyLine(outReader, outLine, LINE_LENGTH)) {
+            recordRecentLine(outLine.toString(), false, streamLog);
+            outLine.setLength(0);
+            processed |= true;
+          }
+          if (!processed && !finished.get()) {
+            //nothing processed: wait a bit for data.
+            try {
+              Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {
+              //ignore this, rely on the done flag
+              LOG.debug("Ignoring ", e);
+            }
+          }
+        }
+        // finished: cleanup
+
+        //print the current error line then stream through the rest
+        recordFinalOutput(errReader, errorLine, true, streamLog);
+        //now do the info line
+        recordFinalOutput(outReader, outLine, false, streamLog);
+
+      } catch (Exception ignored) {
+        LOG.warn("encountered {}", ignored, ignored);
+        //process connection has been torn down
+      } finally {
+        // close streams
+        IOUtils.closeStream(errReader);
+        IOUtils.closeStream(outReader);
+        //mark output as done
+        finalOutputProcessed.set(true);
+      }
+    }
+
+    /**
+     * Record the final output of a process stream
+     * @param reader reader of output
+     * @param lineBuilder string builder into which line is built
+     * @param isErrorStream flag to indicate whether or not this is the
+     * is the line from the error stream
+     * @param logger logger to log to
+     * @throws IOException
+     */
+    protected void recordFinalOutput(BufferedReader reader,
+        StringBuilder lineBuilder, boolean isErrorStream, Logger logger) throws
+        IOException {
+      String line = lineBuilder.toString();
+      recordRecentLine(line, isErrorStream, logger);
+      line = reader.readLine();
+      while (line != null) {
+        recordRecentLine(line, isErrorStream, logger);
+        line = reader.readLine();
+        if (Thread.interrupted()) {
+          break;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
new file mode 100644
index 0000000..a13b508
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
@@ -0,0 +1,41 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+/**
+ * Callback when a long-lived application exits
+ */
+public interface LongLivedProcessLifecycleEvent {
+
+  /**
+   * Callback when a process is started
+   * @param process the process invoking the callback
+   */
+  void onProcessStarted(LongLivedProcess process);
+
+  /**
+   * Callback when a process has finished
+   * @param process the process invoking the callback
+   * @param exitCode exit code from the process
+   * @param signCorrectedCode the code- as sign corrected
+   */
+  void onProcessExited(LongLivedProcess process,
+      int exitCode,
+      int signCorrectedCode);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
new file mode 100644
index 0000000..a123584
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.Service;
+
+import java.util.List;
+
+/**
+ * Interface for accessing services that contain one or more child
+ * services. 
+ */
+public interface ServiceParent extends Service {
+
+  /**
+   * Add a child service. It must be in a consistent state with the
+   * service to which it is being added.
+   * @param service the service to add.
+   */
+  void addService(Service service);
+
+  /**
+   * Get an unmodifiable list of services
+   * @return a list of child services at the time of invocation -
+   * added services will not be picked up.
+   */
+  List<Service> getServices();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
new file mode 100644
index 0000000..5ebf77c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.Service;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A runnable which terminates its owner; it also catches any
+ * exception raised and can serve it back.  
+ * 
+ */
+public class ServiceTerminatingCallable<V> implements Callable<V> {
+
+  private final Service owner;
+  private Exception exception;
+  /**
+   * This is the callback
+   */
+  private final Callable<V> callable;
+
+
+  /**
+   * Create an instance. If the owner is null, the owning service
+   * is not terminated.
+   * @param owner owning service -can be null
+   * @param callable callback.
+   */
+  public ServiceTerminatingCallable(Service owner,
+      Callable<V> callable) {
+    Preconditions.checkArgument(callable != null, "null callable");
+    this.owner = owner;
+    this.callable = callable;
+  }
+
+
+  /**
+   * Get the owning service
+   * @return the service to receive notification when
+   * the runnable completes.
+   */
+  public Service getOwner() {
+    return owner;
+  }
+
+  /**
+   * Any exception raised by inner <code>action's</code> run.
+   * @return an exception or null.
+   */
+  public Exception getException() {
+    return exception;
+  }
+
+  /**
+   * Delegates the call to the callable supplied in the constructor,
+   * then calls the stop() operation on its owner. Any exception
+   * is caught, noted and rethrown
+   * @return the outcome of the delegated call operation
+   * @throws Exception if one was raised.
+   */
+  @Override
+  public V call() throws Exception {
+    try {
+      return callable.call();
+    } catch (Exception e) {
+      exception = e;
+      throw e;
+    } finally {
+      if (owner != null) {
+        owner.stop();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
new file mode 100644
index 0000000..dc591df
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.Service;
+
+/**
+ * A runnable which terminates its after running; it also catches any
+ * exception raised and can serve it back. 
+ */
+public class ServiceTerminatingRunnable implements Runnable {
+
+  private final Service owner;
+  private final Runnable action;
+  private Exception exception;
+
+  /**
+   * Create an instance.
+   * @param owner owning service
+   * @param action action to execute before terminating the service
+   */
+  public ServiceTerminatingRunnable(Service owner, Runnable action) {
+    Preconditions.checkArgument(owner != null, "null owner");
+    Preconditions.checkArgument(action != null, "null action");
+    this.owner = owner;
+    this.action = action;
+  }
+
+  /**
+   * Get the owning service.
+   * @return the service to receive notification when
+   * the runnable completes.
+   */
+  public Service getOwner() {
+    return owner;
+  }
+
+  /**
+   * Any exception raised by inner <code>action's</code> run.
+   * @return an exception or null.
+   */
+  public Exception getException() {
+    return exception;
+  }
+
+  @Override
+  public void run() {
+    try {
+      action.run();
+    } catch (Exception e) {
+      exception = e;
+    }
+    owner.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
new file mode 100644
index 0000000..737197b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A thread factory that creates threads (possibly daemon threads)
+ * using the name and naming policy supplied.
+ * The thread counter starts at 1, increments atomically, 
+ * and is supplied as the second argument in the format string.
+ * 
+ * A static method, {@link #singleThreadExecutor(String, boolean)},
+ * exists to simplify the construction of an executor with a single well-named
+ * threads. 
+ * 
+ * Example
+ * <pre>
+ *  ExecutorService exec = ServiceThreadFactory.newSingleThreadExecutor("live", true)
+ * </pre>
+ */
+public class ServiceThreadFactory implements ThreadFactory {
+
+  private static final AtomicInteger counter = new AtomicInteger(1);
+
+  /**
+   * Default format for thread names: {@value}.
+   */
+  public static final String DEFAULT_NAMING_FORMAT = "%s-%03d";
+  private final String name;
+  private final boolean daemons;
+  private final String namingFormat;
+
+  /**
+   * Create an instance
+   * @param name base thread name
+   * @param daemons flag to indicate the threads should be marked as daemons
+   * @param namingFormat format string to generate thread names from
+   */
+  public ServiceThreadFactory(String name,
+      boolean daemons,
+      String namingFormat) {
+    Preconditions.checkArgument(name != null, "null name");
+    Preconditions.checkArgument(namingFormat != null, "null naming format");
+    this.name = name;
+    this.daemons = daemons;
+    this.namingFormat = namingFormat;
+  }
+
+  /**
+   * Create an instance with the default naming format.
+   * @param name base thread name
+   * @param daemons flag to indicate the threads should be marked as daemons
+   */
+  public ServiceThreadFactory(String name,
+      boolean daemons) {
+    this(name, daemons, DEFAULT_NAMING_FORMAT);
+  }
+
+  @Override
+  public Thread newThread(Runnable r) {
+    Preconditions.checkArgument(r != null, "null runnable");
+    String threadName =
+        String.format(namingFormat, name, counter.getAndIncrement());
+    Thread thread = new Thread(r, threadName);
+    thread.setDaemon(daemons);
+    return thread;
+  }
+
+  /**
+   * Create a single thread executor using this naming policy.
+   * @param name base thread name
+   * @param daemons flag to indicate the threads should be marked as daemons
+   * @return an executor
+   */
+  public static ExecutorService singleThreadExecutor(String name,
+      boolean daemons) {
+    return Executors.newSingleThreadExecutor(
+        new ServiceThreadFactory(name, daemons));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
new file mode 100644
index 0000000..65d14b7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service that calls the supplied callback when it is started -after the 
+ * given delay.
+ *
+ * It can be configured to stop itself after the callback has
+ * completed, marking any exception raised as the exception of this service.
+ * The notifications come in on a callback thread -a thread that is only
+ * started in this service's <code>start()</code> operation.
+ */
+public class WorkflowCallbackService<V> extends
+    WorkflowScheduledExecutorService<ScheduledExecutorService> {
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(WorkflowCallbackService.class);
+
+  /**
+   * This is the callback.
+   */
+  private final Callable<V> callback;
+  private final int delay;
+  private final ServiceTerminatingCallable<V> command;
+
+  private ScheduledFuture<V> scheduledFuture;
+
+  /**
+   * Create an instance of the service
+   * @param name service name
+   * @param callback callback to invoke
+   * @param delay delay -or 0 for no delay
+   * @param terminate terminate this service after the callback?
+   */
+  public WorkflowCallbackService(String name,
+      Callable<V> callback,
+      int delay,
+      boolean terminate) {
+    super(name);
+    Preconditions.checkNotNull(callback, "Null callback argument");
+    this.callback = callback;
+    this.delay = delay;
+    command = new ServiceTerminatingCallable<V>(
+        terminate ? this : null,
+        callback);
+  }
+
+  public ScheduledFuture<V> getScheduledFuture() {
+    return scheduledFuture;
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
+    ScheduledExecutorService executorService =
+        Executors.newSingleThreadScheduledExecutor(
+            new ServiceThreadFactory(getName(), true));
+    setExecutor(executorService);
+    scheduledFuture =
+        executorService.schedule(command, delay, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Stop the service.
+   * If there is any exception noted from any executed notification,
+   * note the exception in this class
+   * @throws Exception exception.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+    // propagate any failure
+    if (getCallbackException() != null) {
+      throw getCallbackException();
+    }
+  }
+
+  /**
+   * Get the exception raised by a callback. Will always be null if the 
+   * callback has not been executed; will only be non-null after any success.
+   * @return a callback
+   */
+  public Exception getCallbackException() {
+    return command.getException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
new file mode 100644
index 0000000..9c653f3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * An extended composite service which stops itself if any child service
+ * fails, or when all its children have successfully stopped without failure.
+ *
+ * Lifecycle
+ * <ol>
+ *   <li>If any child exits with a failure: this service stops, propagating
+ *   the exception.</li>
+ *   <li>When all child services has stopped, this service stops itself</li>
+ * </ol>
+ *
+ */
+public class WorkflowCompositeService extends CompositeService
+    implements ServiceParent, ServiceStateChangeListener {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(WorkflowCompositeService.class);
+
+  /**
+   * Deadlock-avoiding overridden config for slider services; see SLIDER-1052
+   */
+  private volatile Configuration configuration;
+
+  /**
+   * Construct an instance
+   * @param name name of this service instance
+   */
+  public WorkflowCompositeService(String name) {
+    super(name);
+  }
+
+  @Override
+  public Configuration getConfig() {
+    return configuration;
+  }
+
+  @Override
+  protected void setConfig(Configuration conf) {
+    super.setConfig(conf);
+    configuration = conf;
+  }
+
+  /**
+   * Construct an instance with the default name.
+   */
+  public WorkflowCompositeService() {
+    this("WorkflowCompositeService");
+  }
+
+  /**
+   * Varargs constructor
+   * @param name name of this service instance
+   * @param children children
+   */
+  public WorkflowCompositeService(String name, Service... children) {
+    this(name);
+    for (Service child : children) {
+      addService(child);
+    }
+  }
+
+  /**
+   * Construct with a list of children
+   * @param name name of this service instance
+   * @param children children to add
+   */
+  public WorkflowCompositeService(String name, List<Service> children) {
+    this(name);
+    for (Service child : children) {
+      addService(child);
+    }
+  }
+
+  /**
+   * Add a service, and register it
+   * @param service the {@link Service} to be added.
+   * Important: do not add a service to a parent during your own serviceInit/start,
+   * in Hadoop 2.2; you will trigger a ConcurrentModificationException.
+   */
+  @Override
+  public synchronized void addService(Service service) {
+    Preconditions.checkArgument(service != null, "null service argument");
+    service.registerServiceListener(this);
+    super.addService(service);
+  }
+
+  /**
+   * When this service is started, any service stopping with a failure
+   * exception is converted immediately into a failure of this service, 
+   * storing the failure and stopping ourselves.
+   * @param child the service that has changed.
+   */
+  @Override
+  public void stateChanged(Service child) {
+    //if that child stopped while we are running:
+    if (isInState(STATE.STARTED) && child.isInState(STATE.STOPPED)) {
+      // a child service has stopped
+      //did the child fail? if so: propagate
+      Throwable failureCause = child.getFailureCause();
+      if (failureCause != null) {
+        LOG.info("Child service " + child + " failed", failureCause);
+        //failure. Convert to an exception
+        Exception e = (failureCause instanceof Exception) ?
+            (Exception) failureCause : new Exception(failureCause);
+        //flip ourselves into the failed state
+        noteFailure(e);
+        stop();
+      } else {
+        LOG.info("Child service completed {}", child);
+        if (areAllChildrenStopped()) {
+          LOG.info("All children are halted: stopping");
+          stop();
+        }
+      }
+    }
+  }
+
+  /**
+   * Probe to query if all children are stopped -simply
+   * by taking a snapshot of the child service list and enumerating
+   * their state. 
+   * The state of the children may change during this operation -that will
+   * not get picked up.
+   * @return true if all the children are stopped.
+   */
+  private boolean areAllChildrenStopped() {
+    List<Service> children = getServices();
+    boolean stopped = true;
+    for (Service child : children) {
+      if (!child.isInState(STATE.STOPPED)) {
+        stopped = false;
+        break;
+      }
+    }
+    return stopped;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
new file mode 100644
index 0000000..7409d32
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.AbstractService;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * A service that hosts an executor -when the service is stopped,
+ * {@link ExecutorService#shutdownNow()} is invoked.
+ */
+public class WorkflowExecutorService<E extends ExecutorService> extends AbstractService {
+
+  private E executor;
+
+  /**
+   * Construct an instance with the given name -but
+   * no executor
+   * @param name service name
+   */
+  public WorkflowExecutorService(String name) {
+    this(name, null);
+  }
+
+  /**
+   * Construct an instance with the given name and executor
+   * @param name service name
+   * @param executor exectuor
+   */
+  public WorkflowExecutorService(String name,
+      E executor) {
+    super(name);
+    this.executor = executor;
+  }
+
+  /**
+   * Get the executor
+   * @return the executor
+   */
+  public synchronized E getExecutor() {
+    return executor;
+  }
+
+  /**
+   * Set the executor. Only valid if the current one is null
+   * @param executor executor
+   */
+  public synchronized void setExecutor(E executor) {
+    Preconditions.checkState(this.executor == null,
+        "Executor already set");
+    this.executor = executor;
+  }
+
+  /**
+   * Execute the runnable with the executor (which 
+   * must have been created already)
+   * @param runnable runnable to execute
+   */
+  public void execute(Runnable runnable) {
+    getExecutor().execute(runnable);
+  }
+
+  /**
+   * Submit a callable
+   * @param callable callable
+   * @param <V> type of the final get
+   * @return a future to wait on
+   */
+  public <V> Future<V> submit(Callable<V> callable) {
+    return getExecutor().submit(callable);
+  }
+
+  /**
+   * Stop the service: halt the executor. 
+   * @throws Exception exception.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    stopExecutor();
+    super.serviceStop();
+  }
+
+  /**
+   * Stop the executor if it is not null.
+   * This uses {@link ExecutorService#shutdownNow()}
+   * and so does not block until they have completed.
+   */
+  protected synchronized void stopExecutor() {
+    if (executor != null) {
+      executor.shutdownNow();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
new file mode 100644
index 0000000..b71530f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A YARN service that maps the start/stop lifecycle of an RPC server
+ * to the YARN service lifecycle. 
+ */
+public class WorkflowRpcService extends AbstractService {
+
+  /** RPC server*/
+  private final Server server;
+
+  /**
+   * Construct an instance
+   * @param name service name
+   * @param server service to stop
+   */
+  public WorkflowRpcService(String name, Server server) {
+    super(name);
+    Preconditions.checkArgument(server != null, "Null server");
+    this.server = server;
+  }
+
+  /**
+   * Get the server
+   * @return the server
+   */
+  public Server getServer() {
+    return server;
+  }
+
+  /**
+   * Get the socket address of this server
+   * @return the address this server is listening on
+   */
+  public InetSocketAddress getConnectAddress() {
+    return NetUtils.getConnectAddress(server);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    server.start();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (server != null) {
+      server.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java
new file mode 100644
index 0000000..e9f53ed
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Scheduled executor or subclass thereof
+ * @param <E> scheduled executor service type
+ */
+public class WorkflowScheduledExecutorService<E extends ScheduledExecutorService>
+    extends WorkflowExecutorService<E> {
+
+  public WorkflowScheduledExecutorService(String name) {
+    super(name);
+  }
+
+  public WorkflowScheduledExecutorService(String name,
+      E executor) {
+    super(name, executor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
new file mode 100644
index 0000000..97f97e8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.hadoop.service.ServiceStateException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This resembles the YARN CompositeService, except that it
+ * starts one service after another
+ * 
+ * Workflow
+ * <ol>
+ *   <li>When the <code>WorkflowSequenceService</code> instance is
+ *   initialized, it only initializes itself.</li>
+ *   
+ *   <li>When the <code>WorkflowSequenceService</code> instance is
+ *   started, it initializes then starts the first of its children.
+ *   If there are no children, it immediately stops.</li>
+ *   
+ *   <li>When the active child stops, it did not fail, and the parent has not
+ *   stopped -then the next service is initialized and started. If there is no
+ *   remaining child the parent service stops.</li>
+ *   
+ *   <li>If the active child did fail, the parent service notes the exception
+ *   and stops -effectively propagating up the failure.
+ *   </li>
+ * </ol>
+ * 
+ * New service instances MAY be added to a running instance -but no guarantees
+ * can be made as to whether or not they will be run.
+ */
+
+public class WorkflowSequenceService extends AbstractService implements
+    ServiceParent, ServiceStateChangeListener {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(WorkflowSequenceService.class);
+
+  /**
+   * list of services
+   */
+  private final List<Service> serviceList = new ArrayList<>();
+
+  /**
+   * The currently active service.
+   * Volatile -may change & so should be read into a 
+   * local variable before working with
+   */
+  private volatile Service activeService;
+
+  /**
+  the previous service -the last one that finished. 
+  null if one did not finish yet
+   */
+  private volatile Service previousService;
+  
+  private boolean stopIfNoChildServicesAtStartup = true;
+
+  /**
+   * Construct an instance
+   * @param name service name
+   */
+  public WorkflowSequenceService(String name) {
+    super(name);
+  }
+
+  /**
+   * Construct an instance with the default name
+   */
+  public WorkflowSequenceService() {
+    this("WorkflowSequenceService");
+  }
+
+  /**
+   * Create a service sequence with the given list of services
+   * @param name service name
+   * @param children initial sequence
+   */
+  public WorkflowSequenceService(String name, Service... children) {
+    super(name);
+    for (Service service : children) {
+      addService(service);
+    }
+  }  /**
+   * Create a service sequence with the given list of services
+   * @param name service name
+   * @param children initial sequence
+   */
+  public WorkflowSequenceService(String name, List<Service> children) {
+    super(name);
+    for (Service service : children) {
+      addService(service);
+    }
+  }
+
+  /**
+   * Get the current service -which may be null
+   * @return service running
+   */
+  public Service getActiveService() {
+    return activeService;
+  }
+
+  /**
+   * Get the previously active service
+   * @return the service last run, or null if there is none.
+   */
+  public Service getPreviousService() {
+    return previousService;
+  }
+
+  protected void setStopIfNoChildServicesAtStartup(boolean stopIfNoChildServicesAtStartup) {
+    this.stopIfNoChildServicesAtStartup = stopIfNoChildServicesAtStartup;
+  }
+
+  /**
+   * When started
+   * @throws Exception
+   */
+  @Override
+  protected void serviceStart() throws Exception {
+    if (!startNextService() && stopIfNoChildServicesAtStartup) {
+        //nothing to start -so stop
+        stop();
+    }
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    //stop current service.
+    //this triggers a callback that is caught and ignored
+    Service current = activeService;
+    previousService = current;
+    activeService = null;
+    if (current != null) {
+      current.stop();
+    }
+  }
+
+  /**
+   * Start the next service in the list.
+   * Return false if there are no more services to run, or this
+   * service has stopped
+   * @return true if a service was started
+   * @throws RuntimeException from any init or start failure
+   * @throws ServiceStateException if this call is made before
+   * the service is started
+   */
+  public synchronized boolean startNextService() {
+    if (isInState(STATE.STOPPED)) {
+      //downgrade to a failed
+      LOG.debug("Not starting next service -{} is stopped", this);
+      return false;
+    }
+    if (!isInState(STATE.STARTED)) {
+      //reject attempts to start a service too early
+      throw new ServiceStateException(
+        "Cannot start a child service when not started");
+    }
+    if (serviceList.isEmpty()) {
+      //nothing left to run
+      return false;
+    }
+    if (activeService != null && activeService.getFailureCause() != null) {
+      //did the last service fail? Is this caused by some premature callback?
+      LOG.debug("Not starting next service due to a failure of {}",
+          activeService);
+      return false;
+    }
+    //bear in mind that init & start can fail, which
+    //can trigger re-entrant calls into the state change listener.
+    //by setting the current service to null
+    //the start-next-service logic is skipped.
+    //now, what does that mean w.r.t exit states?
+
+    activeService = null;
+    Service head = serviceList.remove(0);
+
+    try {
+      head.init(getConfig());
+      head.registerServiceListener(this);
+      head.start();
+    } catch (RuntimeException e) {
+      noteFailure(e);
+      throw e;
+    }
+    //at this point the service must have explicitly started & not failed,
+    //else an exception would have been raised
+    activeService = head;
+    return true;
+  }
+
+  /**
+   * State change event relays service stop events to
+   * {@link #onServiceCompleted(Service)}. Subclasses can
+   * extend that with extra logic
+   * @param service the service that has changed.
+   */
+  @Override
+  public void stateChanged(Service service) {
+    // only react to the state change when it is the current service
+    // and it has entered the STOPPED state
+    if (service == activeService && service.isInState(STATE.STOPPED)) {
+      onServiceCompleted(service);
+    }
+  }
+
+  /**
+   * handler for service completion: base class starts the next service
+   * @param service service that has completed
+   */
+  protected synchronized void onServiceCompleted(Service service) {
+    LOG.info("Running service stopped: {}", service);
+    previousService = activeService;
+    
+
+    //start the next service if we are not stopped ourselves
+    if (isInState(STATE.STARTED)) {
+
+      //did the service fail? if so: propagate
+      Throwable failureCause = service.getFailureCause();
+      if (failureCause != null) {
+        Exception e = (failureCause instanceof Exception) ?
+                      (Exception) failureCause : new Exception(failureCause);
+        noteFailure(e);
+        stop();
+      }
+      
+      //start the next service
+      boolean started;
+      try {
+        started = startNextService();
+      } catch (Exception e) {
+        //something went wrong here
+        noteFailure(e);
+        started = false;
+      }
+      if (!started) {
+        //no start because list is empty
+        //stop and expect the notification to go upstream
+        stop();
+      }
+    } else {
+      //not started, so just note that the current service
+      //has gone away
+      activeService = null;
+    }
+  }
+
+  /**
+   * Add the passed {@link Service} to the list of services managed by this
+   * {@link WorkflowSequenceService}
+   * @param service the {@link Service} to be added
+   */
+  @Override
+  public synchronized void addService(Service service) {
+    Preconditions.checkArgument(service != null, "null service argument");
+    LOG.debug("Adding service {} ", service.getName());
+    synchronized (serviceList) {
+      serviceList.add(service);
+    }
+  }
+
+  /**
+   * Get an unmodifiable list of services
+   * @return a list of child services at the time of invocation -
+   * added services will not be picked up.
+   */
+  @Override //Parent
+  public synchronized List<Service> getServices() {
+    return Collections.unmodifiableList(serviceList);
+  }
+
+  @Override // Object
+  public synchronized String toString() {
+    return super.toString() + "; current service " + activeService
+           + "; queued service count=" + serviceList.size();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
new file mode 100644
index 0000000..36d059a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+/**
+
+<p>
+ This package contains classes which can be aggregated to build up
+ complex workflows of services: sequences of operations, callbacks
+ and composite services with a shared lifespan.
+ </p>
+
+<h2>
+ Core concepts:
+</h2>
+
+
+<p>
+The Workflow Services are set of Hadoop YARN services, all implementing
+the {@link org.apache.hadoop.service.Service} API.
+They are designed to be aggregated, to be composed to produce larger
+composite services which than perform ordered operations, notify other services
+when work has completed, and to propagate failure up the service hierarchy.
+</p>
+<p>
+Service instances may a limited lifespan, and may self-terminate when
+they consider it appropriate.</p>
+<p>
+Workflow Services that have children implement the
+{@link org.apache.slider.server.services.workflow.ServiceParent}
+class, which provides (thread-safe) access to the children -allowing new children
+to be added, and existing children to be ennumerated. The implement policies
+on how to react to the termination of children -so can sequence operations
+which terminate themselves when complete.
+</p>
+
+<p>
+Workflow Services may be subclassed to extend their behavior, or to use them
+in specific applications. Just as the standard
+{@link org.apache.hadoop.service.CompositeService}
+is often subclassed to aggregate child services, the
+{@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+can be used instead -adding the feature that failing services trigger automatic
+parent shutdown. If that is the desired operational mode of a class,
+swapping the composite service implementation may be sufficient to adopt it.
+</p>
+
+
+<h2> How do the workflow services differ from the standard YARN services? </h2>
+
+ <p>
+ 
+ There is exactly one standard YARN service for managing children, the
+ {@link org.apache.hadoop.service.CompositeService}.
+ </p><p>
+ The {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ shares the same model of "child services, all inited and started together".
+ Where it differs is that if any child service stops -either due to a failure
+ or to an action which invokes that service's
+ {@link org.apache.hadoop.service.Service#stop()} method.
+ </p>
+ <p>
+
+In contrast, the original <code>CompositeService</code> class starts its children
+in its{@link org.apache.hadoop.service.Service#start()}  method, but does not
+listen or react to any child service halting. As a result, changes in child 
+state are not automatically detected or propagated, other than failures in
+the actual init() and start() methods.
+</p>
+
+<p>
+If a child service runs until completed -that is it will not be stopped until
+instructed to do so, and if it is only the parent service that attempts to
+stop the child, then this difference is unimportant. 
+</p>
+<p>
+
+However, if any service that depends upon all it child services running -
+and if those child services are written so as to stop when they fail, using
+the <code>WorkflowCompositeService</code> as a base class will enable the 
+parent service to be automatically notified of a child stopping.
+
+</p>
+<p>
+The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
+resembles the composite service in API, but its workflow is different. It
+initializes and starts its children one-by-one, only starting the second after
+the first one succeeds, the third after the second, etc. If any service in
+the sequence fails, the parent <code>WorkflowSequenceService</code> stops, 
+reporting the same exception. 
+</p>
+
+<p>
+The {@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+Executes a process when started, and binds to the life of that process. When the
+process terminates, so does the service -and vice versa. This service enables
+external processes to be executed as part of a sequence of operations -or,
+using the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+in parallel with other services, terminating the process when the other services
+stop -and vice versa.
+</p>
+
+<p>
+The {@link org.apache.slider.server.services.workflow.WorkflowCallbackService}
+executes a {@link java.util.concurrent.Callable} callback a specified delay
+after the service is started, then potentially terminates itself.
+This is useful for callbacks when a workflow  reaches a specific point
+-or simply for executing arbitrary code in the workflow.
+
+ </p>
+
+
+<h2>
+Other Workflow Services
+</h2>
+
+There are some minor services that have proven useful within aggregate workflows,
+and simply in applications which are built from composite YARN services.
+
+ <ul>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowRpcService }:
+ Maintains a reference to an RPC {@link org.apache.hadoop.ipc.Server} instance.
+ When the service is started, so is the RPC server. Similarly, when the service
+ is stopped, so is the RPC server instance. 
+ </li>
+
+ <li>{@link org.apache.slider.server.services.workflow.ClosingService}: Closes
+ an instance of {@link java.io.Closeable} when the service is stopped. This
+ is purely a housekeeping class.
+ </li>
+
+ </ul>
+
+ Lower-level classes 
+ <ul>
+ <li>{@link org.apache.slider.server.services.workflow.ServiceTerminatingRunnable }:
+ A {@link java.lang.Runnable} which runs the runnable supplied in its constructor
+ then signals its owning service to stop once that runnable is completed. 
+ Any exception raised in the run is stored.
+ </li>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowExecutorService}:
+ A base class for services that wish to have a {@link java.util.concurrent.ExecutorService}
+ with a lifespan mapped to that of a service. When the service is stopped, the
+ {@link java.util.concurrent.ExecutorService#shutdownNow()} method is called to
+ attempt to shut down all running tasks.
+ </li>
+ <li>{@link org.apache.slider.server.services.workflow.ServiceThreadFactory}:
+ This is a simple {@link java.util.concurrent.ThreadFactory} which generates
+ meaningful thread names. It can be used as a parameter to constructors of 
+ {@link java.util.concurrent.ExecutorService} instances, to ensure that
+ log information can tie back text to the related services</li>
+ </ul>
+
+
+
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java
new file mode 100644
index 0000000..254bf27
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.yarnregistry;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.registry.client.api.BindFlags;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.slider.common.tools.SliderUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.join;
+
+/**
+ * Registry view for providers. This tracks where the service
+ * is registered, offers access to the record and other things.
+ */
+public class YarnRegistryViewForProviders {
+
+  private final RegistryOperations registryOperations;
+
+  private final String user;
+
+  private final String sliderServiceClass;
+  private final String instanceName;
+  private final ApplicationAttemptId applicationAttemptId;
+  /**
+   * Record used where the service registered itself.
+   * Null until the service is registered
+   */
+  private ServiceRecord selfRegistration;
+
+  /**
+   * Path where record was registered
+   * Null until the service is registered
+   */
+  private String selfRegistrationPath;
+
+  public YarnRegistryViewForProviders(RegistryOperations registryOperations,
+      String user,
+      String sliderServiceClass,
+      String instanceName,
+      ApplicationAttemptId applicationAttemptId) {
+    Preconditions.checkArgument(registryOperations != null,
+        "null registry operations");
+    Preconditions.checkArgument(user != null, "null user");
+    Preconditions.checkArgument(SliderUtils.isSet(sliderServiceClass),
+        "unset service class");
+    Preconditions.checkArgument(SliderUtils.isSet(instanceName),
+        "instanceName");
+    Preconditions.checkArgument(applicationAttemptId != null,
+        "null applicationAttemptId");
+    this.registryOperations = registryOperations;
+    this.user = user;
+    this.sliderServiceClass = sliderServiceClass;
+    this.instanceName = instanceName;
+    this.applicationAttemptId = applicationAttemptId;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getSliderServiceClass() {
+    return sliderServiceClass;
+  }
+
+  public String getInstanceName() {
+    return instanceName;
+  }
+
+  public RegistryOperations getRegistryOperations() {
+    return registryOperations;
+  }
+
+  public ServiceRecord getSelfRegistration() {
+    return selfRegistration;
+  }
+
+  private void setSelfRegistration(ServiceRecord selfRegistration) {
+    this.selfRegistration = selfRegistration;
+  }
+
+  /**
+   * Get the path to where the service has registered itself.
+   * Null until the service is registered
+   * @return the service registration path.
+   */
+  public String getSelfRegistrationPath() {
+    return selfRegistrationPath;
+  }
+
+  /**
+   * Get the absolute path to where the service has registered itself.
+   * This includes the base registry path
+   * Null until the service is registered
+   * @return the service registration path.
+   */
+  public String getAbsoluteSelfRegistrationPath() {
+    if (selfRegistrationPath == null) {
+      return null;
+    }
+    String root = registryOperations.getConfig().getTrimmed(
+        RegistryConstants.KEY_REGISTRY_ZK_ROOT,
+        RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
+    return RegistryPathUtils.join(root, selfRegistrationPath);
+  }
+
+  /**
+   * Add a component under the slider name/entry
+   * @param componentName component name
+   * @param record record to put
+   * @throws IOException
+   */
+  public void putComponent(String componentName,
+      ServiceRecord record) throws
+      IOException {
+    putComponent(sliderServiceClass, instanceName,
+        componentName,
+        record);
+  }
+
+  /**
+   * Add a component 
+   * @param serviceClass service class to use under ~user
+   * @param componentName component name
+   * @param record record to put
+   * @throws IOException
+   */
+  public void putComponent(String serviceClass,
+      String serviceName,
+      String componentName,
+      ServiceRecord record) throws IOException {
+    String path = RegistryUtils.componentPath(
+        user, serviceClass, serviceName, componentName);
+    registryOperations.mknode(RegistryPathUtils.parentOf(path), true);
+    registryOperations.bind(path, record, BindFlags.OVERWRITE);
+  }
+    
+  /**
+   * Add a service under a path, optionally purging any history
+   * @param username user
+   * @param serviceClass service class to use under ~user
+   * @param serviceName name of the service
+   * @param record service record
+   * @param deleteTreeFirst perform recursive delete of the path first.
+   * @return the path the service was created at
+   * @throws IOException
+   */
+  public String putService(String username,
+      String serviceClass,
+      String serviceName,
+      ServiceRecord record,
+      boolean deleteTreeFirst) throws IOException {
+    String path = RegistryUtils.servicePath(
+        username, serviceClass, serviceName);
+    if (deleteTreeFirst) {
+      registryOperations.delete(path, true);
+    }
+    registryOperations.mknode(RegistryPathUtils.parentOf(path), true);
+    registryOperations.bind(path, record, BindFlags.OVERWRITE);
+    return path;
+  }
+
+  /**
+   * Add a service under a path for the current user
+   * @param serviceClass service class to use under ~user
+   * @param serviceName name of the service
+   * @param record service record
+   * @param deleteTreeFirst perform recursive delete of the path first
+   * @return the path the service was created at
+   * @throws IOException
+   */
+  public String putService(
+      String serviceClass,
+      String serviceName,
+      ServiceRecord record,
+      boolean deleteTreeFirst) throws IOException {
+    return putService(user, serviceClass, serviceName, record, deleteTreeFirst);
+  }
+
+
+  /**
+   * Add a service under a path for the current user
+   * @param serviceClass service class to use under ~user
+   * @param serviceName name of the service
+   * @param record service record
+   * @param deleteTreeFirst perform recursive delete of the path first
+   * @return the path the service was created at
+   * @throws IOException
+   */
+  public String registerSelf(
+      ServiceRecord record,
+      boolean deleteTreeFirst) throws IOException {
+    selfRegistrationPath =
+        putService(user, sliderServiceClass, instanceName, record, deleteTreeFirst);
+    setSelfRegistration(record);
+    return selfRegistrationPath;
+  }
+
+  /**
+   * Update the self record by pushing out the latest version of the service
+   * registration record. 
+   * @throws IOException any failure.
+   */
+  public void updateSelf() throws IOException {
+    putService(user, sliderServiceClass, instanceName, selfRegistration, false);
+  }
+    
+  /**
+   * Delete a component
+   * @param componentName component name
+   * @throws IOException
+   */
+  public void deleteComponent(String componentName) throws IOException {
+    String path = RegistryUtils.componentPath(
+        user, sliderServiceClass, instanceName,
+        componentName);
+    registryOperations.delete(path, false);
+  }
+
+  /**
+   * Delete the children of a path -but not the path itself.
+   * It is not an error if the path does not exist
+   * @param path path to delete
+   * @param recursive flag to request recursive deletes
+   * @throws IOException IO problems
+   */
+  public void deleteChildren(String path, boolean recursive) throws IOException {
+    List<String> childNames = null;
+    try {
+      childNames = registryOperations.list(path);
+    } catch (PathNotFoundException e) {
+      return;
+    }
+    for (String childName : childNames) {
+      String child = join(path, childName);
+      registryOperations.delete(child, recursive);
+    }
+  }
+  
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message