aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [2/3] aurora git commit: Remove legacy commons ZK code
Date Thu, 19 Oct 2017 00:34:05 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
index 93ddd89..f091384 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java
@@ -13,29 +13,19 @@
  */
 package org.apache.aurora.common.zookeeper;
 
-import java.util.List;
-
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Utilities for dealing with zoo keeper.
+ * Utilities for dealing with ZooKeeper.
  */
 public final class ZooKeeperUtils {
 
-  private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class);
-
   /**
    * An appropriate default session timeout for Twitter ZooKeeper clusters.
    */
@@ -44,12 +34,6 @@ public final class ZooKeeperUtils {
   public static final Amount<Integer,Time> DEFAULT_ZK_CONNECTION_TIMEOUT = Amount.of(10, Time.SECONDS);
 
   /**
-   * The magic version number that allows any mutation to always succeed regardless of actual
-   * version number.
-   */
-  public static final int ANY_VERSION = -1;
-
-  /**
    * An ACL that gives all permissions any user authenticated or not.
    */
   public static final ImmutableList<ACL> OPEN_ACL_UNSAFE =
@@ -65,99 +49,13 @@ public final class ZooKeeperUtils {
           .build();
 
   /**
-   * Returns true if the given exception indicates an error that can be resolved by retrying the
-   * operation without modification.
-   *
-   * @param e the exception to check
-   * @return true if the causing operation is strictly retryable
-   */
-  public static boolean isRetryable(KeeperException e) {
-    Preconditions.checkNotNull(e);
-
-    switch (e.code()) {
-      case CONNECTIONLOSS:
-      case SESSIONEXPIRED:
-      case SESSIONMOVED:
-      case OPERATIONTIMEOUT:
-        return true;
-
-      case RUNTIMEINCONSISTENCY:
-      case DATAINCONSISTENCY:
-      case MARSHALLINGERROR:
-      case BADARGUMENTS:
-      case NONODE:
-      case NOAUTH:
-      case BADVERSION:
-      case NOCHILDRENFOREPHEMERALS:
-      case NODEEXISTS:
-      case NOTEMPTY:
-      case INVALIDCALLBACK:
-      case INVALIDACL:
-      case AUTHFAILED:
-      case UNIMPLEMENTED:
-
-      // These two should not be encountered - they are used internally by ZK to specify ranges
-      case SYSTEMERROR:
-      case APIERROR:
-
-      case OK: // This is actually an invalid ZK exception code
-
-      default:
-        return false;
-    }
-  }
-
-  /**
-   * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}.  If the
-   * path already exists, nothing is done; however if any portion of the path is missing, it will be
-   * created with the given {@code acl} as a persistent zookeeper node.  The given {@code path} must
-   * be a valid zookeeper absolute path.
-   *
-   * @param zkClient the client to use to access the ZK cluster
-   * @param acl the acl to use if creating path nodes
-   * @param path the path to ensure exists
-   * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster
-   * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster
-   * @throws KeeperException if there was a problem in ZK
-   */
-  public static void ensurePath(ZooKeeperClient zkClient, List<ACL> acl, String path)
-      throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-    Preconditions.checkNotNull(zkClient);
-    Preconditions.checkNotNull(path);
-    Preconditions.checkArgument(path.startsWith("/"));
-
-    ensurePathInternal(zkClient, acl, path);
-  }
-
-  private static void ensurePathInternal(ZooKeeperClient zkClient, List<ACL> acl, String path)
-      throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-    if (zkClient.get().exists(path, false) == null) {
-      // The current path does not exist; so back up a level and ensure the parent path exists
-      // unless we're already a root-level path.
-      int lastPathIndex = path.lastIndexOf('/');
-      if (lastPathIndex > 0) {
-        ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex));
-      }
-
-      // We've ensured our parent path (if any) exists so we can proceed to create our path.
-      try {
-        zkClient.get().create(path, null, acl, CreateMode.PERSISTENT);
-      } catch (KeeperException.NodeExistsException e) {
-        // This ensures we don't die if a race condition was met between checking existence and
-        // trying to create the node.
-        LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?");
-      }
-    }
-  }
-
-  /**
    * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and
    * never ends with a slash (except for root path).
    *
    * @param path the path to be normalized
    * @return normalized path string
    */
-  public static String normalizePath(String path) {
+  static String normalizePath(String path) {
     String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1");
     PathUtils.validatePath(normalizedPath);
     return normalizedPath;

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
deleted file mode 100644
index ba09279..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper.testing;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient;
-
-/**
- * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient.
- */
-public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest {
-
-  private final Amount<Integer, Time> defaultSessionTimeout;
-
-  /**
-   * Creates a test case where the test server uses its
-   * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit
-   * session timeout.
-   */
-  public BaseZooKeeperClientTest() {
-    this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT);
-  }
-
-  /**
-   * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for
-   * clients created without an explicit session timeout.
-   */
-  public BaseZooKeeperClientTest(Amount<Integer, Time> defaultSessionTimeout) {
-    this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout);
-  }
-
-
-  /**
-   * Starts zookeeper back up on the last used port.
-   */
-  protected final void restartNetwork() throws IOException, InterruptedException {
-    getServer().restartNetwork();
-  }
-
-  /**
-   * Shuts down the in-process zookeeper network server.
-   */
-  protected final void shutdownNetwork() {
-    getServer().shutdownNetwork();
-  }
-
-  /**
-   * Expires the active session for the given client.  The client should be one returned from
-   * {@link #createZkClient}.
-   *
-   * @param zkClient the client to expire
-   * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to
-   *    the local zk server while trying to expire the session
-   * @throws InterruptedException if interrupted while requesting expiration
-   */
-  protected final void expireSession(ZooKeeperClient zkClient)
-      throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException {
-    getServer().expireClientSession(zkClient.get().getSessionId());
-  }
-
-  /**
-   * Returns the current port to connect to the in-process zookeeper instance.
-   */
-  protected final int getPort() {
-    return getServer().getPort();
-  }
-
-  /**
-   * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
-   * with the default session timeout.
-   */
-  protected final ZooKeeperClient createZkClient() {
-    return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent());
-  }
-
-  /**
-   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
-   * the default session timeout.
-   */
-  protected final ZooKeeperClient createZkClient(Credentials credentials) {
-    return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent());
-  }
-
-  /**
-   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
-   * the default session timeout.  The client is authenticated in the digest authentication scheme
-   * with the given {@code username} and {@code password}.
-   */
-  protected final ZooKeeperClient createZkClient(String username, String password) {
-    return createZkClient(Credentials.digestCredentials(username, password));
-  }
-
-  /**
-   * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server
-   * with a custom {@code sessionTimeout}.
-   */
-  protected final ZooKeeperClient createZkClient(Amount<Integer, Time> sessionTimeout) {
-    return createZkClient(sessionTimeout, Optional.absent(), Optional.absent());
-  }
-
-  /**
-   * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with
-   * the default session timeout and the custom chroot path.
-   */
-  protected final ZooKeeperClient createZkClient(String chrootPath) {
-    return createZkClient(defaultSessionTimeout, Optional.absent(),
-        Optional.of(chrootPath));
-  }
-
-  private ZooKeeperClient createZkClient(
-      Amount<Integer, Time> sessionTimeout,
-      Optional<Credentials> credentials,
-      Optional<String> chrootPath) {
-
-    ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath,
-        ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort())));
-    addTearDown(client::close);
-    return client;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
index 29204cd..a4504b8 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java
@@ -19,8 +19,6 @@ import java.net.InetSocketAddress;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -34,8 +32,6 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
  */
 public class ZooKeeperTestServer {
 
-  static final Amount<Integer, Time> DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS);
-
   private final File dataDir;
   private final File snapDir;
 
@@ -91,7 +87,7 @@ public class ZooKeeperTestServer {
   /**
    * Shuts down the in-process zookeeper network server.
    */
-  final void shutdownNetwork() {
+  private void shutdownNetwork() {
     if (connectionFactory != null) {
       connectionFactory.shutdown(); // Also shuts down zooKeeperServer.
       connectionFactory = null;

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
deleted file mode 100644
index 9c0cebe..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-public class CandidateImplTest extends BaseZooKeeperClientTest {
-  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
-  private static final String SERVICE = "/twitter/services/puffin_linkhose/leader";
-  private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES);
-
-  private LinkedBlockingDeque<CandidateImpl> candidateBuffer;
-
-  @Before
-  public void mySetUp() throws IOException {
-    candidateBuffer = new LinkedBlockingDeque<>();
-  }
-
-  private Group createGroup(ZooKeeperClient zkClient) throws IOException {
-    return new Group(zkClient, ACL, SERVICE);
-  }
-
-  private class Reign implements Candidate.Leader {
-    private ExceptionalCommand<Group.JoinException> abdicate;
-    private final CandidateImpl candidate;
-    private final String id;
-    private CountDownLatch defeated = new CountDownLatch(1);
-
-    Reign(String id, CandidateImpl candidate) {
-      this.id = id;
-      this.candidate = candidate;
-    }
-
-    @Override
-    public void onElected(ExceptionalCommand<Group.JoinException> abdicate) {
-      candidateBuffer.offerFirst(candidate);
-      this.abdicate = abdicate;
-    }
-
-    @Override
-    public void onDefeated() {
-      defeated.countDown();
-    }
-
-    public void abdicate() throws Group.JoinException {
-      Preconditions.checkState(abdicate != null);
-      abdicate.execute();
-    }
-
-    public void expectDefeated() throws InterruptedException {
-      defeated.await();
-    }
-
-    @Override
-    public String toString() {
-      return id;
-    }
-  }
-
-  @Test
-  public void testOfferLeadership() throws Exception {
-    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) {
-      @Override public String toString() {
-        return "Leader1";
-      }
-    };
-    ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) {
-      @Override public String toString() {
-        return "Leader2";
-      }
-    };
-    ZooKeeperClient zkClient3 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) {
-      @Override public String toString() {
-        return "Leader3";
-      }
-    };
-
-    Reign candidate1Reign = new Reign("1", candidate1);
-    Reign candidate2Reign = new Reign("2", candidate2);
-    Reign candidate3Reign = new Reign("3", candidate3);
-
-    Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
-    Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
-    Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign);
-
-    assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader",
-        candidate1Leader.get());
-
-    shutdownNetwork();
-    restartNetwork();
-
-    assertTrue("A re-connect without a session expiration should leave the leader elected",
-        candidate1Leader.get());
-
-    candidate1Reign.abdicate();
-    assertSame(candidate1, candidateBuffer.takeLast());
-    assertFalse(candidate1Leader.get());
-    // Active abdication should trigger defeat.
-    candidate1Reign.expectDefeated();
-
-    CandidateImpl secondCandidate = candidateBuffer.takeLast();
-    assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " "
-               + candidateBuffer,
-        candidate2Leader.get() ^ candidate3Leader.get());
-
-    if (secondCandidate == candidate2) {
-      expireSession(zkClient2);
-      assertSame(candidate3, candidateBuffer.takeLast());
-      assertTrue(candidate3Leader.get());
-      // Passive expiration should trigger defeat.
-      candidate2Reign.expectDefeated();
-    } else {
-      expireSession(zkClient3);
-      assertSame(candidate2, candidateBuffer.takeLast());
-      assertTrue(candidate2Leader.get());
-      // Passive expiration should trigger defeat.
-      candidate3Reign.expectDefeated();
-    }
-  }
-
-  @Test
-  public void testEmptyMembership() throws Exception {
-    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1));
-    Reign candidate1Reign = new Reign("1", candidate1);
-
-    candidate1.offerLeadership(candidate1Reign);
-    assertSame(candidate1, candidateBuffer.takeLast());
-    candidate1Reign.abdicate();
-    assertFalse(candidate1.getLeaderData().isPresent());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java
new file mode 100644
index 0000000..16c0171
--- /dev/null
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.aurora.common.thrift.Endpoint;
+import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.common.thrift.Status;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EncodingTest {
+  @Test
+  public void testSimpleSerialization() throws Exception {
+    InetSocketAddress endpoint = new InetSocketAddress(12345);
+    Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
+    Status status = Status.ALIVE;
+
+    byte[] data = Encoding.serializeServiceInstance(
+        endpoint, additionalEndpoints, status, Encoding.JSON_CODEC);
+
+    ServiceInstance instance = Encoding.deserializeServiceInstance(data, Encoding.JSON_CODEC);
+
+    assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
+    assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
+    assertEquals(Status.ALIVE, instance.getStatus());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
deleted file mode 100644
index 97a42d1..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.Membership;
-import org.apache.aurora.common.zookeeper.Group.NodeScheme;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.fail;
-
-public class GroupTest extends BaseZooKeeperClientTest {
-
-  private ZooKeeperClient zkClient;
-  private Group joinGroup;
-  private Group watchGroup;
-  private Command stopWatching;
-  private Command onLoseMembership;
-
-  private RecordingListener listener;
-
-  public GroupTest() {
-    super(Amount.of(1, Time.DAYS));
-  }
-
-  @Before
-  public void mySetUp() throws Exception {
-    onLoseMembership = createMock(Command.class);
-
-    zkClient = createZkClient("group", "test");
-    joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
-    watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
-
-    listener = new RecordingListener();
-    stopWatching = watchGroup.watch(listener);
-  }
-
-  private static class RecordingListener implements GroupChangeListener {
-    private final LinkedBlockingQueue<Iterable<String>> membershipChanges =
-        new LinkedBlockingQueue<Iterable<String>>();
-
-    @Override
-    public void onGroupChange(Iterable<String> memberIds) {
-      membershipChanges.add(memberIds);
-    }
-
-    public Iterable<String> take() throws InterruptedException {
-      return membershipChanges.take();
-    }
-
-    public void assertEmpty() {
-      assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges));
-    }
-
-    @Override
-    public String toString() {
-      return membershipChanges.toString();
-    }
-  }
-
-  private static class CustomScheme implements NodeScheme {
-    static final String NODE_NAME = "custom_name";
-
-    @Override
-    public boolean isMember(String nodeName) {
-      return NODE_NAME.equals(nodeName);
-    }
-
-    @Override
-    public String createName(byte[] membershipData) {
-      return NODE_NAME;
-    }
-
-    @Override
-    public boolean isSequential() {
-      return false;
-    }
-  }
-
-  @Test
-  public void testSessionExpirationTriggersOnLoseMembership() throws Exception {
-    final CountDownLatch lostMembership = new CountDownLatch(1);
-    Command onLoseMembership = lostMembership::countDown;
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join(onLoseMembership);
-    assertMembershipObserved(membership.getMemberId());
-    expireSession(zkClient);
-
-    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
-  }
-
-  @Test
-  public void testNodeDeleteTriggersOnLoseMembership() throws Exception {
-    final CountDownLatch lostMembership = new CountDownLatch(1);
-    Command onLoseMembership = lostMembership::countDown;
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join(onLoseMembership);
-    assertMembershipObserved(membership.getMemberId());
-    membership.cancel();
-
-    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
-  }
-
-  @Test
-  public void testJoinsAndWatchesSurviveDisconnect() throws Exception {
-    replay(onLoseMembership);
-
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join();
-    String originalMemberId = membership.getMemberId();
-    assertMembershipObserved(originalMemberId);
-
-    shutdownNetwork();
-    restartNetwork();
-
-    // The member should still be present under existing ephemeral node since session did not
-    // expire.
-    watchGroup.watch(listener);
-    assertMembershipObserved(originalMemberId);
-
-    membership.cancel();
-
-    assertEmptyMembershipObserved();
-    assertEmptyMembershipObserved(); // and again for 2nd listener
-
-    listener.assertEmpty();
-
-    verify(onLoseMembership);
-    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
-  }
-
-  @Test
-  public void testJoinsAndWatchesSurviveExpiredSession() throws Exception {
-    onLoseMembership.execute();
-    replay(onLoseMembership);
-
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join(onLoseMembership);
-    String originalMemberId = membership.getMemberId();
-    assertMembershipObserved(originalMemberId);
-
-    expireSession(zkClient);
-
-    // We should have lost our group membership and then re-gained it with a new ephemeral node.
-    // We may or may-not see the intermediate state change but we must see the final state
-    Iterable<String> members = listener.take();
-    if (Iterables.isEmpty(members)) {
-      members = listener.take();
-    }
-    assertEquals(1, Iterables.size(members));
-    assertNotEquals(originalMemberId, Iterables.getOnlyElement(members));
-    assertNotEquals(originalMemberId, membership.getMemberId());
-
-    listener.assertEmpty();
-
-    verify(onLoseMembership);
-    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
-  }
-
-  @Test
-  public void testJoinCustomNamingScheme() throws Exception {
-    Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group",
-        new CustomScheme());
-
-    listener = new RecordingListener();
-    group.watch(listener);
-    assertEmptyMembershipObserved();
-
-    Membership membership = group.join();
-    String memberId = membership.getMemberId();
-
-    assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId);
-    assertMembershipObserved(memberId);
-
-    expireSession(zkClient);
-  }
-
-  @Test
-  public void testUpdateMembershipData() throws Exception {
-    Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock();
-
-    byte[] initial = "start".getBytes();
-    expect(dataSupplier.get()).andReturn(initial);
-
-    byte[] second = "update".getBytes();
-    expect(dataSupplier.get()).andReturn(second);
-
-    replay(dataSupplier);
-
-    Membership membership = joinGroup.join(dataSupplier, onLoseMembership);
-    assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get()
-        .getData(membership.getMemberPath(), false, null));
-
-    assertArrayEquals("Updating supplier should not change membership data",
-        initial, zkClient.get().getData(membership.getMemberPath(), false, null));
-
-    membership.updateMemberData();
-    assertArrayEquals("Updating membership should change data",
-        second, zkClient.get().getData(membership.getMemberPath(), false, null));
-
-    verify(dataSupplier);
-  }
-
-  @Test
-  public void testAcls() throws Exception {
-    Group securedMembership =
-        new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL,
-            "/secured/group/membership");
-
-    String memberId = securedMembership.join().getMemberId();
-
-    Group unauthenticatedObserver =
-        new Group(createZkClient(),
-            Ids.READ_ACL_UNSAFE,
-            "/secured/group/membership");
-    RecordingListener unauthenticatedListener = new RecordingListener();
-    unauthenticatedObserver.watch(unauthenticatedListener);
-
-    assertMembershipObserved(unauthenticatedListener, memberId);
-
-    try {
-      unauthenticatedObserver.join();
-      fail("Expected join exception for unauthenticated observer");
-    } catch (JoinException e) {
-      // expected
-    }
-
-    Group unauthorizedObserver =
-        new Group(createZkClient("joe", "schmoe"),
-            Ids.READ_ACL_UNSAFE,
-            "/secured/group/membership");
-    RecordingListener unauthorizedListener = new RecordingListener();
-    unauthorizedObserver.watch(unauthorizedListener);
-
-    assertMembershipObserved(unauthorizedListener, memberId);
-
-    try {
-      unauthorizedObserver.join();
-      fail("Expected join exception for unauthorized observer");
-    } catch (JoinException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testStopWatching() throws Exception {
-    replay(onLoseMembership);
-
-    assertEmptyMembershipObserved();
-
-    Membership member1 = joinGroup.join();
-    String memberId1 = member1.getMemberId();
-    assertMembershipObserved(memberId1);
-
-    Membership member2 = joinGroup.join();
-    String memberId2 = member2.getMemberId();
-    assertMembershipObserved(memberId1, memberId2);
-
-    stopWatching.execute();
-
-    member1.cancel();
-    Membership member3 = joinGroup.join();
-    member2.cancel();
-    member3.cancel();
-
-    listener.assertEmpty();
-  }
-
-  private void assertEmptyMembershipObserved() throws InterruptedException {
-    assertMembershipObserved();
-  }
-
-  private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException {
-    assertMembershipObserved(listener, expectedMemberIds);
-  }
-
-  private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds)
-      throws InterruptedException {
-
-    assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
index 2166123..6cf335d 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java
@@ -52,25 +52,25 @@ public class JsonCodecTest {
         ImmutableMap.of("http", new Endpoint("foo", 8080)),
         Status.ALIVE)
         .setShard(0);
-    byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+    byte[] data = Encoding.serializeServiceInstance(instance1, codec);
+    assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertTrue(Encoding.deserializeServiceInstance(data, codec).isSetShard());
 
     ServiceInstance instance2 = new ServiceInstance(
         new Endpoint("foo", 1000),
         ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
         Status.ALIVE);
-    data = ServerSets.serializeServiceInstance(instance2, codec);
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
-    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+    data = Encoding.serializeServiceInstance(instance2, codec);
+    assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(Encoding.deserializeServiceInstance(data, codec).isSetShard());
 
     ServiceInstance instance3 = new ServiceInstance(
         new Endpoint("foo", 1000),
         ImmutableMap.<String, Endpoint>of(),
         Status.ALIVE);
-    data = ServerSets.serializeServiceInstance(instance3, codec);
-    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
-    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+    data = Encoding.serializeServiceInstance(instance3, codec);
+    assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(Encoding.deserializeServiceInstance(data, codec).isSetShard());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
deleted file mode 100644
index f0c0cb4..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- *
- * TODO(William Farner): Change this to remove thrift dependency.
- */
-public class ServerSetImplTest extends BaseZooKeeperClientTest {
-  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
-  private static final String SERVICE = "/twitter/services/puffin_hosebird";
-
-  private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer;
-  private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor;
-
-  @Before
-  public void mySetUp() throws IOException {
-    serverSetBuffer = new LinkedBlockingQueue<>();
-    serverSetMonitor = serverSetBuffer::offer;
-  }
-
-  private ServerSetImpl createServerSet() throws IOException {
-    return new ServerSetImpl(createZkClient(), ACL, SERVICE);
-  }
-
-  @Test
-  public void testLifecycle() throws Exception {
-    ServerSetImpl client = createServerSet();
-    client.watch(serverSetMonitor);
-    assertChangeFiredEmpty();
-
-    ServerSetImpl server = createServerSet();
-    ServerSet.EndpointStatus status = server.join(
-        InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080));
-
-    ServiceInstance serviceInstance = new ServiceInstance(
-        new Endpoint("foo", 1234),
-        ImmutableMap.of("http-admin", new Endpoint("foo", 8080)),
-        Status.ALIVE);
-
-    assertChangeFired(serviceInstance);
-
-    status.leave();
-    assertChangeFiredEmpty();
-    assertTrue(serverSetBuffer.isEmpty());
-  }
-
-  @Test
-  public void testMembershipChanges() throws Exception {
-    ServerSetImpl client = createServerSet();
-    client.watch(serverSetMonitor);
-    assertChangeFiredEmpty();
-
-    ServerSetImpl server = createServerSet();
-
-    ServerSet.EndpointStatus foo = join(server, "foo");
-    assertChangeFired("foo");
-
-    expireSession(client.getZkClient());
-
-    ServerSet.EndpointStatus bar = join(server, "bar");
-
-    // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a
-    // change, just "foo", "bar" since this was an addition.
-    assertChangeFired("foo", "bar");
-
-    foo.leave();
-    assertChangeFired("bar");
-
-    ServerSet.EndpointStatus baz = join(server, "baz");
-    assertChangeFired("bar", "baz");
-
-    baz.leave();
-    assertChangeFired("bar");
-
-    bar.leave();
-    assertChangeFiredEmpty();
-
-    assertTrue(serverSetBuffer.isEmpty());
-  }
-
-  @Test
-  public void testStopMonitoring() throws Exception {
-    ServerSetImpl client = createServerSet();
-    Command stopMonitoring = client.watch(serverSetMonitor);
-    assertChangeFiredEmpty();
-
-    ServerSetImpl server = createServerSet();
-
-    ServerSet.EndpointStatus foo = join(server, "foo");
-    assertChangeFired("foo");
-    ServerSet.EndpointStatus bar = join(server, "bar");
-    assertChangeFired("foo", "bar");
-
-    stopMonitoring.execute();
-
-    // No new updates should be received since monitoring has stopped.
-    foo.leave();
-    assertTrue(serverSetBuffer.isEmpty());
-
-    // Expiration event.
-    assertTrue(serverSetBuffer.isEmpty());
-  }
-
-  @Test
-  public void testOrdering() throws Exception {
-    ServerSetImpl client = createServerSet();
-    client.watch(serverSetMonitor);
-    assertChangeFiredEmpty();
-
-    Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080);
-    Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081);
-    Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082);
-
-    ServerSetImpl server1 = createServerSet();
-    ServerSetImpl server2 = createServerSet();
-    ServerSetImpl server3 = createServerSet();
-
-    ServiceInstance instance1 = new ServiceInstance(
-        new Endpoint("foo", 1000),
-        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
-        Status.ALIVE);
-    ServiceInstance instance2 = new ServiceInstance(
-        new Endpoint("foo", 1001),
-        ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)),
-        Status.ALIVE);
-    ServiceInstance instance3 = new ServiceInstance(
-        new Endpoint("foo", 1002),
-        ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)),
-        Status.ALIVE);
-
-    server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports);
-    assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take()));
-
-    ServerSet.EndpointStatus status2 = server2.join(
-        InetSocketAddress.createUnresolved("foo", 1001),
-        server2Ports);
-    assertEquals(ImmutableList.of(instance1, instance2),
-        ImmutableList.copyOf(serverSetBuffer.take()));
-
-    server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports);
-    assertEquals(ImmutableList.of(instance1, instance2, instance3),
-        ImmutableList.copyOf(serverSetBuffer.take()));
-
-    status2.leave();
-    assertEquals(ImmutableList.of(instance1, instance3),
-        ImmutableList.copyOf(serverSetBuffer.take()));
-  }
-
-  @Test
-  public void testUnwatchOnException() throws Exception {
-    IMocksControl control = createControl();
-
-    ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class);
-    Watcher onExpirationWatcher = control.createMock(Watcher.class);
-
-    expect(zkClient.registerExpirationHandler(anyObject(Command.class)))
-        .andReturn(onExpirationWatcher);
-
-    expect(zkClient.get()).andThrow(new InterruptedException());  // See interrupted() note below.
-    expect(zkClient.unregister(onExpirationWatcher)).andReturn(true);
-    control.replay();
-
-    Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla");
-    ServerSetImpl serverset = new ServerSetImpl(zkClient, group);
-
-    try {
-      serverset.watch(hostSet -> {});
-      fail("Expected MonitorException");
-    } catch (DynamicHostSet.MonitorException e) {
-      // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is.
-      // That call both returns the current interrupted status as well as clearing it.  The clearing
-      // is crucial depending on the order tests are run in this class.  If this test runs before
-      // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail
-      // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in
-      // the interruption mechanism and so immediately throws `InterruptedException` based on the
-      // un-cleared interrupted bit.
-      assertTrue(Thread.interrupted());
-    }
-    control.verify();
-  }
-
-  private static Map<String, InetSocketAddress> makePortMap(String name, int port) {
-    return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port));
-  }
-
-  private ServerSet.EndpointStatus join(ServerSet serverSet, String host)
-      throws JoinException, InterruptedException {
-
-    return serverSet.join(
-        InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of());
-  }
-
-  private void assertChangeFired(String... serviceHosts)
-      throws InterruptedException {
-
-    assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts),
-        serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42),
-            ImmutableMap.<String, Endpoint>of(), Status.ALIVE))));
-  }
-
-  protected void assertChangeFiredEmpty() throws InterruptedException {
-    assertChangeFired(ImmutableSet.<ServiceInstance>of());
-  }
-
-  protected void assertChangeFired(ServiceInstance... serviceInstances)
-      throws InterruptedException {
-    assertChangeFired(ImmutableSet.copyOf(serviceInstances));
-  }
-
-  protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances)
-      throws InterruptedException {
-    assertEquals(serviceInstances, serverSetBuffer.take());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
deleted file mode 100644
index 0e67191..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class ServerSetsTest {
-  @Test
-  public void testSimpleSerialization() throws Exception {
-    InetSocketAddress endpoint = new InetSocketAddress(12345);
-    Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
-    Status status = Status.ALIVE;
-
-    byte[] data = ServerSets.serializeServiceInstance(
-        endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC);
-
-    ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC);
-
-    assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
-    assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
-    assertEquals(Status.ALIVE, instance.getStatus());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
deleted file mode 100644
index 5f6cdd8..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl;
-import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.easymock.Capture;
-import org.easymock.IExpectationSetters;
-import org.easymock.IMocksControl;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.createControl;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.fail;
-
-public class SingletonServiceImplTest extends BaseZooKeeperClientTest {
-  private static final int PORT_A = 1234;
-  private static final int PORT_B = 8080;
-  private static final InetSocketAddress PRIMARY_ENDPOINT =
-      InetSocketAddress.createUnresolved("foo", PORT_A);
-  private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
-      ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
-
-  private IMocksControl control;
-  private SingletonServiceImpl.LeadershipListener listener;
-  private ServerSet serverSet;
-  private ServerSet.EndpointStatus endpointStatus;
-  private Candidate candidate;
-  private ExceptionalCommand<Group.JoinException> abdicate;
-
-  private SingletonService service;
-
-  @Before
-  @SuppressWarnings("unchecked")
-  public void mySetUp() throws IOException {
-    control = createControl();
-    addTearDown(control::verify);
-    listener = control.createMock(SingletonServiceImpl.LeadershipListener.class);
-    serverSet = control.createMock(ServerSet.class);
-    candidate = control.createMock(Candidate.class);
-    endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
-    abdicate = control.createMock(ExceptionalCommand.class);
-
-    service = new SingletonServiceImpl(serverSet, candidate);
-  }
-
-  private void newLeader(
-      final String hostName,
-      Capture<Leader> leader,
-      LeadershipListener listener) throws Exception {
-
-    service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
-        ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
-        listener);
-
-    // This actually elects the leader.
-    leader.getValue().onElected(abdicate);
-  }
-
-  private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
-    newLeader(hostName, leader, listener);
-  }
-
-  private IExpectationSetters<ServerSet.EndpointStatus> expectJoin() throws Exception {
-    return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
-  }
-
-  @Test
-  public void testLeadAdvertise() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andReturn(endpointStatus);
-    endpointStatus.leave();
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().advertise();
-    controlCapture.getValue().leave();
-  }
-
-  @Test
-  public void teatLeadLeaveNoAdvertise() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    abdicate.execute();
-
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().leave();
-  }
-
-  @Test
-  public void testLeadJoinFailure() throws Exception {
-    Capture<Leader> leaderCapture = new Capture<Leader>();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-
-    try {
-      controlCapture.getValue().advertise();
-      fail("Join should have failed.");
-    } catch (SingletonService.AdvertiseException e) {
-      // Expected.
-    }
-
-    controlCapture.getValue().leave();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testMultipleAdvertise() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andReturn(endpointStatus);
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().advertise();
-    controlCapture.getValue().advertise();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testMultipleLeave() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    expectJoin().andReturn(endpointStatus);
-    endpointStatus.leave();
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().advertise();
-    controlCapture.getValue().leave();
-    controlCapture.getValue().leave();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testAdvertiseAfterLeave() throws Exception {
-    Capture<Leader> leaderCapture = createCapture();
-
-    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-    Capture<LeaderControl> controlCapture = createCapture();
-    listener.onLeading(capture(controlCapture));
-
-    abdicate.execute();
-
-    control.replay();
-
-    newLeader("foo", leaderCapture);
-    controlCapture.getValue().leave();
-    controlCapture.getValue().advertise();
-  }
-
-  @Test
-  public void testLeadMulti() throws Exception {
-    List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
-    List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
-
-    for (int i = 0; i < 5; i++) {
-      Capture<Leader> leaderCapture = new Capture<Leader>();
-      leaderCaptures.add(leaderCapture);
-      Capture<LeaderControl> controlCapture = createCapture();
-      leaderControlCaptures.add(controlCapture);
-
-      expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
-      listener.onLeading(capture(controlCapture));
-      InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
-      Map<String, InetSocketAddress> aux =
-          ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
-      expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
-      endpointStatus.leave();
-      abdicate.execute();
-    }
-
-    control.replay();
-
-    for (int i = 0; i < 5; i++) {
-      final String leaderName = "foo" + i;
-      newLeader(leaderName, leaderCaptures.get(i));
-      leaderControlCaptures.get(i).getValue().advertise();
-      leaderControlCaptures.get(i).getValue().leave();
-    }
-  }
-
-  @Test
-  public void testLeaderLeaves() throws Exception {
-    control.replay();
-    shutdownNetwork();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
deleted file mode 100644
index 5eee235..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.NoAuthException;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Test;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * @author John Sirois
- */
-public class ZooKeeperClientTest extends BaseZooKeeperClientTest {
-
-  public ZooKeeperClientTest() {
-    super(Amount.of(1, Time.DAYS));
-  }
-
-  @Test
-  public void testGet() throws Exception {
-    final ZooKeeperClient zkClient = createZkClient();
-    shutdownNetwork();
-    try {
-      zkClient.get(Amount.of(50L, Time.MILLISECONDS));
-      fail("Expected client connection to timeout while network down");
-    } catch (TimeoutException e) {
-      assertTrue(zkClient.isClosed());
-    }
-    assertNull(zkClient.getZooKeeperClientForTests());
-
-    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
-    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
-    new Thread(() -> {
-      try {
-        client.set(zkClient.get());
-      } catch (ZooKeeperConnectionException e) {
-        throw new RuntimeException(e);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      } finally {
-        blockingGetComplete.countDown();
-      }
-    }).start();
-
-    restartNetwork();
-
-    // Hung blocking connects should succeed when server connection comes up
-    blockingGetComplete.await();
-    assertNotNull(client.get());
-
-    // New connections should succeed now that network is back up
-    long sessionId = zkClient.get().getSessionId();
-
-    // While connected the same client should be reused (no new connections while healthy)
-    assertSame(client.get(), zkClient.get());
-
-    shutdownNetwork();
-    // Our client doesn't know the network is down yet so we should be able to get()
-    ZooKeeper zooKeeper = zkClient.get();
-    try {
-      zooKeeper.exists("/", false);
-      fail("Expected client operation to fail while network down");
-    } catch (ConnectionLossException e) {
-      // expected
-    }
-
-    restartNetwork();
-    assertEquals("Expected connection to be re-established with existing session",
-        sessionId, zkClient.get().getSessionId());
-  }
-
-  /**
-   * Test that if a blocking get() call gets interrupted, after a connection has been created
-   * but before it's connected, the zk connection gets closed.
-   */
-  @Test
-  public void testGetInterrupted() throws Exception {
-    final ZooKeeperClient zkClient = createZkClient();
-    shutdownNetwork();
-
-    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
-    final AtomicBoolean interrupted = new AtomicBoolean();
-    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
-    Thread getThread = new Thread(() -> {
-      try {
-        client.set(zkClient.get());
-      } catch (ZooKeeperConnectionException e) {
-        throw new RuntimeException(e);
-      } catch (InterruptedException e) {
-        interrupted.set(true);
-        throw new RuntimeException(e);
-      } finally {
-        blockingGetComplete.countDown();
-      }
-    });
-    getThread.start();
-
-    while (zkClient.getZooKeeperClientForTests() == null) {
-      Thread.sleep(100);
-    }
-
-    getThread.interrupt();
-    blockingGetComplete.await();
-
-    assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests());
-    assertTrue("The waiter thread should have been interrupted", interrupted.get());
-    assertTrue(zkClient.isClosed());
-  }
-
-  @Test
-  public void testClose() throws Exception {
-    ZooKeeperClient zkClient = createZkClient();
-    zkClient.close();
-
-    // Close should be idempotent
-    zkClient.close();
-
-    long firstSessionId = zkClient.get().getSessionId();
-
-    // Close on an open client should force session re-establishment
-    zkClient.close();
-
-    assertNotEquals(firstSessionId, zkClient.get().getSessionId());
-  }
-
-  @Test
-  public void testCredentials() throws Exception {
-    String path = "/test";
-    ZooKeeperClient authenticatedClient = createZkClient("creator", "creator");
-    assertEquals(path,
-        authenticatedClient.get().create(path, "42".getBytes(),
-            ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT));
-
-    ZooKeeperClient unauthenticatedClient = createZkClient();
-    assertEquals("42", getData(unauthenticatedClient, path));
-    try {
-      setData(unauthenticatedClient, path, "37");
-      fail("Expected unauthenticated write attempt to fail");
-    } catch (NoAuthException e) {
-      assertEquals("42", getData(unauthenticatedClient, path));
-    }
-
-    ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner");
-    assertEquals("42", getData(nonOwnerClient, path));
-    try {
-      setData(nonOwnerClient, path, "37");
-      fail("Expected non owner write attempt to fail");
-    } catch (NoAuthException e) {
-      assertEquals("42", getData(nonOwnerClient, path));
-    }
-
-    ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator");
-    setData(authenticatedClient2, path, "37");
-    assertEquals("37", getData(authenticatedClient2, path));
-  }
-
-  @Test
-  public void testChrootPath() throws Exception {
-    ZooKeeperClient rootClient = createZkClient();
-    String rootPath = "/test";
-    String subPath = "/test/subtest";
-    assertEquals(rootPath,
-            rootClient.get().create(rootPath, "42".getBytes(),
-                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-    assertEquals(subPath,
-            rootClient.get().create(subPath, "37".getBytes(),
-                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
-
-    ZooKeeperClient chrootedClient = createZkClient(rootPath);
-    assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null));
-  }
-
-  private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception {
-    zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION);
-  }
-
-  private String getData(ZooKeeperClient zkClient, String path) throws Exception {
-    return new String(zkClient.get().getData(path, false, null));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
index 9e482a6..5eb3c5e 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java
@@ -13,87 +13,16 @@
  */
 package org.apache.aurora.common.zookeeper;
 
-import com.google.common.base.Charsets;
-
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.BadVersionException;
-import org.apache.zookeeper.KeeperException.NoAuthException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.Stat;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
 import org.junit.Test;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
  * @author John Sirois
  */
-public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
-  @Test
-  public void testEnsurePath() throws Exception {
-    ZooKeeperClient zkClient = createZkClient();
-    zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8));
-
-    assertNull(zkClient.get().exists("/foo", false));
-    ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz");
-
-    zkClient = createZkClient();
-    zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8));
-
-    // Anyone can check for existence in ZK
-    assertNotNull(zkClient.get().exists("/foo", false));
-    assertNotNull(zkClient.get().exists("/foo/bar", false));
-    assertNotNull(zkClient.get().exists("/foo/bar/baz", false));
-
-    try {
-      zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */);
-      fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be "
-           + "rejected");
-    } catch (NoAuthException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception {
-    String nodePath = "/foo";
-    ZooKeeperClient zkClient = createZkClient();
-
-    zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
-
-    Stat initStat = new Stat();
-    byte[] initialData = zkClient.get().getData(nodePath, false, initStat);
-    assertArrayEquals("init".getBytes(), initialData);
-
-    // bump the version
-    Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion());
-
-    try {
-      zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion());
-      fail("expected correct version to be required");
-    } catch (BadVersionException e) {
-      // expected
-    }
-
-    // expect using the correct version to work
-    Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion());
-    assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion());
-
-    zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION);
-    Stat forceWriteStat = new Stat();
-    byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat);
-    assertArrayEquals("force-write".getBytes(), forceWriteData);
-
-    assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion());
-    assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion());
-  }
+public class ZooKeeperUtilsTest extends BaseZooKeeperTest {
 
   @Test
   public void testNormalizingPath() throws Exception {
@@ -135,5 +64,4 @@ public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest {
       // expected
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
deleted file mode 100644
index 339f63b..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-
-import javax.inject.Singleton;
-
-import com.google.inject.Exposed;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.zookeeper.ServerSetImpl;
-import org.apache.aurora.common.zookeeper.SingletonService;
-import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient;
-import org.apache.aurora.common.zookeeper.ZooKeeperUtils;
-import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
-import org.apache.zookeeper.data.ACL;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Binding module for utilities to advertise the network presence of the scheduler.
- *
- * Uses a fork of Twitter commons/zookeeper.
- */
-class CommonsServiceDiscoveryModule extends PrivateModule {
-
-  private final String discoveryPath;
-  private final ZooKeeperConfig zooKeeperConfig;
-
-  CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) {
-    this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath);
-    this.zooKeeperConfig = requireNonNull(zooKeeperConfig);
-  }
-
-  @Override
-  protected void configure() {
-    requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
-    requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
-
-    bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class);
-    expose(ServiceGroupMonitor.class);
-  }
-
-  @Provides
-  @Singleton
-  ZooKeeperClient provideZooKeeperClient(
-      @ServiceDiscoveryBindings.ZooKeeper Iterable<InetSocketAddress> zooKeeperCluster) {
-
-    return new ZooKeeperClient(
-        zooKeeperConfig.getSessionTimeout(),
-        zooKeeperConfig.getCredentials(),
-        zooKeeperConfig.getChrootPath(),
-        zooKeeperCluster);
-  }
-
-  @Provides
-  @Singleton
-  ServerSetImpl provideServerSet(
-      ZooKeeperClient client,
-      @ServiceDiscoveryBindings.ZooKeeper List<ACL> zooKeeperAcls) {
-
-    return new ServerSetImpl(client, zooKeeperAcls, discoveryPath);
-  }
-
-  @Provides
-  @Singleton
-  DynamicHostSet<ServiceInstance> provideServerSet(ServerSetImpl serverSet) {
-    // Used for a type re-binding of the server set.
-    return serverSet;
-  }
-
-  // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding.
-  @Provides
-  @Singleton
-  @Exposed
-  SingletonService provideSingletonService(
-      ZooKeeperClient client,
-      ServerSetImpl serverSet,
-      @ServiceDiscoveryBindings.ZooKeeper List<ACL> zookeeperAcls) {
-
-    return new SingletonServiceImpl(
-        serverSet,
-        SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
deleted file mode 100644
index 9161455..0000000
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.discovery;
-
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
-
-import static java.util.Objects.requireNonNull;
-
-class CommonsServiceGroupMonitor implements ServiceGroupMonitor {
-  private Optional<Command> closeCommand = Optional.empty();
-  private final DynamicHostSet<ServiceInstance> serverSet;
-  private final AtomicReference<ImmutableSet<ServiceInstance>> services =
-      new AtomicReference<>(ImmutableSet.of());
-
-  @Inject
-  CommonsServiceGroupMonitor(DynamicHostSet<ServiceInstance> serverSet) {
-    this.serverSet = requireNonNull(serverSet);
-  }
-
-  @Override
-  public void start() throws MonitorException {
-    try {
-      closeCommand = Optional.of(serverSet.watch(services::set));
-    } catch (DynamicHostSet.MonitorException e) {
-      throw new MonitorException("Unable to watch scheduler host set.", e);
-    }
-  }
-
-  @Override
-  public void close() {
-    closeCommand.ifPresent(Command::execute);
-  }
-
-  @Override
-  public ImmutableSet<ServiceInstance> get() {
-    return services.get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
index 40cda8c..77f90ee 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java
@@ -37,7 +37,7 @@ import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.ServerSet;
+import org.apache.aurora.common.zookeeper.Encoding;
 import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.curator.RetryPolicy;
@@ -76,7 +76,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule {
     requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY);
     requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY);
 
-    bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(ServerSet.JSON_CODEC);
+    bind(new TypeLiteral<Codec<ServiceInstance>>() { }).toInstance(Encoding.JSON_CODEC);
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
index 1e7b9ce..48c7bfd 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java
@@ -40,13 +40,6 @@ public final class FlaggedZooKeeperConfig {
 
   @Parameters(separators = "=")
   public static class Options {
-    @Parameter(names = "-zk_use_curator",
-        description =
-            "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter "
-                + "commons/zookeeper (the legacy library) is used.",
-        arity = 1)
-    public boolean useCurator = true;
-
     @Parameter(names = "-zk_in_proc",
         description =
             "Launches an embedded zookeeper server for local testing causing -zk_endpoints "
@@ -87,7 +80,6 @@ public final class FlaggedZooKeeperConfig {
    */
   public static ZooKeeperConfig create(Options opts) {
     return new ZooKeeperConfig(
-        opts.useCurator,
         opts.zkEndpoints,
         Optional.fromNullable(opts.chrootPath),
         opts.inProcess,

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
index 917a567..7e3b6c4 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java
@@ -28,7 +28,6 @@ import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
-import com.google.inject.Module;
 import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.binder.LinkedBindingBuilder;
@@ -85,15 +84,7 @@ public class ServiceDiscoveryModule extends AbstractModule {
       clusterBinder.toInstance(zooKeeperConfig.getServers());
     }
 
-    install(discoveryModule());
-  }
-
-  private Module discoveryModule() {
-    if (zooKeeperConfig.isUseCurator()) {
-      return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
-    } else {
-      return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig);
-    }
+    install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig));
   }
 
   @Provides

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
index 433ed31..1a7e8cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
+++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java
@@ -44,13 +44,11 @@ public class ZooKeeperConfig {
   /**
    * Creates a new client configuration with defaults for the session timeout and credentials.
    *
-   * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used.
    * @param servers ZooKeeper server addresses.
    * @return A new configuration.
    */
-  public static ZooKeeperConfig create(boolean useCurator, Iterable<InetSocketAddress> servers) {
+  public static ZooKeeperConfig create(Iterable<InetSocketAddress> servers) {
     return new ZooKeeperConfig(
-        useCurator,
         servers,
         Optional.absent(), // chrootPath
         false,
@@ -59,7 +57,6 @@ public class ZooKeeperConfig {
         Optional.absent()); // credentials
   }
 
-  private final boolean useCurator;
   private final Iterable<InetSocketAddress> servers;
   private final boolean inProcess;
   private final Amount<Integer, Time> sessionTimeout;
@@ -77,7 +74,6 @@ public class ZooKeeperConfig {
    * @param credentials ZooKeeper authentication credentials.
    */
   ZooKeeperConfig(
-      boolean useCurator,
       Iterable<InetSocketAddress> servers,
       Optional<String> chrootPath,
       boolean inProcess,
@@ -85,7 +81,6 @@ public class ZooKeeperConfig {
       Amount<Integer, Time> connectionTimeout,
       Optional<Credentials> credentials) {
 
-    this.useCurator = useCurator;
     this.servers = MorePreconditions.checkNotBlank(servers);
     this.chrootPath = requireNonNull(chrootPath);
     this.inProcess = inProcess;
@@ -103,7 +98,6 @@ public class ZooKeeperConfig {
    */
   public ZooKeeperConfig withCredentials(Credentials newCredentials) {
     return new ZooKeeperConfig(
-        useCurator,
         servers,
         chrootPath,
         inProcess,
@@ -112,10 +106,6 @@ public class ZooKeeperConfig {
         Optional.of(newCredentials));
   }
 
-  boolean isUseCurator() {
-    return useCurator;
-  }
-
   Iterable<InetSocketAddress> getServers() {
     return servers;
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index a363e70..8e3c1de 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.app;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Set;
@@ -47,9 +48,7 @@ import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.common.stats.Stats;
 import org.apache.aurora.common.zookeeper.Credentials;
-import org.apache.aurora.common.zookeeper.ServerSetImpl;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
+import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest;
 import org.apache.aurora.gen.HostAttributes;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
@@ -110,10 +109,9 @@ import static org.easymock.EasyMock.createControl;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class SchedulerIT extends BaseZooKeeperClientTest {
+public class SchedulerIT extends BaseZooKeeperTest {
 
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class);
 
@@ -153,7 +151,6 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
   private Stream logStream;
   private StreamMatcher streamMatcher;
   private EntrySerializer entrySerializer;
-  private ZooKeeperClient zkClient;
   private File backupDir;
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -181,11 +178,9 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
     entrySerializer = new EntrySerializer.EntrySerializerImpl(
         Amount.of(512, Data.KB),
         Hashing.md5());
-
-    zkClient = createZkClient();
   }
 
-  private void startScheduler() throws Exception {
+  private Injector startScheduler() throws Exception {
     // TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using
     // AppLauncher.
     Module testModule = new AbstractModule() {
@@ -215,8 +210,8 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
     };
     ZooKeeperConfig zkClientConfig =
         ZooKeeperConfig.create(
-            true, // useCurator
-            ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort())))
+            ImmutableList.of(
+                InetSocketAddress.createUnresolved("localhost", getServer().getPort())))
             .withCredentials(Credentials.digestCredentials("mesos", "mesos"));
     SchedulerMain main = SchedulerMain.class.newInstance();
     Injector injector = Guice.createInjector(
@@ -245,21 +240,35 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
     });
     injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class))
         .awaitHealthy();
+    return injector;
   }
 
-  private void awaitSchedulerReady() throws Exception {
+  private void awaitSchedulerReady(Injector injector) throws Exception {
     executor.submit(() -> {
-      ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
-      final CountDownLatch schedulerReady = new CountDownLatch(1);
-      schedulerService.watch(hostSet -> {
-        if (!hostSet.isEmpty()) {
-          schedulerReady.countDown();
+      ServiceGroupMonitor groupMonitor = injector.getInstance(ServiceGroupMonitor.class);
+      try {
+        // A timeout is used because certain types of assertion errors (mocks) will not surface
+        // until the main test thread exits this body of code.
+        long waited = 0;
+        while (waited < 5000) {
+          if (groupMonitor.get().isEmpty()) {
+            try {
+              Thread.sleep(100);
+              waited += 100;
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          } else {
+            break;
+          }
         }
-      });
-      // A timeout is used because certain types of assertion errors (mocks) will not surface
-      // until the main test thread exits this body of code.
-      assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES));
-      return null;
+      } finally {
+        try {
+          groupMonitor.close();
+        } catch (IOException e) {
+          LOG.info("Failed to close:" + e, e);
+        }
+      }
     }).get();
   }
 
@@ -345,7 +354,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
     expect(driver.stop(true)).andReturn(Protos.Status.DRIVER_STOPPED).anyTimes();
 
     control.replay();
-    startScheduler();
+    Injector injector = startScheduler();
 
     driverStarted.await();
     scheduler.getValue().registered(
@@ -353,7 +362,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest {
         Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
         MASTER);
 
-    awaitSchedulerReady();
+    awaitSchedulerReady(injector);
 
     assertEquals(0L, Stats.<Long>getVariable("task_store_PENDING").read().longValue());
     assertEquals(1L, Stats.<Long>getVariable("task_store_ASSIGNED").read().longValue());


Mime
View raw message