hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [3/3] hbase git commit: HBASE-6778 Deprecate Chore; its a thread per task when we should have one thread to do all tasks (Jonathan Lawlor)
Date Fri, 30 Jan 2015 23:27:23 GMT
HBASE-6778 Deprecate Chore; its a thread per task when we should have one thread to do all tasks (Jonathan Lawlor)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/af84b746
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/af84b746
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/af84b746

Branch: refs/heads/branch-1
Commit: af84b746ceab1e4e6ed8a37ce8f1f4546ad3df5c
Parents: b9f5c6b
Author: stack <stack@apache.org>
Authored: Fri Jan 30 15:27:08 2015 -0800
Committer: stack <stack@apache.org>
Committed: Fri Jan 30 15:27:08 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ConnectionManager.java  |  79 +-
 .../java/org/apache/hadoop/hbase/AuthUtil.java  |  12 +-
 .../java/org/apache/hadoop/hbase/Chore.java     | 142 ----
 .../org/apache/hadoop/hbase/ChoreService.java   | 368 ++++++++
 .../org/apache/hadoop/hbase/ScheduledChore.java | 330 ++++++++
 .../apache/hadoop/hbase/TestChoreService.java   | 844 +++++++++++++++++++
 .../apache/hadoop/hbase/rest/RESTServlet.java   |  14 +-
 .../apache/hadoop/hbase/HealthCheckChore.java   |   8 +-
 .../java/org/apache/hadoop/hbase/Server.java    |   5 +
 .../hadoop/hbase/master/CatalogJanitor.java     |   9 +-
 .../hbase/master/ClusterStatusPublisher.java    |  37 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  32 +-
 .../hadoop/hbase/master/SplitLogManager.java    |  23 +-
 .../hbase/master/balancer/BalancerChore.java    |  13 +-
 .../master/balancer/ClusterStatusChore.java     |  13 +-
 .../hbase/master/cleaner/CleanerChore.java      |   8 +-
 .../hbase/regionserver/HRegionServer.java       | 109 ++-
 .../hbase/regionserver/HeapMemoryManager.java   |  48 +-
 .../regionserver/RegionServerServices.java      |   6 +-
 .../hbase/regionserver/ServerNonceManager.java  |  15 +-
 .../regionserver/StorefileRefresherChore.java   |   8 +-
 .../regionserver/ReplicationSyncUp.java         |   6 +
 .../org/apache/hadoop/hbase/tool/Canary.java    |  11 +-
 .../hadoop/hbase/util/ConnectionCache.java      |  19 +-
 .../hadoop/hbase/MockRegionServerServices.java  |   7 +-
 .../hadoop/hbase/backup/TestHFileArchiving.java |  11 +-
 .../TestZooKeeperTableArchiveClient.java        |   7 +-
 .../mapreduce/TestLoadIncrementalHFiles.java    |  30 +-
 .../hadoop/hbase/master/MockRegionServer.java   |   8 +-
 .../hbase/master/TestActiveMasterManager.java   |   6 +
 .../hadoop/hbase/master/TestCatalogJanitor.java |  19 +-
 .../hbase/master/TestClockSkewDetection.java    |   6 +
 .../hbase/master/TestSplitLogManager.java       |   6 +
 .../hbase/master/TestTableLockManager.java      |  15 +-
 .../hbase/master/cleaner/TestHFileCleaner.java  |   7 +-
 .../master/cleaner/TestHFileLinkCleaner.java    |  16 +-
 .../hbase/master/cleaner/TestLogsCleaner.java   |   6 +
 .../TestEndToEndSplitTransaction.java           |  21 +-
 .../regionserver/TestHeapMemoryManager.java     |  24 +-
 .../regionserver/TestServerNonceManager.java    |  13 +-
 .../hbase/regionserver/TestSplitLogWorker.java  |   6 +
 .../replication/TestReplicationStateZKImpl.java |   6 +
 .../TestReplicationTrackerZKImpl.java           |   6 +
 .../TestReplicationSourceManager.java           |  14 +-
 .../security/token/TestTokenAuthentication.java |   6 +
 .../apache/hadoop/hbase/util/MockServer.java    |   6 +
 46 files changed, 1931 insertions(+), 474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index b22d456..63094da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -42,9 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -56,11 +54,11 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
@@ -159,8 +157,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
@@ -563,8 +561,6 @@ class ConnectionManager {
     private final Object masterAndZKLock = new Object();
 
     private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
-    private final DelayedClosing delayedClosing =
-      DelayedClosing.createAndStart(this);
 
     // thread executor shared by all HTableInterface instances created
     // by this connection
@@ -1370,7 +1366,6 @@ class ConnectionManager {
       HConnection connection;
       MasterService.BlockingInterface stub;
       int userCount;
-      long keepAliveUntil = Long.MAX_VALUE;
 
       MasterServiceState (final HConnection connection) {
         super();
@@ -1616,71 +1611,6 @@ class ConnectionManager {
       }
     }
 
-    /**
-     * Creates a Chore thread to check the connections to master & zookeeper
-     *  and close them when they reach their closing time (
-     *  {@link MasterServiceState#keepAliveUntil} and
-     *  {@link #keepZooKeeperWatcherAliveUntil}). Keep alive time is
-     *  managed by the release functions and the variable {@link #keepAlive}
-     */
-    private static class DelayedClosing extends Chore implements Stoppable {
-      private HConnectionImplementation hci;
-      Stoppable stoppable;
-
-      private DelayedClosing(
-        HConnectionImplementation hci, Stoppable stoppable){
-        super(
-          "ZooKeeperWatcher and Master delayed closing for connection "+hci,
-          60*1000, // We check every minutes
-          stoppable);
-        this.hci = hci;
-        this.stoppable = stoppable;
-      }
-
-      static DelayedClosing createAndStart(HConnectionImplementation hci){
-        Stoppable stoppable = new Stoppable() {
-              private volatile boolean isStopped = false;
-              @Override public void stop(String why) { isStopped = true;}
-              @Override public boolean isStopped() {return isStopped;}
-            };
-
-        return new DelayedClosing(hci, stoppable);
-      }
-
-      protected void closeMasterProtocol(MasterServiceState protocolState) {
-        if (System.currentTimeMillis() > protocolState.keepAliveUntil) {
-          hci.closeMasterService(protocolState);
-          protocolState.keepAliveUntil = Long.MAX_VALUE;
-        }
-      }
-
-      @Override
-      protected void chore() {
-        synchronized (hci.masterAndZKLock) {
-          if (hci.canCloseZKW) {
-            if (System.currentTimeMillis() >
-              hci.keepZooKeeperWatcherAliveUntil) {
-
-              hci.closeZooKeeperWatcher();
-              hci.keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
-            }
-          }
-          closeMasterProtocol(hci.masterServiceState);
-          closeMasterProtocol(hci.masterServiceState);
-        }
-      }
-
-      @Override
-      public void stop(String why) {
-        stoppable.stop(why);
-      }
-
-      @Override
-      public boolean isStopped() {
-        return stoppable.isStopped();
-      }
-    }
-
     private void closeZooKeeperWatcher() {
       synchronized (masterAndZKLock) {
         if (keepAliveZookeeper != null) {
@@ -1703,7 +1633,6 @@ class ConnectionManager {
 
     private void resetMasterServiceState(final MasterServiceState mss) {
       mss.userCount++;
-      mss.keepAliveUntil = Long.MAX_VALUE;
     }
 
     @Override
@@ -2054,9 +1983,6 @@ class ConnectionManager {
       if (mss.getStub() == null) return;
       synchronized (masterAndZKLock) {
         --mss.userCount;
-        if (mss.userCount <= 0) {
-          mss.keepAliveUntil = System.currentTimeMillis() + keepAlive;
-        }
       }
     }
 
@@ -2356,7 +2282,6 @@ class ConnectionManager {
       if (this.closed) {
         return;
       }
-      delayedClosing.stop("Closing connection");
       closeMaster();
       shutdownBatchPool();
       this.closed = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java
index 282b5e3..f597935 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/AuthUtil.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -47,12 +46,12 @@ public class AuthUtil {
   /**
    * Checks if security is enabled and if so, launches chore for refreshing kerberos ticket.
    */
-  public static void launchAuthChore(Configuration conf) throws IOException {
+  public static ScheduledChore getAuthChore(Configuration conf) throws IOException {
     UserProvider userProvider = UserProvider.instantiate(conf);
     // login the principal (if using secure Hadoop)
     boolean securityEnabled =
         userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled();
-    if (!securityEnabled) return;
+    if (!securityEnabled) return null;
     String host = null;
     try {
       host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
@@ -87,7 +86,8 @@ public class AuthUtil {
     // e.g. 5min tgt * 0.8 = 4min refresh so interval is better be way less than 1min
     final int CHECK_TGT_INTERVAL = 30 * 1000; // 30sec
 
-    Chore refreshCredentials = new Chore("RefreshCredentials", CHECK_TGT_INTERVAL, stoppable) {
+    ScheduledChore refreshCredentials =
+        new ScheduledChore("RefreshCredentials", stoppable, CHECK_TGT_INTERVAL) {
       @Override
       protected void chore() {
         try {
@@ -97,7 +97,7 @@ public class AuthUtil {
         }
       }
     };
-    // Start the chore for refreshing credentials
-    Threads.setDaemonThreadRunning(refreshCredentials.getThread());
+
+    return refreshCredentials;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java
deleted file mode 100644
index c2c7964..0000000
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/Chore.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.Sleeper;
-
-/**
- * Chore is a task performed on a period in hbase.  The chore is run in its own
- * thread. This base abstract class provides while loop and sleeping facility.
- * If an unhandled exception, the threads exit is logged.
- * Implementers just need to add checking if there is work to be done and if
- * so, do it.  Its the base of most of the chore threads in hbase.
- *
- * <p>Don't subclass Chore if the task relies on being woken up for something to
- * do, such as an entry being added to a queue, etc.
- */
-@InterfaceAudience.Private
-public abstract class Chore extends HasThread {
-  private final Log LOG = LogFactory.getLog(this.getClass());
-  private final Sleeper sleeper;
-  protected final Stoppable stopper;
-
-  /**
-   * @param p Period at which we should run.  Will be adjusted appropriately
-   * should we find work and it takes time to complete.
-   * @param stopper When {@link Stoppable#isStopped()} is true, this thread will
-   * cleanup and exit cleanly.
-   */
-  public Chore(String name, final int p, final Stoppable stopper) {
-    super(name);
-    if (stopper == null){
-      throw new NullPointerException("stopper cannot be null");
-    }
-    this.sleeper = new Sleeper(p, stopper);
-    this.stopper = stopper;
-  }
-
-  /**
-   * This constructor is for test only. It allows to create an object and to call chore() on
-   *  it. There is no sleeper nor stoppable.
-   */
-  protected Chore(){
-    sleeper = null;
-    stopper = null;
-  }
-
-  /**
-   * @see java.lang.Thread#run()
-   */
-  @Override
-  public void run() {
-    try {
-      boolean initialChoreComplete = false;
-      while (!this.stopper.isStopped()) {
-        long startTime = System.currentTimeMillis();
-        try {
-          if (!initialChoreComplete) {
-            initialChoreComplete = initialChore();
-          } else {
-            chore();
-          }
-        } catch (Exception e) {
-          LOG.error("Caught exception", e);
-          if (this.stopper.isStopped()) {
-            continue;
-          }
-        }
-        this.sleeper.sleep(startTime);
-      }
-    } catch (Throwable t) {
-      LOG.fatal(getName() + "error", t);
-    } finally {
-      LOG.info(getName() + " exiting");
-      cleanup();
-    }
-  }
-
-  /**
-   * If the thread is currently sleeping, trigger the core to happen immediately.
-   * If it's in the middle of its operation, will begin another operation
-   * immediately after finishing this one.
-   */
-  public void triggerNow() {
-    this.sleeper.skipSleepCycle();
-  }
-
-  /*
-   * Exposed for TESTING!
-   * calls directly the chore method, from the current thread.
-   */
-  public void choreForTesting() {
-    chore();
-  }
-
-  /**
-   * Override to run a task before we start looping.
-   * @return true if initial chore was successful
-   */
-  protected boolean initialChore() {
-    // Default does nothing.
-    return true;
-  }
-
-  /**
-   * Look for chores.  If any found, do them else just return.
-   */
-  protected abstract void chore();
-
-  /**
-   * Sleep for period.
-   */
-  protected void sleep() {
-    this.sleeper.sleep();
-  }
-
-  /**
-   * Called when the chore has completed, allowing subclasses to cleanup any
-   * extra overhead
-   */
-  protected void cleanup() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
new file mode 100644
index 0000000..fd6cbc9
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ChoreService.java
@@ -0,0 +1,368 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
+ * periodically while sharing threads. The ChoreService is backed by a
+ * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
+ * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
+ * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
+ * <p>
+ * The ChoreService provides the ability to schedule, cancel, and trigger instances of
+ * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
+ * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
+ * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
+ * there may be a need to increase the number of threads if it is noticed that chores are no longer
+ * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
+ * made to reduce the number of running threads to see if chores can still meet their start times
+ * with a smaller thread pool.
+ * <p>
+ * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
+ * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
+ */
+@InterfaceAudience.Private
+public class ChoreService implements ChoreServicer {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+
+  /**
+   * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
+   */
+  public final static int MIN_CORE_POOL_SIZE = 1;
+
+  /**
+   * This thread pool is used to schedule all of the Chores
+   */
+  private final ScheduledThreadPoolExecutor scheduler;
+
+  /**
+   * Maps chores to their futures. Futures are used to control a chore's schedule
+   */
+  private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
+
+  /**
+   * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
+   * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
+   * increase the core pool size by 1 (otherwise a single long running chore whose execution is
+   * longer than its period would be able to spawn too many threads).
+   */
+  private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
+
+  /**
+   * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
+   * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
+   * running on. The prefix is useful because it allows us to monitor how the thread pool of a
+   * particular service changes over time VIA thread dumps.
+   */
+  private final String coreThreadPoolPrefix;
+
+  /**
+   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
+   *          spawned by this service
+   */
+  public ChoreService(final String coreThreadPoolPrefix) {
+    this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE);
+  }
+
+  /**
+   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
+   *          spawned by this service
+   * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor 
+   *          to during initialization. The default size is 1, but specifying a larger size may be
+   *          beneficial if you know that 1 thread will not be enough.
+   */
+  public ChoreService(final String coreThreadPoolPrefix, int corePoolSize) {
+    this.coreThreadPoolPrefix = coreThreadPoolPrefix;
+    if (corePoolSize < MIN_CORE_POOL_SIZE) corePoolSize = MIN_CORE_POOL_SIZE;
+    final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
+    scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
+    scheduler.setRemoveOnCancelPolicy(true);
+    scheduledChores = new HashMap<ScheduledChore, ScheduledFuture<?>>();
+    choresMissingStartTime = new HashMap<ScheduledChore, Boolean>();
+  }
+
+  /**
+   * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
+   *          spawned by this service
+   */
+  public static ChoreService getInstance(final String coreThreadPoolPrefix) {
+    return new ChoreService(coreThreadPoolPrefix);
+  }
+
+  /**
+   * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
+   *          instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
+   *          with a single ChoreService instance).
+   * @return true when the chore was successfully scheduled. false when the scheduling failed
+   *         (typically occurs when a chore is scheduled during shutdown of service)
+   */
+  public synchronized boolean scheduleChore(ScheduledChore chore) {
+    if (chore == null) return false;
+
+    try {
+      ScheduledFuture<?> future =
+          scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
+            chore.getTimeUnit());
+      chore.setChoreServicer(this);
+      scheduledChores.put(chore, future);
+      return true;
+    } catch (Exception exception) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Could not successfully schedule chore: " + chore.getName());
+      }
+      return false;
+    }
+  }
+
+  /**
+   * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
+   *          yet then this call is equivalent to a call to scheduleChore.
+   */
+  private synchronized void rescheduleChore(ScheduledChore chore) {
+    if (chore == null) return;
+
+    if (scheduledChores.containsKey(chore)) {
+      ScheduledFuture<?> future = scheduledChores.get(chore);
+      future.cancel(false);
+    }
+    scheduleChore(chore);
+  }
+
+  @Override
+  public synchronized void cancelChore(ScheduledChore chore) {
+    cancelChore(chore, false);
+  }
+
+  @Override
+  public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
+    if (chore != null && scheduledChores.containsKey(chore)) {
+      ScheduledFuture<?> future = scheduledChores.get(chore);
+      future.cancel(mayInterruptIfRunning);
+      scheduledChores.remove(chore);
+
+      // Removing a chore that was missing its start time means it may be possible
+      // to reduce the number of threads
+      if (choresMissingStartTime.containsKey(chore)) {
+        choresMissingStartTime.remove(chore);
+        requestCorePoolDecrease();
+      }
+    }
+  }
+
+  @Override
+  public synchronized boolean isChoreScheduled(ScheduledChore chore) {
+    return chore != null && scheduledChores.containsKey(chore)
+        && !scheduledChores.get(chore).isDone();
+  }
+
+  @Override
+  public synchronized boolean triggerNow(ScheduledChore chore) {
+    if (chore == null) {
+      return false;
+    } else {
+      rescheduleChore(chore);
+      return true;
+    }
+  }
+
+  /**
+   * @return number of chores that this service currently has scheduled
+   */
+  int getNumberOfScheduledChores() {
+    return scheduledChores.size();
+  }
+
+  /**
+   * @return number of chores that this service currently has scheduled that are missing their
+   *         scheduled start time
+   */
+  int getNumberOfChoresMissingStartTime() {
+    return choresMissingStartTime.size();
+  }
+
+  /**
+   * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
+   */
+  int getCorePoolSize() {
+    return scheduler.getCorePoolSize();
+  }
+
+  /**
+   * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
+   * daemon threads, and thus, don't prevent the JVM from shutting down
+   */
+  static class ChoreServiceThreadFactory implements ThreadFactory {
+    private final String threadPrefix;
+    private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
+    private AtomicInteger threadNumber = new AtomicInteger(1);
+
+    /**
+     * @param threadPrefix The prefix given to all threads created by this factory
+     */
+    public ChoreServiceThreadFactory(final String threadPrefix) {
+      this.threadPrefix = threadPrefix;
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread thread =
+          new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
+      thread.setDaemon(true);
+      return thread;
+    }
+  }
+
+  /**
+   * Represents a request to increase the number of core pool threads. Typically a request
+   * originates from the fact that the current core pool size is not sufficient to service all of
+   * the currently running Chores
+   * @return true when the request to increase the core pool size succeeds
+   */
+  private synchronized boolean requestCorePoolIncrease() {
+    // There is no point in creating more threads than scheduledChores.size since scheduled runs
+    // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
+    // amongst occurrences of the same chore).
+    if (scheduler.getCorePoolSize() < scheduledChores.size()) {
+      scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
+      printChoreServiceDetails("requestCorePoolIncrease");
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Represents a request to decrease the number of core pool threads. Typically a request
+   * originates from the fact that the current core pool size is more than sufficient to service the
+   * running Chores.
+   */
+  private synchronized void requestCorePoolDecrease() {
+    if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
+      scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
+      printChoreServiceDetails("requestCorePoolDecrease");
+    }
+  }
+
+  @Override
+  public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
+    if (chore == null || !scheduledChores.containsKey(chore)) return;
+
+    // If the chore has not caused an increase in the size of the core thread pool then request an
+    // increase. This allows each chore missing its start time to increase the core pool size by
+    // at most 1.
+    if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
+      choresMissingStartTime.put(chore, requestCorePoolIncrease());
+    }
+
+    // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
+    // the chore is NOT rescheduled, future executions of this chore will be delayed more and
+    // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
+    // idle threads to chores based on how delayed they are.
+    rescheduleChore(chore);
+    printChoreDetails("onChoreMissedStartTime", chore);
+  }
+
+  /**
+   * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
+   * in the middle of execution will be interrupted and shutdown. This service will be unusable
+   * after this method has been called (i.e. future scheduling attempts will fail).
+   */
+  public void shutdown() {
+    List<Runnable> ongoing = scheduler.shutdownNow();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + ongoing + " on shutdown");
+    }
+    cancelAllChores(true);
+    scheduledChores.clear();
+    choresMissingStartTime.clear();
+  }
+  
+  /**
+   * @return true when the service is shutdown and thus cannot be used anymore
+   */
+  public boolean isShutdown() {
+    return scheduler.isShutdown();
+  }
+
+  /**
+   * @return true when the service is shutdown and all threads have terminated
+   */
+  public boolean isTerminated() {
+    return scheduler.isTerminated();
+  }
+
+  private void cancelAllChores(final boolean mayInterruptIfRunning) {
+    ArrayList<ScheduledChore> choresToCancel = new ArrayList<ScheduledChore>();
+    // Build list of chores to cancel so we can iterate through a set that won't change
+    // as chores are cancelled. If we tried to cancel each chore while iterating through
+    // keySet the results would be undefined because the keySet would be changing
+    for (ScheduledChore chore : scheduledChores.keySet()) {
+      choresToCancel.add(chore);
+    }
+    for (ScheduledChore chore : choresToCancel) {
+      chore.cancel(mayInterruptIfRunning);
+    }
+    choresToCancel.clear();
+  }
+
+  /**
+   * Prints a summary of important details about the chore. Used for debugging purposes
+   */
+  private void printChoreDetails(final String header, ScheduledChore chore) {
+    LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
+    output.put(header, "");
+    output.put("Chore name: ", chore.getName());
+    output.put("Chore period: ", Integer.toString(chore.getPeriod()));
+    output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
+
+    for (Entry<String, String> entry : output.entrySet()) {
+      if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
+    }
+  }
+
+  /**
+   * Prints a summary of important details about the service. Used for debugging purposes
+   */
+  private void printChoreServiceDetails(final String header) {
+    LinkedHashMap<String, String> output = new LinkedHashMap<String, String>();
+    output.put(header, "");
+    output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
+    output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
+    output.put("ChoreService missingStartTimeCount: ",
+      Integer.toString(getNumberOfChoresMissingStartTime()));
+
+    for (Entry<String, String> entry : output.entrySet()) {
+      if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
new file mode 100644
index 0000000..84002c5
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
@@ -0,0 +1,330 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
+ * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
+ * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
+ * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
+ * cancellation is logged. Implementers should consider whether or not the Chore will be able to
+ * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
+ * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
+ * thread pool.
+ * <p>
+ * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
+ * an entry being added to a queue, etc.
+ */
+@InterfaceAudience.Private
+public abstract class ScheduledChore implements Runnable {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+
+  private final String name;
+
+  /**
+   * Default values for scheduling parameters should they be excluded during construction
+   */
+  private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
+  private final static long DEFAULT_INITIAL_DELAY = 0;
+
+  /**
+   * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
+   */
+  private final int period;
+  private final TimeUnit timeUnit;
+  private final long initialDelay;
+
+  /**
+   * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
+   * not scheduled.
+   */
+  private ChoreServicer choreServicer;
+
+  /**
+   * Variables that encapsulate the meaningful state information
+   */
+  private long timeOfLastRun = -1;
+  private long timeOfThisRun = -1;
+  private boolean initialChoreComplete = false;
+
+  /**
+   * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
+   * stopped, it will cancel itself. This is particularly useful in the case where a single stopper
+   * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
+   * command can cause many chores to stop together.
+   */
+  private final Stoppable stopper;
+
+  interface ChoreServicer {
+    /**
+     * Cancel any ongoing schedules that this chore has with the implementer of this interface.
+     */
+    public void cancelChore(ScheduledChore chore);
+    public void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning);
+
+    /**
+     * @return true when the chore is scheduled with the implementer of this interface
+     */
+    public boolean isChoreScheduled(ScheduledChore chore);
+
+    /**
+     * This method tries to execute the chore immediately. If the chore is executing at the time of
+     * this call, the chore will begin another execution as soon as the current execution finishes
+     * <p>
+     * If the chore is not scheduled with a ChoreService, this call will fail.
+     * @return false when the chore could not be triggered immediately
+     */
+    public boolean triggerNow(ScheduledChore chore);
+
+    /**
+     * A callback that tells the implementer of this interface that one of the scheduled chores is
+     * missing its start time. The implication of a chore missing its start time is that the
+     * service's current means of scheduling may not be sufficient to handle the number of ongoing
+     * chores (the other explanation is that the chore's execution time is greater than its
+     * scheduled period). The service should try to increase its concurrency when this callback is
+     * received.
+     * @param chore The chore that missed its start time
+     */
+    public void onChoreMissedStartTime(ScheduledChore chore);
+  }
+
+  /**
+   * This constructor is for test only. It allows us to create an object and to call chore() on it.
+   */
+  protected ScheduledChore() {
+    this.name = null;
+    this.stopper = null;
+    this.period = 0;
+    this.initialDelay = DEFAULT_INITIAL_DELAY;
+    this.timeUnit = DEFAULT_TIME_UNIT;
+  }
+
+  /**
+   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
+   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
+   * @param period Period with which this Chore repeats execution when scheduled.
+   */
+  public ScheduledChore(final String name, Stoppable stopper, final int period) {
+    this(name, stopper, period, DEFAULT_INITIAL_DELAY);
+  }
+
+  /**
+   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
+   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
+   * @param period Period with which this Chore repeats execution when scheduled.
+   * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
+   *          value of 0 means the chore will begin to execute immediately. Negative delays are
+   *          invalid and will be corrected to a value of 0.
+   */
+  public ScheduledChore(final String name, Stoppable stopper, final int period,
+      final long initialDelay) {
+    this(name, stopper, period, initialDelay, DEFAULT_TIME_UNIT);
+  }
+
+  /**
+   * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
+   * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
+   * @param period Period with which this Chore repeats execution when scheduled.
+   * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
+   *          value of 0 means the chore will begin to execute immediately. Negative delays are
+   *          invalid and will be corrected to a value of 0.
+   * @param unit The unit that is used to measure period and initialDelay
+   */
+  public ScheduledChore(final String name, Stoppable stopper, final int period,
+      final long initialDelay, final TimeUnit unit) {
+    this.name = name;
+    this.stopper = stopper;
+    this.period = period;
+    this.initialDelay = initialDelay < 0 ? 0 : initialDelay;
+    this.timeUnit = unit;
+  }
+
+  synchronized void resetState() {
+    timeOfLastRun = -1;
+    timeOfThisRun = -1;
+    initialChoreComplete = false;
+  }
+
+  /**
+   * @see java.lang.Thread#run()
+   */
+  @Override
+  public synchronized void run() {
+    timeOfLastRun = timeOfThisRun;
+    timeOfThisRun = System.currentTimeMillis();
+    if (missedStartTime() && choreServicer != null) {
+      choreServicer.onChoreMissedStartTime(this);
+      if (LOG.isInfoEnabled()) LOG.info("Chore: " + getName() + " missed its start time");
+    } else if (stopper.isStopped()) {
+      cancel();
+      cleanup();
+      LOG.info("Chore: " + getName() + " was stopped");
+    } else {
+      try {
+        if (!initialChoreComplete) {
+          initialChoreComplete = initialChore();
+        } else {
+          chore();
+        }
+      } catch (Throwable t) {
+        LOG.error("Caught error", t);
+        if (this.stopper.isStopped()) {
+          cancel();
+          cleanup();
+        }
+      }
+    }
+  }
+
+  /**
+   * @return How long has it been since this chore last run. Useful for checking if the chore has
+   *         missed its scheduled start time by too large of a margin
+   */
+  synchronized long getTimeBetweenRuns() {
+    return timeOfThisRun - timeOfLastRun;
+  }
+
+  /**
+   * @return true when the time between runs exceeds the acceptable threshold
+   */
+  private synchronized boolean missedStartTime() {
+    return isValidTime(timeOfLastRun) && isValidTime(timeOfThisRun)
+        && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
+  }
+
+  private synchronized double getMaximumAllowedTimeBetweenRuns() {
+    // Threshold used to determine if the Chore's current run started too late
+    return 1.5 * period;
+  }
+
+  private synchronized boolean isValidTime(final long time) {
+    return time > 0 && time <= System.currentTimeMillis();
+  }
+
+  /**
+   * @return false when the Chore is not currently scheduled with a ChoreService
+   */
+  public synchronized boolean triggerNow() {
+    if (choreServicer != null) {
+      return choreServicer.triggerNow(this);
+    } else {
+      return false;
+    }
+  }
+
+  synchronized void setChoreServicer(ChoreServicer service) {
+    // Chores should only ever be scheduled with a single ChoreService. If the choreServicer
+    // is changing, cancel any existing schedules of this chore.
+    if (choreServicer != null && choreServicer != service) {
+      choreServicer.cancelChore(this, false);
+    }
+    choreServicer = service;
+    timeOfThisRun = System.currentTimeMillis();
+  }
+
+  public synchronized void cancel() {
+    cancel(false);
+  }
+
+  public synchronized void cancel(boolean mayInterruptIfRunning) {
+    if (choreServicer != null) choreServicer.cancelChore(this, mayInterruptIfRunning);
+
+    choreServicer = null;
+  }
+
+  public synchronized String getName() {
+    return name;
+  }
+
+  public synchronized Stoppable getStopper() {
+    return stopper;
+  }
+
+  public synchronized int getPeriod() {
+    return period;
+  }
+
+  public synchronized long getInitialDelay() {
+    return initialDelay;
+  }
+
+  public final synchronized TimeUnit getTimeUnit() {
+    return timeUnit;
+  }
+
+  public synchronized boolean isInitialChoreComplete() {
+    return initialChoreComplete;
+  }
+
+  @VisibleForTesting
+  synchronized ChoreServicer getChoreServicer() {
+    return choreServicer;
+  }
+
+  @VisibleForTesting
+  synchronized long getTimeOfLastRun() {
+    return timeOfLastRun;
+  }
+
+  @VisibleForTesting
+  synchronized long getTimeOfThisRun() {
+    return timeOfThisRun;
+  }
+
+  /**
+   * @return true when this Chore is scheduled with a ChoreService
+   */
+  public synchronized boolean isScheduled() {
+    return choreServicer != null && choreServicer.isChoreScheduled(this);
+  }
+
+  @VisibleForTesting
+  public synchronized void choreForTesting() {
+    chore();
+  }
+
+  /**
+   * The task to execute on each scheduled execution of the Chore
+   */
+  protected abstract void chore();
+
+  /**
+   * Override to run a task before we start looping.
+   * @return true if initial chore was successful
+   */
+  protected synchronized boolean initialChore() {
+    // Default does nothing
+    return true;
+  }
+
+  /**
+   * Override to run cleanup tasks when the Chore encounters an error and must stop running
+   */
+  protected synchronized void cleanup() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
new file mode 100644
index 0000000..35cc530
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestChoreService.java
@@ -0,0 +1,844 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.CountingChore;
+import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.DoNothingChore;
+import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.FailInitialChore;
+import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SampleStopper;
+import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SleepingChore;
+import org.apache.hadoop.hbase.TestChoreService.ScheduledChoreSamples.SlowChore;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestChoreService {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+  private final String TEST_SERVER_NAME = "testServerName";
+
+  /**
+   * A few ScheduledChore samples that are useful for testing with ChoreService
+   */
+  public static class ScheduledChoreSamples {
+    /**
+     * Straight forward stopper implementation that is used by default when one is not provided
+     */
+    public static class SampleStopper implements Stoppable {
+      private boolean stopped = false;
+
+      @Override
+      public void stop(String why) {
+        stopped = true;
+      }
+
+      @Override
+      public boolean isStopped() {
+        return stopped;
+      }
+    }
+
+    /**
+     * Sleeps for longer than the scheduled period. This chore always misses its scheduled periodic
+     * executions
+     */
+    public static class SlowChore extends ScheduledChore {
+      public SlowChore(String name, int period) {
+        this(name, new SampleStopper(), period);
+      }
+      
+      public SlowChore(String name, Stoppable stopper, int period) {
+        super(name, stopper, period);
+      }
+
+      @Override
+      protected boolean initialChore() {
+        try {
+          Thread.sleep(getPeriod() * 2);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        return true;
+      }
+
+      @Override
+      protected void chore() {
+        try {
+          Thread.sleep(getPeriod() * 2);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    /**
+     * Lightweight ScheduledChore used primarily to fill the scheduling queue in tests
+     */
+    public static class DoNothingChore extends ScheduledChore {
+      public DoNothingChore(String name, int period) {
+        super(name, new SampleStopper(), period);
+      }
+      
+      public DoNothingChore(String name, Stoppable stopper, int period) {
+        super(name, stopper, period);
+      }
+
+      @Override
+      protected void chore() {
+        // DO NOTHING
+      }
+
+    }
+
+    public static class SleepingChore extends ScheduledChore {
+      private int sleepTime;
+
+      public SleepingChore(String name, int chorePeriod, int sleepTime) {
+        this(name, new SampleStopper(), chorePeriod, sleepTime);
+      }
+
+      public SleepingChore(String name, Stoppable stopper, int period, int sleepTime) {
+        super(name, stopper, period);
+        this.sleepTime = sleepTime;
+      }
+      
+      @Override
+      protected boolean initialChore() {
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        return true;
+      }
+
+      @Override
+      protected void chore() {
+        try {
+          Thread.sleep(sleepTime);
+        } catch (Exception e) {
+          System.err.println(e.getStackTrace());
+        }
+      }
+    }
+
+    public static class CountingChore extends ScheduledChore {
+      private int countOfChoreCalls;
+      private boolean outputOnTicks = false;
+
+      public CountingChore(String name, int period) {
+        this(name, new SampleStopper(), period);
+      }
+      
+      public CountingChore(String name, Stoppable stopper, int period) {
+        this(name, stopper, period, false);
+      }
+
+      public CountingChore(String name, Stoppable stopper, int period, 
+          final boolean outputOnTicks) {
+        super(name, stopper, period);
+        this.countOfChoreCalls = 0;
+        this.outputOnTicks = outputOnTicks;
+      }
+
+      @Override
+      protected boolean initialChore() {
+        countOfChoreCalls++;
+        if (outputOnTicks) outputTickCount();
+        return true;
+      }
+
+      @Override
+      protected void chore() {
+        countOfChoreCalls++;
+        if (outputOnTicks) outputTickCount();
+      }
+
+      private void outputTickCount() {
+        System.out.println("Chore: " + getName() + ". Count of chore calls: " + countOfChoreCalls);
+      }
+
+      public int getCountOfChoreCalls() {
+        return countOfChoreCalls;
+      }
+
+      public boolean isOutputtingOnTicks() {
+        return outputOnTicks;
+      }
+
+      public void setOutputOnTicks(boolean o) {
+        outputOnTicks = o;
+      }
+    }
+
+    /**
+     * A Chore that will try to execute the initial chore a few times before succeeding. Once the
+     * initial chore is complete the chore cancels itself
+     */
+    public static class FailInitialChore extends ScheduledChore {
+      private int numberOfFailures;
+      private int failureThreshold;
+      
+      /**
+       * @param failThreshold Number of times the Chore fails when trying to execute initialChore
+       *          before succeeding.
+       */
+      public FailInitialChore(String name, int period, int failThreshold) {
+        this(name, new SampleStopper(), period, failThreshold);
+      }
+      
+      public FailInitialChore(String name, Stoppable stopper, int period, int failThreshold) {
+        super(name, stopper, period);
+        numberOfFailures = 0;
+        failureThreshold = failThreshold;
+      }
+
+      @Override
+      protected boolean initialChore() {
+        if (numberOfFailures < failureThreshold) {
+          numberOfFailures++;
+          return false;
+        } else {
+          return true;
+        }
+      }
+
+      @Override
+      protected void chore() {
+        assertTrue(numberOfFailures == failureThreshold);
+        cancel(false);
+      }
+
+    }
+  }
+
+  @Test
+  public void testInitialChorePrecedence() throws InterruptedException {
+    ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
+
+    final int period = 100;
+    final int failureThreshold = 5;
+    ScheduledChore chore = new FailInitialChore("chore", period, failureThreshold);
+    service.scheduleChore(chore);
+
+    int loopCount = 0;
+    boolean brokeOutOfLoop = false;
+    
+    while (!chore.isInitialChoreComplete() && chore.isScheduled()) {
+      Thread.sleep(failureThreshold * period);
+      loopCount++;
+      if (loopCount > 3) {
+        brokeOutOfLoop = true;
+        break;
+      }
+    }
+
+    assertFalse(brokeOutOfLoop);
+  }
+
+  @Test
+  public void testCancelChore() throws InterruptedException {
+    final int period = 100;
+    ScheduledChore chore1 = new DoNothingChore("chore1", period);
+    ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
+
+    service.scheduleChore(chore1);
+    assertTrue(chore1.isScheduled());
+
+    chore1.cancel(true);
+    assertFalse(chore1.isScheduled());
+    assertTrue(service.getNumberOfScheduledChores() == 0);
+  }
+
+  @Test
+  public void testScheduledChoreConstruction() {
+    final String NAME = "chore";
+    final int PERIOD = 100;
+    final long VALID_DELAY = 0;
+    final long INVALID_DELAY = -100;
+    final TimeUnit UNIT = TimeUnit.NANOSECONDS;
+
+    ScheduledChore chore1 =
+        new ScheduledChore(NAME, new SampleStopper(), PERIOD, VALID_DELAY, UNIT) {
+      @Override
+      protected void chore() {
+        // DO NOTHING
+      }
+    };
+
+    assertEquals("Name construction failed", chore1.getName(), NAME);
+    assertEquals("Period construction failed", chore1.getPeriod(), PERIOD);
+    assertEquals("Initial Delay construction failed", chore1.getInitialDelay(), VALID_DELAY);
+    assertEquals("TimeUnit construction failed", chore1.getTimeUnit(), UNIT);
+
+    ScheduledChore invalidDelayChore =
+        new ScheduledChore(NAME, new SampleStopper(), PERIOD, INVALID_DELAY, UNIT) {
+      @Override
+      protected void chore() {
+        // DO NOTHING
+      }
+    };
+
+    assertEquals("Initial Delay should be set to 0 when invalid", 0,
+      invalidDelayChore.getInitialDelay());
+  }
+
+  @Test
+  public void testChoreServiceConstruction() {
+    final int corePoolSize = 10;
+    final int defaultCorePoolSize = ChoreService.MIN_CORE_POOL_SIZE;
+
+    ChoreService customInit = new ChoreService(TEST_SERVER_NAME, corePoolSize);
+    assertEquals(corePoolSize, customInit.getCorePoolSize());
+
+    ChoreService defaultInit = new ChoreService(TEST_SERVER_NAME);
+    assertEquals(defaultCorePoolSize, defaultInit.getCorePoolSize());
+
+    ChoreService invalidInit = new ChoreService(TEST_SERVER_NAME, -10);
+    assertEquals(defaultCorePoolSize, invalidInit.getCorePoolSize());
+  }
+
+  @Test 
+  public void testFrequencyOfChores() throws InterruptedException {
+    final int period = 100;
+    // Small delta that acts as time buffer (allowing chores to complete if running slowly)
+    final int delta = 5;
+    ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
+    CountingChore chore = new CountingChore("countingChore", period);
+    service.scheduleChore(chore);
+
+    Thread.sleep(10 * period + delta);
+    assertTrue(chore.getCountOfChoreCalls() == 11);
+
+    Thread.sleep(10 * period);
+    assertTrue(chore.getCountOfChoreCalls() == 21);
+  }
+
+  @Test
+  public void testForceTrigger() throws InterruptedException {
+    final int period = 100;
+    final int delta = 5;
+    ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
+    CountingChore chore = new CountingChore("countingChore", period);
+    service.scheduleChore(chore);
+    Thread.sleep(10 * period + delta);
+
+    assertTrue(chore.getCountOfChoreCalls() == 11);
+
+    // Force five runs of the chore to occur, sleeping between triggers to ensure the
+    // chore has time to run
+    chore.triggerNow();
+    Thread.sleep(delta);
+    chore.triggerNow();
+    Thread.sleep(delta);
+    chore.triggerNow();
+    Thread.sleep(delta);
+    chore.triggerNow();
+    Thread.sleep(delta);
+    chore.triggerNow();
+    Thread.sleep(delta);
+
+    assertTrue(chore.getCountOfChoreCalls() == 16);
+
+    Thread.sleep(10 * period + delta);
+
+    assertTrue(chore.getCountOfChoreCalls() == 26);
+  }
+
+  @Test
+  public void testCorePoolIncrease() throws InterruptedException {
+    final int initialCorePoolSize = 3;
+    ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize);
+    assertEquals("Should have a core pool of size: " + initialCorePoolSize, initialCorePoolSize,
+        service.getCorePoolSize());
+    
+    final int slowChorePeriod = 100;
+    SlowChore slowChore1 = new SlowChore("slowChore1", slowChorePeriod);
+    SlowChore slowChore2 = new SlowChore("slowChore2", slowChorePeriod);
+    SlowChore slowChore3 = new SlowChore("slowChore3", slowChorePeriod);
+
+    service.scheduleChore(slowChore1);
+    service.scheduleChore(slowChore2);
+    service.scheduleChore(slowChore3);
+    
+    Thread.sleep(slowChorePeriod * 10);
+    assertEquals("Should not create more pools than scheduled chores", 3, 
+      service.getCorePoolSize());
+
+    SlowChore slowChore4 = new SlowChore("slowChore4", slowChorePeriod);
+    service.scheduleChore(slowChore4);
+
+    Thread.sleep(slowChorePeriod * 10);
+    assertEquals("Chores are missing their start time. Should expand core pool size", 4,
+      service.getCorePoolSize());
+
+    SlowChore slowChore5 = new SlowChore("slowChore5", slowChorePeriod);
+    service.scheduleChore(slowChore5);
+
+    Thread.sleep(slowChorePeriod * 10);
+    assertEquals("Chores are missing their start time. Should expand core pool size", 5,
+      service.getCorePoolSize());
+  }
+
+  @Test
+  public void testCorePoolDecrease() throws InterruptedException {
+    final int initialCorePoolSize = 3;
+    ChoreService service = new ChoreService(TEST_SERVER_NAME, initialCorePoolSize);
+    final int chorePeriod = 10;
+
+    // Slow chores always miss their start time and thus the core pool size should be at least as
+    // large as the number of running slow chores
+    SlowChore slowChore1 = new SlowChore("slowChore1", chorePeriod);
+    SlowChore slowChore2 = new SlowChore("slowChore2", chorePeriod);
+    SlowChore slowChore3 = new SlowChore("slowChore3", chorePeriod);
+
+    service.scheduleChore(slowChore1);
+    service.scheduleChore(slowChore2);
+    service.scheduleChore(slowChore3);
+
+    Thread.sleep(chorePeriod * 10);
+    assertEquals("Should not create more pools than scheduled chores",
+      service.getNumberOfScheduledChores(), service.getCorePoolSize());
+
+    SlowChore slowChore4 = new SlowChore("slowChore4", chorePeriod);
+    service.scheduleChore(slowChore4);
+    Thread.sleep(chorePeriod * 10);
+    assertEquals("Chores are missing their start time. Should expand core pool size",
+      service.getNumberOfScheduledChores(), service.getCorePoolSize());
+
+    SlowChore slowChore5 = new SlowChore("slowChore5", chorePeriod);
+    service.scheduleChore(slowChore5);
+    Thread.sleep(chorePeriod * 10);
+    assertEquals("Chores are missing their start time. Should expand core pool size",
+      service.getNumberOfScheduledChores(), service.getCorePoolSize());
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 5);
+
+    slowChore5.cancel();
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+      service.getCorePoolSize());
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 4);
+
+    slowChore4.cancel();
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+      service.getCorePoolSize());
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 3);
+
+    slowChore3.cancel();
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+      service.getCorePoolSize());
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
+
+    slowChore2.cancel();
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+      service.getCorePoolSize());
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 1);
+
+    slowChore1.cancel();
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+      service.getCorePoolSize());
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 0);
+
+    slowChore1.resetState();
+    service.scheduleChore(slowChore1);
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+      service.getCorePoolSize());
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 1);
+
+    slowChore2.resetState();
+    service.scheduleChore(slowChore2);
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(Math.max(ChoreService.MIN_CORE_POOL_SIZE, service.getNumberOfScheduledChores()),
+      service.getCorePoolSize());
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
+
+    DoNothingChore fastChore1 = new DoNothingChore("fastChore1", chorePeriod);
+    service.scheduleChore(fastChore1);
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
+    assertEquals("Should increase", 3, service.getCorePoolSize());
+
+    DoNothingChore fastChore2 = new DoNothingChore("fastChore2", chorePeriod);
+    service.scheduleChore(fastChore2);
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
+    assertEquals("Should increase", 3, service.getCorePoolSize());
+
+    DoNothingChore fastChore3 = new DoNothingChore("fastChore3", chorePeriod);
+    service.scheduleChore(fastChore3);
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
+    assertEquals("Should not change", 3, service.getCorePoolSize());
+
+    DoNothingChore fastChore4 = new DoNothingChore("fastChore4", chorePeriod);
+    service.scheduleChore(fastChore4);
+    Thread.sleep(chorePeriod * 10);
+    assertEquals(service.getNumberOfChoresMissingStartTime(), 2);
+    assertEquals("Should not change", 3, service.getCorePoolSize());
+  }
+
+  @Test
+  public void testNumberOfRunningChores() throws InterruptedException {
+    ChoreService service = new ChoreService(TEST_SERVER_NAME);
+
+    final int period = 100;
+    final int sleepTime = 5;
+
+    DoNothingChore dn1 = new DoNothingChore("dn1", period);
+    DoNothingChore dn2 = new DoNothingChore("dn2", period);
+    DoNothingChore dn3 = new DoNothingChore("dn3", period);
+    DoNothingChore dn4 = new DoNothingChore("dn4", period);
+    DoNothingChore dn5 = new DoNothingChore("dn5", period);
+
+    service.scheduleChore(dn1);
+    service.scheduleChore(dn2);
+    service.scheduleChore(dn3);
+    service.scheduleChore(dn4);
+    service.scheduleChore(dn5);
+
+    Thread.sleep(sleepTime);
+    assertEquals("Scheduled chore mismatch", 5, service.getNumberOfScheduledChores());
+
+    dn1.cancel();
+    Thread.sleep(sleepTime);
+    assertEquals("Scheduled chore mismatch", 4, service.getNumberOfScheduledChores());
+
+    dn2.cancel();
+    dn3.cancel();
+    dn4.cancel();
+    Thread.sleep(sleepTime);
+    assertEquals("Scheduled chore mismatch", 1, service.getNumberOfScheduledChores());
+
+    dn5.cancel();
+    Thread.sleep(sleepTime);
+    assertEquals("Scheduled chore mismatch", 0, service.getNumberOfScheduledChores());
+  }
+
+  @Test
+  public void testNumberOfChoresMissingStartTime() throws InterruptedException {
+    ChoreService service = new ChoreService(TEST_SERVER_NAME);
+
+    final int period = 100;
+    final int sleepTime = 5 * period;
+
+    // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
+    // ALWAYS miss their start time since their execution takes longer than their period
+    SlowChore sc1 = new SlowChore("sc1", period);
+    SlowChore sc2 = new SlowChore("sc2", period);
+    SlowChore sc3 = new SlowChore("sc3", period);
+    SlowChore sc4 = new SlowChore("sc4", period);
+    SlowChore sc5 = new SlowChore("sc5", period);
+
+    service.scheduleChore(sc1);
+    service.scheduleChore(sc2);
+    service.scheduleChore(sc3);
+    service.scheduleChore(sc4);
+    service.scheduleChore(sc5);
+
+    Thread.sleep(sleepTime);
+    assertEquals(5, service.getNumberOfChoresMissingStartTime());
+
+    sc1.cancel();
+    Thread.sleep(sleepTime);
+    assertEquals(4, service.getNumberOfChoresMissingStartTime());
+
+    sc2.cancel();
+    sc3.cancel();
+    sc4.cancel();
+    Thread.sleep(sleepTime);
+    assertEquals(1, service.getNumberOfChoresMissingStartTime());
+
+    sc5.cancel();
+    Thread.sleep(sleepTime);
+    assertEquals(0, service.getNumberOfChoresMissingStartTime());
+  }
+
+  /**
+   * ChoreServices should never have a core pool size that exceeds the number of chores that have
+   * been scheduled with the service. For example, if 4 ScheduledChores are scheduled with a
+   * ChoreService, the number of threads in the ChoreService's core pool should never exceed 4
+   */
+  @Test
+  public void testMaximumChoreServiceThreads() throws InterruptedException {
+    ChoreService service = new ChoreService(TEST_SERVER_NAME);
+
+    final int period = 10;
+    final int sleepTime = 5 * period;
+
+    // Slow chores sleep for a length of time LONGER than their period. Thus, SlowChores
+    // ALWAYS miss their start time since their execution takes longer than their period.
+    // Chores that miss their start time will trigger the onChoreMissedStartTime callback
+    // in the ChoreService. This callback will try to increase the number of core pool
+    // threads.
+    SlowChore sc1 = new SlowChore("sc1", period);
+    SlowChore sc2 = new SlowChore("sc2", period);
+    SlowChore sc3 = new SlowChore("sc3", period);
+    SlowChore sc4 = new SlowChore("sc4", period);
+    SlowChore sc5 = new SlowChore("sc5", period);
+
+    service.scheduleChore(sc1);
+    service.scheduleChore(sc2);
+    service.scheduleChore(sc3);
+    service.scheduleChore(sc4);
+    service.scheduleChore(sc5);
+    
+    Thread.sleep(sleepTime);
+    assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
+
+    SlowChore sc6 = new SlowChore("sc6", period);
+    SlowChore sc7 = new SlowChore("sc7", period);
+    SlowChore sc8 = new SlowChore("sc8", period);
+    SlowChore sc9 = new SlowChore("sc9", period);
+    SlowChore sc10 = new SlowChore("sc10", period);
+
+    service.scheduleChore(sc6);
+    service.scheduleChore(sc7);
+    service.scheduleChore(sc8);
+    service.scheduleChore(sc9);
+    service.scheduleChore(sc10);
+
+    Thread.sleep(sleepTime);
+    assertTrue(service.getCorePoolSize() <= service.getNumberOfScheduledChores());
+  }
+
+  @Test
+  public void testScheduledChoreReset() throws InterruptedException {
+    final int period = 100;
+    ChoreService service = new ChoreService(TEST_SERVER_NAME);
+    ScheduledChore chore = new DoNothingChore("sampleChore", period);
+
+    // TRUE
+    assertTrue(!chore.isInitialChoreComplete());
+    assertTrue(chore.getTimeOfLastRun() == -1);
+    assertTrue(chore.getTimeOfThisRun() == -1);
+
+    service.scheduleChore(chore);
+    Thread.sleep(5 * period);
+
+    // FALSE
+    assertFalse(!chore.isInitialChoreComplete());
+    assertFalse(chore.getTimeOfLastRun() == -1);
+    assertFalse(chore.getTimeOfThisRun() == -1);
+
+    chore.resetState();
+
+    // TRUE
+    assertTrue(!chore.isInitialChoreComplete());
+    assertTrue(chore.getTimeOfLastRun() == -1);
+    assertTrue(chore.getTimeOfThisRun() == -1);
+  }
+
+  @Test
+  public void testChangingChoreServices() throws InterruptedException {
+    final int period = 100;
+    final int sleepTime = 10;
+    ChoreService service1 = new ChoreService(TEST_SERVER_NAME);
+    ChoreService service2 = new ChoreService(TEST_SERVER_NAME);
+    ScheduledChore chore = new DoNothingChore("sample", period);
+
+    assertFalse(chore.isScheduled());
+    assertFalse(service1.isChoreScheduled(chore));
+    assertFalse(service2.isChoreScheduled(chore));
+    assertTrue(chore.getChoreServicer() == null);
+
+    service1.scheduleChore(chore);
+    Thread.sleep(sleepTime);
+    assertTrue(chore.isScheduled());
+    assertTrue(service1.isChoreScheduled(chore));
+    assertFalse(service2.isChoreScheduled(chore));
+    assertFalse(chore.getChoreServicer() == null);
+
+    service2.scheduleChore(chore);
+    Thread.sleep(sleepTime);
+    assertTrue(chore.isScheduled());
+    assertFalse(service1.isChoreScheduled(chore));
+    assertTrue(service2.isChoreScheduled(chore));
+    assertFalse(chore.getChoreServicer() == null);
+
+    chore.cancel();
+    assertFalse(chore.isScheduled());
+    assertFalse(service1.isChoreScheduled(chore));
+    assertFalse(service2.isChoreScheduled(chore));
+    assertTrue(chore.getChoreServicer() == null);
+  }
+
+  @Test
+  public void testTriggerNowFailsWhenNotScheduled() throws InterruptedException {
+    final int period = 100;
+    // Small sleep time buffer to allow CountingChore to complete
+    final int sleep = 5;
+    ChoreService service = new ChoreService(TEST_SERVER_NAME);
+    CountingChore chore = new CountingChore("dn", period);
+
+    assertFalse(chore.triggerNow());
+    assertTrue(chore.getCountOfChoreCalls() == 0);
+
+    service.scheduleChore(chore);
+    Thread.sleep(sleep);
+    assertEquals(1, chore.getCountOfChoreCalls());
+    Thread.sleep(period);
+    assertEquals(2, chore.getCountOfChoreCalls());
+    assertTrue(chore.triggerNow());
+    Thread.sleep(sleep);
+    assertTrue(chore.triggerNow());
+    Thread.sleep(sleep);
+    assertTrue(chore.triggerNow());
+    Thread.sleep(sleep);
+    assertEquals(5, chore.getCountOfChoreCalls());
+  }
+
+  @Test
+  public void testStopperForScheduledChores() throws InterruptedException {
+    ChoreService service = ChoreService.getInstance(TEST_SERVER_NAME);
+    Stoppable stopperForGroup1 = new SampleStopper();
+    Stoppable stopperForGroup2 = new SampleStopper();
+    final int period = 100;
+    final int delta = 10;
+
+    ScheduledChore chore1_group1 = new DoNothingChore("c1g1", stopperForGroup1, period);
+    ScheduledChore chore2_group1 = new DoNothingChore("c2g1", stopperForGroup1, period);
+    ScheduledChore chore3_group1 = new DoNothingChore("c3g1", stopperForGroup1, period);
+
+    ScheduledChore chore1_group2 = new DoNothingChore("c1g2", stopperForGroup2, period);
+    ScheduledChore chore2_group2 = new DoNothingChore("c2g2", stopperForGroup2, period);
+    ScheduledChore chore3_group2 = new DoNothingChore("c3g2", stopperForGroup2, period);
+
+    service.scheduleChore(chore1_group1);
+    service.scheduleChore(chore2_group1);
+    service.scheduleChore(chore3_group1);
+    service.scheduleChore(chore1_group2);
+    service.scheduleChore(chore2_group2);
+    service.scheduleChore(chore3_group2);
+
+    Thread.sleep(delta);
+    Thread.sleep(10 * period);
+    assertTrue(chore1_group1.isScheduled());
+    assertTrue(chore2_group1.isScheduled());
+    assertTrue(chore3_group1.isScheduled());
+    assertTrue(chore1_group2.isScheduled());
+    assertTrue(chore2_group2.isScheduled());
+    assertTrue(chore3_group2.isScheduled());
+
+    stopperForGroup1.stop("test stopping group 1");
+    Thread.sleep(period);
+    assertFalse(chore1_group1.isScheduled());
+    assertFalse(chore2_group1.isScheduled());
+    assertFalse(chore3_group1.isScheduled());
+    assertTrue(chore1_group2.isScheduled());
+    assertTrue(chore2_group2.isScheduled());
+    assertTrue(chore3_group2.isScheduled());
+
+    stopperForGroup2.stop("test stopping group 2");
+    Thread.sleep(period);
+    assertFalse(chore1_group1.isScheduled());
+    assertFalse(chore2_group1.isScheduled());
+    assertFalse(chore3_group1.isScheduled());
+    assertFalse(chore1_group2.isScheduled());
+    assertFalse(chore2_group2.isScheduled());
+    assertFalse(chore3_group2.isScheduled());
+  }
+
+  @Test
+  public void testShutdownCancelsScheduledChores() throws InterruptedException {
+    final int period = 100;
+    ChoreService service = new ChoreService(TEST_SERVER_NAME);
+    ScheduledChore successChore1 = new DoNothingChore("sc1", period);
+    ScheduledChore successChore2 = new DoNothingChore("sc2", period);
+    ScheduledChore successChore3 = new DoNothingChore("sc3", period);
+
+    assertTrue(service.scheduleChore(successChore1));
+    assertTrue(successChore1.isScheduled());
+    assertTrue(service.scheduleChore(successChore2));
+    assertTrue(successChore2.isScheduled());
+    assertTrue(service.scheduleChore(successChore3));
+    assertTrue(successChore3.isScheduled());
+
+    service.shutdown();
+
+    assertFalse(successChore1.isScheduled());
+    assertFalse(successChore2.isScheduled());
+    assertFalse(successChore3.isScheduled());
+  }
+
+  @Test
+  public void testShutdownWorksWhileChoresAreExecuting() throws InterruptedException {
+    final int period = 100;
+    final int sleep = 5 * period;
+    ChoreService service = new ChoreService(TEST_SERVER_NAME);
+    ScheduledChore slowChore1 = new SleepingChore("sc1", period, sleep);
+    ScheduledChore slowChore2 = new SleepingChore("sc2", period, sleep);
+    ScheduledChore slowChore3 = new SleepingChore("sc3", period, sleep);
+
+    assertTrue(service.scheduleChore(slowChore1));
+    assertTrue(service.scheduleChore(slowChore2));
+    assertTrue(service.scheduleChore(slowChore3));
+
+    Thread.sleep(sleep / 2);
+    service.shutdown();
+
+    assertFalse(slowChore1.isScheduled());
+    assertFalse(slowChore2.isScheduled());
+    assertFalse(slowChore3.isScheduled());
+    assertTrue(service.isShutdown());
+
+    Thread.sleep(5);
+    assertTrue(service.isTerminated());
+  }
+
+  @Test
+  public void testShutdownRejectsNewSchedules() throws InterruptedException {
+    final int period = 100;
+    ChoreService service = new ChoreService(TEST_SERVER_NAME);
+    ScheduledChore successChore1 = new DoNothingChore("sc1", period);
+    ScheduledChore successChore2 = new DoNothingChore("sc2", period);
+    ScheduledChore successChore3 = new DoNothingChore("sc3", period);
+    ScheduledChore failChore1 = new DoNothingChore("fc1", period);
+    ScheduledChore failChore2 = new DoNothingChore("fc2", period);
+    ScheduledChore failChore3 = new DoNothingChore("fc3", period);
+
+    assertTrue(service.scheduleChore(successChore1));
+    assertTrue(successChore1.isScheduled());
+    assertTrue(service.scheduleChore(successChore2));
+    assertTrue(successChore2.isScheduled());
+    assertTrue(service.scheduleChore(successChore3));
+    assertTrue(successChore3.isScheduled());
+
+    service.shutdown();
+
+    assertFalse(service.scheduleChore(failChore1));
+    assertFalse(failChore1.isScheduled());
+    assertFalse(service.scheduleChore(failChore2));
+    assertFalse(failChore2.isScheduled());
+    assertFalse(service.scheduleChore(failChore3));
+    assertFalse(failChore3.isScheduled());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java
index ff42271..bb93bc8 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.rest;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.ParseFilter;
@@ -74,7 +74,10 @@ public class RESTServlet implements Constants {
   }
 
   public synchronized static void stop() {
-    if (INSTANCE != null)  INSTANCE = null;
+    if (INSTANCE != null) {
+      INSTANCE.shutdown();
+      INSTANCE = null;
+    }
   }
 
   /**
@@ -130,6 +133,13 @@ public class RESTServlet implements Constants {
     connectionCache.setEffectiveUser(effectiveUser);
   }
 
+  /**
+   * Shutdown any services that need to stop
+   */
+  void shutdown() {
+    if (connectionCache != null) connectionCache.shutdown();
+  }
+
   boolean supportsProxyuser() {
     return conf.getBoolean(HBASE_REST_SUPPORT_PROXYUSER, false);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java
index 4226c3f..fc70ca4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HealthCheckChore.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.util.StringUtils;
 /**
  * The Class HealthCheckChore for running health checker regularly.
  */
- public class HealthCheckChore extends Chore {
+public class HealthCheckChore extends ScheduledChore {
   private static Log LOG = LogFactory.getLog(HealthCheckChore.class);
   private HealthChecker healthChecker;
   private Configuration config;
@@ -38,7 +38,7 @@ import org.apache.hadoop.util.StringUtils;
   private long startWindow;
 
   public HealthCheckChore(int sleepTime, Stoppable stopper, Configuration conf) {
-    super("HealthChecker", sleepTime, stopper);
+    super("HealthChecker", stopper, sleepTime);
     LOG.info("Health Check Chore runs every " + StringUtils.formatTime(sleepTime));
     this.config = conf;
     String healthCheckScript = this.config.get(HConstants.HEALTH_SCRIPT_LOC);
@@ -58,8 +58,8 @@ import org.apache.hadoop.util.StringUtils;
     if (!isHealthy) {
       boolean needToStop = decideToStop();
       if (needToStop) {
-        this.stopper.stop("The  node reported unhealthy " + threshold
-            + " number of times consecutively.");
+        getStopper().stop(
+          "The  node reported unhealthy " + threshold + " number of times consecutively.");
       }
       // Always log health report.
       LOG.info("Health status at " + StringUtils.formatTime(System.currentTimeMillis()) + " : "

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
index 6b79f80..85f8471 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java
@@ -65,4 +65,9 @@ public interface Server extends Abortable, Stoppable {
    * Get CoordinatedStateManager instance for this server.
    */
   CoordinatedStateManager getCoordinatedStateManager();
+
+  /**
+   * @return The {@link ChoreService} instance for this server
+   */
+  ChoreService getChoreService();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
index 9f71b90..9d18c98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java
@@ -31,12 +31,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
@@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.util.Triple;
  * table on a period looking for unused regions to garbage collect.
  */
 @InterfaceAudience.Private
-public class CatalogJanitor extends Chore {
+public class CatalogJanitor extends ScheduledChore {
   private static final Log LOG = LogFactory.getLog(CatalogJanitor.class.getName());
   private final Server server;
   private final MasterServices services;
@@ -66,9 +66,8 @@ public class CatalogJanitor extends Chore {
   private final Connection connection;
 
   CatalogJanitor(final Server server, final MasterServices services) {
-    super("CatalogJanitor-" + server.getServerName().toShortString(),
-      server.getConfiguration().getInt("hbase.catalogjanitor.interval", 300000),
-      server);
+    super("CatalogJanitor-" + server.getServerName().toShortString(), server, server
+        .getConfiguration().getInt("hbase.catalogjanitor.interval", 300000));
     this.server = server;
     this.services = services;
     this.connection = server.getConnection();

http://git-wip-us.apache.org/repos/asf/hbase/blob/af84b746/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
index 6e7024c..e90aae6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
@@ -35,22 +35,7 @@ import io.netty.channel.socket.DatagramPacket;
 import io.netty.channel.socket.InternetProtocolFamily;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.handler.codec.MessageToMessageEncoder;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import io.netty.util.internal.StringUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
-import org.apache.hadoop.hbase.util.Addressing;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.ExceptionUtil;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.util.VersionInfo;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -67,6 +52,22 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.VersionInfo;
+
 
 /**
  * Class to publish the cluster status to the client. This allows them to know immediately
@@ -75,7 +76,7 @@ import java.util.concurrent.ConcurrentMap;
  *  on the client the different timeouts, as the dead servers will be detected separately.
  */
 @InterfaceAudience.Private
-public class ClusterStatusPublisher extends Chore {
+public class ClusterStatusPublisher extends ScheduledChore {
   /**
    * The implementation class used to publish the status. Default is null (no publish).
    * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
@@ -115,8 +116,8 @@ public class ClusterStatusPublisher extends Chore {
   public ClusterStatusPublisher(HMaster master, Configuration conf,
                                 Class<? extends Publisher> publisherClass)
       throws IOException {
-    super("HBase clusterStatusPublisher for " + master.getName(),
-        conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD), master);
+    super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
+      STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
     this.master = master;
     this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
     try {


Mime
View raw message