aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jsir...@apache.org
Subject [2/2] aurora git commit: Remove `-zk_use_curator` and unused code.
Date Tue, 27 Sep 2016 21:38:26 GMT
Remove `-zk_use_curator` and unused code.

Some portions of the commons zookeeper package remain to be moved in a
follow-up change.

Bugs closed: AURORA-1669

Reviewed at https://reviews.apache.org/r/52312/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/69cba786
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/69cba786
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/69cba786

Branch: refs/heads/master
Commit: 69cba786efc2628eab566201dfea46836a1d9af5
Parents: f559e93
Author: John Sirois <jsirois@apache.org>
Authored: Tue Sep 27 15:38:20 2016 -0600
Committer: John Sirois <john.sirois@gmail.com>
Committed: Tue Sep 27 15:38:20 2016 -0600

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   8 +
 .../aurora/common/zookeeper/Candidate.java      |  78 ---
 .../aurora/common/zookeeper/CandidateImpl.java  | 127 ----
 .../apache/aurora/common/zookeeper/Group.java   | 674 -------------------
 .../aurora/common/zookeeper/JsonCodec.java      |  12 +-
 .../aurora/common/zookeeper/ServerSet.java      |  74 --
 .../aurora/common/zookeeper/ServerSetImpl.java  | 349 ----------
 .../aurora/common/zookeeper/ServerSets.java     | 118 ----
 .../common/zookeeper/SingletonServiceImpl.java  | 122 ----
 .../common/zookeeper/CandidateImplTest.java     | 165 -----
 .../aurora/common/zookeeper/GroupTest.java      | 321 ---------
 .../aurora/common/zookeeper/JsonCodecTest.java  |  38 +-
 .../common/zookeeper/ServerSetImplTest.java     | 258 -------
 .../aurora/common/zookeeper/ServerSetsTest.java |  44 --
 .../zookeeper/SingletonServiceImplTest.java     | 243 -------
 docs/reference/scheduler-configuration.md       |   2 -
 .../CommonsServiceDiscoveryModule.java          | 102 ---
 .../discovery/CommonsServiceGroupMonitor.java   |  59 --
 .../CuratorServiceDiscoveryModule.java          |   4 +-
 .../discovery/FlaggedZooKeeperConfig.java       |  13 -
 .../discovery/ServiceDiscoveryModule.java       |  13 +-
 .../scheduler/discovery/ZooKeeperConfig.java    |  12 +-
 .../aurora/scheduler/app/SchedulerIT.java       |  45 +-
 .../discovery/AbstractDiscoveryModuleTest.java  |  77 ---
 .../discovery/BaseCuratorDiscoveryTest.java     |   8 +-
 .../discovery/CommonsDiscoveryModuleTest.java   |  29 -
 .../CommonsServiceGroupMonitorTest.java         | 137 ----
 .../discovery/CuratorDiscoveryModuleTest.java   |  66 +-
 .../discovery/CuratorSingletonServiceTest.java  |   3 +-
 .../discovery/ZooKeeperConfigTest.java          |   5 +-
 30 files changed, 119 insertions(+), 3087 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 82c9a1c..49c03e8 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -1,3 +1,11 @@
+0.17.0 (Not yet released)
+=========================
+
+### Deprecations and removals:
+
+- The scheduler flag `-zk_use_curator` has been removed. If you have never set the flag and are
+  upgrading you should take care as described in the [note](#zk_use_curator_upgrade) below.
+
 0.16.0
 ======
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
deleted file mode 100644
index 75c1b14..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Candidate.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.WatchException;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Interface definition for becoming or querying for a ZooKeeper-based group leader.
- */
-public interface Candidate {
-
-  /**
-   * Returns the current group leader by querying ZooKeeper synchronously.
-   *
-   * @return the current group leader's identifying data or {@link Optional#absent()} if there is
-   *     no leader
-   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
-   * @throws KeeperException if there was a problem reading the leader information
-   * @throws InterruptedException if this thread is interrupted getting the leader
-   */
-  public Optional<byte[]> getLeaderData()
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException;
-
-  /**
-   * Encapsulates a leader that can be elected and subsequently defeated.
-   */
-  interface Leader {
-
-    /**
-     * Called when this leader has been elected.
-     *
-     * @param abdicate a command that can be used to abdicate leadership and force a new election
-     */
-    void onElected(ExceptionalCommand<JoinException> abdicate);
-
-    /**
-     * Called when the leader has been ousted.  Can occur either if the leader abdicates or if an
-     * external event causes the leader to lose its leadership role (session expiration).
-     */
-    void onDefeated();
-  }
-
-  /**
-   * Offers this candidate in leadership elections for as long as the current jvm process is alive.
-   * Upon election, the {@code onElected} callback will be executed and a command that can be used
-   * to abdicate leadership will be passed in.  If the elected leader jvm process dies or the
-   * elected leader successfully abdicates then a new leader will be elected.  Leaders that
-   * successfully abdicate are removed from the group and will not be eligible for leadership
-   * election unless {@link #offerLeadership(Leader)} is called again.
-   *
-   * @param leader the leader to notify of election and defeat events
-   * @throws JoinException if there was a problem joining the group
-   * @throws WatchException if there is a problem generating the 1st group membership list
-   * @throws InterruptedException if interrupted waiting to join the group and determine initial
-   *     election results
-   * @return a supplier that can be queried to find out if this leader is currently elected
-   */
-  public Supplier<Boolean> offerLeadership(Leader leader)
-        throws JoinException, WatchException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
deleted file mode 100644
index 98b5ee4..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/CandidateImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.Membership;
-import org.apache.aurora.common.zookeeper.Group.WatchException;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements leader election for small groups of candidates.  This implementation is subject to the
- * <a href="http://hadoop.apache.org/zookeeper/docs/r3.2.1/recipes.html#sc_leaderElection">
- * herd effect</a> for a given group and should only be used for small (~10 member) candidate pools.
- */
-public class CandidateImpl implements Candidate {
-  private static final Logger LOG = LoggerFactory.getLogger(CandidateImpl.class);
-
-  private static final byte[] UNKNOWN_CANDIDATE_DATA = "<unknown>".getBytes(Charsets.UTF_8);
-
-  private static final Supplier<byte[]> IP_ADDRESS_DATA_SUPPLIER = () -> {
-    try {
-      return InetAddress.getLocalHost().getHostAddress().getBytes();
-    } catch (UnknownHostException e) {
-      LOG.warn("Failed to determine local address!", e);
-      return UNKNOWN_CANDIDATE_DATA;
-    }
-  };
-
-  private static final Function<Iterable<String>, String> MOST_RECENT_JUDGE =
-      candidates -> Ordering.natural().min(candidates);
-
-  private final Group group;
-
-  /**
-   * Creates a candidate that can be used to offer leadership for the given {@code group}.
-   */
-  public CandidateImpl(Group group) {
-    this.group = Preconditions.checkNotNull(group);
-  }
-
-  @Override
-  public Optional<byte[]> getLeaderData()
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-
-    String leaderId = getLeader(group.getMemberIds());
-    return leaderId == null
-        ? Optional.<byte[]>absent()
-        : Optional.of(group.getMemberData(leaderId));
-  }
-
-  @Override
-  public Supplier<Boolean> offerLeadership(final Leader leader)
-      throws JoinException, WatchException, InterruptedException {
-
-    final Membership membership = group.join(IP_ADDRESS_DATA_SUPPLIER, leader::onDefeated);
-
-    final AtomicBoolean elected = new AtomicBoolean(false);
-    final AtomicBoolean abdicated = new AtomicBoolean(false);
-    group.watch(memberIds -> {
-      boolean noCandidates = Iterables.isEmpty(memberIds);
-      String memberId = membership.getMemberId();
-
-      if (noCandidates) {
-        LOG.warn("All candidates have temporarily left the group: " + group);
-      } else if (!Iterables.contains(memberIds, memberId)) {
-        LOG.error(
-            "Current member ID {} is not a candidate for leader, current voting: {}",
-            memberId, memberIds);
-      } else {
-        boolean electedLeader = memberId.equals(getLeader(memberIds));
-        boolean previouslyElected = elected.getAndSet(electedLeader);
-
-        if (!previouslyElected && electedLeader) {
-          LOG.info("Candidate {} is now leader of group: {}",
-              membership.getMemberPath(), memberIds);
-
-          leader.onElected(() -> {
-            membership.cancel();
-            abdicated.set(true);
-          });
-        } else if (!electedLeader) {
-          if (previouslyElected) {
-            leader.onDefeated();
-          }
-          LOG.info(
-              "Candidate {} waiting for the next leader election, current voting: {}",
-              membership.getMemberPath(), memberIds);
-        }
-      }
-    });
-
-    return () -> !abdicated.get() && elected.get();
-  }
-
-  @Nullable
-  private String getLeader(Iterable<String> memberIds) {
-    return Iterables.isEmpty(memberIds) ? null : MOST_RECENT_JUDGE.apply(memberIds);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
deleted file mode 100644
index 2720dd1..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Group.java
+++ /dev/null
@@ -1,674 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.base.Commands;
-import org.apache.aurora.common.base.ExceptionalSupplier;
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class exposes methods for joining and monitoring distributed groups.  The groups this class
- * monitors are realized as persistent paths in ZooKeeper with ephemeral child nodes for
- * each member of a group.
- */
-public class Group {
-  private static final Logger LOG = LoggerFactory.getLogger(Group.class);
-
-  private static final Supplier<byte[]> NO_MEMBER_DATA = Suppliers.ofInstance(null);
-  private static final String DEFAULT_NODE_NAME_PREFIX = "member_";
-
-  private final ZooKeeperClient zkClient;
-  private final ImmutableList<ACL> acl;
-  private final String path;
-
-  private final NodeScheme nodeScheme;
-  private final Predicate<String> nodeNameFilter;
-
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a group rooted at the given {@code path}.  Paths must be absolute and trailing or
-   * duplicate slashes will be normalized.  For example, all the following paths would create a
-   * group at the normalized path /my/distributed/group:
-   * <ul>
-   *   <li>/my/distributed/group
-   *   <li>/my/distributed/group/
-   *   <li>/my/distributed//group
-   * </ul>
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it does not already exist
-   * @param path the absolute persistent path that represents this group
-   * @param nodeScheme the scheme that defines how nodes are created
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, NodeScheme nodeScheme) {
-    this.zkClient = Preconditions.checkNotNull(zkClient);
-    this.acl = ImmutableList.copyOf(acl);
-    this.path = ZooKeeperUtils.normalizePath(Preconditions.checkNotNull(path));
-
-    this.nodeScheme = Preconditions.checkNotNull(nodeScheme);
-    nodeNameFilter = Group.this.nodeScheme::isMember;
-
-    backoffHelper = new BackoffHelper();
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, String)} with a
-   * {@code namePrefix} of 'member_'.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
-    this(zkClient, acl, path, DEFAULT_NODE_NAME_PREFIX);
-  }
-
-  /**
-   * Equivalent to {@link #Group(ZooKeeperClient, Iterable, String, NodeScheme)} with a
-   * {@link DefaultScheme} using {@code namePrefix}.
-   */
-  public Group(ZooKeeperClient zkClient, Iterable<ACL> acl, String path, String namePrefix) {
-    this(zkClient, acl, path, new DefaultScheme(namePrefix));
-  }
-
-  public String getMemberPath(String memberId) {
-    return path + "/" + MorePreconditions.checkNotBlank(memberId);
-  }
-
-  public String getPath() {
-    return path;
-  }
-
-  public String getMemberId(String nodePath) {
-    MorePreconditions.checkNotBlank(nodePath);
-    Preconditions.checkArgument(nodePath.startsWith(path + "/"),
-        "Not a member of this group[%s]: %s", path, nodePath);
-
-    String memberId = StringUtils.substringAfterLast(nodePath, "/");
-    Preconditions.checkArgument(nodeScheme.isMember(memberId),
-        "Not a group member: %s", memberId);
-    return memberId;
-  }
-
-  /**
-   * Returns the current list of group member ids by querying ZooKeeper synchronously.
-   *
-   * @return the ids of all the present members of this group
-   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
-   * @throws KeeperException if there was a problem reading this group's member ids
-   * @throws InterruptedException if this thread is interrupted listing the group members
-   */
-  public Iterable<String> getMemberIds()
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-    return Iterables.filter(zkClient.get().getChildren(path, false), nodeNameFilter);
-  }
-
-  /**
-   * Gets the data for one of this groups members by querying ZooKeeper synchronously.
-   *
-   * @param memberId the id of the member whose data to retrieve
-   * @return the data associated with the {@code memberId}
-   * @throws ZooKeeperConnectionException if there was a problem connecting to ZooKeeper
-   * @throws KeeperException if there was a problem reading this member's data
-   * @throws InterruptedException if this thread is interrupted retrieving the member data
-   */
-  public byte[] getMemberData(String memberId)
-      throws ZooKeeperConnectionException, KeeperException, InterruptedException {
-    return zkClient.get().getData(getMemberPath(memberId), false, null);
-  }
-
-  /**
-   * Represents membership in a distributed group.
-   */
-  public interface Membership {
-
-    /**
-     * Returns the persistent ZooKeeper path that represents this group.
-     */
-    String getGroupPath();
-
-    /**
-     * Returns the id (ZooKeeper node name) of this group member.  May change over time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberId();
-
-    /**
-     * Returns the full ZooKeeper path to this group member.  May change over time if the
-     * ZooKeeper session expires.
-     */
-    String getMemberPath();
-
-    /**
-     * Updates the membership data synchronously using the {@code Supplier<byte[]>} passed to
-     * {@link Group#join()}.
-     *
-     * @return the new membership data
-     * @throws UpdateException if there was a problem updating the membership data
-     */
-    byte[] updateMemberData() throws UpdateException;
-
-    /**
-     * Cancels group membership by deleting the associated ZooKeeper member node.
-     *
-     * @throws JoinException if there is a problem deleting the node
-     */
-    void cancel() throws JoinException;
-  }
-
-  /**
-   * Indicates an error joining a group.
-   */
-  public static class JoinException extends Exception {
-    public JoinException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Indicates an error updating a group member's data.
-   */
-  public static class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, null)}.
-   */
-  public final Membership join() throws JoinException, InterruptedException {
-    return join(NO_MEMBER_DATA, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(memberData, null)}.
-   */
-  public final Membership join(Supplier<byte[]> memberData)
-      throws JoinException, InterruptedException {
-
-    return join(memberData, null);
-  }
-
-  /**
-   * Equivalent to calling {@code join(null, onLoseMembership)}.
-   */
-  public final Membership join(@Nullable final Command onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    return join(NO_MEMBER_DATA, onLoseMembership);
-  }
-
-  /**
-   * Joins this group and returns the resulting Membership when successful.  Membership will be
-   * automatically cancelled when the current jvm process dies; however the returned Membership
-   * object can be used to cancel membership earlier.  Unless
-   * {@link Group.Membership#cancel()} is called the membership will
-   * be maintained by re-establishing it silently in the background.
-   *
-   * <p>Any {@code memberData} given is persisted in the member node in ZooKeeper.  If an
-   * {@code onLoseMembership} callback is supplied, it will be notified each time this member loses
-   * membership in the group.
-   *
-   * @param memberData a supplier of the data to store in the member node
-   * @param onLoseMembership a callback to notify when membership is lost
-   * @return a Membership object with the member details
-   * @throws JoinException if there was a problem joining the group
-   * @throws InterruptedException if this thread is interrupted awaiting completion of the join
-   */
-  public final Membership join(Supplier<byte[]> memberData, @Nullable Command onLoseMembership)
-      throws JoinException, InterruptedException {
-
-    Preconditions.checkNotNull(memberData);
-    ensurePersistentGroupPath();
-
-    final ActiveMembership groupJoiner = new ActiveMembership(memberData, onLoseMembership);
-    return backoffHelper.doUntilResult(() -> {
-      try {
-        return groupJoiner.join();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new JoinException("Interrupted trying to join group at path: " + path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Temporary error trying to join group at path: " + path, e);
-        return null;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error trying to join group at path: " + path, e);
-          return null;
-        } else {
-          throw new JoinException("Problem joining partition group at path: " + path, e);
-        }
-      }
-    });
-  }
-
-  private void ensurePersistentGroupPath() throws JoinException, InterruptedException {
-    backoffHelper.doUntilSuccess(() -> {
-      try {
-        ZooKeeperUtils.ensurePath(zkClient, acl, path);
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new JoinException("Interrupted trying to ensure group at path: " + path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-        return false;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error ensuring path: " + path, e);
-          return false;
-        } else {
-          throw new JoinException("Problem ensuring group at path: " + path, e);
-        }
-      }
-    });
-  }
-
-  private class ActiveMembership implements Membership {
-    private final Supplier<byte[]> memberData;
-    private final Command onLoseMembership;
-    private String nodePath;
-    private String memberId;
-    private volatile boolean cancelled;
-    private byte[] membershipData;
-
-    public ActiveMembership(Supplier<byte[]> memberData, @Nullable Command onLoseMembership) {
-      this.memberData = memberData;
-      this.onLoseMembership = (onLoseMembership == null) ? Commands.NOOP : onLoseMembership;
-    }
-
-    @Override
-    public String getGroupPath() {
-      return path;
-    }
-
-    @Override
-    public synchronized String getMemberId() {
-      return memberId;
-    }
-
-    @Override
-    public synchronized String getMemberPath() {
-      return nodePath;
-    }
-
-    @Override
-    public synchronized byte[] updateMemberData() throws UpdateException {
-      byte[] membershipData = memberData.get();
-      if (!ArrayUtils.isEquals(this.membershipData, membershipData)) {
-        try {
-          zkClient.get().setData(nodePath, membershipData, ZooKeeperUtils.ANY_VERSION);
-          this.membershipData = membershipData;
-        } catch (KeeperException e) {
-          throw new UpdateException("Problem updating membership data.", e);
-        } catch (InterruptedException e) {
-          throw new UpdateException("Interrupted attempting to update membership data.", e);
-        } catch (ZooKeeperConnectionException e) {
-          throw new UpdateException(
-              "Could not connect to the ZooKeeper cluster to update membership data.", e);
-        }
-      }
-      return membershipData;
-    }
-
-    @Override
-    public synchronized void cancel() throws JoinException {
-      if (!cancelled) {
-        try {
-          backoffHelper.doUntilSuccess(() -> {
-            try {
-              zkClient.get().delete(nodePath, ZooKeeperUtils.ANY_VERSION);
-              return true;
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              throw new JoinException("Interrupted trying to cancel membership: " + nodePath, e);
-            } catch (ZooKeeperConnectionException e) {
-              LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-              return false;
-            } catch (NoNodeException e) {
-              LOG.info("Membership already cancelled, node at path: " + nodePath +
-                       " has been deleted");
-              return true;
-            } catch (KeeperException e) {
-              if (zkClient.shouldRetry(e)) {
-                LOG.warn("Temporary error cancelling membership: " + nodePath, e);
-                return false;
-              } else {
-                throw new JoinException("Problem cancelling membership: " + nodePath, e);
-              }
-            }
-          });
-          cancelled = true; // Prevent auto-re-join logic from undoing this cancel.
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new JoinException("Problem cancelling membership: " + nodePath, e);
-        }
-      }
-    }
-
-    private class CancelledException extends IllegalStateException { /* marker */ }
-
-    synchronized Membership join()
-        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-
-      if (cancelled) {
-        throw new CancelledException();
-      }
-
-      if (nodePath == null) {
-        // Re-join if our ephemeral node goes away due to session expiry - only needs to be
-        // registered once.
-        zkClient.registerExpirationHandler(this::tryJoin);
-      }
-
-      byte[] membershipData = memberData.get();
-      String nodeName = nodeScheme.createName(membershipData);
-      CreateMode createMode = nodeScheme.isSequential()
-          ? CreateMode.EPHEMERAL_SEQUENTIAL
-          : CreateMode.EPHEMERAL;
-      nodePath = zkClient.get().create(path + "/" + nodeName, membershipData, acl, createMode);
-      memberId = Group.this.getMemberId(nodePath);
-      LOG.info("Set group member ID to " + memberId);
-      this.membershipData = membershipData;
-
-      // Re-join if our ephemeral node goes away due to maliciousness.
-      zkClient.get().exists(nodePath, event -> {
-        if (event.getType() == EventType.NodeDeleted) {
-          tryJoin();
-        }
-      });
-
-      return this;
-    }
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> tryJoin =
-        () -> {
-          try {
-            join();
-            return true;
-          } catch (CancelledException e) {
-            // Lost a cancel race - that's ok.
-            return true;
-          } catch (ZooKeeperConnectionException e) {
-            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-            return false;
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error re-joining group: " + path, e);
-              return false;
-            } else {
-              throw new IllegalStateException("Permanent problem re-joining group: " + path, e);
-            }
-          }
-        };
-
-    private synchronized void tryJoin() {
-      onLoseMembership.execute();
-      try {
-        backoffHelper.doUntilSuccess(tryJoin);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-join group: %s, giving up", path), e);
-      }
-    }
-  }
-
-  /**
-   * An interface to an object that listens for changes to a group's membership.
-   */
-  public interface GroupChangeListener {
-
-    /**
-     * Called whenever group membership changes with the new list of member ids.
-     *
-     * @param memberIds the current member ids
-     */
-    void onGroupChange(Iterable<String> memberIds);
-  }
-
-  /**
-   * An interface that dictates the scheme to use for storing and filtering nodes that represent
-   * members of a distributed group.
-   */
-  public interface NodeScheme {
-    /**
-     * Determines if a child node is a member of a group by examining the node's name.
-     *
-     * @param nodeName the name of a child node found in a group
-     * @return {@code true} if {@code nodeName} identifies a group member in this scheme
-     */
-    boolean isMember(String nodeName);
-
-    /**
-     * Generates a node name for the node representing this process in the distributed group.
-     *
-     * @param membershipData the data that will be stored in this node
-     * @return the name for the node that will represent this process in the group
-     */
-    String createName(byte[] membershipData);
-
-    /**
-     * Indicates whether this scheme needs ephemeral sequential nodes or just ephemeral nodes.
-     *
-     * @return {@code true} if this scheme requires sequential node names; {@code false} otherwise
-     */
-    boolean isSequential();
-  }
-
-  /**
-   * Indicates an error watching a group.
-   */
-  public static class WatchException extends Exception {
-    public WatchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  /**
-   * Watches this group for the lifetime of this jvm process.  This method will block until the
-   * current group members are available, notify the {@code groupChangeListener} and then return.
-   * All further changes to the group membership will cause notifications on a background thread.
-   *
-   * @param groupChangeListener the listener to notify of group membership change events
-   * @return A command which, when executed, will stop watching the group.
-   * @throws WatchException if there is a problem generating the 1st group membership list
-   * @throws InterruptedException if interrupted waiting to gather the 1st group membership list
-   */
-  public final Command watch(final GroupChangeListener groupChangeListener)
-      throws WatchException, InterruptedException {
-    Preconditions.checkNotNull(groupChangeListener);
-
-    try {
-      ensurePersistentGroupPath();
-    } catch (JoinException e) {
-      throw new WatchException("Failed to create group path: " + path, e);
-    }
-
-    final GroupMonitor groupMonitor = new GroupMonitor(groupChangeListener);
-    backoffHelper.doUntilSuccess(() -> {
-      try {
-        groupMonitor.watchGroup();
-        return true;
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new WatchException("Interrupted trying to watch group at path: " + path, e);
-      } catch (ZooKeeperConnectionException e) {
-        LOG.warn("Temporary error trying to watch group at path: " + path, e);
-        return null;
-      } catch (KeeperException e) {
-        if (zkClient.shouldRetry(e)) {
-          LOG.warn("Temporary error trying to watch group at path: " + path, e);
-          return null;
-        } else {
-          throw new WatchException("Problem trying to watch group at path: " + path, e);
-        }
-      }
-    });
-    return groupMonitor::stopWatching;
-  }
-
-  /**
-   * Helps continuously monitor a group for membership changes.
-   */
-  private class GroupMonitor {
-    private final GroupChangeListener groupChangeListener;
-    private volatile boolean stopped = false;
-    private Set<String> members;
-
-    GroupMonitor(GroupChangeListener groupChangeListener) {
-      this.groupChangeListener = groupChangeListener;
-    }
-
-    private final Watcher groupWatcher = event -> {
-      if (event.getType() == EventType.NodeChildrenChanged) {
-        tryWatchGroup();
-      }
-    };
-
-    private final ExceptionalSupplier<Boolean, InterruptedException> tryWatchGroup =
-        () -> {
-          try {
-            watchGroup();
-            return true;
-          } catch (ZooKeeperConnectionException e) {
-            LOG.warn("Problem connecting to ZooKeeper, retrying", e);
-            return false;
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error re-watching group: " + path, e);
-              return false;
-            } else {
-              throw new IllegalStateException("Permanent problem re-watching group: " + path, e);
-            }
-          }
-        };
-
-    private void tryWatchGroup() {
-      if (stopped) {
-        return;
-      }
-
-      try {
-        backoffHelper.doUntilSuccess(tryWatchGroup);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(
-            String.format("Interrupted while trying to re-watch group: %s, giving up", path), e);
-      }
-    }
-
-    private void watchGroup()
-        throws ZooKeeperConnectionException, InterruptedException, KeeperException {
-
-      if (stopped) {
-        return;
-      }
-
-      List<String> children = zkClient.get().getChildren(path, groupWatcher);
-      setMembers(Iterables.filter(children, nodeNameFilter));
-    }
-
-    private void stopWatching() {
-      // TODO(William Farner): Cancel the watch when
-      // https://issues.apache.org/jira/browse/ZOOKEEPER-442 is resolved.
-      LOG.info("Stopping watch on " + this);
-      stopped = true;
-    }
-
-    synchronized void setMembers(Iterable<String> members) {
-      if (stopped) {
-        LOG.info("Suppressing membership update, no longer watching " + this);
-        return;
-      }
-
-      if (this.members == null) {
-        // Reset our watch on the group if session expires - only needs to be registered once.
-        zkClient.registerExpirationHandler(this::tryWatchGroup);
-      }
-
-      Set<String> membership = ImmutableSet.copyOf(members);
-      if (!membership.equals(this.members)) {
-        groupChangeListener.onGroupChange(members);
-        this.members = membership;
-      }
-    }
-  }
-
-  /**
-   * Default naming scheme implementation. Stores nodes at [given path] + "/" + [given prefix] +
-   * ZooKeeper-generated member ID. For example, if the path is "/discovery/servicename", and the
-   * prefix is "member_", the node's full path will look something like
-   * {@code /discovery/servicename/member_0000000007}.
-   */
-  public static class DefaultScheme implements NodeScheme {
-    private final String namePrefix;
-    private final Pattern namePattern;
-
-    /**
-     * Creates a sequential node scheme based on the given node name prefix.
-     *
-     * @param namePrefix the prefix for the names of the member nodes
-     */
-    public DefaultScheme(String namePrefix) {
-      this.namePrefix = MorePreconditions.checkNotBlank(namePrefix);
-      namePattern = Pattern.compile("^" + Pattern.quote(namePrefix) + "-?[0-9]+$");
-    }
-
-    @Override
-    public boolean isMember(String nodeName) {
-      return namePattern.matcher(nodeName).matches();
-    }
-
-    @Override
-    public String createName(byte[] membershipData) {
-      return namePrefix;
-    }
-
-    @Override
-    public boolean isSequential() {
-      return true;
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "Group " + path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
index 9d31608..45e789b 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/JsonCodec.java
@@ -38,7 +38,10 @@ import org.apache.aurora.common.thrift.Status;
 
 import static java.util.Objects.requireNonNull;
 
-class JsonCodec implements Codec<ServiceInstance> {
+/**
+ * Encodes a {@link ServiceInstance} as a JSON object.
+ */
+public class JsonCodec implements Codec<ServiceInstance> {
 
   private static void assertRequiredField(String fieldName, Object fieldValue) {
     if (fieldValue == null) {
@@ -100,11 +103,16 @@ class JsonCodec implements Codec<ServiceInstance> {
     }
   }
 
+  /**
+   * The encoding for service instance data in ZooKeeper expected by Aurora clients.
+   */
+  public static final Codec<ServiceInstance> INSTANCE = new JsonCodec();
+
   private static final Charset ENCODING = Charsets.UTF_8;
 
   private final Gson gson;
 
-  JsonCodec() {
+  private JsonCodec() {
     this(new Gson());
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
deleted file mode 100644
index aeea02d..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-
-/**
- * A logical set of servers registered in ZooKeeper.  Intended to be used by servers in a
- * common service to advertise their presence to server-set protocol-aware clients.
- *
- * Standard implementations should use the {@link #JSON_CODEC} to serialize the service instance
- * rendezvous data to zookeeper so that standard clients can interoperate.
- */
-public interface ServerSet {
-
-  /**
-   * Encodes a {@link ServiceInstance} as a JSON object.
-   *
-   * This is the default encoding for service instance data in ZooKeeper.
-   */
-  Codec<ServiceInstance> JSON_CODEC = new JsonCodec();
-
-  /**
-   * Attempts to join a server set for this logical service group.
-   *
-   * @param endpoint the primary service endpoint
-   * @param additionalEndpoints and additional endpoints keyed by their logical name
-   * @return an EndpointStatus object that allows the endpoint to adjust its status
-   * @throws JoinException if there was a problem joining the server set
-   * @throws InterruptedException if interrupted while waiting to join the server set
-   */
-  EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws JoinException, InterruptedException;
-
-  /**
-   * A handle to a service endpoint's status data that allows updating it to track current events.
-   */
-  interface EndpointStatus {
-
-    /**
-     * Removes the endpoint from the server set.
-     *
-     * @throws UpdateException if there was a problem leaving the ServerSet.
-     */
-    void leave() throws UpdateException;
-  }
-
-  /**
-   * Indicates an error updating a service's status information.
-   */
-  class UpdateException extends Exception {
-    public UpdateException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
deleted file mode 100644
index ace4980..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
-import com.google.common.util.concurrent.UncheckedExecutionException;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}.
- */
-public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> {
-  private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class);
-
-  private final ZooKeeperClient zkClient;
-  private final Group group;
-  private final Codec<ServiceInstance> codec;
-  private final BackoffHelper backoffHelper;
-
-  /**
-   * Creates a new ServerSet using open ZooKeeper node ACLs.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, String path) {
-    this(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, path);
-  }
-
-  /**
-   * Creates a new ServerSet for the given service {@code path}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param acl the ACL to use for creating the persistent group path if it does not already exist
-   * @param path the name-service path of the service to connect to
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Iterable<ACL> acl, String path) {
-    this(zkClient, new Group(zkClient, acl, path), JSON_CODEC);
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group) {
-    this(zkClient, group, JSON_CODEC);
-  }
-
-  /**
-   * Creates a new ServerSet using the given service {@code group} and a custom {@code codec}.
-   *
-   * @param zkClient the client to use for interactions with ZooKeeper
-   * @param group the server group
-   * @param codec a codec to use for serializing and de-serializing the ServiceInstance data to and
-   *     from a byte array
-   */
-  public ServerSetImpl(ZooKeeperClient zkClient, Group group, Codec<ServiceInstance> codec) {
-    this.zkClient = checkNotNull(zkClient);
-    this.group = checkNotNull(group);
-    this.codec = checkNotNull(codec);
-
-    // TODO(John Sirois): Inject the helper so that backoff strategy can be configurable.
-    backoffHelper = new BackoffHelper();
-  }
-
-  @VisibleForTesting
-  ZooKeeperClient getZkClient() {
-    return zkClient;
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints)
-      throws Group.JoinException, InterruptedException {
-
-    checkNotNull(endpoint);
-    checkNotNull(additionalEndpoints);
-
-    MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints);
-    Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance;
-    Group.Membership membership = group.join(serviceInstanceSupplier);
-
-    return () -> memberStatus.leave(membership);
-  }
-
-  @Override
-  public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
-    ServerSetWatcher serverSetWatcher = new ServerSetWatcher(zkClient, monitor);
-    try {
-      return serverSetWatcher.watch();
-    } catch (Group.WatchException e) {
-      throw new MonitorException("ZooKeeper watch failed.", e);
-    } catch (InterruptedException e) {
-      throw new MonitorException("Interrupted while watching ZooKeeper.", e);
-    }
-  }
-
-  private class MemberStatus {
-    private final InetSocketAddress endpoint;
-    private final Map<String, InetSocketAddress> additionalEndpoints;
-
-    private MemberStatus(
-        InetSocketAddress endpoint,
-        Map<String, InetSocketAddress> additionalEndpoints) {
-
-      this.endpoint = endpoint;
-      this.additionalEndpoints = additionalEndpoints;
-    }
-
-    synchronized void leave(Group.Membership membership) throws UpdateException {
-      try {
-        membership.cancel();
-      } catch (Group.JoinException e) {
-        throw new UpdateException(
-            "Failed to auto-cancel group membership on transition to DEAD status", e);
-      }
-    }
-
-    byte[] serializeServiceInstance() {
-      ServiceInstance serviceInstance = new ServiceInstance(
-          ServerSets.toEndpoint(endpoint),
-          Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
-          Status.ALIVE);
-
-      LOG.debug("updating endpoint data to:\n\t" + serviceInstance);
-      try {
-        return ServerSets.serializeServiceInstance(serviceInstance, codec);
-      } catch (IOException e) {
-        throw new IllegalStateException("Unexpected problem serializing thrift struct " +
-            serviceInstance + "to a byte[]", e);
-      }
-    }
-  }
-
-  private static class ServiceInstanceFetchException extends RuntimeException {
-    ServiceInstanceFetchException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-  private static class ServiceInstanceDeletedException extends RuntimeException {
-    ServiceInstanceDeletedException(String path) {
-      super(path);
-    }
-  }
-
-  private class ServerSetWatcher {
-    private final ZooKeeperClient zkClient;
-    private final HostChangeMonitor<ServiceInstance> monitor;
-    @Nullable private ImmutableSet<ServiceInstance> serverSet;
-
-    ServerSetWatcher(ZooKeeperClient zkClient, HostChangeMonitor<ServiceInstance> monitor) {
-      this.zkClient = zkClient;
-      this.monitor = monitor;
-    }
-
-    public Command watch() throws Group.WatchException, InterruptedException {
-      Watcher onExpirationWatcher = zkClient.registerExpirationHandler(this::rebuildServerSet);
-
-      try {
-        return group.watch(this::notifyGroupChange);
-      } catch (Group.WatchException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      } catch (InterruptedException e) {
-        zkClient.unregister(onExpirationWatcher);
-        throw e;
-      }
-    }
-
-    private ServiceInstance getServiceInstance(final String nodePath) {
-      try {
-        return backoffHelper.doUntilResult(() -> {
-          try {
-            byte[] data = zkClient.get().getData(nodePath, false, null);
-            return ServerSets.deserializeServiceInstance(data, codec);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new ServiceInstanceFetchException(
-                "Interrupted updating service data for: " + nodePath, e);
-          } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-            LOG.warn("Temporary error trying to updating service data for: " + nodePath, e);
-            return null;
-          } catch (NoNodeException e) {
-            invalidateNodePath(nodePath);
-            throw new ServiceInstanceDeletedException(nodePath);
-          } catch (KeeperException e) {
-            if (zkClient.shouldRetry(e)) {
-              LOG.warn("Temporary error trying to update service data for: " + nodePath, e);
-              return null;
-            } else {
-              throw new ServiceInstanceFetchException(
-                  "Failed to update service data for: " + nodePath, e);
-            }
-          } catch (IOException e) {
-            throw new ServiceInstanceFetchException(
-                "Failed to deserialize the ServiceInstance data for: " + nodePath, e);
-          }
-        });
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ServiceInstanceFetchException(
-            "Interrupted trying to update service data for: " + nodePath, e);
-      }
-    }
-
-    private final LoadingCache<String, ServiceInstance> servicesByMemberId =
-        CacheBuilder.newBuilder().build(new CacheLoader<String, ServiceInstance>() {
-          @Override public ServiceInstance load(String memberId) {
-            return getServiceInstance(group.getMemberPath(memberId));
-          }
-        });
-
-    private void rebuildServerSet() {
-      Set<String> memberIds = ImmutableSet.copyOf(servicesByMemberId.asMap().keySet());
-      servicesByMemberId.invalidateAll();
-      notifyGroupChange(memberIds);
-    }
-
-    private String invalidateNodePath(String deletedPath) {
-      String memberId = group.getMemberId(deletedPath);
-      servicesByMemberId.invalidate(memberId);
-      return memberId;
-    }
-
-    private final Function<String, ServiceInstance> MAYBE_FETCH_NODE =
-        memberId -> {
-          // This get will trigger a fetch
-          try {
-            return servicesByMemberId.getUnchecked(memberId);
-          } catch (UncheckedExecutionException e) {
-            Throwable cause = e.getCause();
-            if (!(cause instanceof ServiceInstanceDeletedException)) {
-              Throwables.propagateIfInstanceOf(cause, ServiceInstanceFetchException.class);
-              throw new IllegalStateException(
-                  "Unexpected error fetching member data for: " + memberId, e);
-            }
-            return null;
-          }
-        };
-
-    private synchronized void notifyGroupChange(Iterable<String> memberIds) {
-      ImmutableSet<String> newMemberIds = ImmutableSortedSet.copyOf(memberIds);
-      Set<String> existingMemberIds = servicesByMemberId.asMap().keySet();
-
-      // Ignore no-op state changes except for the 1st when we've seen no group yet.
-      if ((serverSet == null) || !newMemberIds.equals(existingMemberIds)) {
-        SetView<String> deletedMemberIds = Sets.difference(existingMemberIds, newMemberIds);
-        // Implicit removal from servicesByMemberId.
-        existingMemberIds.removeAll(ImmutableSet.copyOf(deletedMemberIds));
-
-        Iterable<ServiceInstance> serviceInstances = Iterables.filter(
-            Iterables.transform(newMemberIds, MAYBE_FETCH_NODE), Predicates.notNull());
-
-        notifyServerSetChange(ImmutableSet.copyOf(serviceInstances));
-      }
-    }
-
-    private void notifyServerSetChange(ImmutableSet<ServiceInstance> currentServerSet) {
-      // ZK nodes may have changed if there was a session expiry for a server in the server set, but
-      // if the server's status has not changed, we can skip any onChange updates.
-      if (!currentServerSet.equals(serverSet)) {
-        if (currentServerSet.isEmpty()) {
-          LOG.warn("server set empty for path " + group.getPath());
-        } else {
-          if (serverSet == null) {
-            LOG.info("received initial membership {}", currentServerSet);
-          } else {
-            logChange(currentServerSet);
-          }
-        }
-        serverSet = currentServerSet;
-        monitor.onChange(serverSet);
-      }
-    }
-
-    private void logChange(ImmutableSet<ServiceInstance> newServerSet) {
-      StringBuilder message = new StringBuilder("server set " + group.getPath() + " change: ");
-      if (serverSet.size() != newServerSet.size()) {
-        message.append("from ").append(serverSet.size())
-            .append(" members to ").append(newServerSet.size());
-      }
-
-      Joiner joiner = Joiner.on("\n\t\t");
-
-      SetView<ServiceInstance> left = Sets.difference(serverSet, newServerSet);
-      if (!left.isEmpty()) {
-        message.append("\n\tleft:\n\t\t").append(joiner.join(left));
-      }
-
-      SetView<ServiceInstance> joined = Sets.difference(newServerSet, serverSet);
-      if (!joined.isEmpty()) {
-        message.append("\n\tjoined:\n\t\t").append(joiner.join(joined));
-      }
-
-      LOG.info(message.toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
deleted file mode 100644
index 01a54a5..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.io.Codec;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * Common ServerSet related functions
- */
-public class ServerSets {
-
-  private ServerSets() {
-    // Utility class.
-  }
-
-  /**
-   * A function that invokes {@link #toEndpoint(InetSocketAddress)}.
-   */
-  public static final Function<InetSocketAddress, Endpoint> TO_ENDPOINT =
-      ServerSets::toEndpoint;
-
-  /**
-   * Creates a server set that registers at a single path applying the given ACL to all nodes
-   * created in the path.
-   *
-   * @param zkClient ZooKeeper client to register with.
-   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
-   * @param zkPath Path to register at.  @see #create(ZooKeeperClient, java.util.Set)
-   * @return A server set that registers at {@code zkPath}.
-   */
-  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
-    Preconditions.checkNotNull(zkClient);
-    MorePreconditions.checkNotBlank(acl);
-    MorePreconditions.checkNotBlank(zkPath);
-
-    return new ServerSetImpl(zkClient, acl, zkPath);
-  }
-
-  /**
-   * Returns a serialized Thrift service instance object, with given endpoints and codec.
-   *
-   * @param serviceInstance the Thrift service instance object to be serialized
-   * @param codec the codec to use to serialize a Thrift service instance object
-   * @return byte array that contains a serialized Thrift service instance
-   */
-  public static byte[] serializeServiceInstance(
-      ServiceInstance serviceInstance, Codec<ServiceInstance> codec) throws IOException {
-
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    codec.serialize(serviceInstance, output);
-    return output.toByteArray();
-  }
-
-  /**
-   * Serializes a service instance based on endpoints.
-   * @see #serializeServiceInstance(ServiceInstance, Codec)
-   *
-   * @param address the target address of the service instance
-   * @param additionalEndpoints additional endpoints of the service instance
-   * @param status service status
-   */
-  public static byte[] serializeServiceInstance(
-      InetSocketAddress address,
-      Map<String, Endpoint> additionalEndpoints,
-      Status status,
-      Codec<ServiceInstance> codec) throws IOException {
-
-    ServiceInstance serviceInstance =
-        new ServiceInstance(toEndpoint(address), additionalEndpoints, status);
-    return serializeServiceInstance(serviceInstance, codec);
-  }
-
-  /**
-   * Creates a service instance object deserialized from byte array.
-   *
-   * @param data the byte array contains a serialized Thrift service instance
-   * @param codec the codec to use to deserialize the byte array
-   */
-  public static ServiceInstance deserializeServiceInstance(
-      byte[] data, Codec<ServiceInstance> codec) throws IOException {
-
-    return codec.deserialize(new ByteArrayInputStream(data));
-  }
-
-  /**
-   * Creates an endpoint for the given InetSocketAddress.
-   *
-   * @param address the target address to create the endpoint for
-   */
-  public static Endpoint toEndpoint(InetSocketAddress address) {
-    return new Endpoint(address.getHostName(), address.getPort());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
deleted file mode 100644
index d9978a9..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/SingletonServiceImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.zookeeper.Candidate.Leader;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.zookeeper.data.ACL;
-
-public class SingletonServiceImpl implements SingletonService {
-  @VisibleForTesting
-  static final String LEADER_ELECT_NODE_PREFIX = "singleton_candidate_";
-
-  /**
-   * Creates a candidate that can be combined with an existing server set to form a singleton
-   * service using {@link #SingletonServiceImpl(ServerSet, Candidate)}.
-   *
-   * @param zkClient The ZooKeeper client to use.
-   * @param servicePath The path where service nodes live.
-   * @param acl The acl to apply to newly created candidate nodes and serverset nodes.
-   * @return A candidate that can be housed with a standard server set under a single zk path.
-   */
-  public static Candidate createSingletonCandidate(
-      ZooKeeperClient zkClient,
-      String servicePath,
-      Iterable<ACL> acl) {
-
-    return new CandidateImpl(new Group(zkClient, acl, servicePath, LEADER_ELECT_NODE_PREFIX));
-  }
-
-  private final ServerSet serverSet;
-  private final Candidate candidate;
-
-  /**
-   * Creates a new singleton service that uses the supplied candidate to vie for leadership and then
-   * advertises itself in the given server set once elected.
-   *
-   * @param serverSet The server set to advertise in on election.
-   * @param candidate The candidacy to use to vie for election.
-   */
-  public SingletonServiceImpl(ServerSet serverSet, Candidate candidate) {
-    this.serverSet = Preconditions.checkNotNull(serverSet);
-    this.candidate = Preconditions.checkNotNull(candidate);
-  }
-
-  @Override
-  public void lead(final InetSocketAddress endpoint,
-                   final Map<String, InetSocketAddress> additionalEndpoints,
-                   final LeadershipListener listener)
-                   throws LeadException, InterruptedException {
-
-    Preconditions.checkNotNull(listener);
-
-    try {
-      candidate.offerLeadership(new Leader() {
-        @Override public void onElected(final ExceptionalCommand<JoinException> abdicate) {
-          listener.onLeading(new LeaderControl() {
-            ServerSet.EndpointStatus endpointStatus = null;
-            final AtomicBoolean left = new AtomicBoolean(false);
-
-            // Methods are synchronized to prevent simultaneous invocations.
-            @Override public synchronized void advertise()
-                throws AdvertiseException, InterruptedException {
-
-              Preconditions.checkState(!left.get(), "Cannot advertise after leaving.");
-              Preconditions.checkState(endpointStatus == null, "Cannot advertise more than once.");
-              try {
-                endpointStatus = serverSet.join(endpoint, additionalEndpoints);
-              } catch (JoinException e) {
-                throw new AdvertiseException("Problem advertising endpoint " + endpoint, e);
-              }
-            }
-
-            @Override public synchronized void leave() throws LeaveException {
-              Preconditions.checkState(left.compareAndSet(false, true),
-                  "Cannot leave more than once.");
-              if (endpointStatus != null) {
-                try {
-                  endpointStatus.leave();
-                } catch (ServerSet.UpdateException e) {
-                  throw new LeaveException("Problem updating endpoint status for abdicating leader " +
-                      "at endpoint " + endpoint, e);
-                }
-              }
-              try {
-                abdicate.execute();
-              } catch (JoinException e) {
-                throw new LeaveException("Problem abdicating leadership for endpoint " + endpoint, e);
-              }
-            }
-          });
-        }
-
-        @Override public void onDefeated() {
-          listener.onDefeated();
-        }
-      });
-    } catch (JoinException e) {
-      throw new LeadException("Problem joining leadership group for endpoint " + endpoint, e);
-    } catch (Group.WatchException e) {
-      throw new LeadException("Problem getting initial membership list for leadership group.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
deleted file mode 100644
index 9c0cebe..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-
-import org.apache.aurora.common.base.ExceptionalCommand;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-public class CandidateImplTest extends BaseZooKeeperClientTest {
-  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
-  private static final String SERVICE = "/twitter/services/puffin_linkhose/leader";
-  private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES);
-
-  private LinkedBlockingDeque<CandidateImpl> candidateBuffer;
-
-  @Before
-  public void mySetUp() throws IOException {
-    candidateBuffer = new LinkedBlockingDeque<>();
-  }
-
-  private Group createGroup(ZooKeeperClient zkClient) throws IOException {
-    return new Group(zkClient, ACL, SERVICE);
-  }
-
-  private class Reign implements Candidate.Leader {
-    private ExceptionalCommand<Group.JoinException> abdicate;
-    private final CandidateImpl candidate;
-    private final String id;
-    private CountDownLatch defeated = new CountDownLatch(1);
-
-    Reign(String id, CandidateImpl candidate) {
-      this.id = id;
-      this.candidate = candidate;
-    }
-
-    @Override
-    public void onElected(ExceptionalCommand<Group.JoinException> abdicate) {
-      candidateBuffer.offerFirst(candidate);
-      this.abdicate = abdicate;
-    }
-
-    @Override
-    public void onDefeated() {
-      defeated.countDown();
-    }
-
-    public void abdicate() throws Group.JoinException {
-      Preconditions.checkState(abdicate != null);
-      abdicate.execute();
-    }
-
-    public void expectDefeated() throws InterruptedException {
-      defeated.await();
-    }
-
-    @Override
-    public String toString() {
-      return id;
-    }
-  }
-
-  @Test
-  public void testOfferLeadership() throws Exception {
-    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) {
-      @Override public String toString() {
-        return "Leader1";
-      }
-    };
-    ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) {
-      @Override public String toString() {
-        return "Leader2";
-      }
-    };
-    ZooKeeperClient zkClient3 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) {
-      @Override public String toString() {
-        return "Leader3";
-      }
-    };
-
-    Reign candidate1Reign = new Reign("1", candidate1);
-    Reign candidate2Reign = new Reign("2", candidate2);
-    Reign candidate3Reign = new Reign("3", candidate3);
-
-    Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
-    Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
-    Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign);
-
-    assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader",
-        candidate1Leader.get());
-
-    shutdownNetwork();
-    restartNetwork();
-
-    assertTrue("A re-connect without a session expiration should leave the leader elected",
-        candidate1Leader.get());
-
-    candidate1Reign.abdicate();
-    assertSame(candidate1, candidateBuffer.takeLast());
-    assertFalse(candidate1Leader.get());
-    // Active abdication should trigger defeat.
-    candidate1Reign.expectDefeated();
-
-    CandidateImpl secondCandidate = candidateBuffer.takeLast();
-    assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " "
-               + candidateBuffer,
-        candidate2Leader.get() ^ candidate3Leader.get());
-
-    if (secondCandidate == candidate2) {
-      expireSession(zkClient2);
-      assertSame(candidate3, candidateBuffer.takeLast());
-      assertTrue(candidate3Leader.get());
-      // Passive expiration should trigger defeat.
-      candidate2Reign.expectDefeated();
-    } else {
-      expireSession(zkClient3);
-      assertSame(candidate2, candidateBuffer.takeLast());
-      assertTrue(candidate2Leader.get());
-      // Passive expiration should trigger defeat.
-      candidate3Reign.expectDefeated();
-    }
-  }
-
-  @Test
-  public void testEmptyMembership() throws Exception {
-    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
-    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1));
-    Reign candidate1Reign = new Reign("1", candidate1);
-
-    candidate1.offerLeadership(candidate1Reign);
-    assertSame(candidate1, candidateBuffer.takeLast());
-    candidate1Reign.abdicate();
-    assertFalse(candidate1.getLeaderData().isPresent());
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/69cba786/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
deleted file mode 100644
index 97a42d1..0000000
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.Membership;
-import org.apache.aurora.common.zookeeper.Group.NodeScheme;
-import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.fail;
-
-public class GroupTest extends BaseZooKeeperClientTest {
-
-  private ZooKeeperClient zkClient;
-  private Group joinGroup;
-  private Group watchGroup;
-  private Command stopWatching;
-  private Command onLoseMembership;
-
-  private RecordingListener listener;
-
-  public GroupTest() {
-    super(Amount.of(1, Time.DAYS));
-  }
-
-  @Before
-  public void mySetUp() throws Exception {
-    onLoseMembership = createMock(Command.class);
-
-    zkClient = createZkClient("group", "test");
-    joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
-    watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
-
-    listener = new RecordingListener();
-    stopWatching = watchGroup.watch(listener);
-  }
-
-  private static class RecordingListener implements GroupChangeListener {
-    private final LinkedBlockingQueue<Iterable<String>> membershipChanges =
-        new LinkedBlockingQueue<Iterable<String>>();
-
-    @Override
-    public void onGroupChange(Iterable<String> memberIds) {
-      membershipChanges.add(memberIds);
-    }
-
-    public Iterable<String> take() throws InterruptedException {
-      return membershipChanges.take();
-    }
-
-    public void assertEmpty() {
-      assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges));
-    }
-
-    @Override
-    public String toString() {
-      return membershipChanges.toString();
-    }
-  }
-
-  private static class CustomScheme implements NodeScheme {
-    static final String NODE_NAME = "custom_name";
-
-    @Override
-    public boolean isMember(String nodeName) {
-      return NODE_NAME.equals(nodeName);
-    }
-
-    @Override
-    public String createName(byte[] membershipData) {
-      return NODE_NAME;
-    }
-
-    @Override
-    public boolean isSequential() {
-      return false;
-    }
-  }
-
-  @Test
-  public void testSessionExpirationTriggersOnLoseMembership() throws Exception {
-    final CountDownLatch lostMembership = new CountDownLatch(1);
-    Command onLoseMembership = lostMembership::countDown;
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join(onLoseMembership);
-    assertMembershipObserved(membership.getMemberId());
-    expireSession(zkClient);
-
-    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
-  }
-
-  @Test
-  public void testNodeDeleteTriggersOnLoseMembership() throws Exception {
-    final CountDownLatch lostMembership = new CountDownLatch(1);
-    Command onLoseMembership = lostMembership::countDown;
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join(onLoseMembership);
-    assertMembershipObserved(membership.getMemberId());
-    membership.cancel();
-
-    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
-  }
-
-  @Test
-  public void testJoinsAndWatchesSurviveDisconnect() throws Exception {
-    replay(onLoseMembership);
-
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join();
-    String originalMemberId = membership.getMemberId();
-    assertMembershipObserved(originalMemberId);
-
-    shutdownNetwork();
-    restartNetwork();
-
-    // The member should still be present under existing ephemeral node since session did not
-    // expire.
-    watchGroup.watch(listener);
-    assertMembershipObserved(originalMemberId);
-
-    membership.cancel();
-
-    assertEmptyMembershipObserved();
-    assertEmptyMembershipObserved(); // and again for 2nd listener
-
-    listener.assertEmpty();
-
-    verify(onLoseMembership);
-    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
-  }
-
-  @Test
-  public void testJoinsAndWatchesSurviveExpiredSession() throws Exception {
-    onLoseMembership.execute();
-    replay(onLoseMembership);
-
-    assertEmptyMembershipObserved();
-
-    Membership membership = joinGroup.join(onLoseMembership);
-    String originalMemberId = membership.getMemberId();
-    assertMembershipObserved(originalMemberId);
-
-    expireSession(zkClient);
-
-    // We should have lost our group membership and then re-gained it with a new ephemeral node.
-    // We may or may-not see the intermediate state change but we must see the final state
-    Iterable<String> members = listener.take();
-    if (Iterables.isEmpty(members)) {
-      members = listener.take();
-    }
-    assertEquals(1, Iterables.size(members));
-    assertNotEquals(originalMemberId, Iterables.getOnlyElement(members));
-    assertNotEquals(originalMemberId, membership.getMemberId());
-
-    listener.assertEmpty();
-
-    verify(onLoseMembership);
-    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
-  }
-
-  @Test
-  public void testJoinCustomNamingScheme() throws Exception {
-    Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group",
-        new CustomScheme());
-
-    listener = new RecordingListener();
-    group.watch(listener);
-    assertEmptyMembershipObserved();
-
-    Membership membership = group.join();
-    String memberId = membership.getMemberId();
-
-    assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId);
-    assertMembershipObserved(memberId);
-
-    expireSession(zkClient);
-  }
-
-  @Test
-  public void testUpdateMembershipData() throws Exception {
-    Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock();
-
-    byte[] initial = "start".getBytes();
-    expect(dataSupplier.get()).andReturn(initial);
-
-    byte[] second = "update".getBytes();
-    expect(dataSupplier.get()).andReturn(second);
-
-    replay(dataSupplier);
-
-    Membership membership = joinGroup.join(dataSupplier, onLoseMembership);
-    assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get()
-        .getData(membership.getMemberPath(), false, null));
-
-    assertArrayEquals("Updating supplier should not change membership data",
-        initial, zkClient.get().getData(membership.getMemberPath(), false, null));
-
-    membership.updateMemberData();
-    assertArrayEquals("Updating membership should change data",
-        second, zkClient.get().getData(membership.getMemberPath(), false, null));
-
-    verify(dataSupplier);
-  }
-
-  @Test
-  public void testAcls() throws Exception {
-    Group securedMembership =
-        new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL,
-            "/secured/group/membership");
-
-    String memberId = securedMembership.join().getMemberId();
-
-    Group unauthenticatedObserver =
-        new Group(createZkClient(),
-            Ids.READ_ACL_UNSAFE,
-            "/secured/group/membership");
-    RecordingListener unauthenticatedListener = new RecordingListener();
-    unauthenticatedObserver.watch(unauthenticatedListener);
-
-    assertMembershipObserved(unauthenticatedListener, memberId);
-
-    try {
-      unauthenticatedObserver.join();
-      fail("Expected join exception for unauthenticated observer");
-    } catch (JoinException e) {
-      // expected
-    }
-
-    Group unauthorizedObserver =
-        new Group(createZkClient("joe", "schmoe"),
-            Ids.READ_ACL_UNSAFE,
-            "/secured/group/membership");
-    RecordingListener unauthorizedListener = new RecordingListener();
-    unauthorizedObserver.watch(unauthorizedListener);
-
-    assertMembershipObserved(unauthorizedListener, memberId);
-
-    try {
-      unauthorizedObserver.join();
-      fail("Expected join exception for unauthorized observer");
-    } catch (JoinException e) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testStopWatching() throws Exception {
-    replay(onLoseMembership);
-
-    assertEmptyMembershipObserved();
-
-    Membership member1 = joinGroup.join();
-    String memberId1 = member1.getMemberId();
-    assertMembershipObserved(memberId1);
-
-    Membership member2 = joinGroup.join();
-    String memberId2 = member2.getMemberId();
-    assertMembershipObserved(memberId1, memberId2);
-
-    stopWatching.execute();
-
-    member1.cancel();
-    Membership member3 = joinGroup.join();
-    member2.cancel();
-    member3.cancel();
-
-    listener.assertEmpty();
-  }
-
-  private void assertEmptyMembershipObserved() throws InterruptedException {
-    assertMembershipObserved();
-  }
-
-  private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException {
-    assertMembershipObserved(listener, expectedMemberIds);
-  }
-
-  private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds)
-      throws InterruptedException {
-
-    assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take()));
-  }
-}


Mime
View raw message