aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject [13/21] aurora git commit: Remove unused classes from commons fork.
Date Fri, 28 Aug 2015 18:33:31 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java
deleted file mode 100644
index 1e8fc48..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLock.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * DistributedLock
- *
- * @author Florian Leibert
- */
-public interface DistributedLock {
-  void lock() throws LockingException;
-
-  boolean tryLock(long timeout, TimeUnit unit);
-
-  void unlock() throws LockingException;
-
-  public static class LockingException extends RuntimeException {
-    public LockingException(String msg, Exception e) {
-      super(msg, e);
-    }
-
-    public LockingException(String msg) {
-      super(msg);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java
deleted file mode 100644
index 99a5774..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/DistributedLockImpl.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Ordering;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-
-import org.apache.aurora.common.base.MorePreconditions;
-
-/**
- * Distributed locking via ZooKeeper. Assuming there are N clients that all try to acquire a lock,
- * the algorithm works as follows. Each host creates an ephemeral|sequential node, and requests a
- * list of children for the lock node. Due to the nature of sequential, all the ids are increasing
- * in order, therefore the client with the least ID according to natural ordering will hold the
- * lock. Every other client watches the id immediately preceding its own id and checks for the lock
- * in case of notification. The client holding the lock does the work and finally deletes the node,
- * thereby triggering the next client in line to acquire the lock. Deadlocks are possible but
- * avoided in most cases because if a client drops dead while holding the lock, the ZK session
- * should timeout and since the node is ephemeral, it will be removed in such a case. Deadlocks
- * could occur if the the worker thread on a client hangs but the zk-client thread is still alive.
- * There could be an external monitor client that ensures that alerts are triggered if the least-id
- * ephemeral node is present past a time-out.
- * <p/>
- * Note: Locking attempts will fail in case session expires!
- *
- * @author Florian Leibert
- */
-@ThreadSafe
-public class DistributedLockImpl implements DistributedLock {
-
-  private static final Logger LOG = Logger.getLogger(DistributedLockImpl.class.getName());
-
-  private final ZooKeeperClient zkClient;
-  private final String lockPath;
-  private final ImmutableList<ACL> acl;
-
-  private final AtomicBoolean aborted = new AtomicBoolean(false);
-  private CountDownLatch syncPoint;
-  private boolean holdsLock = false;
-  private String currentId;
-  private String currentNode;
-  private String watchedNode;
-  private LockWatcher watcher;
-
-  /**
-   * Equivalent to {@link #DistributedLockImpl(ZooKeeperClient, String, Iterable)} with a default
-   * wide open {@code acl} ({@link ZooDefs.Ids#OPEN_ACL_UNSAFE}).
-   */
-  public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath) {
-    this(zkClient, lockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE);
-  }
-
-  /**
-   * Creates a distributed lock using the given {@code zkClient} to coordinate locking.
-   *
-   * @param zkClient The ZooKeeper client to use.
-   * @param lockPath The path used to manage the lock under.
-   * @param acl The acl to apply to newly created lock nodes.
-   */
-  public DistributedLockImpl(ZooKeeperClient zkClient, String lockPath, Iterable<ACL> acl) {
-    this.zkClient = Preconditions.checkNotNull(zkClient);
-    this.lockPath = MorePreconditions.checkNotBlank(lockPath);
-    this.acl = ImmutableList.copyOf(acl);
-    this.syncPoint = new CountDownLatch(1);
-  }
-
-  private synchronized void prepare()
-    throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException, KeeperException {
-
-    ZooKeeperUtils.ensurePath(zkClient, acl, lockPath);
-    LOG.log(Level.FINE, "Working with locking path:" + lockPath);
-
-    // Create an EPHEMERAL_SEQUENTIAL node.
-    currentNode =
-        zkClient.get().create(lockPath + "/member_", null, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
-
-    // We only care about our actual id since we want to compare ourselves to siblings.
-    if (currentNode.contains("/")) {
-      currentId = currentNode.substring(currentNode.lastIndexOf("/") + 1);
-    }
-    LOG.log(Level.FINE, "Received ID from zk:" + currentId);
-    this.watcher = new LockWatcher();
-  }
-
-  @Override
-  public synchronized void lock() throws LockingException {
-    if (holdsLock) {
-      throw new LockingException("Error, already holding a lock. Call unlock first!");
-    }
-    try {
-      prepare();
-      watcher.checkForLock();
-      syncPoint.await();
-      if (!holdsLock) {
-        throw new LockingException("Error, couldn't acquire the lock!");
-      }
-    } catch (InterruptedException e) {
-      cancelAttempt();
-      throw new LockingException("InterruptedException while trying to acquire lock!", e);
-    } catch (KeeperException e) {
-      // No need to clean up since the node wasn't created yet.
-      throw new LockingException("KeeperException while trying to acquire lock!", e);
-    } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-      // No need to clean up since the node wasn't created yet.
-      throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
-    }
-  }
-
-  @Override
-  public synchronized boolean tryLock(long timeout, TimeUnit unit) {
-    if (holdsLock) {
-      throw new LockingException("Error, already holding a lock. Call unlock first!");
-    }
-    try {
-      prepare();
-      watcher.checkForLock();
-      boolean success = syncPoint.await(timeout, unit);
-      if (!success) {
-        return false;
-      }
-      if (!holdsLock) {
-        throw new LockingException("Error, couldn't acquire the lock!");
-      }
-    } catch (InterruptedException e) {
-      cancelAttempt();
-      return false;
-    } catch (KeeperException e) {
-      // No need to clean up since the node wasn't created yet.
-      throw new LockingException("KeeperException while trying to acquire lock!", e);
-    } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-      // No need to clean up since the node wasn't created yet.
-      throw new LockingException("ZooKeeperConnectionException while trying to acquire lock", e);
-    }
-    return true;
-  }
-
-  @Override
-  public synchronized void unlock() throws LockingException {
-    if (currentId == null) {
-      throw new LockingException("Error, neither attempting to lock nor holding a lock!");
-    }
-    Preconditions.checkNotNull(currentId);
-    // Try aborting!
-    if (!holdsLock) {
-      aborted.set(true);
-      LOG.log(Level.INFO, "Not holding lock, aborting acquisition attempt!");
-    } else {
-      LOG.log(Level.INFO, "Cleaning up this locks ephemeral node.");
-      cleanup();
-    }
-  }
-
-  //TODO(Florian Leibert): Make sure this isn't a runtime exception. Put exceptions into the token?
-
-  private synchronized void cancelAttempt() {
-    LOG.log(Level.INFO, "Cancelling lock attempt!");
-    cleanup();
-    // Bubble up failure...
-    holdsLock = false;
-    syncPoint.countDown();
-  }
-
-  private void cleanup() {
-    LOG.info("Cleaning up!");
-    Preconditions.checkNotNull(currentId);
-    try {
-      Stat stat = zkClient.get().exists(currentNode, false);
-      if (stat != null) {
-        zkClient.get().delete(currentNode, ZooKeeperUtils.ANY_VERSION);
-      } else {
-        LOG.log(Level.WARNING, "Called cleanup but nothing to cleanup!");
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    holdsLock = false;
-    aborted.set(false);
-    currentId = null;
-    currentNode = null;
-    watcher = null;
-    syncPoint = new CountDownLatch(1);
-  }
-
-  class LockWatcher implements Watcher {
-
-    public synchronized void checkForLock() {
-      MorePreconditions.checkNotBlank(currentId);
-
-      try {
-        List<String> candidates = zkClient.get().getChildren(lockPath, null);
-        ImmutableList<String> sortedMembers = Ordering.natural().immutableSortedCopy(candidates);
-
-        // Unexpected behavior if there are no children!
-        if (sortedMembers.isEmpty()) {
-          throw new LockingException("Error, member list is empty!");
-        }
-
-        int memberIndex = sortedMembers.indexOf(currentId);
-
-        // If we hold the lock
-        if (memberIndex == 0) {
-          holdsLock = true;
-          syncPoint.countDown();
-        } else {
-          final String nextLowestNode = sortedMembers.get(memberIndex - 1);
-          LOG.log(Level.INFO, String.format("Current LockWatcher with ephemeral node [%s], is " +
-              "waiting for [%s] to release lock.", currentId, nextLowestNode));
-
-          watchedNode = String.format("%s/%s", lockPath, nextLowestNode);
-          Stat stat = zkClient.get().exists(watchedNode, this);
-          if (stat == null) {
-            checkForLock();
-          }
-        }
-      } catch (InterruptedException e) {
-        LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
-            "got interrupted. Trying to cancel lock acquisition.", currentId), e);
-        cancelAttempt();
-      } catch (KeeperException e) {
-        LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
-            "got a KeeperException. Trying to cancel lock acquisition.", currentId), e);
-        cancelAttempt();
-      } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-        LOG.log(Level.WARNING, String.format("Current LockWatcher with ephemeral node [%s] " +
-            "got a ConnectionException. Trying to cancel lock acquisition.", currentId), e);
-        cancelAttempt();
-      }
-    }
-
-    @Override
-    public synchronized void process(WatchedEvent event) {
-      // this handles the case where we have aborted a lock and deleted ourselves but still have a
-      // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub.
-      if (!event.getPath().equals(watchedNode)) {
-        LOG.log(Level.INFO, "Ignoring call for node:" + watchedNode);
-        return;
-      }
-      //TODO(Florian Leibert): Pull this into the outer class.
-      if (event.getType() == Watcher.Event.EventType.None) {
-        switch (event.getState()) {
-          case SyncConnected:
-            // TODO(Florian Leibert): maybe we should just try to "fail-fast" in this case and abort.
-            LOG.info("Reconnected...");
-            break;
-          case Expired:
-            LOG.log(Level.WARNING, String.format("Current ZK session expired![%s]", currentId));
-            cancelAttempt();
-            break;
-        }
-      } else if (event.getType() == Event.EventType.NodeDeleted) {
-        checkForLock();
-      } else {
-        LOG.log(Level.WARNING, String.format("Unexpected ZK event: %s", event.getType().name()));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java
deleted file mode 100644
index 91ea345..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/Partitioner.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Ordering;
-import org.apache.aurora.common.zookeeper.Group.GroupChangeListener;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.zookeeper.Group.Membership;
-import org.apache.aurora.common.zookeeper.Group.UpdateException;
-import org.apache.aurora.common.zookeeper.Group.WatchException;
-import org.apache.zookeeper.data.ACL;
-
-import javax.annotation.Nullable;
-import java.util.List;
-import java.util.logging.Logger;
-
-/**
- * A distributed mechanism for eventually arriving at an evenly partitioned space of long values.
- * A typical usage would have a client on each of several hosts joining a logical partition (a
- * "partition group") that represents some shared work.  Clients could then process a subset of a
- * full body of work by testing any given item of work with their partition filter.
- *
- * <p>Note that clients must be able to tolerate periods of duplicate processing by more than 1
- * partition as explained in {@link #join()}.
- *
- * @author John Sirois
- */
-public class Partitioner {
-
-  private static final Logger LOG = Logger.getLogger(Partitioner.class.getName());
-
-  private volatile int groupSize;
-  private volatile int groupIndex;
-  private final Group group;
-
-  /**
-   * Constructs a representation of a partition group but does not join it.  Note that the partition
-   * group path will be created as a persistent zookeeper path if it does not already exist.
-   *
-   * @param zkClient a client to use for joining the partition group and watching its membership
-   * @param acl the acl for this partition group
-   * @param path a zookeeper path that represents the partition group
-   */
-  public Partitioner(ZooKeeperClient zkClient, List<ACL> acl, String path) {
-    group = new Group(zkClient, acl, path);
-  }
-
-  @VisibleForTesting
-  int getGroupSize() {
-    return groupSize;
-  }
-
-  /**
-   * Represents a slice of a partition group.  The partition is dynamic and will adjust its size as
-   * members join and leave its partition group.
-   */
-  public abstract static class Partition implements Predicate<Long>, Membership {
-
-    /**
-     * Returns {@code true} if the given {@code value} is a member of this partition at this time.
-     */
-    public abstract boolean isMember(long value);
-
-    /**
-     * Gets number of members in the group at this time.
-     *
-     * @return number of members in the ZK group at this time.
-     */
-    public abstract int getNumPartitions();
-
-    /**
-     * Evaluates partition membership based on the given {@code value}'s hash code.  If the value
-     * is null it is never a member of a partition.
-     */
-    boolean isMember(Object value) {
-      return (value != null) && isMember(value.hashCode());
-    }
-
-    /**
-     * Equivalent to {@link #isMember(long)} for all non-null values; however incurs unboxing
-     * overhead.
-     */
-    @Override
-    public boolean apply(@Nullable Long input) {
-      return (input != null) && isMember(input);
-    }
-  }
-
-  /**
-   * Attempts to join the partition group and claim a slice.  When successful, a predicate is
-   * returned that can be used to test whether or not an item belongs to this partition.  The
-   * predicate is dynamic such that as the group is further partitioned or partitions merge the
-   * predicate will claim a narrower or wider swath of the partition space respectively.  Partition
-   * creation and merging is not instantaneous and clients should expect independent partitions to
-   * claim ownership of some items when partition membership is in flux.  It is only in the steady
-   * state that a client should expect independent partitions to divide the partition space evenly
-   * and without overlap.
-   *
-   * <p>TODO(John Sirois): consider adding a version with a global timeout for the join operation.
-   *
-   * @return the partition representing the slice of the partition group this member can claim
-   * @throws JoinException if there was a problem joining the partition group
-   * @throws InterruptedException if interrupted while waiting to join the partition group
-   */
-  public final Partition join() throws JoinException, InterruptedException {
-    final Membership membership = group.join();
-    try {
-      group.watch(createGroupChangeListener(membership));
-    } catch (WatchException e) {
-      membership.cancel();
-      throw new JoinException("Problem establishing watch on group after joining it", e);
-    }
-    return new Partition() {
-      @Override public boolean isMember(long value) {
-        return (value % groupSize) == groupIndex;
-      }
-
-      @Override public int getNumPartitions() {
-        return groupSize;
-      }
-
-      @Override public String getGroupPath() {
-        return membership.getGroupPath();
-      }
-
-      @Override public String getMemberId() {
-        return membership.getMemberId();
-      }
-
-      @Override public String getMemberPath() {
-        return membership.getMemberPath();
-      }
-
-      @Override public byte[] updateMemberData() throws UpdateException {
-        return membership.updateMemberData();
-      }
-
-      @Override public void cancel() throws JoinException {
-        membership.cancel();
-      }
-    };
-  }
-
-  @VisibleForTesting GroupChangeListener createGroupChangeListener(final Membership membership) {
-    return new GroupChangeListener() {
-      @Override public void onGroupChange(Iterable<String> memberIds) {
-        List<String> members = Ordering.natural().sortedCopy(memberIds);
-        int newSize = members.size();
-        int newIndex = members.indexOf(membership.getMemberId());
-
-        LOG.info(String.format("Rebuilding group %s:%s [%d:%d]->[%d:%d]",
-            membership.getGroupPath(), members, groupSize, groupIndex, newSize, newIndex));
-
-        groupSize = newSize;
-        groupIndex = newIndex;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
index 2b99268..18aff9f 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSets.java
@@ -18,14 +18,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.Set;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-import org.apache.zookeeper.data.ACL;
 
 import org.apache.aurora.common.base.Function;
 import org.apache.aurora.common.base.MorePreconditions;
@@ -33,6 +27,7 @@ import org.apache.aurora.common.io.Codec;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
+import org.apache.zookeeper.data.ACL;
 
 /**
  * Common ServerSet related functions
@@ -63,32 +58,11 @@ public class ServerSets {
    * @return A server set that registers at {@code zkPath}.
    */
   public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, String zkPath) {
-    return create(zkClient, acl, ImmutableSet.of(zkPath));
-  }
-
-  /**
-   * Creates a server set that registers at one or multiple paths applying the given ACL to all
-   * nodes created in the paths.
-   *
-   * @param zkClient ZooKeeper client to register with.
-   * @param acl The ACL to apply to the {@code zkPath} nodes the ServerSet creates.
-   * @param zkPaths Paths to register at, must be non-empty.
-   * @return A server set that registers at the given {@code zkPath}s.
-   */
-  public static ServerSet create(ZooKeeperClient zkClient, Iterable<ACL> acl, Set<String> zkPaths) {
     Preconditions.checkNotNull(zkClient);
     MorePreconditions.checkNotBlank(acl);
-    MorePreconditions.checkNotBlank(zkPaths);
+    MorePreconditions.checkNotBlank(zkPath);
 
-    if (zkPaths.size() == 1) {
-      return new ServerSetImpl(zkClient, acl, Iterables.getOnlyElement(zkPaths));
-    } else {
-      ImmutableList.Builder<ServerSet> builder = ImmutableList.builder();
-      for (String path : zkPaths) {
-        builder.add(new ServerSetImpl(zkClient, acl, path));
-      }
-      return new CompoundServerSet(builder.build());
-    }
+    return new ServerSetImpl(zkClient, acl, zkPath);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java
deleted file mode 100644
index 99c290e..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/StaticServerSet.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.base.Commands;
-import org.apache.aurora.common.zookeeper.Group.JoinException;
-import org.apache.aurora.common.thrift.Endpoint;
-import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.thrift.Status;
-
-/**
- * A server set that represents a fixed set of hosts.
- * This may be composed under {@link CompoundServerSet} to ensure a minimum set of hosts is
- * present.
- * A static server set does not support joining, but will allow normal join calls and status update
- * calls to be made.
- */
-public class StaticServerSet implements ServerSet {
-
-  private static final Logger LOG = Logger.getLogger(StaticServerSet.class.getName());
-
-  private static final Function<Endpoint, ServiceInstance> ENDPOINT_TO_INSTANCE =
-      new Function<Endpoint, ServiceInstance>() {
-        @Override public ServiceInstance apply(Endpoint endpoint) {
-          return new ServiceInstance(endpoint, ImmutableMap.<String, Endpoint>of(), Status.ALIVE);
-        }
-      };
-
-  private final ImmutableSet<ServiceInstance> hosts;
-
-  /**
-   * Creates a static server set that will reply to monitor calls immediately and exactly once with
-   * the provided service instances.
-   *
-   * @param hosts Hosts in the static set.
-   */
-  public StaticServerSet(Set<ServiceInstance> hosts) {
-    this.hosts = ImmutableSet.copyOf(hosts);
-  }
-
-  /**
-   * Creates a static server set containing the provided endpoints (and no auxiliary ports) which
-   * will all be in the {@link Status#ALIVE} state.
-   *
-   * @param endpoints Endpoints in the static set.
-   * @return A static server set that will advertise the provided endpoints.
-   */
-  public static StaticServerSet fromEndpoints(Set<Endpoint> endpoints) {
-    return new StaticServerSet(
-        ImmutableSet.copyOf(Iterables.transform(endpoints, ENDPOINT_TO_INSTANCE)));
-  }
-
-  private EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> auxEndpoints,
-      Optional<Integer> shardId) {
-
-    LOG.warning("Attempt to join fixed server set ignored.");
-    ServiceInstance joining = new ServiceInstance(
-        ServerSets.toEndpoint(endpoint),
-        Maps.transformValues(auxEndpoints, ServerSets.TO_ENDPOINT),
-        Status.ALIVE);
-    if (shardId.isPresent()) {
-      joining.setShard(shardId.get());
-    }
-    if (!hosts.contains(joining)) {
-      LOG.log(Level.SEVERE,
-          "Joining instance " + joining + " does not match any member of the static set.");
-    }
-
-    return new EndpointStatus() {
-      @Override public void leave() throws UpdateException {
-        LOG.warning("Attempt to adjust state of fixed server set ignored.");
-      }
-
-      @Override public void update(Status status) throws UpdateException {
-        LOG.warning("Attempt to adjust state of fixed server set ignored.");
-      }
-    };
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> auxEndpoints,
-      Status status) {
-
-    LOG.warning("This method is deprecated. Please do not specify a status field.");
-    return join(endpoint, auxEndpoints, Optional.<Integer>absent());
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> auxEndpoints) {
-
-    LOG.warning("Joining a ServerSet without a shard ID is deprecated and will soon break.");
-    return join(endpoint, auxEndpoints, Optional.<Integer>absent());
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> auxEndpoints,
-      int shardId) throws JoinException, InterruptedException {
-
-    return join(endpoint, auxEndpoints, Optional.of(shardId));
-  }
-
-  @Override
-  public Command watch(HostChangeMonitor<ServiceInstance> monitor) {
-    monitor.onChange(hosts);
-    return Commands.NOOP;
-  }
-
-  @Override
-  public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
-    watch(monitor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java
deleted file mode 100644
index 29db55a..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperMap.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ForwardingMap;
-import com.google.common.collect.Sets;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.base.ExceptionalSupplier;
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-
-/**
- * A ZooKeeper backed {@link Map}.  Initialized with a node path, this map represents child nodes
- * under that path as keys, with the data in those nodes as values.  This map is readonly from
- * clients of this class, and only can be modified via direct zookeeper operations.
- *
- * Note that instances of this class maintain a zookeeper watch for each zookeeper node under the
- * parent, as well as on the parent itself.  Instances of this class should be created via the
- * {@link #create} factory method.
- *
- * As of ZooKeeper Version 3.1, the maximum allowable size of a data node is 1 MB.  A single
- * client should be able to hold up to maintain several thousand watches, but this depends on rate
- * of data change as well.
- *
- * Talk to your zookeeper cluster administrator if you expect number of map entries times number
- * of live clients to exceed a thousand, as a zookeeper cluster is limited by total number of
- * server-side watches enabled.
- *
- * For an example of a set of tools to maintain one of these maps, please see
- * src/scripts/HenAccess.py in the hen repository.
- *
- * @param <V> the type of values this map stores
- */
-public class ZooKeeperMap<V> extends ForwardingMap<String, V> {
-
-  /**
-   * An optional listener which can be supplied and triggered when entries in a ZooKeeperMap
-   * are added, changed or removed. For a ZooKeeperMap of type <V>, the listener will fire a
-   * "nodeChanged" event with the name of the ZNode that changed, and its resulting value as
-   * interpreted by the provided deserializer. Removal of child nodes triggers the "nodeRemoved"
-   * method indicating the name of the ZNode which is no longer present in the map.
-   */
-  public interface Listener<V> {
-
-    /**
-     * Fired when a node is added to the ZooKeeperMap or changed.
-     *
-     * @param nodeName indicates the name of the ZNode that was added or changed.
-     * @param value is the new value of the node after passing through your supplied deserializer.
-     */
-    void nodeChanged(String nodeName, V value);
-
-    /**
-     * Fired when a node is removed from the ZooKeeperMap.
-     *
-     * @param nodeName indicates the name of the ZNode that was removed from the ZooKeeperMap.
-     */
-    void nodeRemoved(String nodeName);
-  }
-
-  /**
-   * Default deserializer for the constructor if you want to simply store the zookeeper byte[] data
-   * in this map.
-   */
-  public static final Function<byte[], byte[]> BYTE_ARRAY_VALUES = Functions.identity();
-
-  /**
-   * A listener that ignores all events.
-   */
-  public static <T> Listener<T> noopListener() {
-    return new Listener<T>() {
-      @Override public void nodeChanged(String nodeName, T value) { }
-      @Override public void nodeRemoved(String nodeName) { }
-    };
-  }
-
-  private static final Logger LOG = Logger.getLogger(ZooKeeperMap.class.getName());
-
-  private final ZooKeeperClient zkClient;
-  private final String nodePath;
-  private final Function<byte[], V> deserializer;
-
-  private final ConcurrentMap<String, V> localMap;
-  private final Map<String, V> unmodifiableLocalMap;
-  private final BackoffHelper backoffHelper;
-
-  private final Listener<V> mapListener;
-
-  // Whether it's safe to re-establish watches if our zookeeper session has expired.
-  private final Object safeToRewatchLock;
-  private volatile boolean safeToRewatch;
-
-  /**
-   * Returns an initialized ZooKeeperMap.  The given path must exist at the time of
-   * creation or a {@link KeeperException} will be thrown.
-   *
-   * @param zkClient a zookeeper client
-   * @param nodePath path to a node whose data will be watched
-   * @param deserializer a function that converts byte[] data from a zk node to this map's
-   *     value type V
-   * @param listener is a Listener which fires when values are added, changed, or removed.
-   *
-   * @throws InterruptedException if the underlying zookeeper server transaction is interrupted
-   * @throws KeeperException.NoNodeException if the given nodePath doesn't exist
-   * @throws KeeperException if the server signals an error
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper
-   *     cluster
-   */
-  public static <V> ZooKeeperMap<V> create(
-      ZooKeeperClient zkClient,
-      String nodePath,
-      Function<byte[], V> deserializer,
-      Listener<V> listener)
-      throws InterruptedException, KeeperException, ZooKeeperConnectionException {
-
-    ZooKeeperMap<V> zkMap = new ZooKeeperMap<V>(zkClient, nodePath, deserializer, listener);
-    zkMap.init();
-    return zkMap;
-  }
-
-
-  /**
-   * Returns an initialized ZooKeeperMap.  The given path must exist at the time of
-   * creation or a {@link KeeperException} will be thrown.
-   *
-   * @param zkClient a zookeeper client
-   * @param nodePath path to a node whose data will be watched
-   * @param deserializer a function that converts byte[] data from a zk node to this map's
-   *     value type V
-   *
-   * @throws InterruptedException if the underlying zookeeper server transaction is interrupted
-   * @throws KeeperException.NoNodeException if the given nodePath doesn't exist
-   * @throws KeeperException if the server signals an error
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper
-   *     cluster
-   */
-  public static <V> ZooKeeperMap<V> create(
-      ZooKeeperClient zkClient,
-      String nodePath,
-      Function<byte[], V> deserializer)
-      throws InterruptedException, KeeperException, ZooKeeperConnectionException {
-
-    return ZooKeeperMap.create(zkClient, nodePath, deserializer, ZooKeeperMap.<V>noopListener());
-  }
-
-  /**
-   * Initializes a ZooKeeperMap.  The given path must exist at the time of object creation or
-   * a {@link KeeperException} will be thrown.
-   *
-   * Please note that this object will not track any remote zookeeper data until {@link #init()}
-   * is successfully called.  After construction and before that call, this {@link Map} will
-   * be empty.
-   *
-   * @param zkClient a zookeeper client
-   * @param nodePath top-level node path under which the map data lives
-   * @param deserializer a function that converts byte[] data from a zk node to this map's
-   *     value type V
-   * @param mapListener is a Listener which fires when values are added, changed, or removed.
-   *
-   * @throws InterruptedException if the underlying zookeeper server transaction is interrupted
-   * @throws KeeperException.NoNodeException if the given nodePath doesn't exist
-   * @throws KeeperException if the server signals an error
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper
-   *     cluster
-   */
-  @VisibleForTesting
-  ZooKeeperMap(
-      ZooKeeperClient zkClient,
-      String nodePath,
-      Function<byte[], V> deserializer,
-      Listener<V> mapListener)
-      throws InterruptedException, KeeperException, ZooKeeperConnectionException {
-
-    super();
-
-    this.mapListener = Preconditions.checkNotNull(mapListener);
-    this.zkClient = Preconditions.checkNotNull(zkClient);
-    this.nodePath = MorePreconditions.checkNotBlank(nodePath);
-    this.deserializer = Preconditions.checkNotNull(deserializer);
-
-    localMap = new ConcurrentHashMap<String, V>();
-    unmodifiableLocalMap = Collections.unmodifiableMap(localMap);
-    backoffHelper = new BackoffHelper();
-    safeToRewatchLock = new Object();
-    safeToRewatch = false;
-
-    if (zkClient.get().exists(nodePath, null) == null) {
-      throw new KeeperException.NoNodeException();
-    }
-  }
-
-  /**
-   * Initialize zookeeper tracking for this {@link Map}.  Once this call returns, this object
-   * will be tracking data in zookeeper.
-   *
-   * @throws InterruptedException if the underlying zookeeper server transaction is interrupted
-   * @throws KeeperException if the server signals an error
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper
-   *     cluster
-   */
-  @VisibleForTesting
-  void init() throws InterruptedException, KeeperException, ZooKeeperConnectionException {
-    Watcher watcher = zkClient.registerExpirationHandler(new Command() {
-      @Override public void execute() {
-        /*
-         * First rewatch all of our locally cached children.  Some of them may not exist anymore,
-         * which will lead to caught KeeperException.NoNode whereafter we'll remove that child
-         * from the cached map.
-         *
-         * Next, we'll establish our top level child watch and add any new nodes that might exist.
-         */
-        try {
-          synchronized (safeToRewatchLock) {
-            if (safeToRewatch) {
-              rewatchDataNodes();
-              tryWatchChildren();
-            }
-          }
-        } catch (InterruptedException e) {
-          LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e);
-          Thread.currentThread().interrupt();
-        }
-      }
-    });
-
-    try {
-      // Synchronize to prevent the race of watchChildren completing and then the session expiring
-      // before we update safeToRewatch.
-      synchronized (safeToRewatchLock) {
-        watchChildren();
-        safeToRewatch = true;
-      }
-    } catch (InterruptedException e) {
-      zkClient.unregister(watcher);
-      throw e;
-    } catch (KeeperException e) {
-      zkClient.unregister(watcher);
-      throw e;
-    } catch (ZooKeeperConnectionException e) {
-      zkClient.unregister(watcher);
-      throw e;
-    }
-  }
-
-  @Override
-  protected Map<String, V> delegate() {
-    return unmodifiableLocalMap;
-  }
-
-  private void tryWatchChildren() throws InterruptedException {
-    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() {
-      @Override public Boolean get() throws InterruptedException {
-        try {
-          watchChildren();
-          return true;
-        } catch (KeeperException e) {
-          return false;
-        } catch (ZooKeeperConnectionException e) {
-          return false;
-        }
-      }
-    });
-  }
-
-  private synchronized void watchChildren()
-      throws InterruptedException, KeeperException, ZooKeeperConnectionException {
-
-    /*
-     * Add a watch on the parent node itself, and attempt to rewatch if it
-     * gets deleted
-     */
-    zkClient.get().exists(nodePath, new Watcher() {
-      @Override public void process(WatchedEvent event) {
-        if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
-          // If the parent node no longer exists
-          localMap.clear();
-          try {
-            tryWatchChildren();
-          } catch (InterruptedException e) {
-            LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e);
-            Thread.currentThread().interrupt();
-          }
-        }
-      }});
-
-    final Watcher childWatcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
-          try {
-            tryWatchChildren();
-          } catch (InterruptedException e) {
-            LOG.log(Level.WARNING, "Interrupted while trying to watch children.", e);
-            Thread.currentThread().interrupt();
-          }
-        }
-      }
-    };
-
-    List<String> children = zkClient.get().getChildren(nodePath, childWatcher);
-    updateChildren(Sets.newHashSet(children));
-  }
-
-  private void tryAddChild(final String child) throws InterruptedException {
-    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() {
-      @Override public Boolean get() throws InterruptedException {
-        try {
-          addChild(child);
-          return true;
-        } catch (KeeperException e) {
-          return false;
-        } catch (ZooKeeperConnectionException e) {
-          return false;
-        }
-      }
-    });
-  }
-
-  // TODO(Adam Samet) - Make this use the ZooKeeperNode class.
-  private void addChild(final String child)
-      throws InterruptedException, KeeperException, ZooKeeperConnectionException {
-
-    final Watcher nodeWatcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
-          try {
-            tryAddChild(child);
-          } catch (InterruptedException e) {
-            LOG.log(Level.WARNING, "Interrupted while trying to add a child.", e);
-            Thread.currentThread().interrupt();
-          }
-        } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
-          removeEntry(child);
-        }
-      }
-    };
-
-    try {
-      V value = deserializer.apply(zkClient.get().getData(makePath(child), nodeWatcher, null));
-      putEntry(child, value);
-    } catch (KeeperException.NoNodeException e) {
-      // This node doesn't exist anymore, remove it from the map and we're done.
-      removeEntry(child);
-    }
-  }
-
-  @VisibleForTesting
-  void removeEntry(String key) {
-    localMap.remove(key);
-    mapListener.nodeRemoved(key);
-  }
-
-  @VisibleForTesting
-  void putEntry(String key, V value) {
-    localMap.put(key, value);
-    mapListener.nodeChanged(key, value);
-  }
-
-  private void rewatchDataNodes() throws InterruptedException {
-    for (String child : keySet()) {
-      tryAddChild(child);
-    }
-  }
-
-  private String makePath(final String child) {
-    return nodePath + "/" + child;
-  }
-
-  private void updateChildren(Set<String> zkChildren) throws InterruptedException {
-    Set<String> addedChildren = Sets.difference(zkChildren, keySet());
-    Set<String> removedChildren = Sets.difference(keySet(), zkChildren);
-    for (String child : addedChildren) {
-      tryAddChild(child);
-    }
-    for (String child : removedChildren) {
-      removeEntry(child);
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java
deleted file mode 100644
index 3829ca7..0000000
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperNode.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.common.zookeeper;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.data.Stat;
-
-import org.apache.aurora.common.base.Closure;
-import org.apache.aurora.common.base.Closures;
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.base.ExceptionalSupplier;
-import org.apache.aurora.common.base.MorePreconditions;
-import org.apache.aurora.common.util.BackoffHelper;
-import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
-
-/**
- * An implementation of {@link Supplier} that offers a readonly view of a
- * zookeeper data node.  This class is thread-safe.
- *
- * Instances of this class each maintain a zookeeper watch for the remote data node.  Instances
- * of this class should be created via the {@link #create} factory method.
- *
- * Please see zookeeper documentation and talk to your cluster administrator for guidance on
- * appropriate node size and total number of nodes you should be using.
- *
- * @param <T> the type of data this node stores
- */
-public class ZooKeeperNode<T> implements Supplier<T> {
-  /**
-   * Deserializer for the constructor if you want to simply store the zookeeper byte[] data
-   * as-is.
-   */
-  public static final Function<byte[], byte[]> BYTE_ARRAY_VALUE = Functions.identity();
-
-  private static final Logger LOG = Logger.getLogger(ZooKeeperNode.class.getName());
-
-  private final ZooKeeperClient zkClient;
-  private final String nodePath;
-  private final NodeDeserializer<T> deserializer;
-
-  private final BackoffHelper backoffHelper;
-
-  // Whether it's safe to re-establish watches if our zookeeper session has expired.
-  private final Object safeToRewatchLock;
-  private volatile boolean safeToRewatch;
-
-  private final T NO_DATA = null;
-  @Nullable private volatile T nodeData;
-  private final Closure<T> dataUpdateListener;
-
-  /**
-   * When a call to ZooKeeper.getData is made, the Watcher is added to a Set before the the network
-   * request is made and if the request fails, the Watcher remains. There's a problem where Watcher
-   * can accumulate when there are failed requests, so they are set to instance fields and reused.
-   */
-  private final Watcher nodeWatcher;
-  private final Watcher existenceWatcher;
-
-  /**
-   * Returns an initialized ZooKeeperNode.  The given node must exist at the time of object
-   * creation or a {@link KeeperException} will be thrown.
-   *
-   * @param zkClient a zookeeper client
-   * @param nodePath path to a node whose data will be watched
-   * @param deserializer a function that converts byte[] data from a zk node to this supplier's
-   *     type T
-   *
-   * @throws InterruptedException if the underlying zookeeper server transaction is interrupted
-   * @throws KeeperException.NoNodeException if the given nodePath doesn't exist
-   * @throws KeeperException if the server signals an error
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper
-   *     cluster
-   */
-  public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath,
-      Function<byte[], T> deserializer) throws InterruptedException, KeeperException,
-      ZooKeeperConnectionException {
-    return create(zkClient, nodePath, deserializer, Closures.<T>noop());
-  }
-
-  /**
-   * Like the above, but optionally takes in a {@link Closure} that will get notified
-   * whenever the data is updated from the remote node.
-   *
-   * @param dataUpdateListener a {@link Closure} to receive data update notifications.
-   */
-  public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath,
-      Function<byte[], T> deserializer, Closure<T> dataUpdateListener) throws InterruptedException,
-      KeeperException, ZooKeeperConnectionException {
-    return create(zkClient, nodePath, new FunctionWrapper<T>(deserializer), dataUpdateListener);
-  }
-
-  /**
-   * Returns an initialized ZooKeeperNode.  The given node must exist at the time of object
-   * creation or a {@link KeeperException} will be thrown.
-   *
-   * @param zkClient a zookeeper client
-   * @param nodePath path to a node whose data will be watched
-   * @param deserializer an implentation of {@link NodeDeserializer} that converts a byte[] from a
-   *     zk node to this supplier's type T. Also supplies a {@link Stat} object which is useful for
-   *     doing versioned updates.
-   *
-   * @throws InterruptedException if the underlying zookeeper server transaction is interrupted
-   * @throws KeeperException.NoNodeException if the given nodePath doesn't exist
-   * @throws KeeperException if the server signals an error
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper
-   *     cluster
-   */
-  public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath,
-      NodeDeserializer<T> deserializer) throws InterruptedException, KeeperException,
-      ZooKeeperConnectionException {
-    return create(zkClient, nodePath, deserializer, Closures.<T>noop());
-  }
-
-  /**
-   * Like the above, but optionally takes in a {@link Closure} that will get notified
-   * whenever the data is updated from the remote node.
-   *
-   * @param dataUpdateListener a {@link Closure} to receive data update notifications.
-   */
-  public static <T> ZooKeeperNode<T> create(ZooKeeperClient zkClient, String nodePath,
-      NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener)
-      throws InterruptedException, KeeperException, ZooKeeperConnectionException {
-    ZooKeeperNode<T> zkNode =
-        new ZooKeeperNode<T>(zkClient, nodePath, deserializer, dataUpdateListener);
-    zkNode.init();
-    return zkNode;
-  }
-
-  /**
-   * Initializes a ZooKeeperNode.  The given node must exist at the time of object creation or
-   * a {@link KeeperException} will be thrown.
-   *
-   * Please note that this object will not track any remote zookeeper data until {@link #init()}
-   * is successfully called.  After construction and before that call, this {@link Supplier} will
-   * return null.
-   *
-   * @param zkClient a zookeeper client
-   * @param nodePath path to a node whose data will be watched
-   * @param deserializer an implementation of {@link NodeDeserializer} that converts byte[] data
-   *     from a zk node to this supplier's type T
-   * @param dataUpdateListener a {@link Closure} to receive data update notifications.
-   */
-  @VisibleForTesting
-  ZooKeeperNode(ZooKeeperClient zkClient, String nodePath,
-      NodeDeserializer<T> deserializer, Closure<T> dataUpdateListener) {
-    this.zkClient = Preconditions.checkNotNull(zkClient);
-    this.nodePath = MorePreconditions.checkNotBlank(nodePath);
-    this.deserializer = Preconditions.checkNotNull(deserializer);
-    this.dataUpdateListener = Preconditions.checkNotNull(dataUpdateListener);
-
-    backoffHelper = new BackoffHelper();
-    safeToRewatchLock = new Object();
-    safeToRewatch = false;
-    nodeData = NO_DATA;
-
-    nodeWatcher = new Watcher() {
-      @Override public void process(WatchedEvent event) {
-        if (event.getState() == KeeperState.SyncConnected) {
-          try {
-            tryWatchDataNode();
-          } catch (InterruptedException e) {
-            LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e);
-            Thread.currentThread().interrupt();
-          }
-        } else {
-          LOG.info("Ignoring watcher event " + event);
-        }
-      }
-    };
-
-    existenceWatcher = new Watcher() {
-      @Override public void process(WatchedEvent event) {
-        if (event.getType() == Watcher.Event.EventType.NodeCreated) {
-          try {
-            tryWatchDataNode();
-          } catch (InterruptedException e) {
-            LOG.log(Level.WARNING, "Interrupted while trying to watch a data node.", e);
-            Thread.currentThread().interrupt();
-          }
-        }
-      }
-    };
-  }
-
-  /**
-   * Initialize zookeeper tracking for this {@link Supplier}.  Once this call returns, this object
-   * will be tracking data in zookeeper.
-   *
-   * @throws InterruptedException if the underlying zookeeper server transaction is interrupted
-   * @throws KeeperException if the server signals an error
-   * @throws ZooKeeperConnectionException if there was a problem connecting to the zookeeper
-   *     cluster
-   */
-  @VisibleForTesting
-  void init() throws InterruptedException, KeeperException,
-      ZooKeeperConnectionException {
-    Watcher watcher = zkClient.registerExpirationHandler(new Command() {
-      @Override public void execute() {
-        try {
-          synchronized (safeToRewatchLock) {
-            if (safeToRewatch) {
-              tryWatchDataNode();
-            }
-          }
-        } catch (InterruptedException e) {
-          LOG.log(Level.WARNING, "Interrupted while trying to re-establish watch.", e);
-          Thread.currentThread().interrupt();
-        }
-      }
-    });
-
-    try {
-      /*
-       * Synchronize to prevent the race of watchDataNode completing and then the session expiring
-       * before we update safeToRewatch.
-       */
-      synchronized (safeToRewatchLock) {
-        watchDataNode();
-        safeToRewatch = true;
-      }
-    } catch (InterruptedException e) {
-      zkClient.unregister(watcher);
-      throw e;
-    } catch (KeeperException e) {
-      zkClient.unregister(watcher);
-      throw e;
-    } catch (ZooKeeperConnectionException e) {
-      zkClient.unregister(watcher);
-      throw e;
-    }
-  }
-
-  /**
-   * Returns the data corresponding to a byte array in a remote zookeeper node.  This data is
-   * cached locally and updated in the background on watch notifications.
-   *
-   * @return the data currently cached locally or null if {@link #init()} hasn't been called
-   *   or the backing node has no data or does not exist anymore.
-   */
-  @Override
-  public @Nullable T get() {
-    return nodeData;
-  }
-
-  @VisibleForTesting
-  void updateData(@Nullable T newData) {
-    nodeData = newData;
-    dataUpdateListener.execute(newData);
-  }
-
-  private void tryWatchDataNode() throws InterruptedException {
-    backoffHelper.doUntilSuccess(new ExceptionalSupplier<Boolean, InterruptedException>() {
-      @Override public Boolean get() throws InterruptedException {
-        try {
-          watchDataNode();
-          return true;
-        } catch (KeeperException e) {
-          return false;
-        } catch (ZooKeeperConnectionException e) {
-          return false;
-        }
-      }
-    });
-  }
-
-  private void watchDataNode() throws InterruptedException, KeeperException,
-      ZooKeeperConnectionException {
-    try {
-      Stat stat = new Stat();
-      byte[] rawData = zkClient.get().getData(nodePath, nodeWatcher, stat);
-      T newData = deserializer.deserialize(rawData, stat);
-      updateData(newData);
-    } catch (KeeperException.NoNodeException e) {
-      /*
-       * This node doesn't exist right now, reflect that locally and then create a watch to wait
-       * for its recreation.
-       */
-      updateData(NO_DATA);
-      watchForExistence();
-    }
-  }
-
-  private void watchForExistence() throws InterruptedException, KeeperException,
-      ZooKeeperConnectionException {
-    /*
-     * If the node was created between the getData call and this call, just try watching it.
-     * We'll have an extra exists watch on it that goes off on its next deletion, which will
-     * be a no-op.
-     * Otherwise, just let the exists watch wait for its creation.
-     */
-    if (zkClient.get().exists(nodePath, existenceWatcher) != null) {
-      tryWatchDataNode();
-    }
-  }
-
-  /**
-   * Interface for defining zookeeper node data deserialization.
-   *
-   * @param <T> the type of data associated with this node
-   */
-  public interface NodeDeserializer<T> {
-    /**
-     * @param data the byte array returned from ZooKeeper when a watch is triggered.
-     * @param stat a ZooKeeper {@link Stat} object. Populated by
-     *             {@link org.apache.zookeeper.ZooKeeper#getData(String, boolean, Stat)}.
-     */
-    T deserialize(byte[] data, Stat stat);
-  }
-
-  // wrapper for backwards compatibility with older create() methods with Function parameter
-  private static final class FunctionWrapper<T> implements NodeDeserializer<T> {
-    private final Function<byte[], T> func;
-    private FunctionWrapper(Function<byte[], T> func) {
-      Preconditions.checkNotNull(func);
-      this.func = func;
-    }
-
-    public T deserialize(byte[] rawData, Stat stat) {
-      return func.apply(rawData);
-    }
-  }
-
-}
-


Mime
View raw message