hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1492718 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ had...
Date Thu, 13 Jun 2013 15:54:42 GMT
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java Thu Jun 13 15:54:38 2013
@@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ShutdownHookManager;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * This class contains a set of methods to work with services, especially
@@ -33,74 +38,6 @@ public final class ServiceOperations {
   }
 
   /**
-   * Verify that that a service is in a given state.
-   * @param state the actual state a service is in
-   * @param expectedState the desired state
-   * @throws IllegalStateException if the service state is different from
-   * the desired state
-   */
-  public static void ensureCurrentState(Service.STATE state,
-                                        Service.STATE expectedState) {
-    if (state != expectedState) {
-      throw new IllegalStateException("For this operation, the " +
-                                          "current service state must be "
-                                          + expectedState
-                                          + " instead of " + state);
-    }
-  }
-
-  /**
-   * Initialize a service.
-   * <p/>
-   * The service state is checked <i>before</i> the operation begins.
-   * This process is <i>not</i> thread safe.
-   * @param service a service that must be in the state
-   *   {@link Service.STATE#NOTINITED}
-   * @param configuration the configuration to initialize the service with
-   * @throws RuntimeException on a state change failure
-   * @throws IllegalStateException if the service is in the wrong state
-   */
-
-  public static void init(Service service, Configuration configuration) {
-    Service.STATE state = service.getServiceState();
-    ensureCurrentState(state, Service.STATE.NOTINITED);
-    service.init(configuration);
-  }
-
-  /**
-   * Start a service.
-   * <p/>
-   * The service state is checked <i>before</i> the operation begins.
-   * This process is <i>not</i> thread safe.
-   * @param service a service that must be in the state 
-   *   {@link Service.STATE#INITED}
-   * @throws RuntimeException on a state change failure
-   * @throws IllegalStateException if the service is in the wrong state
-   */
-
-  public static void start(Service service) {
-    Service.STATE state = service.getServiceState();
-    ensureCurrentState(state, Service.STATE.INITED);
-    service.start();
-  }
-
-  /**
-   * Initialize then start a service.
-   * <p/>
-   * The service state is checked <i>before</i> the operation begins.
-   * This process is <i>not</i> thread safe.
-   * @param service a service that must be in the state 
-   *   {@link Service.STATE#NOTINITED}
-   * @param configuration the configuration to initialize the service with
-   * @throws RuntimeException on a state change failure
-   * @throws IllegalStateException if the service is in the wrong state
-   */
-  public static void deploy(Service service, Configuration configuration) {
-    init(service, configuration);
-    start(service);
-  }
-
-  /**
    * Stop a service.
    * <p/>Do nothing if the service is null or not
    * in a state in which it can be/needs to be stopped.
@@ -111,10 +48,7 @@ public final class ServiceOperations {
    */
   public static void stop(Service service) {
     if (service != null) {
-      Service.STATE state = service.getServiceState();
-      if (state == Service.STATE.STARTED) {
-        service.stop();
-      }
+      service.stop();
     }
   }
 
@@ -127,14 +61,93 @@ public final class ServiceOperations {
    * @return any exception that was caught; null if none was.
    */
   public static Exception stopQuietly(Service service) {
+    return stopQuietly(LOG, service);
+  }
+
+  /**
+   * Stop a service; if it is null do nothing. Exceptions are caught and
+   * logged at warn level. (but not Throwables). This operation is intended to
+   * be used in cleanup operations
+   *
+   * @param log the log to warn at
+   * @param service a service; may be null
+   * @return any exception that was caught; null if none was.
+   * @see ServiceOperations#stopQuietly(Service)
+   */
+  public static Exception stopQuietly(Log log, Service service) {
     try {
       stop(service);
     } catch (Exception e) {
-      LOG.warn("When stopping the service " + service.getName()
-                   + " : " + e,
+      log.warn("When stopping the service " + service.getName()
+               + " : " + e,
                e);
       return e;
     }
     return null;
   }
+
+
+  /**
+   * Class to manage a list of {@link ServiceStateChangeListener} instances,
+   * including a notification loop that is robust against changes to the list
+   * during the notification process.
+   */
+  public static class ServiceListeners {
+    /**
+     * List of state change listeners; it is final to guarantee
+     * that it will never be null.
+     */
+    private final List<ServiceStateChangeListener> listeners =
+      new ArrayList<ServiceStateChangeListener>();
+
+    /**
+     * Thread-safe addition of a new listener to the end of a list.
+     * Attempts to re-register a listener that is already registered
+     * will be ignored.
+     * @param l listener
+     */
+    public synchronized void add(ServiceStateChangeListener l) {
+      if(!listeners.contains(l)) {
+        listeners.add(l);
+      }
+    }
+
+    /**
+     * Remove any registration of a listener from the listener list.
+     * @param l listener
+     * @return true if the listener was found (and then removed)
+     */
+    public synchronized boolean remove(ServiceStateChangeListener l) {
+      return listeners.remove(l);
+    }
+
+    /**
+     * Reset the listener list
+     */
+    public synchronized void reset() {
+      listeners.clear();
+    }
+
+    /**
+     * Change to a new state and notify all listeners.
+     * This method will block until all notifications have been issued.
+     * It caches the list of listeners before the notification begins,
+     * so additions or removal of listeners will not be visible.
+     * @param service the service that has changed state
+     */
+    public void notifyListeners(Service service) {
+      //take a very fast snapshot of the callback list
+      //very much like CopyOnWriteArrayList, only more minimal
+      ServiceStateChangeListener[] callbacks;
+      synchronized (this) {
+        callbacks = listeners.toArray(new ServiceStateChangeListener[listeners.size()]);
+      }
+      //iterate through the listeners outside the synchronized method,
+      //ensuring that listener registration/unregistration doesn't break anything
+      for (ServiceStateChangeListener l : callbacks) {
+        l.stateChanged(service);
+      }
+    }
+  }
+
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java?rev=1492718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java Thu Jun 13 15:54:38 2013
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.yarn.YarnRuntimeException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception that is raised on state change operations.
+ */
+public class ServiceStateException extends YarnRuntimeException {
+
+  public ServiceStateException(String message) {
+    super(message);
+  }
+
+  public ServiceStateException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ServiceStateException(Throwable cause) {
+    super(cause);
+  }
+
+  /**
+   * Convert any exception into a {@link RuntimeException}.
+   * If the caught exception already is of that type -including
+   * a {@link YarnException} it is typecast to a {@link RuntimeException}
+   * and returned.
+   *
+   * All other exception types are wrapped in a new instance of
+   * ServiceStateException
+   * @param fault exception or throwable
+   * @return a ServiceStateException to rethrow
+   */
+  public static RuntimeException convert(Throwable fault) {
+    if (fault instanceof RuntimeException) {
+      return (RuntimeException) fault;
+    } else {
+      return new ServiceStateException(fault);
+    }
+  }
+
+  /**
+   * Convert any exception into a {@link RuntimeException}.
+   * If the caught exception already is of that type -including
+   * a {@link YarnException} it is typecast to a {@link RuntimeException}
+   * and returned.
+   *
+   * All other exception types are wrapped in a new instance of
+   * ServiceStateException
+   * @param text text to use if a new exception is created
+   * @param fault exception or throwable
+   * @return a ServiceStateException to rethrow
+   */
+  public static RuntimeException convert(String text, Throwable fault) {
+    if (fault instanceof RuntimeException) {
+      return (RuntimeException) fault;
+    } else {
+      return new ServiceStateException(text, fault);
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java?rev=1492718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java Thu Jun 13 15:54:38 2013
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+/**
+ * Implements the service state model for YARN.
+ */
+public class ServiceStateModel {
+
+  /**
+   * Map of all valid state transitions
+   * [current] [proposed1, proposed2, ...]
+   */
+  private static final boolean[][] statemap =
+    {
+      //                uninited inited started stopped
+      /* uninited  */    {false, true,  false,  true},
+      /* inited    */    {false, true,  true,   true},
+      /* started   */    {false, false, true,   true},
+      /* stopped   */    {false, false, false,  true},
+    };
+
+  /**
+   * The state of the service
+   */
+  private volatile Service.STATE state;
+
+  /**
+   * The name of the service: used in exceptions
+   */
+  private String name;
+
+  /**
+   * Create the service state model in the {@link Service.STATE#NOTINITED}
+   * state.
+   */
+  public ServiceStateModel(String name) {
+    this(name, Service.STATE.NOTINITED);
+  }
+
+  /**
+   * Create a service state model instance in the chosen state
+   * @param state the starting state
+   */
+  public ServiceStateModel(String name, Service.STATE state) {
+    this.state = state;
+    this.name = name;
+  }
+
+  /**
+   * Query the service state. This is a non-blocking operation.
+   * @return the state
+   */
+  public Service.STATE getState() {
+    return state;
+  }
+
+  /**
+   * Query that the state is in a specific state
+   * @param proposed proposed new state
+   * @return the state
+   */
+  public boolean isInState(Service.STATE proposed) {
+    return state.equals(proposed);
+  }
+
+  /**
+   * Verify that that a service is in a given state.
+   * @param expectedState the desired state
+   * @throws ServiceStateException if the service state is different from
+   * the desired state
+   */
+  public void ensureCurrentState(Service.STATE expectedState) {
+    if (state != expectedState) {
+      throw new ServiceStateException(name+ ": for this operation, the " +
+                                      "current service state must be "
+                                      + expectedState
+                                      + " instead of " + state);
+    }
+  }
+
+  /**
+   * Enter a state -thread safe.
+   *
+   * @param proposed proposed new state
+   * @return the original state
+   * @throws ServiceStateException if the transition is not permitted
+   */
+  public synchronized Service.STATE enterState(Service.STATE proposed) {
+    checkStateTransition(name, state, proposed);
+    Service.STATE oldState = state;
+    //atomic write of the new state
+    state = proposed;
+    return oldState;
+  }
+
+  /**
+   * Check that a state tansition is valid and
+   * throw an exception if not
+   * @param name name of the service (can be null)
+   * @param state current state
+   * @param proposed proposed new state
+   */
+  public static void checkStateTransition(String name,
+                                          Service.STATE state,
+                                          Service.STATE proposed) {
+    if (!isValidStateTransition(state, proposed)) {
+      throw new ServiceStateException(name + " cannot enter state "
+                                      + proposed + " from state " + state);
+    }
+  }
+
+  /**
+   * Is a state transition valid?
+   * There are no checks for current==proposed
+   * as that is considered a non-transition.
+   *
+   * using an array kills off all branch misprediction costs, at the expense
+   * of cache line misses.
+   *
+   * @param current current state
+   * @param proposed proposed new state
+   * @return true if the transition to a new state is valid
+   */
+  public static boolean isValidStateTransition(Service.STATE current,
+                                               Service.STATE proposed) {
+    boolean[] row = statemap[current.getValue()];
+    return row[proposed.getValue()];
+  }
+
+  /**
+   * return the state text as the toString() value
+   * @return the current state's description
+   */
+  @Override
+  public String toString() {
+    return (name.isEmpty() ? "" : ((name) + ": "))
+            + state.toString();
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java Thu Jun 13 15:54:38 2013
@@ -49,21 +49,21 @@ public abstract class AbstractLiveliness
   }
 
   @Override
-  public void start() {
+  protected void serviceStart() throws Exception {
     assert !stopped : "starting when already stopped";
     checkerThread = new Thread(new PingChecker());
     checkerThread.setName("Ping Checker");
     checkerThread.start();
-    super.start();
+    super.serviceStart();
   }
 
   @Override
-  public void stop() {
+  protected void serviceStop() throws Exception {
     stopped = true;
     if (checkerThread != null) {
       checkerThread.interrupt();
     }
-    super.stop();
+    super.serviceStop();
   }
 
   protected abstract void expire(O ob);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java Thu Jun 13 15:54:38 2013
@@ -55,13 +55,7 @@ public class BreakableService extends Ab
   }
 
   private int convert(STATE state) {
-    switch (state) {
-      case NOTINITED: return 0;
-      case INITED:    return 1;
-      case STARTED:   return 2;
-      case STOPPED:   return 3;
-      default:        return 0;
-    }
+    return state.getValue();
   }
 
   private void inc(STATE state) {
@@ -75,29 +69,27 @@ public class BreakableService extends Ab
 
   private void maybeFail(boolean fail, String action) {
     if (fail) {
-      throw new BrokenLifecycleEvent(action);
+      throw new BrokenLifecycleEvent(this, action);
     }
   }
 
   @Override
-  public void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     inc(STATE.INITED);
     maybeFail(failOnInit, "init");
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public void start() {
+  protected void serviceStart() {
     inc(STATE.STARTED);
     maybeFail(failOnStart, "start");
-    super.start();
   }
 
   @Override
-  public void stop() {
+  protected void serviceStop() {
     inc(STATE.STOPPED);
     maybeFail(failOnStop, "stop");
-    super.stop();
   }
 
   public void setFailOnInit(boolean failOnInit) {
@@ -116,8 +108,13 @@ public class BreakableService extends Ab
    * The exception explicitly raised on a failure
    */
   public static class BrokenLifecycleEvent extends RuntimeException {
-    BrokenLifecycleEvent(String action) {
-      super("Lifecycle Failure during " + action);
+
+    final STATE state;
+
+    public BrokenLifecycleEvent(Service service, String action) {
+      super("Lifecycle Failure during " + action + " state is "
+            + service.getServiceState());
+      state = service.getServiceState();
     }
   }
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java?rev=1492718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java Thu Jun 13 15:54:38 2013
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A state change listener that logs the number of state change events received,
+ * and the last state invoked.
+ *
+ * It can be configured to fail during a state change event
+ */
+public class BreakableStateChangeListener
+    implements ServiceStateChangeListener {
+
+  private final String name;
+
+  private int eventCount;
+  private int failureCount;
+  private Service lastService;
+  private Service.STATE lastState = Service.STATE.NOTINITED;
+  //no callbacks are ever received for this event, so it
+  //can be used as an 'undefined'.
+  private Service.STATE failingState = Service.STATE.NOTINITED;
+  private List<Service.STATE> stateEventList = new ArrayList<Service.STATE>(4);
+
+  public BreakableStateChangeListener() {
+    this( "BreakableStateChangeListener");
+  }
+
+  public BreakableStateChangeListener(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public synchronized void stateChanged(Service service) {
+    eventCount++;
+    lastService = service;
+    lastState = service.getServiceState();
+    stateEventList.add(lastState);
+    if (lastState == failingState) {
+      failureCount++;
+      throw new BreakableService.BrokenLifecycleEvent(service,
+                                                      "Failure entering "
+                                                      + lastState
+                                                      + " for "
+                                                      + service.getName());
+    }
+  }
+
+  public synchronized int getEventCount() {
+    return eventCount;
+  }
+
+  public synchronized Service getLastService() {
+    return lastService;
+  }
+
+  public synchronized Service.STATE getLastState() {
+    return lastState;
+  }
+
+  public synchronized void setFailingState(Service.STATE failingState) {
+    this.failingState = failingState;
+  }
+
+  public synchronized int getFailureCount() {
+    return failureCount;
+  }
+
+  public List<Service.STATE> getStateEventList() {
+    return stateEventList;
+  }
+
+  @Override
+  public synchronized String toString() {
+    String s =
+      name + " - event count = " + eventCount + " last state " + lastState;
+    StringBuilder history = new StringBuilder(stateEventList.size()*10);
+    for (Service.STATE state: stateEventList) {
+      history.append(state).append(" ");
+    }
+    return s + " [ " + history + "]";
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java?rev=1492718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java Thu Jun 13 15:54:38 2013
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Test global state changes. It is critical for all tests to clean up the
+ * global listener afterwards to avoid interfering with follow-on tests.
+ *
+ * One listener, {@link #listener} is defined which is automatically
+ * unregistered on cleanup. All other listeners must be unregistered in the
+ * finally clauses of the tests.
+ */
+public class TestGlobalStateChangeListener extends ServiceAssert {
+
+  BreakableStateChangeListener listener = new BreakableStateChangeListener("listener");
+
+
+  private void register() {
+    register(listener);
+  }
+
+  private boolean unregister() {
+    return unregister(listener);
+  }
+
+  private void register(ServiceStateChangeListener l) {
+    AbstractService.registerGlobalListener(l);
+  }
+
+  private boolean unregister(ServiceStateChangeListener l) {
+    return AbstractService.unregisterGlobalListener(l);
+  }
+
+  /**
+   * After every test case reset the list of global listeners.
+   */
+  @After
+  public void cleanup() {
+    AbstractService.resetGlobalListeners();
+  }
+
+  /**
+   * Assert that the last state of the listener is that the test expected.
+   * @param breakable a breakable listener
+   * @param state the expected state
+   */
+  public void assertListenerState(BreakableStateChangeListener breakable,
+                                  Service.STATE state) {
+    assertEquals("Wrong state in " + breakable, state, breakable.getLastState());
+  }
+
+  /**
+   * Assert that the number of state change notifications matches expectations.
+   * @param breakable the listener
+   * @param count the expected count.
+   */
+  public void assertListenerEventCount(BreakableStateChangeListener breakable,
+                                       int count) {
+    assertEquals("Wrong event count in " + breakable, count,
+                 breakable.getEventCount());
+  }
+
+  /**
+   * Test that register/unregister works
+   */
+  @Test
+  public void testRegisterListener() {
+    register();
+    assertTrue("listener not registered", unregister());
+  }
+
+  /**
+   * Test that double registration results in one registration only.
+   */
+  @Test
+  public void testRegisterListenerTwice() {
+    register();
+    register();
+    assertTrue("listener not registered", unregister());
+    //there should be no listener to unregister the second time
+    assertFalse("listener double registered", unregister());
+  }
+
+  /**
+   * Test that the {@link BreakableStateChangeListener} is picking up
+   * the state changes and that its last event field is as expected.
+   */
+  @Test
+  public void testEventHistory() {
+    register();
+    BreakableService service = new BreakableService();
+    assertListenerState(listener, Service.STATE.NOTINITED);
+    assertEquals(0, listener.getEventCount());
+    service.init(new Configuration());
+    assertListenerState(listener, Service.STATE.INITED);
+    assertSame(service, listener.getLastService());
+    assertListenerEventCount(listener, 1);
+
+    service.start();
+    assertListenerState(listener, Service.STATE.STARTED);
+    assertListenerEventCount(listener, 2);
+
+    service.stop();
+    assertListenerState(listener, Service.STATE.STOPPED);
+    assertListenerEventCount(listener, 3);
+  }
+
+  /**
+   * This test triggers a failure in the listener - the expectation is that the
+   * service has already reached it's desired state, purely because the
+   * notifications take place afterwards.
+   *
+   */
+  @Test
+  public void testListenerFailure() {
+    listener.setFailingState(Service.STATE.INITED);
+    register();
+    BreakableStateChangeListener l2 = new BreakableStateChangeListener();
+    register(l2);
+    BreakableService service = new BreakableService();
+    service.init(new Configuration());
+    //expected notifications to fail
+
+    //still should record its invocation
+    assertListenerState(listener, Service.STATE.INITED);
+    assertListenerEventCount(listener, 1);
+
+    //and second listener didn't get notified of anything
+    assertListenerEventCount(l2, 0);
+
+    //service should still consider itself started
+    assertServiceStateInited(service);
+    service.start();
+    service.stop();
+  }
+
+  /**
+   * Create a chain of listeners and set one in the middle to fail; verify that
+   * those in front got called, and those after did not.
+   */
+  @Test
+  public void testListenerChain() {
+
+    //create and register the listeners
+    LoggingStateChangeListener logListener = new LoggingStateChangeListener();
+    register(logListener);
+    BreakableStateChangeListener l0 = new BreakableStateChangeListener("l0");
+    register(l0);
+    listener.setFailingState(Service.STATE.STARTED);
+    register();
+    BreakableStateChangeListener l3 = new BreakableStateChangeListener("l3");
+    register(l3);
+
+    //create and init a service.
+    BreakableService service = new BreakableService();
+    service.init(new Configuration());
+    assertServiceStateInited(service);
+    assertListenerState(l0, Service.STATE.INITED);
+    assertListenerState(listener, Service.STATE.INITED);
+    assertListenerState(l3, Service.STATE.INITED);
+
+    service.start();
+    //expect that listener l1 and the failing listener are in start, but
+    //not the final one
+    assertServiceStateStarted(service);
+    assertListenerState(l0, Service.STATE.STARTED);
+    assertListenerEventCount(l0, 2);
+    assertListenerState(listener, Service.STATE.STARTED);
+    assertListenerEventCount(listener, 2);
+    //this is the listener that is not expected to have been invoked
+    assertListenerState(l3, Service.STATE.INITED);
+    assertListenerEventCount(l3, 1);
+
+    //stop the service
+    service.stop();
+    //listeners are all updated
+    assertListenerEventCount(l0, 3);
+    assertListenerEventCount(listener, 3);
+    assertListenerEventCount(l3, 2);
+    //can all be unregistered in any order
+    unregister(logListener);
+    unregister(l0);
+    unregister(l3);
+
+    //check that the listeners are all unregistered, even
+    //though they were registered in a different order.
+    //rather than do this by doing unregister checks, a new service is created
+    service = new BreakableService();
+    //this service is initialized
+    service.init(new Configuration());
+    //it is asserted that the event count has not changed for the unregistered
+    //listeners
+    assertListenerEventCount(l0, 3);
+    assertListenerEventCount(l3, 2);
+    //except for the one listener that was not unregistered, which
+    //has incremented by one
+    assertListenerEventCount(listener, 4);
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java Thu Jun 13 15:54:38 2013
@@ -19,10 +19,13 @@
 
 package org.apache.hadoop.yarn.service;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
 public class TestServiceLifecycle extends ServiceAssert {
+  private static Log LOG = LogFactory.getLog(TestServiceLifecycle.class);
 
   /**
    * Walk the {@link BreakableService} through it's lifecycle, 
@@ -59,13 +62,8 @@ public class TestServiceLifecycle extend
     Configuration conf = new Configuration();
     conf.set("test.init","t");
     svc.init(conf);
-    try {
-      svc.init(new Configuration());
-      fail("Expected a failure, got " + svc);
-    } catch (IllegalStateException e) {
-      //expected
-    }
-    assertStateCount(svc, Service.STATE.INITED, 2);
+    svc.init(new Configuration());
+    assertStateCount(svc, Service.STATE.INITED, 1);
     assertServiceConfigurationContains(svc, "test.init");
   }
 
@@ -78,21 +76,14 @@ public class TestServiceLifecycle extend
     BreakableService svc = new BreakableService();
     svc.init(new Configuration());
     svc.start();
-    try {
-      svc.start();
-      fail("Expected a failure, got " + svc);
-    } catch (IllegalStateException e) {
-      //expected
-    }
-    assertStateCount(svc, Service.STATE.STARTED, 2);
+    svc.start();
+    assertStateCount(svc, Service.STATE.STARTED, 1);
   }
 
 
   /**
    * Verify that when a service is stopped more than once, no exception
-   * is thrown, and the counter is incremented.
-   * This is because the state change operations happen after the counter in
-   * the subclass is incremented, even though stop is meant to be a no-op
+   * is thrown.
    * @throws Throwable if necessary
    */
   @Test
@@ -103,7 +94,7 @@ public class TestServiceLifecycle extend
     svc.stop();
     assertStateCount(svc, Service.STATE.STOPPED, 1);
     svc.stop();
-    assertStateCount(svc, Service.STATE.STOPPED, 2);
+    assertStateCount(svc, Service.STATE.STOPPED, 1);
   }
 
 
@@ -124,12 +115,12 @@ public class TestServiceLifecycle extend
       //expected
     }
     //the service state wasn't passed
-    assertServiceStateCreated(svc);
+    assertServiceStateStopped(svc);
     assertStateCount(svc, Service.STATE.INITED, 1);
+    assertStateCount(svc, Service.STATE.STOPPED, 1);
     //now try to stop
     svc.stop();
-    //even after the stop operation, we haven't entered the state
-    assertServiceStateCreated(svc);
+    assertStateCount(svc, Service.STATE.STOPPED, 1);
   }
 
 
@@ -151,18 +142,12 @@ public class TestServiceLifecycle extend
       //expected
     }
     //the service state wasn't passed
-    assertServiceStateInited(svc);
-    assertStateCount(svc, Service.STATE.INITED, 1);
-    //now try to stop
-    svc.stop();
-    //even after the stop operation, we haven't entered the state
-    assertServiceStateInited(svc);
+    assertServiceStateStopped(svc);
   }
 
   /**
    * verify that when a service fails during its stop operation,
-   * its state does not change, and the subclass invocation counter
-   * increments.
+   * its state does not change.
    * @throws Throwable if necessary
    */
   @Test
@@ -177,42 +162,302 @@ public class TestServiceLifecycle extend
       //expected
     }
     assertStateCount(svc, Service.STATE.STOPPED, 1);
-    assertServiceStateStarted(svc);
-    //now try again, and expect it to happen again
+  }
+
+  /**
+   * verify that when a service that is not started is stopped, the
+   * service enters the stopped state
+   * @throws Throwable on a failure
+   */
+  @Test
+  public void testStopUnstarted() throws Throwable {
+    BreakableService svc = new BreakableService();
+    svc.stop();
+    assertServiceStateStopped(svc);
+    assertStateCount(svc, Service.STATE.INITED, 0);
+    assertStateCount(svc, Service.STATE.STOPPED, 1);
+  }
+
+  /**
+   * Show that if the service failed during an init
+   * operation, stop was called.
+   */
+
+  @Test
+  public void testStopFailingInitAndStop() throws Throwable {
+    BreakableService svc = new BreakableService(true, false, true);
+    svc.register(new LoggingStateChangeListener());
     try {
-      svc.stop();
+      svc.init(new Configuration());
       fail("Expected a failure, got " + svc);
     } catch (BreakableService.BrokenLifecycleEvent e) {
+      assertEquals(Service.STATE.INITED, e.state);
+    }
+    //the service state is stopped
+    assertServiceStateStopped(svc);
+    assertEquals(Service.STATE.INITED, svc.getFailureState());
+
+    Throwable failureCause = svc.getFailureCause();
+    assertNotNull("Null failure cause in " + svc, failureCause);
+    BreakableService.BrokenLifecycleEvent cause =
+      (BreakableService.BrokenLifecycleEvent) failureCause;
+    assertNotNull("null state in " + cause + " raised by " + svc, cause.state);
+    assertEquals(Service.STATE.INITED, cause.state);
+  }
+
+  @Test
+  public void testInitNullConf() throws Throwable {
+    BreakableService svc = new BreakableService(false, false, false);
+    try {
+      svc.init(null);
+      LOG.warn("Null Configurations are permitted ");
+    } catch (ServiceStateException e) {
       //expected
     }
-    assertStateCount(svc, Service.STATE.STOPPED, 2);
+  }
+
+  @Test
+  public void testServiceNotifications() throws Throwable {
+    BreakableService svc = new BreakableService(false, false, false);
+    BreakableStateChangeListener listener = new BreakableStateChangeListener();
+    svc.register(listener);
+    svc.init(new Configuration());
+    assertEventCount(listener, 1);
+    svc.start();
+    assertEventCount(listener, 2);
+    svc.stop();
+    assertEventCount(listener, 3);
+    svc.stop();
+    assertEventCount(listener, 3);
   }
 
   /**
-   * verify that when a service that is not started is stopped, its counter
-   * of stop calls is still incremented-and the service remains in its
-   * original state..
+   * Test that when a service listener is unregistered, it stops being invoked
    * @throws Throwable on a failure
    */
   @Test
-  public void testStopUnstarted() throws Throwable {
-    BreakableService svc = new BreakableService();
+  public void testServiceNotificationsStopOnceUnregistered() throws Throwable {
+    BreakableService svc = new BreakableService(false, false, false);
+    BreakableStateChangeListener listener = new BreakableStateChangeListener();
+    svc.register(listener);
+    svc.init(new Configuration());
+    assertEventCount(listener, 1);
+    svc.unregister(listener);
+    svc.start();
+    assertEventCount(listener, 1);
     svc.stop();
-    assertServiceStateCreated(svc);
-    assertStateCount(svc, Service.STATE.STOPPED, 1);
+    assertEventCount(listener, 1);
+    svc.stop();
+  }
 
-    //stop failed, now it can be initialised
+  /**
+   * This test uses a service listener that unregisters itself during the callbacks.
+   * This a test that verifies the concurrency logic on the listener management
+   * code, that it doesn't throw any immutable state change exceptions
+   * if you change list membership during the notifications.
+   * The standard <code>AbstractService</code> implementation copies the list
+   * to an array in a <code>synchronized</code> block then iterates through
+   * the copy precisely to prevent this problem.
+   * @throws Throwable on a failure
+   */
+  @Test
+  public void testServiceNotificationsUnregisterDuringCallback() throws Throwable {
+    BreakableService svc = new BreakableService(false, false, false);
+    BreakableStateChangeListener listener =
+      new SelfUnregisteringBreakableStateChangeListener();
+    BreakableStateChangeListener l2 =
+      new BreakableStateChangeListener();
+    svc.register(listener);
+    svc.register(l2);
     svc.init(new Configuration());
-
-    //and try to stop again, with no state change but an increment
+    assertEventCount(listener, 1);
+    assertEventCount(l2, 1);
+    svc.unregister(listener);
+    svc.start();
+    assertEventCount(listener, 1);
+    assertEventCount(l2, 2);
     svc.stop();
-    assertServiceStateInited(svc);
-    assertStateCount(svc, Service.STATE.STOPPED, 2);
+    assertEventCount(listener, 1);
+    svc.stop();
+  }
+
+  private static class SelfUnregisteringBreakableStateChangeListener
+    extends BreakableStateChangeListener {
+
+    @Override
+    public synchronized void stateChanged(Service service) {
+      super.stateChanged(service);
+      service.unregister(this);
+    }
+  }
 
-    //once started, the service can be stopped reliably
+  private void assertEventCount(BreakableStateChangeListener listener,
+                                int expected) {
+    assertEquals(listener.toString(), expected, listener.getEventCount());
+  }
+
+  @Test
+  public void testServiceFailingNotifications() throws Throwable {
+    BreakableService svc = new BreakableService(false, false, false);
+    BreakableStateChangeListener listener = new BreakableStateChangeListener();
+    listener.setFailingState(Service.STATE.STARTED);
+    svc.register(listener);
+    svc.init(new Configuration());
+    assertEventCount(listener, 1);
+    //start this; the listener failed but this won't show
     svc.start();
-    ServiceOperations.stop(svc);
-    assertServiceStateStopped(svc);
-    assertStateCount(svc, Service.STATE.STOPPED, 3);
+    //counter went up
+    assertEventCount(listener, 2);
+    assertEquals(1, listener.getFailureCount());
+    //stop the service -this doesn't fail
+    svc.stop();
+    assertEventCount(listener, 3);
+    assertEquals(1, listener.getFailureCount());
+    svc.stop();
+  }
+
+  /**
+   * This test verifies that you can block waiting for something to happen
+   * and use notifications to manage it
+   * @throws Throwable on a failure
+   */
+  @Test
+  public void testListenerWithNotifications() throws Throwable {
+    //this tests that a listener can get notified when a service is stopped
+    AsyncSelfTerminatingService service = new AsyncSelfTerminatingService(2000);
+    NotifyingListener listener = new NotifyingListener();
+    service.register(listener);
+    service.init(new Configuration());
+    service.start();
+    assertServiceInState(service, Service.STATE.STARTED);
+    long start = System.currentTimeMillis();
+    synchronized (listener) {
+      listener.wait(20000);
+    }
+    long duration = System.currentTimeMillis() - start;
+    assertEquals(Service.STATE.STOPPED, listener.notifyingState);
+    assertServiceInState(service, Service.STATE.STOPPED);
+    assertTrue("Duration of " + duration + " too long", duration < 10000);
+  }
+
+  @Test
+  public void testSelfTerminatingService() throws Throwable {
+    SelfTerminatingService service = new SelfTerminatingService();
+    BreakableStateChangeListener listener = new BreakableStateChangeListener();
+    service.register(listener);
+    service.init(new Configuration());
+    assertEventCount(listener, 1);
+    //start the service
+    service.start();
+    //and expect an event count of exactly two
+    assertEventCount(listener, 2);
+  }
+
+  @Test
+  public void testStartInInitService() throws Throwable {
+    Service service = new StartInInitService();
+    BreakableStateChangeListener listener = new BreakableStateChangeListener();
+    service.register(listener);
+    service.init(new Configuration());
+    assertServiceInState(service, Service.STATE.STARTED);
+    assertEventCount(listener, 1);
+  }
+
+  @Test
+  public void testStopInInitService() throws Throwable {
+    Service service = new StopInInitService();
+    BreakableStateChangeListener listener = new BreakableStateChangeListener();
+    service.register(listener);
+    service.init(new Configuration());
+    assertServiceInState(service, Service.STATE.STOPPED);
+    assertEventCount(listener, 1);
+  }
+
+  /**
+   * Listener that wakes up all threads waiting on it
+   */
+  private static class NotifyingListener implements ServiceStateChangeListener {
+    public Service.STATE notifyingState = Service.STATE.NOTINITED;
+
+    public synchronized void stateChanged(Service service) {
+      notifyingState = service.getServiceState();
+      this.notifyAll();
+    }
+  }
+
+  /**
+   * Service that terminates itself after starting and sleeping for a while
+   */
+  private static class AsyncSelfTerminatingService extends AbstractService
+                                               implements Runnable {
+    final int timeout;
+    private AsyncSelfTerminatingService(int timeout) {
+      super("AsyncSelfTerminatingService");
+      this.timeout = timeout;
+    }
+
+    @Override
+    protected void serviceStart() throws Exception {
+      new Thread(this).start();
+      super.serviceStart();
+    }
+
+    @Override
+    public void run() {
+      try {
+        Thread.sleep(timeout);
+      } catch (InterruptedException ignored) {
+
+      }
+      this.stop();
+    }
+  }
+
+  /**
+   * Service that terminates itself in startup
+   */
+  private static class SelfTerminatingService extends AbstractService {
+    private SelfTerminatingService() {
+      super("SelfTerminatingService");
+    }
+
+    @Override
+    protected void serviceStart() throws Exception {
+      //start
+      super.serviceStart();
+      //then stop
+      stop();
+    }
+  }
+
+  /**
+   * Service that starts itself in init
+   */
+  private static class StartInInitService extends AbstractService {
+    private StartInInitService() {
+      super("StartInInitService");
+    }
+
+    @Override
+    protected void serviceInit(Configuration conf) throws Exception {
+      super.serviceInit(conf);
+      start();
+    }
   }
+
+  /**
+   * Service that starts itself in init
+   */
+  private static class StopInInitService extends AbstractService {
+    private StopInInitService() {
+      super("StopInInitService");
+    }
+
+    @Override
+    protected void serviceInit(Configuration conf) throws Exception {
+      super.serviceInit(conf);
+      stop();
+    }
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java Thu Jun 13 15:54:38 2013
@@ -21,10 +21,15 @@ package org.apache.hadoop.yarn.util;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.YarnRuntimeException;
+import org.apache.hadoop.yarn.service.BreakableService;
 import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.service.ServiceStateException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -34,6 +39,16 @@ public class TestCompositeService {
 
   private static final int FAILED_SERVICE_SEQ_NUMBER = 2;
 
+  private static final Log LOG  = LogFactory.getLog(TestCompositeService.class);
+
+  /**
+   * flag to state policy of CompositeService, and hence
+   * what to look for after trying to stop a service from another state
+   * (e.g inited)
+   */
+  private static final boolean STOP_ONLY_STARTED_SERVICES =
+    CompositeServiceImpl.isPolicyToStopOnlyStartedServices();
+
   @Before
   public void setup() {
     CompositeServiceImpl.resetCounter();
@@ -59,6 +74,9 @@ public class TestCompositeService {
     // Initialise the composite service
     serviceManager.init(conf);
 
+    //verify they were all inited
+    assertInState(STATE.INITED, services);
+
     // Verify the init() call sequence numbers for every service
     for (int i = 0; i < NUM_OF_SERVICES; i++) {
       assertEquals("For " + services[i]
@@ -67,11 +85,11 @@ public class TestCompositeService {
     }
 
     // Reset the call sequence numbers
-    for (int i = 0; i < NUM_OF_SERVICES; i++) {
-      services[i].reset();
-    }
+    resetServices(services);
 
     serviceManager.start();
+    //verify they were all started
+    assertInState(STATE.STARTED, services);
 
     // Verify the start() call sequence numbers for every service
     for (int i = 0; i < NUM_OF_SERVICES; i++) {
@@ -79,13 +97,12 @@ public class TestCompositeService {
           + " service, start() call sequence number should have been ", i,
           services[i].getCallSequenceNumber());
     }
+    resetServices(services);
 
-    // Reset the call sequence numbers
-    for (int i = 0; i < NUM_OF_SERVICES; i++) {
-      services[i].reset();
-    }
 
     serviceManager.stop();
+    //verify they were all stopped
+    assertInState(STATE.STOPPED, services);
 
     // Verify the stop() call sequence numbers for every service
     for (int i = 0; i < NUM_OF_SERVICES; i++) {
@@ -104,6 +121,13 @@ public class TestCompositeService {
     }
   }
 
+  private void resetServices(CompositeServiceImpl[] services) {
+    // Reset the call sequence numbers
+    for (int i = 0; i < NUM_OF_SERVICES; i++) {
+      services[i].reset();
+    }
+  }
+
   @Test
   public void testServiceStartup() {
     ServiceManager serviceManager = new ServiceManager("ServiceManager");
@@ -131,7 +155,7 @@ public class TestCompositeService {
       fail("Exception should have been thrown due to startup failure of last service");
     } catch (YarnRuntimeException e) {
       for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
-        if (i >= FAILED_SERVICE_SEQ_NUMBER) {
+        if (i >= FAILED_SERVICE_SEQ_NUMBER && STOP_ONLY_STARTED_SERVICES) {
           // Failed service state should be INITED
           assertEquals("Service state should have been ", STATE.INITED,
               services[NUM_OF_SERVICES - 1].getServiceState());
@@ -171,15 +195,147 @@ public class TestCompositeService {
     try {
       serviceManager.stop();
     } catch (YarnRuntimeException e) {
-      for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
-        assertEquals("Service state should have been ", STATE.STOPPED,
-            services[NUM_OF_SERVICES].getServiceState());
-      }
     }
+    assertInState(STATE.STOPPED, services);
   }
 
+  /**
+   * Assert that all services are in the same expected state
+   * @param expected expected state value
+   * @param services services to examine
+   */
+  private void assertInState(STATE expected, CompositeServiceImpl[] services) {
+    assertInState(expected, services,0, services.length);
+  }
+
+  /**
+   * Assert that all services are in the same expected state
+   * @param expected expected state value
+   * @param services services to examine
+   * @param start start offset
+   * @param finish finish offset: the count stops before this number
+   */
+  private void assertInState(STATE expected,
+                             CompositeServiceImpl[] services,
+                             int start, int finish) {
+    for (int i = start; i < finish; i++) {
+      Service service = services[i];
+      assertInState(expected, service);
+    }
+  }
+
+  private void assertInState(STATE expected, Service service) {
+    assertEquals("Service state should have been " + expected + " in "
+                 + service,
+                 expected,
+                 service.getServiceState());
+  }
+
+  /**
+   * Shut down from not-inited: expect nothing to have happened
+   */
+  @Test
+  public void testServiceStopFromNotInited() {
+    ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+    // Add services
+    for (int i = 0; i < NUM_OF_SERVICES; i++) {
+      CompositeServiceImpl service = new CompositeServiceImpl(i);
+      serviceManager.addTestService(service);
+    }
+
+    CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+      new CompositeServiceImpl[0]);
+    serviceManager.stop();
+    assertInState(STATE.NOTINITED, services);
+  }
+
+  /**
+   * Shut down from inited
+   */
+  @Test
+  public void testServiceStopFromInited() {
+    ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+    // Add services
+    for (int i = 0; i < NUM_OF_SERVICES; i++) {
+      CompositeServiceImpl service = new CompositeServiceImpl(i);
+      serviceManager.addTestService(service);
+    }
+
+    CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+      new CompositeServiceImpl[0]);
+    serviceManager.init(new Configuration());
+    serviceManager.stop();
+    if (STOP_ONLY_STARTED_SERVICES) {
+      //this policy => no services were stopped
+      assertInState(STATE.INITED, services);
+    } else {
+      assertInState(STATE.STOPPED, services);
+    }
+  }
+
+  /**
+   * Use a null configuration & expect a failure
+   * @throws Throwable
+   */
+  @Test
+  public void testInitNullConf() throws Throwable {
+    ServiceManager serviceManager = new ServiceManager("testInitNullConf");
+
+    CompositeServiceImpl service = new CompositeServiceImpl(0);
+    serviceManager.addTestService(service);
+    try {
+      serviceManager.init(null);
+      LOG.warn("Null Configurations are permitted " + serviceManager);
+    } catch (ServiceStateException e) {
+      //expected
+    }
+  }
+
+  /**
+   * Walk the service through their lifecycle without any children;
+   * verify that it all works.
+   */
+  @Test
+  public void testServiceLifecycleNoChildren() {
+    ServiceManager serviceManager = new ServiceManager("ServiceManager");
+    serviceManager.init(new Configuration());
+    serviceManager.start();
+    serviceManager.stop();
+  }
+
+  @Test
+  public void testAddServiceInInit() throws Throwable {
+    BreakableService child = new BreakableService();
+    assertInState(STATE.NOTINITED, child);
+    CompositeServiceAddingAChild composite =
+      new CompositeServiceAddingAChild(child);
+    composite.init(new Configuration());
+    assertInState(STATE.INITED, child);
+  }
+  
+  public static class CompositeServiceAddingAChild extends CompositeService{
+    Service child;
+
+    public CompositeServiceAddingAChild(Service child) {
+      super("CompositeServiceAddingAChild");
+      this.child = child;
+    }
+
+    @Override
+    protected void serviceInit(Configuration conf) throws Exception {
+      addService(child);
+      super.serviceInit(conf);
+    }
+  }
+  
   public static class CompositeServiceImpl extends CompositeService {
 
+    public static boolean isPolicyToStopOnlyStartedServices() {
+      return STOP_ONLY_STARTED_SERVICES;
+    }
+
     private static int counter = -1;
 
     private int callSequenceNumber = -1;
@@ -193,30 +349,30 @@ public class TestCompositeService {
     }
 
     @Override
-    public synchronized void init(Configuration conf) {
+    protected void serviceInit(Configuration conf) throws Exception {
       counter++;
       callSequenceNumber = counter;
-      super.init(conf);
+      super.serviceInit(conf);
     }
 
     @Override
-    public synchronized void start() {
+    protected void serviceStart() throws Exception {
       if (throwExceptionOnStart) {
         throw new YarnRuntimeException("Fake service start exception");
       }
       counter++;
       callSequenceNumber = counter;
-      super.start();
+      super.serviceStart();
     }
 
     @Override
-    public synchronized void stop() {
+    protected void serviceStop() throws Exception {
       counter++;
       callSequenceNumber = counter;
       if (throwExceptionOnStop) {
         throw new YarnRuntimeException("Fake service stop exception");
       }
-      super.stop();
+      super.serviceStop();
     }
 
     public static int getCounter() {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Thu Jun 13 15:54:38 2013
@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 import org.apache.commons.logging.Log;
@@ -75,7 +74,7 @@ public class DeletionService extends Abs
   }
 
   @Override
-  public void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     ThreadFactory tf = new ThreadFactoryBuilder()
       .setNameFormat("DeletionService #%d")
       .build();
@@ -90,21 +89,23 @@ public class DeletionService extends Abs
     }
     sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
     sched.setKeepAliveTime(60L, SECONDS);
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public void stop() {
-    sched.shutdown();
-    boolean terminated = false;
-    try {
-      terminated = sched.awaitTermination(10, SECONDS);
-    } catch (InterruptedException e) {
-    }
-    if (terminated != true) {
-      sched.shutdownNow();
+  protected void serviceStop() throws Exception {
+    if (sched != null) {
+      sched.shutdown();
+      boolean terminated = false;
+      try {
+        terminated = sched.awaitTermination(10, SECONDS);
+      } catch (InterruptedException e) {
+      }
+      if (terminated != true) {
+        sched.shutdownNow();
+      }
     }
-    super.stop();
+    super.serviceStop();
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java Thu Jun 13 15:54:38 2013
@@ -113,7 +113,7 @@ public class LocalDirsHandlerService ext
    * 
    */
   @Override
-  public void init(Configuration config) {
+  protected void serviceInit(Configuration config) throws Exception {
     // Clone the configuration as we may do modifications to dirs-list
     Configuration conf = new Configuration(config);
     diskHealthCheckInterval = conf.getLong(
@@ -126,7 +126,7 @@ public class LocalDirsHandlerService ext
         YarnConfiguration.NM_MIN_HEALTHY_DISKS_FRACTION,
         YarnConfiguration.DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION);
     lastDisksCheckTime = System.currentTimeMillis();
-    super.init(conf);
+    super.serviceInit(conf);
 
     FileContext localFs;
     try {
@@ -150,24 +150,24 @@ public class LocalDirsHandlerService ext
    * Method used to start the disk health monitoring, if enabled.
    */
   @Override
-  public void start() {
+  protected void serviceStart() throws Exception {
     if (isDiskHealthCheckerEnabled) {
       dirsHandlerScheduler = new Timer("DiskHealthMonitor-Timer", true);
       dirsHandlerScheduler.scheduleAtFixedRate(monitoringTimerTask,
           diskHealthCheckInterval, diskHealthCheckInterval);
     }
-    super.start();
+    super.serviceStart();
   }
 
   /**
    * Method used to terminate the disk health monitoring service.
    */
   @Override
-  public void stop() {
+  protected void serviceStop() throws Exception {
     if (dirsHandlerScheduler != null) {
       dirsHandlerScheduler.cancel();
     }
-    super.stop();
+    super.serviceStop();
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java Thu Jun 13 15:54:38 2013
@@ -39,13 +39,13 @@ public class NodeHealthCheckerService ex
   }
 
   @Override
-  public void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     if (NodeHealthScriptRunner.shouldRun(conf)) {
       nodeHealthScriptRunner = new NodeHealthScriptRunner();
       addService(nodeHealthScriptRunner);
     }
     addService(dirsHandler);
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java Thu Jun 13 15:54:38 2013
@@ -197,7 +197,7 @@ public class NodeHealthScriptRunner exte
    * Method which initializes the values for the script path and interval time.
    */
   @Override
-  public void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     this.conf = conf;
     this.nodeHealthScript = 
         conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
@@ -209,6 +209,7 @@ public class NodeHealthScriptRunner exte
     String[] args = conf.getStrings(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS,
         new String[] {});
     timer = new NodeHealthMonitorExecutor(args);
+    super.serviceInit(conf);
   }
 
   /**
@@ -216,7 +217,7 @@ public class NodeHealthScriptRunner exte
    * 
    */
   @Override
-  public void start() {
+  protected void serviceStart() throws Exception {
     // if health script path is not configured don't start the thread.
     if (!shouldRun(conf)) {
       LOG.info("Not starting node health monitor");
@@ -226,6 +227,7 @@ public class NodeHealthScriptRunner exte
     // Start the timer task immediately and
     // then periodically at interval time.
     nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+    super.serviceStart();
   }
 
   /**
@@ -233,11 +235,13 @@ public class NodeHealthScriptRunner exte
    * 
    */
   @Override
-  public void stop() {
+  protected void serviceStop() {
     if (!shouldRun(conf)) {
       return;
     }
-    nodeHealthScriptScheduler.cancel();
+    if (nodeHealthScriptScheduler != null) {
+      nodeHealthScriptScheduler.cancel();
+    }
     if (shexec != null) {
       Process p = shexec.getProcess();
       if (p != null) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Thu Jun 13 15:54:38 2013
@@ -128,7 +128,7 @@ public class NodeManager extends Composi
   }
 
   @Override
-  public void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
 
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
@@ -192,31 +192,36 @@ public class NodeManager extends Composi
             YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
         SHUTDOWN_CLEANUP_SLOP_MS;
     
-    super.init(conf);
+    super.serviceInit(conf);
     // TODO add local dirs to del
   }
 
   @Override
-  public void start() {
+  protected void serviceStart() throws Exception {
     try {
       doSecureLogin();
     } catch (IOException e) {
       throw new YarnRuntimeException("Failed NodeManager login", e);
     }
-    super.start();
+    super.serviceStart();
   }
 
   @Override
-  public void stop() {
+  protected void serviceStop() throws Exception {
     if (isStopping.getAndSet(true)) {
       return;
     }
-
-    cleanupContainers(NodeManagerEventType.SHUTDOWN);
-    super.stop();
+    if (context != null) {
+      cleanupContainers(NodeManagerEventType.SHUTDOWN);
+    }
+    super.serviceStop();
     DefaultMetricsSystem.shutdown();
   }
 
+  public String getName() {
+    return "NodeManager";
+  }
+
   protected void resyncWithRM() {
     //we do not want to block dispatcher thread here
     new Thread() {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Thu Jun 13 15:54:38 2013
@@ -80,7 +80,7 @@ public class NodeStatusUpdaterImpl exten
   private InetSocketAddress rmAddress;
   private Resource totalResource;
   private int httpPort;
-  private boolean isStopped;
+  private volatile boolean isStopped;
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private boolean tokenKeepAliveEnabled;
   private long tokenRemovalDelayMs;
@@ -109,7 +109,7 @@ public class NodeStatusUpdaterImpl exten
   }
 
   @Override
-  public synchronized void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     this.rmAddress = conf.getSocketAddr(
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
         YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
@@ -146,11 +146,11 @@ public class NodeStatusUpdaterImpl exten
         " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
         " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
     
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public void start() {
+  protected void serviceStart() throws Exception {
 
     // NodeManager is the last service to start, so NodeId is available.
     this.nodeId = this.context.getNodeId();
@@ -159,7 +159,7 @@ public class NodeStatusUpdaterImpl exten
       // Registration has to be in start so that ContainerManager can get the
       // perNM tokens needed to authenticate ContainerTokens.
       registerWithRM();
-      super.start();
+      super.serviceStart();
       startStatusUpdater();
     } catch (Exception e) {
       String errorMessage = "Unexpected error starting NodeStatusUpdater";
@@ -169,10 +169,10 @@ public class NodeStatusUpdaterImpl exten
   }
 
   @Override
-  public synchronized void stop() {
+  protected void serviceStop() throws Exception {
     // Interrupt the updater.
     this.isStopped = true;
-    super.stop();
+    super.serviceStop();
   }
 
   protected void rebootNodeStatusUpdater() {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Thu Jun 13 15:54:38 2013
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -82,7 +81,7 @@ public class AuxServices extends Abstrac
   }
 
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     Collection<String> auxNames = conf.getStringCollection(
         YarnConfiguration.NM_AUX_SERVICES);
     for (final String sName : auxNames) {
@@ -110,11 +109,11 @@ public class AuxServices extends Abstrac
         throw e;
       }
     }
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public void start() {
+  public void serviceStart() throws Exception {
     // TODO fork(?) services running as configured user
     //      monitor for health, shutdown/restart(?) if any should die
     for (Map.Entry<String, AuxiliaryService> entry : serviceMap.entrySet()) {
@@ -127,11 +126,11 @@ public class AuxServices extends Abstrac
         serviceMeta.put(name, meta);
       }
     }
-    super.start();
+    super.serviceStart();
   }
 
   @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     try {
       synchronized (serviceMap) {
         for (Service service : serviceMap.values()) {
@@ -144,7 +143,7 @@ public class AuxServices extends Abstrac
         serviceMeta.clear();
       }
     } finally {
-      super.stop();
+      super.serviceStop();
     }
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Thu Jun 13 15:54:38 2013
@@ -177,13 +177,13 @@ public class ContainerManagerImpl extend
   }
 
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     LogHandler logHandler =
       createLogHandler(conf, this.context, this.deletionService);
     addIfService(logHandler);
     dispatcher.register(LogHandlerEventType.class, logHandler);
     
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   private void addIfService(Object object) {
@@ -220,7 +220,7 @@ public class ContainerManagerImpl extend
   }
 
   @Override
-  public void start() {
+  protected void serviceStart() throws Exception {
 
     // Enqueue user dirs in deletion context
 
@@ -254,7 +254,7 @@ public class ContainerManagerImpl extend
       connectAddress.getPort());
     ((NodeManager.NMContext)context).setNodeId(nodeId);
     LOG.info("ContainerManager started at " + connectAddress);
-    super.start();
+    super.serviceStart();
   }
 
   void refreshServiceAcls(Configuration configuration, 
@@ -263,14 +263,14 @@ public class ContainerManagerImpl extend
   }
 
   @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     if (auxiliaryServices.getServiceState() == STARTED) {
       auxiliaryServices.unregister(this);
     }
     if (server != null) {
       server.stop();
     }
-    super.stop();
+    super.serviceStop();
   }
 
   // Get the remoteUGI corresponding to the api call.

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Thu Jun 13 15:54:38 2013
@@ -91,20 +91,20 @@ public class ContainersLauncher extends 
   }
 
   @Override
-  public void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     try {
       //TODO Is this required?
       FileContext.getLocalFSFileContext(conf);
     } catch (UnsupportedFileSystemException e) {
       throw new YarnRuntimeException("Failed to start ContainersLauncher", e);
     }
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public void stop() {
+  protected  void serviceStop() throws Exception {
     containerLauncher.shutdownNow();
-    super.stop();
+    super.serviceStop();
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu Jun 13 15:54:38 2013
@@ -194,7 +194,7 @@ public class ResourceLocalizationService
   }
 
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     this.validateConf(conf);
     this.publicRsrc =
         new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
@@ -239,7 +239,7 @@ public class ResourceLocalizationService
     localizerTracker = createLocalizerTracker(conf);
     addService(localizerTracker);
     dispatcher.register(LocalizerEventType.class, localizerTracker);
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
@@ -248,7 +248,7 @@ public class ResourceLocalizationService
   }
 
   @Override
-  public void start() {
+  public void serviceStart() throws Exception {
     cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
         cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     server = createServer();
@@ -257,7 +257,7 @@ public class ResourceLocalizationService
         getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
                                       server.getListenerAddress());
     LOG.info("Localizer started on port " + server.getPort());
-    super.start();
+    super.serviceStart();
   }
 
   LocalizerTracker createLocalizerTracker(Configuration conf) {
@@ -288,12 +288,12 @@ public class ResourceLocalizationService
   }
 
   @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     if (server != null) {
       server.stop();
     }
     cacheCleanup.shutdown();
-    super.stop();
+    super.serviceStop();
   }
 
   @Override
@@ -536,9 +536,9 @@ public class ResourceLocalizationService
     }
     
     @Override
-    public synchronized void start() {
+    public synchronized void serviceStart() throws Exception {
       publicLocalizer.start();
-      super.start();
+      super.serviceStart();
     }
 
     public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
@@ -559,12 +559,12 @@ public class ResourceLocalizationService
     }
     
     @Override
-    public void stop() {
+    public void serviceStop() throws Exception {
       for (LocalizerRunner localizer : privLocalizers.values()) {
         localizer.interrupt();
       }
       publicLocalizer.interrupt();
-      super.stop();
+      super.serviceStop();
     }
 
     @Override

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Thu Jun 13 15:54:38 2013
@@ -114,7 +114,7 @@ public class LogAggregationService exten
           .build());
   }
 
-  public synchronized void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     this.remoteRootLogDir =
         new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@@ -122,22 +122,22 @@ public class LogAggregationService exten
         conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
 
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public synchronized void start() {
+  protected void serviceStart() throws Exception {
     // NodeId is only available during start, the following cannot be moved
     // anywhere else.
     this.nodeId = this.context.getNodeId();
-    super.start();
+    super.serviceStart();
   }
   
   @Override
-  public synchronized void stop() {
+  protected void serviceStop() throws Exception {
     LOG.info(this.getName() + " waiting for pending aggregation during exit");
     stopAggregators();
-    super.stop();
+    super.serviceStop();
   }
    
   private void stopAggregators() {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java Thu Jun 13 15:54:38 2013
@@ -69,16 +69,16 @@ public class NonAggregatingLogHandler ex
   }
 
   @Override
-  public void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     // Default 3 hours.
     this.deleteDelaySeconds =
         conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
     sched = createScheduledThreadPoolExecutor(conf);
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public void stop() {
+  protected void serviceStop() throws Exception {
     if (sched != null) {
       sched.shutdown();
       boolean isShutdown = false;
@@ -92,7 +92,7 @@ public class NonAggregatingLogHandler ex
         sched.shutdownNow();
       }
     }
-    super.stop();
+    super.serviceStop();
   }
 
   @SuppressWarnings("unchecked")

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java Thu Jun 13 15:54:38 2013
@@ -85,7 +85,7 @@ public class ContainersMonitorImpl exten
   }
 
   @Override
-  public synchronized void init(Configuration conf) {
+  protected void serviceInit(Configuration conf) throws Exception {
     this.monitoringInterval =
         conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
             YarnConfiguration.DEFAULT_NM_CONTAINER_MON_INTERVAL_MS);
@@ -151,7 +151,7 @@ public class ContainersMonitorImpl exten
                 1) + "). Thrashing might happen.");
       }
     }
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   private boolean isEnabled() {
@@ -175,15 +175,15 @@ public class ContainersMonitorImpl exten
   }
 
   @Override
-  public synchronized void start() {
+  protected void serviceStart() throws Exception {
     if (this.isEnabled()) {
       this.monitoringThread.start();
     }
-    super.start();
+    super.serviceStart();
   }
 
   @Override
-  public synchronized void stop() {
+  protected void serviceStop() throws Exception {
     if (this.isEnabled()) {
       this.monitoringThread.interrupt();
       try {
@@ -192,7 +192,7 @@ public class ContainersMonitorImpl exten
         ;
       }
     }
-    super.stop();
+    super.serviceStop();
   }
 
   private static class ProcessTreeInfo {



Mime
View raw message