aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zma...@apache.org
Subject [16/51] [partial] aurora git commit: Move packages from com.twitter.common to org.apache.aurora.common
Date Wed, 26 Aug 2015 21:00:06 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExecutorServiceShutdown.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExecutorServiceShutdown.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExecutorServiceShutdown.java
new file mode 100644
index 0000000..7aad9ef
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ExecutorServiceShutdown.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * An implementation of the graceful shutdown sequence recommended by {@link ExecutorService}.
+ *
+ * @author John Sirois
+ */
+public class ExecutorServiceShutdown implements Command {
+  private static final Logger LOG = Logger.getLogger(ExecutorServiceShutdown.class.getName());
+
+  private final ExecutorService executor;
+  private final Amount<Long, Time> gracePeriod;
+
+  /**
+   * Creates a new {@code ExecutorServiceShutdown} command that will try to gracefully shut down the
+   * given {@code executor} when executed.  If the supplied grace period is less than or equal to
+   * zero the executor service will be asked to shut down but no waiting will be done after these
+   * requests.
+   *
+   * @param executor The executor service this command should shut down when executed.
+   * @param gracePeriod The maximum time to wait after a shutdown request before continuing to the
+   *     next shutdown phase.
+   */
+  public ExecutorServiceShutdown(ExecutorService executor, Amount<Long, Time> gracePeriod) {
+    this.executor = Preconditions.checkNotNull(executor);
+    this.gracePeriod = Preconditions.checkNotNull(gracePeriod);
+  }
+
+  @Override
+  public void execute() {
+    executor.shutdown(); // Disable new tasks from being submitted.
+    try {
+       // Wait a while for existing tasks to terminate.
+      if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+        executor.shutdownNow(); // Cancel currently executing tasks.
+        // Wait a while for tasks to respond to being cancelled.
+        if (!executor.awaitTermination(gracePeriod.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS)) {
+          LOG.warning("Pool did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted.
+      executor.shutdownNow();
+      // Preserve interrupt status.
+      Thread.currentThread().interrupt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/ForwardingExecutorService.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/ForwardingExecutorService.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ForwardingExecutorService.java
new file mode 100644
index 0000000..b8a0fd9
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/ForwardingExecutorService.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.concurrent;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An executor service that forwards all calls to another executor service. Subclasses should
+ * override one or more methods to modify the behavior of the backing executor service as desired
+ * per the <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>.
+ *
+ * @author John Sirois
+ */
+public class ForwardingExecutorService<T extends ExecutorService> implements ExecutorService {
+  protected final T delegate;
+
+  public ForwardingExecutorService(T delegate) {
+    Preconditions.checkNotNull(delegate);
+    this.delegate = delegate;
+  }
+
+  public void shutdown() {
+    delegate.shutdown();
+  }
+
+  public List<Runnable> shutdownNow() {
+    return delegate.shutdownNow();
+  }
+
+  public boolean isShutdown() {
+    return delegate.isShutdown();
+  }
+
+  public boolean isTerminated() {
+    return delegate.isTerminated();
+  }
+
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return delegate.awaitTermination(timeout, unit);
+  }
+
+  public <T> Future<T> submit(Callable<T> task) {
+    return delegate.submit(task);
+  }
+
+  public <T> Future<T> submit(Runnable task, T result) {
+    return delegate.submit(task, result);
+  }
+
+  public Future<?> submit(Runnable task) {
+    return delegate.submit(task);
+  }
+
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+
+    return delegate.invokeAll(tasks);
+  }
+
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
+      TimeUnit unit) throws InterruptedException {
+
+    return delegate.invokeAll(tasks, timeout, unit);
+  }
+
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+
+    return delegate.invokeAny(tasks);
+  }
+
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+
+    return delegate.invokeAny(tasks, timeout, unit);
+  }
+
+  public void execute(Runnable command) {
+    delegate.execute(command);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/MoreExecutors.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/MoreExecutors.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/MoreExecutors.java
new file mode 100644
index 0000000..630b9aa
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/MoreExecutors.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+/**
+ * Utility class that provides factory functions to decorate
+ * {@link java.util.concurrent.ExecutorService}s.
+ */
+public final class MoreExecutors {
+  private MoreExecutors() {
+    // utility
+  }
+
+  /**
+   * Returns a {@link ExecutorService} that passes uncaught exceptions to
+   * {@link java.lang.Thread.UncaughtExceptionHandler}.
+   * <p>
+   * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and
+   * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of
+   * unchecked exceptions thrown from submitted work. Some users are surprised to find that
+   * even the default uncaught exception handler is not invoked.
+   *
+   * @param executorService delegate
+   * @param uncaughtExceptionHandler exception handler that will receive exceptions generated
+   *                                 from executor tasks.
+   * @return a decorated executor service
+   */
+  public static ExecutorService exceptionHandlingExecutor(
+      ExecutorService executorService,
+      Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+
+    Preconditions.checkNotNull(uncaughtExceptionHandler);
+    return new ExceptionHandlingExecutorService(
+        executorService, Suppliers.ofInstance(uncaughtExceptionHandler));
+  }
+
+  /**
+   * Returns a {@link ExecutorService} that passes uncaught exceptions to
+   * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler()
+   * at the time the exception is thrown.
+   *
+   * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ExecutorService,
+   *                                              Thread.UncaughtExceptionHandler)
+   * @param executorService delegate
+   * @return a decorated executor service
+   */
+  public static ExecutorService exceptionHandlingExecutor(ExecutorService executorService) {
+    return new ExceptionHandlingExecutorService(
+        executorService,
+        new Supplier<Thread.UncaughtExceptionHandler>() {
+          @Override
+          public Thread.UncaughtExceptionHandler get() {
+            return Thread.currentThread().getUncaughtExceptionHandler();
+          }
+        });
+  }
+
+  /**
+   * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to
+   * {@link java.lang.Thread.UncaughtExceptionHandler}.
+   * <p>
+   * This may be useful because {@link java.util.concurrent.ThreadPoolExecutor} and
+   * {@link java.util.concurrent.ScheduledThreadPoolExecutor} provide no built-in propagation of
+   * unchecked exceptions thrown from submitted work. Some users are surprised to find that
+   * even the default uncaught exception handler is not invoked.
+   *
+   * @param executorService delegate
+   * @param uncaughtExceptionHandler exception handler that will receive exceptions generated
+   *                                 from executor tasks.
+   * @return a decorated executor service
+   */
+  public static ScheduledExecutorService exceptionHandlingExecutor(
+      ScheduledExecutorService executorService,
+      Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+
+    Preconditions.checkNotNull(uncaughtExceptionHandler);
+    return new ExceptionHandlingScheduledExecutorService(
+        executorService,
+        Suppliers.ofInstance(uncaughtExceptionHandler));
+  }
+
+  /**
+   * Returns a {@link ScheduledExecutorService} that passes uncaught exceptions to
+   * a handler returned by Thread.currentThread().getDefaultUncaughtExceptionHandler()
+   * at the time the exception is thrown.
+   *
+   * @see MoreExecutors#exceptionHandlingExecutor(java.util.concurrent.ScheduledExecutorService,
+   *                                              Thread.UncaughtExceptionHandler)
+   * @param executorService delegate
+   * @return a decorated executor service
+   */
+  public static ScheduledExecutorService exceptionHandlingExecutor(
+      ScheduledExecutorService executorService) {
+
+    return new ExceptionHandlingScheduledExecutorService(
+        executorService,
+        new Supplier<Thread.UncaughtExceptionHandler>() {
+          @Override
+          public Thread.UncaughtExceptionHandler get() {
+            return Thread.currentThread().getUncaughtExceptionHandler();
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java
new file mode 100644
index 0000000..7448dc1
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.concurrent;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A future task that supports retries by resubmitting itself to an {@link ExecutorService}.
+ *
+ * @author William Farner
+ */
+public class RetryingFutureTask extends FutureTask<Boolean> {
+  private static Logger LOG = Logger.getLogger(RetryingFutureTask.class.getName());
+
+  protected final ExecutorService executor;
+  protected final int maxRetries;
+  protected int numRetries = 0;
+  protected final Callable<Boolean> callable;
+
+  /**
+   * Creates a new retrying future task that will execute a unit of work until successfully
+   * completed, or the retry limit has been reached.
+   *
+   * @param executor The executor service to resubmit the task to upon failure.
+   * @param callable The unit of work.  The work is considered successful when {@code true} is
+   *    returned.  It may return {@code false} or throw an exception when unsueccessful.
+   * @param maxRetries The maximum number of times to retry the task.
+   */
+  public RetryingFutureTask(ExecutorService executor, Callable<Boolean> callable, int maxRetries) {
+    super(callable);
+    this.callable = Preconditions.checkNotNull(callable);
+    this.executor = Preconditions.checkNotNull(executor);
+    this.maxRetries = maxRetries;
+  }
+
+  /**
+   * Invokes a retry of this task.
+   */
+  protected void retry() {
+    executor.execute(this);
+  }
+
+  @Override
+  public void run() {
+    boolean success = false;
+    try {
+      success = callable.call();
+    } catch (Exception e) {
+      LOG.log(Level.WARNING, "Exception while executing task.", e);
+    }
+
+    if (!success) {
+      numRetries++;
+      if (numRetries > maxRetries) {
+        LOG.severe("Task did not complete after " + maxRetries + " retries, giving up.");
+      } else {
+        LOG.info("Task was not successful, resubmitting (num retries: " + numRetries + ")");
+        retry();
+      }
+    } else {
+      set(true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/concurrent/TaskConverter.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/TaskConverter.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/TaskConverter.java
new file mode 100644
index 0000000..5971e37
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/concurrent/TaskConverter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.concurrent;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Collections2;
+
+final class TaskConverter {
+  private TaskConverter() {
+    // utility
+  }
+
+  /**
+   * Returns a wrapped {@link Runnable} that passes uncaught exceptions thrown from the
+   * original Runnable to {@link Thread.UncaughtExceptionHandler}.
+   *
+   * @param runnable runnable to be wrapped
+   * @param handler exception handler that will receive exceptions generated in the runnable
+   * @return wrapped runnable
+   */
+  static Runnable alertingRunnable(
+      final Runnable runnable,
+      final Supplier<Thread.UncaughtExceptionHandler> handler) {
+
+    return new Runnable() {
+      @Override
+      public void run() {
+        try {
+          runnable.run();
+        } catch (Throwable t) {
+          handler.get().uncaughtException(Thread.currentThread(), t);
+          throw Throwables.propagate(t);
+        }
+      }
+    };
+  }
+
+  /**
+   * Returns a wrapped {@link java.util.concurrent.Callable} that passes uncaught exceptions
+   * thrown from the original Callable to {@link Thread.UncaughtExceptionHandler}.
+   *
+   * @param callable callable to be wrapped
+   * @param handler exception handler that will receive exceptions generated in the callable
+   * @return wrapped callable
+   */
+  static <V> Callable<V> alertingCallable(
+      final Callable<V> callable,
+      final Supplier<Thread.UncaughtExceptionHandler> handler) {
+
+    return new Callable<V>() {
+      @Override
+      public V call() throws Exception {
+        try {
+          return callable.call();
+        } catch (Throwable t) {
+          handler.get().uncaughtException(Thread.currentThread(), t);
+          throw Throwables.propagate(t);
+        }
+      }
+    };
+  }
+
+  /*
+   * Calls #alertingCallable on a collection of callables
+   */
+  static <V> Collection<? extends Callable<V>> alertingCallables(
+      Collection<? extends Callable<V>> callables,
+      final Supplier<Thread.UncaughtExceptionHandler> handler) {
+
+    return Collections2.transform(callables, new Function<Callable<V>, Callable<V>>() {
+      @Override
+      public Callable<V> apply(Callable<V> callable) {
+        return alertingCallable(callable, handler);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java b/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java
new file mode 100644
index 0000000..927fb2b
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.logging;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.logging.LogManager;
+
+/**
+ * A custom java.util.logging configuration class that loads the logging configuration from a
+ * properties file resource (as opposed to a file as natively supported by LogManager via
+ * java.util.logging.config.file).  By default this configurator will look for the resource at
+ * /logging.properties but the resource path can be overridden by setting the system property with
+ * key {@link #LOGGING_PROPERTIES_RESOURCE_PATH java.util.logging.config.resource}.  To install this
+ * configurator you must specify the following system property:
+ * java.util.logging.config.class=com.twitter.common.util.logging.ResourceLoggingConfigurator
+ *
+ * @author John Sirois
+ */
+public class ResourceLoggingConfigurator {
+
+  /**
+   * A system property that controls where ResourceLoggingConfigurator looks for the logging
+   * configuration on the process classpath.
+   */
+  public static final String LOGGING_PROPERTIES_RESOURCE_PATH = "java.util.logging.config.resource";
+
+  public ResourceLoggingConfigurator() throws IOException {
+    String loggingPropertiesResourcePath =
+        System.getProperty(LOGGING_PROPERTIES_RESOURCE_PATH, "/logging.properties");
+    InputStream loggingConfig = getClass().getResourceAsStream(loggingPropertiesResourcePath);
+    Preconditions.checkNotNull(loggingConfig,
+        "Could not locate logging config file at resource path: %s", loggingPropertiesResourcePath);
+    LogManager.getLogManager().readConfiguration(loggingConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java b/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java
new file mode 100644
index 0000000..66bbb37
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.logging;
+
+import java.util.logging.LogManager;
+
+/**
+ * A LogManager which by default ignores calls to {@link #reset()}.  This is useful to avoid missing
+ * log statements that occur during vm shutdown.  The standard LogManager installs a
+ * {@link Runtime#addShutdownHook(Thread) shutdown hook} that disables logging and this subclass
+ * nullifies that shutdown hook by disabling any reset of the LogManager by default.
+ *
+ * @author John Sirois
+ */
+public class UnresettableLogManager extends LogManager {
+
+  /**
+   * The system property that controls which LogManager the java.util.logging subsystem should load.
+   */
+  public static final String LOGGING_MANAGER = "java.util.logging.manager";
+
+  /**
+   * A system property which can be used to control an {@code UnresettableLogManager}'s behavior.
+   * If the UnresettableLogManager is installed, but an application still wants
+   * {@link LogManager#reset()} behavior, they can set this property to "false".
+   */
+  private static final String LOGGING_MANAGER_IGNORERESET = "java.util.logging.manager.ignorereset";
+
+  @Override
+  public void reset() throws SecurityException {
+    if (Boolean.parseBoolean(System.getProperty(LOGGING_MANAGER_IGNORERESET, "true"))) {
+      System.err.println("UnresettableLogManager is ignoring a reset() request.");
+    } else {
+      super.reset();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java b/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java
new file mode 100644
index 0000000..2756af4
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.templating;
+
+import java.io.IOException;
+import java.io.Writer;
+
+import com.google.common.base.Preconditions;
+
+import org.antlr.stringtemplate.AutoIndentWriter;
+import org.antlr.stringtemplate.StringTemplate;
+import org.antlr.stringtemplate.StringTemplateGroup;
+
+import org.apache.aurora.common.base.Closure;
+import org.apache.aurora.common.base.MorePreconditions;
+
+/**
+ * A class to simplify the operations required to load a stringtemplate template file from the
+ * classpath and populate it.
+ */
+public class StringTemplateHelper {
+
+  private final StringTemplateGroup group;
+  private final String templatePath;
+
+  /**
+   * Creates a new template helper.
+   *
+   * @param templateContextClass Classpath context for the location of the template file.
+   * @param templateName Template file name (excluding .st suffix) relative to
+   *     {@code templateContextClass}.
+   * @param cacheTemplates Whether the template should be cached.
+   */
+  public StringTemplateHelper(
+      Class<?> templateContextClass,
+      String templateName,
+      boolean cacheTemplates) {
+
+    MorePreconditions.checkNotBlank(templateName);
+    String templatePath =
+        templateContextClass.getPackage().getName().replace('.', '/') + "/" + templateName;
+    StringTemplateGroup group = new StringTemplateGroup(templateName);
+    Preconditions.checkNotNull(group.getInstanceOf(templatePath),
+        "Failed to load template at: %s", templatePath);
+
+    this.group = group;
+    if (!cacheTemplates) {
+      group.setRefreshInterval(0);
+    }
+    this.templatePath = templatePath;
+  }
+
+  /**
+   * Thrown when an exception is encountered while populating a template.
+   */
+  public static class TemplateException extends Exception {
+    public TemplateException(String msg, Throwable cause) {
+      super(msg, cause);
+    }
+  }
+
+  /**
+   * Writes the populated template to an output writer by providing a closure with access to
+   * the unpopulated template object.
+   *
+   * @param out Template output writer.
+   * @param parameterSetter Closure to populate the template.
+   * @throws TemplateException If an exception was encountered while populating the template.
+   */
+  public void writeTemplate(
+      Writer out,
+      Closure<StringTemplate> parameterSetter) throws TemplateException {
+
+    Preconditions.checkNotNull(out);
+    Preconditions.checkNotNull(parameterSetter);
+
+    StringTemplate stringTemplate = group.getInstanceOf(templatePath);
+    try {
+      parameterSetter.execute(stringTemplate);
+      stringTemplate.write(new AutoIndentWriter(out));
+    } catch (IOException e) {
+      throw new TemplateException("Failed to write template: " + e, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java
new file mode 100644
index 0000000..2ed8b15
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeClock.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.testing;
+
+import com.google.common.base.Preconditions;
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.util.Clock;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A clock for use in testing with a configurable value for {@link #nowMillis()}.
+ *
+ * @author John Sirois
+ */
+public class FakeClock implements Clock {
+  // Tests may need to use the clock from multiple threads, ensure liveness.
+  private volatile long nowNanos;
+
+  /**
+   * Sets what {@link #nowMillis()} will return until this method is called again with a new value
+   * for {@code now}.
+   *
+   * @param nowMillis the current time in milliseconds
+   */
+  public void setNowMillis(long nowMillis) {
+    Preconditions.checkArgument(nowMillis >= 0);
+    this.nowNanos = TimeUnit.MILLISECONDS.toNanos(nowMillis);
+  }
+
+  /**
+   * Advances the current time by {@code millis} milliseconds.  Time can be retarded by passing a
+   * negative value.
+   *
+   * @param period the amount of time to advance the current time by
+   */
+  public void advance(Amount<Long, Time> period) {
+    Preconditions.checkNotNull(period);
+    long newNanos = nowNanos + period.as(Time.NANOSECONDS);
+    Preconditions.checkArgument(newNanos >= 0,
+        "invalid period %s - would move current time to a negative value: %sns", period, newNanos);
+    nowNanos = newNanos;
+  }
+
+  @Override
+  public long nowMillis() {
+    return TimeUnit.NANOSECONDS.toMillis(nowNanos);
+  }
+
+  @Override
+  public long nowNanos() {
+    return nowNanos;
+  }
+
+  /**
+   * Waits in fake time, immediately returning in real time; however a check of {@link #nowMillis}
+   * after this method completes will consistently reveal that {@code millis} did in fact pass while
+   * waiting.
+   *
+   * @param millis the amount of time to wait in milliseconds
+   */
+  @Override
+  public void waitFor(long millis) {
+    advance(Amount.of(millis, Time.MILLISECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/util/testing/FakeTicker.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/util/testing/FakeTicker.java b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeTicker.java
new file mode 100644
index 0000000..68247ad
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/util/testing/FakeTicker.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.util.testing;
+
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+/**
+ * A ticker for use in testing with a configurable value for {@link #Ticker#read()}.
+ */
+public class FakeTicker extends Ticker{
+  private long nowNanos;
+
+  /**
+   * Sets what {@link #read()} will return until this method is called again with a new value
+   * for {@code now}.
+   *
+   * @param nowNanos the current time in nanoseconds
+   */
+  public void setNowNanos(long nowNanos) {
+    this.nowNanos = nowNanos;
+  }
+
+  @Override
+  public long read(){
+    return nowNanos;
+  }
+
+  /**
+   * Advances the current time by the given {@code period}.  Time can be retarded by passing a
+   * negative value.
+   *
+   * @param period the amount of time to advance the current time by
+   */
+  public void advance(Amount<Long, Time> period) {
+    Preconditions.checkNotNull(period);
+    nowNanos = nowNanos + period.as(Time.NANOSECONDS);
+  }
+
+  /**
+   * Waits in fake time, immediately returning in real time; however a check of {@link #Ticker#read()}
+   * after this method completes will consistently reveal that {@code nanos} did in fact pass while
+   * waiting.
+   *
+   * @param nanos the amount of time to wait in nanoseconds
+   */
+  public void waitNanos(long nanos) {
+    advance(Amount.of(nanos, Time.NANOSECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
new file mode 100644
index 0000000..f679d92
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * Interface definition for becoming or querying for a ZooKeeper-based group leader.
+ */
+public interface Candidate {
+
+  /**
+   * Returns the current group leader by querying ZooKeeper synchronously.
+   *
+   * @return the current group leader's identifying data or {@link Optional#absent()} if there is
+   *     no leader
+   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
+   * @throws KeeperException if there was a problem reading the leader information
+   * @throws InterruptedException if this thread is interrupted getting the leader
+   */
+  public Optional<byte[]> getLeaderData()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException;
+
+  /**
+   * Encapsulates a leader that can be elected and subsequently defeated.
+   */
+  interface Leader {
+
+    /**
+     * Called when this leader has been elected.
+     *
+     * @param abdicate a command that can be used to abdicate leadership and force a new election
+     */
+    void onElected(ExceptionalCommand<JoinException> abdicate);
+
+    /**
+     * Called when the leader has been ousted.  Can occur either if the leader abdicates or if an
+     * external event causes the leader to lose its leadership role (session expiration).
+     */
+    void onDefeated();
+  }
+
+  /**
+   * Offers this candidate in leadership elections for as long as the current jvm process is alive.
+   * Upon election, the {@code onElected} callback will be executed and a command that can be used
+   * to abdicate leadership will be passed in.  If the elected leader jvm process dies or the
+   * elected leader successfully abdicates then a new leader will be elected.  Leaders that
+   * successfully abdicate are removed from the group and will not be eligible for leadership
+   * election unless {@link #offerLeadership(Leader)} is called again.
+   *
+   * @param leader the leader to notify of election and defeat events
+   * @throws JoinException if there was a problem joining the group
+   * @throws WatchException if there is a problem generating the 1st group membership list
+   * @throws InterruptedException if interrupted waiting to join the group and determine initial
+   *     election results
+   * @return a supplier that can be queried to find out if this leader is currently elected
+   */
+  public Supplier<Boolean> offerLeadership(Leader leader)
+        throws JoinException, WatchException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
new file mode 100644
index 0000000..e16a64d
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.Nullable;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.ExceptionalCommand;
+import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.zookeeper.Group.Membership;
+import org.apache.aurora.common.zookeeper.Group.WatchException;
+import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+
+/**
+ * Implements leader election for small groups of candidates.  This implementation is subject to the
+ * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
+ * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
+ */
+public class CandidateImpl implements Candidate {
+  private static final Logger LOG = Logger.getLogger(CandidateImpl.class.getName());
+
+  private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
+
+  private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = new Supplier<byte[]>() {
+    @Override public byte[] get() {
+      try {
+        return InetAddress.getLocalHost().getHostAddress().getBytes();
+      } catch (UnknownHostException e) {
+        LOG.log(Level.WARNING, "Failed to determine local address!", e);
+        return UNKNOWN_CANDIDATE_DATA;
+      }
+    }
+  };
+
+  private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
+      new Function<Iterable<String>, String>() {
+        @Override public String apply(Iterable<String> candidates) {
+          return Ordering.natural().min(candidates);
+        }
+      };
+
+  private final Group group;
+  private final Function<Iterable<String>, String> judge;
+  private final Supplier<byte[]> dataSupplier;
+
+  /**
+   * Equivalent to {@link #CandidateImpl(Group, com.google.common.base.Function, Supplier)} using a
+   * judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest or
+   * 1st candidate and a default supplier that provides the ip address of this host according to
+   * {@link java.net.InetAddress#getLocalHost()} as the leader identifying data.
+   */
+  public CandidateImpl(Group group) {
+    this(group, MOST_RECENT_JUDGE, IP_ADDRESS_DATA_SUPPLIER);
+  }
+
+  /**
+   * Creates a candidate that can be used to offer leadership for the given {@code group} using
+   * a judge that always picks the lowest numbered candidate ephemeral node - by proxy the oldest
+   * or 1st. The dataSupplier should produce bytes that identify this process as leader. These bytes
+   * will become available to all participants via the {@link Candidate#getLeaderData()} method.
+   */
+  public CandidateImpl(Group group, Supplier<byte[]> dataSupplier) {
+    this(group, MOST_RECENT_JUDGE, dataSupplier);
+  }
+
+  /**
+   * Creates a candidate that can be used to offer leadership for the given {@code group}.  The
+   * {@code judge} is used to pick the current leader from all group members whenever the group
+   * membership changes. To form a well-behaved election group with one leader, all candidates
+   * should use the same judge. The dataSupplier should produce bytes that identify this process
+   * as leader. These bytes will become available to all participants via the
+   * {@link Candidate#getLeaderData()} method.
+   */
+  public CandidateImpl(
+      Group group,
+      Function<Iterable<String>, String> judge,
+      Supplier<byte[]> dataSupplier) {
+    this.group = Preconditions.checkNotNull(group);
+    this.judge = Preconditions.checkNotNull(judge);
+    this.dataSupplier = Preconditions.checkNotNull(dataSupplier);
+  }
+
+  @Override
+  public Optional<byte[]> getLeaderData()
+      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
+
+    String leaderId = getLeader(group.getMemberIds());
+    return leaderId == null
+        ? Optional.<byte[]>absent()
+        : Optional.of(group.getMemberData(leaderId));
+  }
+
+  @Override
+  public Supplier<Boolean> offerLeadership(final Leader leader)
+      throws JoinException, WatchException, InterruptedException {
+
+    final Membership membership = group.join(dataSupplier, new Command() {
+      @Override public void execute() {
+        leader.onDefeated();
+      }
+    });
+
+    final AtomicBoolean elected = new AtomicBoolean(false);
+    final AtomicBoolean abdicated = new AtomicBoolean(false);
+    group.watch(new GroupChangeListener() {
+        @Override public void onGroupChange(Iterable<String> memberIds) {
+          boolean noCandidates = Iterables.isEmpty(memberIds);
+          String memberId = membership.getMemberId();
+
+          if (noCandidates) {
+            LOG.warning("All candidates have temporarily left the group: " + group);
+          } else if (!Iterables.contains(memberIds, memberId)) {
+            LOG.severe(String.format(
+                "Current member ID %s is not a candidate for leader, current voting: %s",
+                memberId, memberIds));
+          } else {
+            boolean electedLeader = memberId.equals(getLeader(memberIds));
+            boolean previouslyElected = elected.getAndSet(electedLeader);
+
+            if (!previouslyElected && electedLeader) {
+              LOG.info(String.format("Candidate %s is now leader of group: %s",
+                  membership.getMemberPath(), memberIds));
+
+              leader.onElected(new ExceptionalCommand<JoinException>() {
+                @Override public void execute() throws JoinException {
+                  membership.cancel();
+                  abdicated.set(true);
+                }
+              });
+            } else if (!electedLeader) {
+              if (previouslyElected) {
+                leader.onDefeated();
+              }
+              LOG.info(String.format(
+                  "Candidate %s waiting for the next leader election, current voting: %s",
+                  membership.getMemberPath(), memberIds));
+            }
+          }
+        }
+      });
+
+    return new Supplier<Boolean>() {
+        @Override public Boolean get() {
+          return !abdicated.get() && elected.get();
+        }
+      };
+  }
+
+  @Nullable
+  private String getLeader(Iterable<String> memberIds) {
+    return Iterables.isEmpty(memberIds) ? null : judge.apply(memberIds);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java
new file mode 100644
index 0000000..42732db
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.common.base.Command;
+import org.apache.aurora.common.base.Commands;
+import org.apache.aurora.common.base.MorePreconditions;
+import org.apache.aurora.common.zookeeper.Group.JoinException;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+
+/**
+ * A ServerSet that delegates all calls to other ServerSets.
+ */
+public class CompoundServerSet implements ServerSet {
+  private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n');
+
+  private final List<ServerSet> serverSets;
+  private final Map<ServerSet, ImmutableSet<ServiceInstance>> instanceCache = Maps.newHashMap();
+  private final List<HostChangeMonitor<ServiceInstance>> monitors = Lists.newArrayList();
+  private Command stopWatching = null;
+  private ImmutableSet<ServiceInstance> allHosts = ImmutableSet.of();
+
+  /**
+   * Create new ServerSet from a list of serverSets.
+   *
+   * @param serverSets serverSets to which the calls will be delegated.
+   */
+  public CompoundServerSet(Iterable<ServerSet> serverSets) {
+    MorePreconditions.checkNotBlank(serverSets);
+    this.serverSets = ImmutableList.copyOf(serverSets);
+  }
+
+  private interface JoinOp {
+    EndpointStatus doJoin(ServerSet serverSet) throws JoinException, InterruptedException;
+  }
+
+  private interface StatusOp {
+    void changeStatus(EndpointStatus status) throws UpdateException;
+  }
+
+  private void changeStatus(
+      ImmutableList<EndpointStatus> statuses,
+      StatusOp statusOp) throws UpdateException {
+
+    ImmutableList.Builder<String> builder = ImmutableList.builder();
+    int errorIdx = 1;
+    for (EndpointStatus endpointStatus : statuses) {
+      try {
+        statusOp.changeStatus(endpointStatus);
+      } catch (UpdateException exception) {
+        builder.add(String.format("[%d] %s", errorIdx++,
+            Throwables.getStackTraceAsString(exception)));
+      }
+    }
+    if (errorIdx > 1) {
+      throw new UpdateException(
+          "One or more ServerSet update failed: " + STACK_TRACE_JOINER.join(builder.build()));
+    }
+  }
+
+  private EndpointStatus doJoin(JoinOp joiner) throws JoinException, InterruptedException {
+    // Get the list of endpoint status from the serverSets.
+    ImmutableList.Builder<EndpointStatus> builder = ImmutableList.builder();
+    for (ServerSet serverSet : serverSets) {
+      builder.add(joiner.doJoin(serverSet));
+    }
+
+    final ImmutableList<EndpointStatus> statuses = builder.build();
+
+    return new EndpointStatus() {
+      @Override public void leave() throws UpdateException {
+        changeStatus(statuses, new StatusOp() {
+          @Override public void changeStatus(EndpointStatus status) throws UpdateException {
+            status.leave();
+          }
+        });
+      }
+
+      @Override public void update(final Status newStatus) throws UpdateException {
+        changeStatus(statuses, new StatusOp() {
+          @Override public void changeStatus(EndpointStatus status) throws UpdateException {
+            status.update(newStatus);
+          }
+        });
+      }
+    };
+  }
+
+  @Override
+  public EndpointStatus join(
+      final InetSocketAddress endpoint,
+      final Map<String, InetSocketAddress> additionalEndpoints)
+      throws Group.JoinException, InterruptedException {
+
+    return doJoin(new JoinOp() {
+      @Override public EndpointStatus doJoin(ServerSet serverSet)
+          throws JoinException, InterruptedException {
+        return serverSet.join(endpoint, additionalEndpoints);
+      }
+    });
+  }
+
+  /*
+   * If any one of the serverSet throws an exception during respective join, the exception is
+   * propagated. Join is successful only if all the joins are successful.
+   *
+   * NOTE: If an exception occurs during the join, the serverSets in the composite can be in a
+   * partially joined state.
+   *
+   * @see ServerSet#join(InetSocketAddress, Map, Status)
+   */
+  @Override
+  public EndpointStatus join(
+      final InetSocketAddress endpoint,
+      final Map<String, InetSocketAddress> additionalEndpoints,
+      final Status status) throws Group.JoinException, InterruptedException {
+
+    return doJoin(new JoinOp() {
+      @Override public EndpointStatus doJoin(ServerSet serverSet)
+          throws JoinException, InterruptedException {
+
+        return serverSet.join(endpoint, additionalEndpoints, status);
+      }
+    });
+  }
+
+  @Override
+  public EndpointStatus join(
+      final InetSocketAddress endpoint,
+      final Map<String, InetSocketAddress> additionalEndpoints,
+      final int shardId) throws JoinException, InterruptedException {
+
+    return doJoin(new JoinOp() {
+      @Override public EndpointStatus doJoin(ServerSet serverSet)
+          throws JoinException, InterruptedException {
+
+        return serverSet.join(endpoint, additionalEndpoints, shardId);
+      }
+    });
+  }
+
+  // Handles changes to the union of hosts.
+  private synchronized void handleChange(ServerSet serverSet, ImmutableSet<ServiceInstance> hosts) {
+    instanceCache.put(serverSet, hosts);
+
+    // Get the union of hosts.
+    ImmutableSet<ServiceInstance> currentHosts =
+        ImmutableSet.copyOf(Iterables.concat(instanceCache.values()));
+
+    // Check if the hosts have changed.
+    if (!currentHosts.equals(allHosts)) {
+      allHosts = currentHosts;
+
+      // Notify the monitors.
+      for (HostChangeMonitor<ServiceInstance> monitor : monitors) {
+        monitor.onChange(allHosts);
+      }
+    }
+  }
+
+  /**
+   * Monitor the CompoundServerSet.
+   *
+   * If any one of the monitor calls to the underlying serverSet raises a MonitorException, the
+   * exception is propagated. The call is successful only if all the monitor calls to the
+   * underlying serverSets are successful.
+   *
+   * NOTE: If an exception occurs during the monitor call, the serverSets in the composite will not
+   * be monitored.
+   *
+   * @param monitor HostChangeMonitor instance used to monitor host changes.
+   * @return A command that, when executed, will stop monitoring all underlying server sets.
+   * @throws MonitorException If there was a problem monitoring any of the underlying server sets.
+   */
+  @Override
+  public synchronized Command watch(HostChangeMonitor<ServiceInstance> monitor)
+      throws MonitorException {
+    if (stopWatching == null) {
+      monitors.add(monitor);
+      ImmutableList.Builder<Command> commandsBuilder = ImmutableList.builder();
+
+      for (final ServerSet serverSet : serverSets) {
+        commandsBuilder.add(serverSet.watch(new HostChangeMonitor<ServiceInstance>() {
+          @Override public void onChange(ImmutableSet<ServiceInstance> hostSet) {
+            handleChange(serverSet, hostSet);
+          }
+        }));
+      }
+
+      stopWatching = Commands.compound(commandsBuilder.build());
+    }
+
+    return stopWatching;
+  }
+
+  @Override
+  public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
+    watch(monitor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java
new file mode 100644
index 0000000..1e8fc48
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * DistributedLock
+ *
+ * @author Florian Leibert
+ */
+public interface DistributedLock {
+  void lock() throws LockingException;
+
+  boolean tryLock(long timeout, TimeUnit unit);
+
+  void unlock() throws LockingException;
+
+  public static class LockingException extends RuntimeException {
+    public LockingException(String msg, Exception e) {
+      super(msg, e);
+    }
+
+    public LockingException(String msg) {
+      super(msg);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/06ddaadb/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java
new file mode 100644
index 0000000..99a5774
--- /dev/null
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.aurora.common.base.MorePreconditions;
+
+/**
+ * Distributed locking via ZooKeeper. Assuming there are N clients that all try to acquire a lock,
+ * the algorithm works as follows. Each host creates an ephemeral|sequential node, and requests a
+ * list of children for the lock node. Due to the nature of sequential, all the ids are increasing
+ * in order, therefore the client with the least ID according to natural ordering will hold the
+ * lock. Every other client watches the id immediately preceding its own id and checks for the lock
+ * in case of notification. The client holding the lock does the work and finally deletes the node,
+ * thereby triggering the next client in line to acquire the lock. Deadlocks are possible but
+ * avoided in most cases because if a client drops dead while holding the lock, the ZK session
+ * should timeout and since the node is ephemeral, it will be removed in such a case. Deadlocks
+ * could occur if the the worker thread on a client hangs but the zk-client thread is still alive.
+ * There could be an external monitor client that ensures that alerts are triggered if the least-id
+ * ephemeral node is present past a time-out.
+ * <p/>
+ * Note: Locking attempts will fail in case session expires!
+ *
+ * @author Florian Leibert
+ */
+@ThreadSafe
+public class DistributedLockImpl implements DistributedLock {
+
+  private static final Logger LOG = Logger.getLogger(DistributedLockImpl.class.getName());
+
+  private final ZooKeeperClient zkClient;
+  private final String lockPath;
+  private final ImmutableList<ACL> acl;
+
+  private final AtomicBoolean aborted = new AtomicBoolean(false);
+  private CountDownLatch syncPoint;
+  private boolean holdsLock = false;
+  private String currentId;
+  private String currentNode;
+  private String watchedNode;
+  private LockWatcher watcher;
+
+  /**
+   * Equivalent to {@link #DistributedLockImpl(ZooKeeperClient, String, Iterable)} with a default
+   * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
+   */
+  public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath) {
+    this(zkClient, lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  /**
+   * Creates a distributed lock using the given {@code zkClient} to coordinate locking.
+   *
+   * @param zkClient The ZooKeeper client to use.
+   * @param lockPath The path used to manage the lock under.
+   * @param acl The acl to apply to newly created lock nodes.
+   */
+  public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath, Iterable<ACL> acl) {
+    this.zkClient = Preconditions.checkNotNull(zkClient);
+    this.lockPath = MorePreconditions.checkNotBlank(lockPath);
+    this.acl = ImmutableList.copyOf(acl);
+    this.syncPoint = new CountDownLatch(1);
+  }
+
+  private synchronized void prepare()
+    throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException, KeeperException {
+
+    ZooKeeperUtils.ensurePath(zkClient, acl, lockPath);
+    LOG.log(Level.FINE, "Working with locking path:" + lockPath);
+
+    // Create an EPHEMERAL_SEQUENTIAL node.
+    currentNode =
+        zkClient.get().create(lockPath + "/member_", null, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+    // We only care about our actual id since we want to compare ourselves to siblings.
+    if (currentNode.contains("/")) {
+      currentId = currentNode.substring(currentNode.lastIndexOf("/") + 1);
+    }
+    LOG.log(Level.FINE, "Received ID from zk:" + currentId);
+    this.watcher = new LockWatcher();
+  }
+
+  @Override
+  public synchronized void lock() throws LockingException {
+    if (holdsLock) {
+      throw new LockingException("Error, already holding a lock. Call unlock first!");
+    }
+    try {
+      prepare();
+      watcher.checkForLock();
+      syncPoint.await();
+      if (!holdsLock) {
+        throw new LockingException("Error, couldn't acquire the lock!");
+      }
+    } catch (InterruptedException e) {
+      cancelAttempt();
+      throw new LockingException("InterruptedException while trying to acquire lock!", e);
+    } catch (KeeperException e) {
+      // No need to clean up since the node wasn't created yet.
+      throw new LockingException("KeeperException while trying to acquire lock!", e);
+    } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+      // No need to clean up since the node wasn't created yet.
+      throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
+    }
+  }
+
+  @Override
+  public synchronized boolean tryLock(long timeout, TimeUnit unit) {
+    if (holdsLock) {
+      throw new LockingException("Error, already holding a lock. Call unlock first!");
+    }
+    try {
+      prepare();
+      watcher.checkForLock();
+      boolean success = syncPoint.await(timeout, unit);
+      if (!success) {
+        return false;
+      }
+      if (!holdsLock) {
+        throw new LockingException("Error, couldn't acquire the lock!");
+      }
+    } catch (InterruptedException e) {
+      cancelAttempt();
+      return false;
+    } catch (KeeperException e) {
+      // No need to clean up since the node wasn't created yet.
+      throw new LockingException("KeeperException while trying to acquire lock!", e);
+    } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+      // No need to clean up since the node wasn't created yet.
+      throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
+    }
+    return true;
+  }
+
+  @Override
+  public synchronized void unlock() throws LockingException {
+    if (currentId == null) {
+      throw new LockingException("Error, neither attempting to lock nor holding a lock!");
+    }
+    Preconditions.checkNotNull(currentId);
+    // Try aborting!
+    if (!holdsLock) {
+      aborted.set(true);
+      LOG.log(Level.INFO, "Not holding lock, aborting acquisition attempt!");
+    } else {
+      LOG.log(Level.INFO, "Cleaning up this locks ephemeral node.");
+      cleanup();
+    }
+  }
+
+  //TODO(Florian Leibert): Make sure this isn't a runtime exception. Put exceptions into the token?
+
+  private synchronized void cancelAttempt() {
+    LOG.log(Level.INFO, "Cancelling lock attempt!");
+    cleanup();
+    // Bubble up failure...
+    holdsLock = false;
+    syncPoint.countDown();
+  }
+
+  private void cleanup() {
+    LOG.info("Cleaning up!");
+    Preconditions.checkNotNull(currentId);
+    try {
+      Stat stat = zkClient.get().exists(currentNode, false);
+      if (stat != null) {
+        zkClient.get().delete(currentNode, ZooKeeperUtils.ANY_VERSION);
+      } else {
+        LOG.log(Level.WARNING, "Called cleanup but nothing to cleanup!");
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    holdsLock = false;
+    aborted.set(false);
+    currentId = null;
+    currentNode = null;
+    watcher = null;
+    syncPoint = new CountDownLatch(1);
+  }
+
+  class LockWatcher implements Watcher {
+
+    public synchronized void checkForLock() {
+      MorePreconditions.checkNotBlank(currentId);
+
+      try {
+        List<String> candidates = zkClient.get().getChildren(lockPath, null);
+        ImmutableList<String> sortedMembers = Ordering.natural().immutableSortedCopy(candidates);
+
+        // Unexpected behavior if there are no children!
+        if (sortedMembers.isEmpty()) {
+          throw new LockingException("Error, member list is empty!");
+        }
+
+        int memberIndex = sortedMembers.indexOf(currentId);
+
+        // If we hold the lock
+        if (memberIndex == 0) {
+          holdsLock = true;
+          syncPoint.countDown();
+        } else {
+          final String nextLowestNode = sortedMembers.get(memberIndex - 1);
+          LOG.log(Level.INFO, String.format("Current LockWatcher with ephemeral node [%s], is " +
+              "waiting for [%s] to release lock.", currentId, nextLowestNode));
+
+          watchedNode = String.format("%s/%s", lockPath, nextLowestNode);
+          Stat stat = zkClient.get().exists(watchedNode, this);
+          if (stat == null) {
+            checkForLock();
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
+            "got interrupted. Trying to cancel lock acquisition.", currentId), e);
+        cancelAttempt();
+      } catch (KeeperException e) {
+        LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
+            "got a KeeperException. Trying to cancel lock acquisition.", currentId), e);
+        cancelAttempt();
+      } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+        LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
+            "got a ConnectionException. Trying to cancel lock acquisition.", currentId), e);
+        cancelAttempt();
+      }
+    }
+
+    @Override
+    public synchronized void process(WatchedEvent event) {
+      // this handles the case where we have aborted a lock and deleted ourselves but still have a
+      // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub.
+      if (!event.getPath().equals(watchedNode)) {
+        LOG.log(Level.INFO, "Ignoring call for node:" + watchedNode);
+        return;
+      }
+      //TODO(Florian Leibert): Pull this into the outer class.
+      if (event.getType() == Watcher.Event.EventType.None) {
+        switch (event.getState()) {
+          case SyncConnected:
+            // TODO(Florian Leibert): maybe we should just try to "fail-fast" in this case and abort.
+            LOG.info("Reconnected...");
+            break;
+          case Expired:
+            LOG.log(Level.WARNING, String.format("Current ZK session expired![%s]", currentId));
+            cancelAttempt();
+            break;
+        }
+      } else if (event.getType() == Event.EventType.NodeDeleted) {
+        checkForLock();
+      } else {
+        LOG.log(Level.WARNING, String.format("Unexpected ZK event: %s", event.getType().name()));
+      }
+    }
+  }
+}


Mime
View raw message