aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zma...@apache.org
Subject [02/37] aurora git commit: Import of Twitter Commons.
Date Tue, 25 Aug 2015 18:19:16 GMT
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/CandidateImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/com/twitter/common/zookeeper/CandidateImplTest.java
new file mode 100644
index 0000000..d66c875
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/CandidateImplTest.java
@@ -0,0 +1,231 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.Ordering;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.zookeeper.Candidate.Leader;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.testing.BaseZooKeeperTest;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class CandidateImplTest extends BaseZooKeeperTest {
+  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+  private static final String SERVICE = "/twitter/services/puffin_linkhose/leader";
+  private static final Amount<Integer, Time> TIMEOUT = Amount.of(1, Time.MINUTES);
+
+  private LinkedBlockingDeque<CandidateImpl> candidateBuffer;
+
+  @Before
+  public void mySetUp() throws IOException {
+    candidateBuffer = new LinkedBlockingDeque<CandidateImpl>();
+  }
+
+  private Group createGroup(ZooKeeperClient zkClient) throws IOException {
+    return new Group(zkClient, ACL, SERVICE);
+  }
+
+  private class Reign implements Leader {
+    private ExceptionalCommand<JoinException> abdicate;
+    private final CandidateImpl candidate;
+    private final String id;
+    private CountDownLatch defeated = new CountDownLatch(1);
+
+    Reign(String id, CandidateImpl candidate) {
+      this.id = id;
+      this.candidate = candidate;
+    }
+
+    @Override
+    public void onElected(ExceptionalCommand<JoinException> abdicate) {
+      candidateBuffer.offerFirst(candidate);
+      this.abdicate = abdicate;
+    }
+
+    @Override
+    public void onDefeated() {
+      defeated.countDown();
+    }
+
+    public void abdicate() throws JoinException {
+      Preconditions.checkState(abdicate != null);
+      abdicate.execute();
+    }
+
+    public void expectDefeated() throws InterruptedException {
+      defeated.await();
+    }
+
+    @Override
+    public String toString() {
+      return id;
+    }
+  }
+
+  @Test
+  public void testOfferLeadership() throws Exception {
+    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) {
+      @Override public String toString() {
+        return "Leader1";
+      }
+    };
+    ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) {
+      @Override public String toString() {
+        return "Leader2";
+      }
+    };
+    ZooKeeperClient zkClient3 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) {
+      @Override public String toString() {
+        return "Leader3";
+      }
+    };
+
+    Reign candidate1Reign = new Reign("1", candidate1);
+    Reign candidate2Reign = new Reign("2", candidate2);
+    Reign candidate3Reign = new Reign("3", candidate3);
+
+    Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
+    Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
+    Supplier<Boolean> candidate3Leader = candidate3.offerLeadership(candidate3Reign);
+
+    assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader",
+        candidate1Leader.get());
+
+    shutdownNetwork();
+    restartNetwork();
+
+    assertTrue("A re-connect without a session expiration should leave the leader elected",
+        candidate1Leader.get());
+
+    candidate1Reign.abdicate();
+    assertSame(candidate1, candidateBuffer.takeLast());
+    assertFalse(candidate1Leader.get());
+    // Active abdication should trigger defeat.
+    candidate1Reign.expectDefeated();
+
+    CandidateImpl secondCandidate = candidateBuffer.takeLast();
+    assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " "
+               + candidateBuffer,
+        candidate2Leader.get() ^ candidate3Leader.get());
+
+    if (secondCandidate == candidate2) {
+      expireSession(zkClient2);
+      assertSame(candidate3, candidateBuffer.takeLast());
+      assertTrue(candidate3Leader.get());
+      // Passive expiration should trigger defeat.
+      candidate2Reign.expectDefeated();
+    } else {
+      expireSession(zkClient3);
+      assertSame(candidate2, candidateBuffer.takeLast());
+      assertTrue(candidate2Leader.get());
+      // Passive expiration should trigger defeat.
+      candidate3Reign.expectDefeated();
+    }
+  }
+
+  @Test
+  public void testCustomJudge() throws Exception {
+    Function<Iterable<String>, String> judge = new Function<Iterable<String>, String>() {
+      @Override public String apply(Iterable<String> input) {
+        return Ordering.natural().max(input);
+      }
+    };
+
+    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+    Group group1 = createGroup(zkClient1);
+    final CandidateImpl candidate1 =
+        new CandidateImpl(group1, judge, Suppliers.ofInstance("Leader1".getBytes())) {
+          @Override public String toString() {
+            return "Leader1";
+          }
+        };
+    ZooKeeperClient zkClient2 = createZkClient(TIMEOUT);
+    Group group2 = createGroup(zkClient2);
+    final CandidateImpl candidate2 =
+        new CandidateImpl(group2, judge, Suppliers.ofInstance("Leader2".getBytes())) {
+          @Override public String toString() {
+            return "Leader2";
+          }
+        };
+
+    Reign candidate1Reign = new Reign("1", candidate1);
+    Reign candidate2Reign = new Reign("2", candidate2);
+
+    candidate1.offerLeadership(candidate1Reign);
+    assertSame(candidate1, candidateBuffer.takeLast());
+
+    Supplier<Boolean> candidate2Leader = candidate2.offerLeadership(candidate2Reign);
+    assertSame(candidate2, candidateBuffer.takeLast());
+    candidate1Reign.expectDefeated();
+    assertTrue("Since the judge picks the newest member joining a group as leader candidate 1 "
+               + "should be defeated and candidate 2 leader", candidate2Leader.get());
+  }
+
+  @Test
+  public void testCustomDataSupplier() throws Exception {
+    byte[] DATA = "Leader1".getBytes();
+    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+    Group group1 = createGroup(zkClient1);
+    CandidateImpl candidate1 = new CandidateImpl(group1, Suppliers.ofInstance(DATA)) {
+      @Override public String toString() {
+        return "Leader1";
+      }
+    };
+    Reign candidate1Reign = new Reign("1", candidate1);
+
+    Supplier<Boolean> candidate1Leader = candidate1.offerLeadership(candidate1Reign);
+    assertSame(candidate1, candidateBuffer.takeLast());
+    assertTrue(candidate1Leader.get());
+    assertArrayEquals(DATA, candidate1.getLeaderData().get());
+  }
+
+  @Test
+  public void testEmptyMembership() throws Exception {
+    ZooKeeperClient zkClient1 = createZkClient(TIMEOUT);
+    final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1));
+    Reign candidate1Reign = new Reign("1", candidate1);
+
+    candidate1.offerLeadership(candidate1Reign);
+    assertSame(candidate1, candidateBuffer.takeLast());
+    candidate1Reign.abdicate();
+    assertFalse(candidate1.getLeaderData().isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/CompoundServerSetTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/CompoundServerSetTest.java b/commons/src/test/java/com/twitter/common/zookeeper/CompoundServerSetTest.java
new file mode 100644
index 0000000..19a4322
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/CompoundServerSetTest.java
@@ -0,0 +1,220 @@
+package com.twitter.common.zookeeper;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor;
+import com.twitter.common.net.pool.DynamicHostSet.MonitorException;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.thrift.ServiceInstance;
+
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.getCurrentArguments;
+
+public class CompoundServerSetTest extends EasyMockTest {
+  private static final Map<String, InetSocketAddress> AUX_PORTS = ImmutableMap.of();
+  private static final InetSocketAddress END_POINT =
+      InetSocketAddress.createUnresolved("foo", 12345);
+
+  private ServerSet.EndpointStatus mockStatus1;
+  private ServerSet.EndpointStatus mockStatus2;
+  private ServerSet.EndpointStatus mockStatus3;
+  private HostChangeMonitor<ServiceInstance> compoundMonitor;
+
+  private ServerSet serverSet1;
+  private ServerSet serverSet2;
+  private ServerSet serverSet3;
+  private Command stop1;
+  private Command stop2;
+  private Command stop3;
+  private CompoundServerSet compoundServerSet;
+
+  private ServiceInstance instance1;
+  private ServiceInstance instance2;
+  private ServiceInstance instance3;
+
+  private void triggerChange(ServiceInstance... hostChanges) {
+    compoundMonitor.onChange(ImmutableSet.copyOf(hostChanges));
+  }
+
+  private void triggerChange(
+      Capture<HostChangeMonitor<ServiceInstance>> capture,
+      ServiceInstance... hostChanges) {
+
+    capture.getValue().onChange(ImmutableSet.copyOf(hostChanges));
+  }
+
+  @Before
+  public void setUpMocks() throws Exception {
+    control = createControl();
+    compoundMonitor = createMock(new Clazz<HostChangeMonitor<ServiceInstance>>() { });
+
+    mockStatus1 = createMock(ServerSet.EndpointStatus.class);
+    mockStatus2 = createMock(ServerSet.EndpointStatus.class);
+    mockStatus3 = createMock(ServerSet.EndpointStatus.class);
+
+    serverSet1 = createMock(ServerSet.class);
+    serverSet2 = createMock(ServerSet.class);
+    serverSet3 = createMock(ServerSet.class);
+
+    stop1 = createMock(Command.class);
+    stop2 = createMock(Command.class);
+    stop3 = createMock(Command.class);
+
+    instance1 = createMock(ServiceInstance.class);
+    instance2 = createMock(ServiceInstance.class);
+    instance3 = createMock(ServiceInstance.class);
+
+    compoundServerSet = new CompoundServerSet(ImmutableList.of(serverSet1, serverSet2, serverSet3));
+  }
+
+  @Test
+  public void testJoin() throws Exception {
+    expect(serverSet1.join(END_POINT, AUX_PORTS, 0)).andReturn(mockStatus1);
+    expect(serverSet2.join(END_POINT, AUX_PORTS, 0)).andReturn(mockStatus2);
+    expect(serverSet3.join(END_POINT, AUX_PORTS, 0)).andReturn(mockStatus3);
+
+    mockStatus1.leave();
+    mockStatus2.leave();
+    mockStatus3.leave();
+
+    control.replay();
+
+    compoundServerSet.join(END_POINT, AUX_PORTS, 0).leave();
+  }
+
+  @Test(expected = Group.JoinException.class)
+  public void testJoinFailure() throws Exception {
+    // Throw exception for the first serverSet join.
+    expect(serverSet1.join(END_POINT, AUX_PORTS))
+        .andThrow(new Group.JoinException("Group join exception", null));
+
+    control.replay();
+    compoundServerSet.join(END_POINT, AUX_PORTS);
+  }
+
+  @Test(expected = ServerSet.UpdateException.class)
+  public void testStatusUpdateFailure() throws Exception {
+    expect(serverSet1.join(END_POINT, AUX_PORTS)).andReturn(mockStatus1);
+    expect(serverSet2.join(END_POINT, AUX_PORTS)).andReturn(mockStatus2);
+    expect(serverSet3.join(END_POINT, AUX_PORTS)).andReturn(mockStatus3);
+
+    mockStatus1.leave();
+    mockStatus2.leave();
+    expectLastCall().andThrow(new ServerSet.UpdateException("Update exception"));
+    mockStatus3.leave();
+
+    control.replay();
+
+    compoundServerSet.join(END_POINT, AUX_PORTS).leave();
+  }
+
+  @Test
+  public void testMonitor() throws Exception {
+    Capture<HostChangeMonitor<ServiceInstance>> set1Capture = createCapture();
+    Capture<HostChangeMonitor<ServiceInstance>> set2Capture = createCapture();
+    Capture<HostChangeMonitor<ServiceInstance>> set3Capture = createCapture();
+
+    expect(serverSet1.watch(
+        EasyMock.<HostChangeMonitor<ServiceInstance>>capture(set1Capture)))
+        .andReturn(stop1);
+    expect(serverSet2.watch(
+        EasyMock.<HostChangeMonitor<ServiceInstance>>capture(set2Capture)))
+        .andReturn(stop2);
+    expect(serverSet3.watch(
+        EasyMock.<HostChangeMonitor<ServiceInstance>>capture(set3Capture)))
+        .andReturn(stop3);
+
+    triggerChange(instance1);
+    triggerChange(instance1, instance2);
+    triggerChange(instance1, instance2, instance3);
+    triggerChange(instance1, instance3);
+    triggerChange(instance1, instance2, instance3);
+    triggerChange(instance3);
+    triggerChange();
+
+    control.replay();
+    compoundServerSet.watch(compoundMonitor);
+
+    // No new instances.
+    triggerChange(set1Capture);
+    triggerChange(set2Capture);
+    triggerChange(set3Capture);
+    // Add one instance from each serverset
+    triggerChange(set1Capture, instance1);
+    triggerChange(set2Capture, instance2);
+    triggerChange(set3Capture, instance3);
+    // Remove instance2
+    triggerChange(set2Capture);
+    // instance1 in both serverset1 and serverset2
+    triggerChange(set2Capture, instance1, instance2);
+    // Remove instances from serversets.
+    triggerChange(set1Capture);
+    triggerChange(set2Capture);
+    triggerChange(set3Capture);
+  }
+
+  @Test(expected = MonitorException.class)
+  public void testMonitorFailure() throws Exception {
+    serverSet1.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject());
+    expectLastCall().andThrow(new MonitorException("Monitor exception", null));
+
+    control.replay();
+    compoundServerSet.watch(compoundMonitor);
+  }
+
+  @Test
+  public void testInitialChange() throws Exception {
+    // Ensures that a synchronous change notification during the call to monitor() is properly
+    // reported.
+    serverSet1.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject());
+    expectLastCall().andAnswer(new IAnswer<Command>() {
+      @Override public Command answer() {
+        @SuppressWarnings("unchecked")
+        HostChangeMonitor<ServiceInstance> monitor =
+            (HostChangeMonitor<ServiceInstance>) getCurrentArguments()[0];
+        monitor.onChange(ImmutableSet.of(instance1, instance2));
+        return stop1;
+      }
+    });
+    compoundMonitor.onChange(ImmutableSet.of(instance1, instance2));
+    expect(serverSet2.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject()))
+        .andReturn(stop2);
+    expect(serverSet3.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject()))
+        .andReturn(stop3);
+
+    control.replay();
+
+    compoundServerSet.watch(compoundMonitor);
+  }
+
+  @Test
+  public void testStopMonitoring() throws Exception {
+    expect(serverSet1.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject()))
+        .andReturn(stop1);
+    expect(serverSet2.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject()))
+        .andReturn(stop2);
+    expect(serverSet3.watch(EasyMock.<HostChangeMonitor<ServiceInstance>>anyObject()))
+        .andReturn(stop3);
+
+    stop1.execute();
+    stop2.execute();
+    stop3.execute();
+
+    control.replay();
+    compoundServerSet.watch(compoundMonitor).execute();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/DistributedLockTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/DistributedLockTest.java b/commons/src/test/java/com/twitter/common/zookeeper/DistributedLockTest.java
new file mode 100644
index 0000000..4643062
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/DistributedLockTest.java
@@ -0,0 +1,177 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.twitter.common.zookeeper.testing.BaseZooKeeperTest;
+
+/**
+ * @author Florian Leibert
+ */
+public class DistributedLockTest extends BaseZooKeeperTest {
+  private static final String LOCK_PATH = "/test/lock";
+
+  private ZooKeeperClient zkClient;
+
+  @Before
+  public void mySetUp() throws Exception {
+    zkClient = createZkClient();
+
+  }
+
+  @Test
+  public void testFailDoubleLock() {
+    DistributedLock lock = new DistributedLockImpl(zkClient, LOCK_PATH);
+    lock.lock();
+    try {
+      lock.lock();
+      fail("Exception expected!");
+    } catch (DistributedLockImpl.LockingException e) {
+      // expected
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Test
+  public void testFailUnlock() {
+    DistributedLock lock = new DistributedLockImpl(zkClient, LOCK_PATH);
+    try {
+      lock.unlock();
+      fail("Expected exception while trying to unlock!");
+    } catch (DistributedLockImpl.LockingException e) {
+      // success
+    }
+  }
+
+  @Test
+  public void testTwoLocks() {
+    DistributedLock lock1 = new DistributedLockImpl(zkClient, LOCK_PATH);
+    DistributedLock lock2 = new DistributedLockImpl(zkClient, LOCK_PATH);
+    lock1.lock();
+    List<String> children = expectZkNodes(LOCK_PATH);
+    assertEquals("One child == lock held!", children.size(), 1);
+    lock1.unlock();
+    // check no locks held/empty children
+    children = expectZkNodes(LOCK_PATH);
+    assertEquals("No children, no lock held!", children.size(), 0);
+    lock2.lock();
+    children = expectZkNodes(LOCK_PATH);
+    assertEquals("One child == lock held!", children.size(), 1);
+    lock2.unlock();
+  }
+
+  @Test
+  public void testTwoLocksFailFast() {
+    DistributedLock lock1 = new DistributedLockImpl(zkClient, LOCK_PATH);
+    DistributedLock lock2 = new DistributedLockImpl(zkClient, LOCK_PATH);
+    lock1.lock();
+    boolean acquired = lock2.tryLock(10, TimeUnit.MILLISECONDS);
+    assertFalse("Couldn't acquire lock because it's currently held", acquired);
+    lock1.unlock();
+    lock2.unlock();
+  }
+
+  @Test
+  @Ignore("pending: <http://jira.local.twitter.com/browse/RESEARCH-49>")
+  public void testMultiConcurrentLocking() throws Exception {
+    //TODO(Florian Leibert): this is a bit janky, so let's replace it.
+    for (int i = 0; i < 50; i++) {
+      testConcurrentLocking();
+    }
+    mySetUp();
+  }
+
+  @Test
+  public void testConcurrentLocking() throws Exception {
+    ZooKeeperClient zk1 = createZkClient();
+    ZooKeeperClient zk2 = createZkClient();
+    ZooKeeperClient zk3 = createZkClient();
+
+    final DistributedLock lock1 = new DistributedLockImpl(zk1, LOCK_PATH);
+    final DistributedLock lock2 = new DistributedLockImpl(zk2, LOCK_PATH);
+    final DistributedLock lock3 = new DistributedLockImpl(zk3, LOCK_PATH);
+    Callable<Object> t1 = new Callable<Object>() {
+      @Override
+      public Object call() throws InterruptedException {
+        lock1.lock();
+        try {
+          Thread.sleep(50);
+        } finally {
+          lock1.unlock();
+        }
+        return new Object();
+      }
+    };
+
+    Callable<Object> t2 = new Callable<Object>() {
+      @Override
+      public Object call() throws InterruptedException {
+        lock2.lock();
+        try {
+          Thread.sleep(50);
+        } finally {
+          lock2.unlock();
+        }
+        return new Object();
+      }
+    };
+
+    Callable<Object> t3 = new Callable<Object>() {
+      @Override
+      public Object call() throws InterruptedException {
+        lock3.lock();
+        try {
+          Thread.sleep(50);
+        } finally {
+          lock3.unlock();
+        }
+        return new Object();
+      }
+    };
+
+    //TODO(Florian Leibert): remove this executors stuff and use a latch instead.
+    ExecutorService ex = Executors.newCachedThreadPool();
+    @SuppressWarnings("unchecked") List<Callable<Object>> tlist = Arrays.asList(t1, t2, t3);
+    ex.invokeAll(tlist);
+    assertTrue("No Children left!", expectZkNodes(LOCK_PATH).size() == 0);
+  }
+
+  protected List<String> expectZkNodes(String path) {
+    try {
+      List<String> children = zkClient.get().getChildren(path, null);
+      return children;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/GroupTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/GroupTest.java b/commons/src/test/java/com/twitter/common/zookeeper/GroupTest.java
new file mode 100644
index 0000000..a5342d9
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/GroupTest.java
@@ -0,0 +1,334 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.base.Supplier;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.zookeeper.Group.GroupChangeListener;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.Membership;
+import com.twitter.common.zookeeper.Group.NodeScheme;
+import com.twitter.common.zookeeper.ZooKeeperClient.Credentials;
+import com.twitter.common.zookeeper.testing.BaseZooKeeperTest;
+
+import static com.google.common.testing.junit4.JUnitAsserts.assertNotEqual;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class GroupTest extends BaseZooKeeperTest {
+
+  private ZooKeeperClient zkClient;
+  private Group joinGroup;
+  private Group watchGroup;
+  private Command stopWatching;
+  private com.twitter.common.base.Command onLoseMembership;
+
+  private RecordingListener listener;
+
+  public GroupTest() {
+    super(Amount.of(1, Time.DAYS));
+  }
+
+  @Before
+  public void mySetUp() throws Exception {
+    onLoseMembership = createMock(Command.class);
+
+    zkClient = createZkClient("group", "test");
+    joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+    watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group");
+
+    listener = new RecordingListener();
+    stopWatching = watchGroup.watch(listener);
+  }
+
+  private static class RecordingListener implements GroupChangeListener {
+    private final LinkedBlockingQueue<Iterable<String>> membershipChanges =
+        new LinkedBlockingQueue<Iterable<String>>();
+
+    @Override
+    public void onGroupChange(Iterable<String> memberIds) {
+      membershipChanges.add(memberIds);
+    }
+
+    public Iterable<String> take() throws InterruptedException {
+      return membershipChanges.take();
+    }
+
+    public void assertEmpty() {
+      assertEquals(ImmutableList.<Iterable<String>>of(), ImmutableList.copyOf(membershipChanges));
+    }
+
+    @Override
+    public String toString() {
+      return membershipChanges.toString();
+    }
+  }
+
+  private static class CustomScheme implements NodeScheme {
+    static final String NODE_NAME = "custom_name";
+
+    @Override
+    public boolean isMember(String nodeName) {
+      return NODE_NAME.equals(nodeName);
+    }
+
+    @Override
+    public String createName(byte[] membershipData) {
+      return NODE_NAME;
+    }
+
+    @Override
+    public boolean isSequential() {
+      return false;
+    }
+  }
+
+  @Test
+  public void testSessionExpirationTriggersOnLoseMembership() throws Exception {
+    final CountDownLatch lostMembership = new CountDownLatch(1);
+    Command onLoseMembership = new Command() {
+      @Override public void execute() throws RuntimeException {
+        lostMembership.countDown();
+      }
+    };
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join(onLoseMembership);
+    assertMembershipObserved(membership.getMemberId());
+    expireSession(zkClient);
+
+    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+  }
+
+  @Test
+  public void testNodeDeleteTriggersOnLoseMembership() throws Exception {
+    final CountDownLatch lostMembership = new CountDownLatch(1);
+    Command onLoseMembership = new Command() {
+      @Override public void execute() throws RuntimeException {
+        lostMembership.countDown();
+      }
+    };
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join(onLoseMembership);
+    assertMembershipObserved(membership.getMemberId());
+    membership.cancel();
+
+    lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated.
+  }
+
+  @Test
+  public void testJoinsAndWatchesSurviveDisconnect() throws Exception {
+    replay(onLoseMembership);
+
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join();
+    String originalMemberId = membership.getMemberId();
+    assertMembershipObserved(originalMemberId);
+
+    shutdownNetwork();
+    restartNetwork();
+
+    // The member should still be present under existing ephemeral node since session did not
+    // expire.
+    watchGroup.watch(listener);
+    assertMembershipObserved(originalMemberId);
+
+    membership.cancel();
+
+    assertEmptyMembershipObserved();
+    assertEmptyMembershipObserved(); // and again for 2nd listener
+
+    listener.assertEmpty();
+
+    verify(onLoseMembership);
+    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+  }
+
+  @Test
+  public void testJoinsAndWatchesSurviveExpiredSession() throws Exception {
+    onLoseMembership.execute();
+    replay(onLoseMembership);
+
+    assertEmptyMembershipObserved();
+
+    Membership membership = joinGroup.join(onLoseMembership);
+    String originalMemberId = membership.getMemberId();
+    assertMembershipObserved(originalMemberId);
+
+    expireSession(zkClient);
+
+    // We should have lost our group membership and then re-gained it with a new ephemeral node.
+    // We may or may-not see the intermediate state change but we must see the final state
+    Iterable<String> members = listener.take();
+    if (Iterables.isEmpty(members)) {
+      members = listener.take();
+    }
+    assertEquals(1, Iterables.size(members));
+    assertNotEqual(originalMemberId, Iterables.getOnlyElement(members));
+    assertNotEqual(originalMemberId, membership.getMemberId());
+
+    listener.assertEmpty();
+
+    verify(onLoseMembership);
+    reset(onLoseMembership); // Turn off expectations during ZK server shutdown.
+  }
+
+  @Test
+  public void testJoinCustomNamingScheme() throws Exception {
+    Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group",
+        new CustomScheme());
+
+    listener = new RecordingListener();
+    group.watch(listener);
+    assertEmptyMembershipObserved();
+
+    Membership membership = group.join();
+    String memberId = membership.getMemberId();
+
+    assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId);
+    assertMembershipObserved(memberId);
+
+    expireSession(zkClient);
+  }
+
+  @Test
+  public void testUpdateMembershipData() throws Exception {
+    Supplier<byte[]> dataSupplier = new EasyMockTest.Clazz<Supplier<byte[]>>() {}.createMock();
+
+    byte[] initial = "start".getBytes();
+    expect(dataSupplier.get()).andReturn(initial);
+
+    byte[] second = "update".getBytes();
+    expect(dataSupplier.get()).andReturn(second);
+
+    replay(dataSupplier);
+
+    Membership membership = joinGroup.join(dataSupplier, onLoseMembership);
+    assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get()
+        .getData(membership.getMemberPath(), false, null));
+
+    assertArrayEquals("Updating supplier should not change membership data",
+        initial, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+    membership.updateMemberData();
+    assertArrayEquals("Updating membership should change data",
+        second, zkClient.get().getData(membership.getMemberPath(), false, null));
+
+    verify(dataSupplier);
+  }
+
+  @Test
+  public void testAcls() throws Exception {
+    Group securedMembership =
+        new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL,
+            "/secured/group/membership");
+
+    String memberId = securedMembership.join().getMemberId();
+
+    Group unauthenticatedObserver =
+        new Group(createZkClient(Credentials.NONE),
+            Ids.READ_ACL_UNSAFE,
+            "/secured/group/membership");
+    RecordingListener unauthenticatedListener = new RecordingListener();
+    unauthenticatedObserver.watch(unauthenticatedListener);
+
+    assertMembershipObserved(unauthenticatedListener, memberId);
+
+    try {
+      unauthenticatedObserver.join();
+      fail("Expected join exception for unauthenticated observer");
+    } catch (JoinException e) {
+      // expected
+    }
+
+    Group unauthorizedObserver =
+        new Group(createZkClient("joe", "schmoe"),
+            Ids.READ_ACL_UNSAFE,
+            "/secured/group/membership");
+    RecordingListener unauthorizedListener = new RecordingListener();
+    unauthorizedObserver.watch(unauthorizedListener);
+
+    assertMembershipObserved(unauthorizedListener, memberId);
+
+    try {
+      unauthorizedObserver.join();
+      fail("Expected join exception for unauthorized observer");
+    } catch (JoinException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testStopWatching() throws Exception {
+    replay(onLoseMembership);
+
+    assertEmptyMembershipObserved();
+
+    Membership member1 = joinGroup.join();
+    String memberId1 = member1.getMemberId();
+    assertMembershipObserved(memberId1);
+
+    Membership member2 = joinGroup.join();
+    String memberId2 = member2.getMemberId();
+    assertMembershipObserved(memberId1, memberId2);
+
+    stopWatching.execute();
+
+    member1.cancel();
+    Membership member3 = joinGroup.join();
+    member2.cancel();
+    member3.cancel();
+
+    listener.assertEmpty();
+  }
+
+  private void assertEmptyMembershipObserved() throws InterruptedException {
+    assertMembershipObserved();
+  }
+
+  private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException {
+    assertMembershipObserved(listener, expectedMemberIds);
+  }
+
+  private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds)
+      throws InterruptedException {
+
+    assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/PartitionerTest.java b/commons/src/test/java/com/twitter/common/zookeeper/PartitionerTest.java
new file mode 100644
index 0000000..2a0b774
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/PartitionerTest.java
@@ -0,0 +1,164 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import com.google.common.testing.TearDown;
+import com.twitter.common.zookeeper.Group.GroupChangeListener;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.Membership;
+import com.twitter.common.zookeeper.Partitioner.Partition;
+import com.twitter.common.zookeeper.testing.BaseZooKeeperTest;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author John Sirois
+ */
+public class PartitionerTest extends BaseZooKeeperTest {
+  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+  private static final String PARTITION_NAMESPACE = "/twitter/puffin/hosebird";
+
+  @Test
+  public void testHeterogeneousPartitionGroup() throws Exception {
+    ZooKeeperClient zkClient = createZkClient();
+    ZooKeeperUtils.ensurePath(zkClient, ACL, PARTITION_NAMESPACE + "/not-a-partition-node");
+    Partitioner partitioner = new Partitioner(zkClient, ACL, PARTITION_NAMESPACE);
+    join(partitioner);
+
+    assertEquals("Expected Partitioner to be tolerant of foreign nodes",
+        1, partitioner.getGroupSize());
+  }
+
+  private static class InstrumentedPartitioner extends Partitioner {
+    private final AtomicInteger myViewOfGroupSize = new AtomicInteger();
+
+    public InstrumentedPartitioner(ZooKeeperClient zkClient) throws IOException {
+      super(zkClient, ACL, PARTITION_NAMESPACE);
+    }
+
+    @Override GroupChangeListener createGroupChangeListener(Membership membership) {
+      final GroupChangeListener listener = super.createGroupChangeListener(membership);
+      return new GroupChangeListener() {
+        @Override public void onGroupChange(Iterable<String> memberIds) {
+          listener.onGroupChange(memberIds);
+          synchronized (myViewOfGroupSize) {
+            myViewOfGroupSize.set(getGroupSize());
+            myViewOfGroupSize.notify();
+          }
+        }
+      };
+    }
+
+    public void observeGroupSize(int expectedSize) throws InterruptedException {
+      while (expectedSize != myViewOfGroupSize.get()) {
+        synchronized (myViewOfGroupSize) {
+          myViewOfGroupSize.wait();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testJoin() throws Exception {
+    // Test that the 1st member of the partition group owns the whole space.
+    InstrumentedPartitioner firstPartitioner = new InstrumentedPartitioner(createZkClient());
+    Partition firstPartition = join(firstPartitioner);
+
+    assertTrue(firstPartition.isMember(0L));
+    assertTrue(firstPartition.isMember(1L));
+    assertTrue(firstPartition.isMember(2L));
+
+    // Test that when additional members join partitions are added and existing partitions shrink.
+    InstrumentedPartitioner secondPartitioner = new InstrumentedPartitioner(createZkClient());
+    Partition secondPartition = join(secondPartitioner);
+
+    firstPartitioner.observeGroupSize(2);
+
+    assertTrue(firstPartition.isMember(0L));
+    assertFalse(secondPartition.isMember(0L));
+
+    assertFalse(firstPartition.isMember(1L));
+    assertTrue(secondPartition.isMember(1L));
+
+    assertTrue(firstPartition.isMember(2L));
+    assertFalse(secondPartition.isMember(2L));
+
+    InstrumentedPartitioner thirdPartitioner = new InstrumentedPartitioner(createZkClient());
+    Partition thirdPartition = join(thirdPartitioner);
+
+    firstPartitioner.observeGroupSize(3);
+    secondPartitioner.observeGroupSize(3);
+
+    assertTrue(firstPartition.isMember(0L));
+    assertFalse(secondPartition.isMember(0L));
+    assertFalse(thirdPartition.isMember(0L));
+
+    assertFalse(firstPartition.isMember(1L));
+    assertTrue(secondPartition.isMember(1L));
+    assertFalse(thirdPartition.isMember(1L));
+
+    assertFalse(firstPartition.isMember(2L));
+    assertFalse(secondPartition.isMember(2L));
+    assertTrue(thirdPartition.isMember(2L));
+
+    assertTrue(firstPartition.isMember(3L));
+    assertFalse(secondPartition.isMember(3L));
+    assertFalse(thirdPartition.isMember(3L));
+
+    // Test that members leaving the partition group results in the partitions being merged.
+    firstPartition.cancel();
+
+    secondPartitioner.observeGroupSize(2);
+    thirdPartitioner.observeGroupSize(2);
+
+    assertTrue(secondPartition.isMember(0L));
+    assertFalse(thirdPartition.isMember(0L));
+
+    assertFalse(secondPartition.isMember(1L));
+    assertTrue(thirdPartition.isMember(1L));
+
+    assertTrue(secondPartition.isMember(2L));
+    assertFalse(thirdPartition.isMember(2L));
+
+    thirdPartition.cancel();
+
+    secondPartitioner.observeGroupSize(1);
+
+    assertTrue(secondPartition.isMember(0L));
+    assertTrue(secondPartition.isMember(1L));
+    assertTrue(secondPartition.isMember(2L));
+  }
+
+  private Partition join(Partitioner partitioner) throws JoinException, InterruptedException {
+    final Partition partition = partitioner.join();
+    addTearDown(new TearDown() {
+      @Override public void tearDown() throws JoinException {
+        partition.cancel();
+      }
+    });
+    return partition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/com/twitter/common/zookeeper/ServerSetImplTest.java
new file mode 100644
index 0000000..64891a0
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/ServerSetImplTest.java
@@ -0,0 +1,441 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.Override;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.testing.TearDown;
+import com.google.gson.GsonBuilder;
+
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.twitter.common.base.Command;
+import com.twitter.common.io.Codec;
+import com.twitter.common.io.JsonCodec;
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.common.thrift.TResourceExhaustedException;
+import com.twitter.common.thrift.Thrift;
+import com.twitter.common.thrift.ThriftFactory;
+import com.twitter.common.thrift.ThriftFactory.ThriftFactoryException;
+import com.twitter.common.zookeeper.Group;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.Group.WatchException;
+import com.twitter.common.zookeeper.ServerSet.EndpointStatus;
+import com.twitter.common.zookeeper.testing.BaseZooKeeperTest;
+import com.twitter.common.zookeeper.ZooKeeperClient;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ * TODO(William Farner): Change this to remove thrift dependency.
+ */
+public class ServerSetImplTest extends BaseZooKeeperTest {
+  private static final Logger LOG = Logger.getLogger(ServerSetImpl.class.getName());
+  private static final List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+  private static final String SERVICE = "/twitter/services/puffin_hosebird";
+
+  private LinkedBlockingQueue<ImmutableSet<ServiceInstance>> serverSetBuffer;
+  private DynamicHostSet.HostChangeMonitor<ServiceInstance> serverSetMonitor;
+
+  @Before
+  public void mySetUp() throws IOException {
+    serverSetBuffer = new LinkedBlockingQueue<ImmutableSet<ServiceInstance>>();
+    serverSetMonitor = new DynamicHostSet.HostChangeMonitor<ServiceInstance>() {
+      @Override public void onChange(ImmutableSet<ServiceInstance> serverSet) {
+        serverSetBuffer.offer(serverSet);
+      }
+    };
+  }
+
+  private ServerSetImpl createServerSet() throws IOException {
+    return new ServerSetImpl(createZkClient(), ACL, SERVICE);
+  }
+
+  @Test
+  public void testLifecycle() throws Exception {
+    ServerSetImpl client = createServerSet();
+    client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    ServerSetImpl server = createServerSet();
+    EndpointStatus status = server.join(
+        InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080), 0);
+
+    ServiceInstance serviceInstance = new ServiceInstance(
+        new Endpoint("foo", 1234),
+        ImmutableMap.of("http-admin", new Endpoint("foo", 8080)),
+        Status.ALIVE)
+        .setShard(0);
+
+    assertChangeFired(serviceInstance);
+
+    status.leave();
+    assertChangeFiredEmpty();
+    assertTrue(serverSetBuffer.isEmpty());
+  }
+
+  @Test
+  public void testMembershipChanges() throws Exception {
+    ServerSetImpl client = createServerSet();
+    client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    ServerSetImpl server = createServerSet();
+
+    EndpointStatus foo = join(server, "foo");
+    assertChangeFired("foo");
+
+    expireSession(client.getZkClient());
+
+    EndpointStatus bar = join(server, "bar");
+
+    // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a
+    // change, just "foo", "bar" since this was an addition.
+    assertChangeFired("foo", "bar");
+
+    foo.leave();
+    assertChangeFired("bar");
+
+    EndpointStatus baz = join(server, "baz");
+    assertChangeFired("bar", "baz");
+
+    baz.leave();
+    assertChangeFired("bar");
+
+    bar.leave();
+    assertChangeFiredEmpty();
+
+    assertTrue(serverSetBuffer.isEmpty());
+  }
+
+  @Test
+  public void testStopMonitoring() throws Exception {
+    ServerSetImpl client = createServerSet();
+    Command stopMonitoring = client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    ServerSetImpl server = createServerSet();
+
+    EndpointStatus foo = join(server, "foo");
+    assertChangeFired("foo");
+    EndpointStatus bar = join(server, "bar");
+    assertChangeFired("foo", "bar");
+
+    stopMonitoring.execute();
+
+    // No new updates should be received since monitoring has stopped.
+    foo.leave();
+    assertTrue(serverSetBuffer.isEmpty());
+
+    // Expiration event.
+    assertTrue(serverSetBuffer.isEmpty());
+  }
+
+  @Test
+  public void testOrdering() throws Exception {
+    ServerSetImpl client = createServerSet();
+    client.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+
+    Map<String, InetSocketAddress> server1Ports = makePortMap("http-admin1", 8080);
+    Map<String, InetSocketAddress> server2Ports = makePortMap("http-admin2", 8081);
+    Map<String, InetSocketAddress> server3Ports = makePortMap("http-admin3", 8082);
+
+    ServerSetImpl server1 = createServerSet();
+    ServerSetImpl server2 = createServerSet();
+    ServerSetImpl server3 = createServerSet();
+
+    ServiceInstance instance1 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+        Status.ALIVE)
+        .setShard(0);
+    ServiceInstance instance2 = new ServiceInstance(
+        new Endpoint("foo", 1001),
+        ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)),
+        Status.ALIVE)
+        .setShard(1);
+    ServiceInstance instance3 = new ServiceInstance(
+        new Endpoint("foo", 1002),
+        ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)),
+        Status.ALIVE)
+        .setShard(2);
+
+    server1.join(
+        InetSocketAddress.createUnresolved("foo", 1000),
+        server1Ports,
+        0);
+    assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take()));
+
+    EndpointStatus status2 = server2.join(
+        InetSocketAddress.createUnresolved("foo", 1001),
+        server2Ports,
+        1);
+    assertEquals(ImmutableList.of(instance1, instance2),
+        ImmutableList.copyOf(serverSetBuffer.take()));
+
+    server3.join(
+        InetSocketAddress.createUnresolved("foo", 1002), server3Ports, 2);
+    assertEquals(ImmutableList.of(instance1, instance2, instance3),
+        ImmutableList.copyOf(serverSetBuffer.take()));
+
+    status2.leave();
+    assertEquals(ImmutableList.of(instance1, instance3),
+        ImmutableList.copyOf(serverSetBuffer.take()));
+  }
+
+  @Test
+  public void testJsonCodecRoundtrip() throws Exception {
+    Codec<ServiceInstance> codec = ServerSetImpl.createJsonCodec();
+    ServiceInstance instance1 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http", new Endpoint("foo", 8080)),
+        Status.ALIVE)
+        .setShard(0);
+    byte[] data = ServerSets.serializeServiceInstance(instance1, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+    ServiceInstance instance2 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
+        Status.ALIVE);
+    data = ServerSets.serializeServiceInstance(instance2, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+
+    ServiceInstance instance3 = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.<String, Endpoint>of(),
+        Status.ALIVE);
+    data = ServerSets.serializeServiceInstance(instance3, codec);
+    assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort());
+    assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard());
+  }
+
+  @Ignore("TODO(zmanji): Fix to work with thrift 0.9.0")
+  @Test
+  public void testJsonCodecCompatibility() throws IOException {
+    ServiceInstance instance = new ServiceInstance(
+        new Endpoint("foo", 1000),
+        ImmutableMap.of("http", new Endpoint("foo", 8080)),
+        Status.ALIVE).setShard(42);
+
+    ByteArrayOutputStream legacy = new ByteArrayOutputStream();
+    JsonCodec.create(
+        ServiceInstance.class,
+        new GsonBuilder().setExclusionStrategies(JsonCodec.getThriftExclusionStrategy())
+            .create()).serialize(instance, legacy);
+
+    ByteArrayOutputStream results = new ByteArrayOutputStream();
+    ServerSetImpl.createJsonCodec().serialize(instance, results);
+
+    assertEquals(legacy.toString(), results.toString());
+
+    results = new ByteArrayOutputStream();
+    ServerSetImpl.createJsonCodec().serialize(instance, results);
+    assertEquals(
+        "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000},"
+            + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}},"
+            + "\"status\":\"ALIVE\","
+            + "\"shard\":42}",
+        results.toString());
+  }
+
+  //TODO(Jake Mannix) move this test method to ServerSetConnectionPoolTest, which should be renamed
+  // to DynamicBackendConnectionPoolTest, and refactor assertChangeFired* methods to be used both
+  // here and there
+  @Test
+  public void testThriftWithServerSet() throws Exception {
+    final AtomicReference<Socket> clientConnection = new AtomicReference<Socket>();
+    final CountDownLatch connected = new CountDownLatch(1);
+    final ServerSocket server = new ServerSocket(0);
+    Thread service = new Thread(new Runnable() {
+      @Override public void run() {
+        try {
+          clientConnection.set(server.accept());
+        } catch (IOException e) {
+          LOG.log(Level.WARNING, "Problem accepting a connection to thrift server", e);
+        } finally {
+          connected.countDown();
+        }
+      }
+    });
+    service.setDaemon(true);
+    service.start();
+
+    ServerSetImpl serverSetImpl = new ServerSetImpl(createZkClient(), SERVICE);
+    serverSetImpl.watch(serverSetMonitor);
+    assertChangeFiredEmpty();
+    InetSocketAddress localSocket = new InetSocketAddress(server.getLocalPort());
+    serverSetImpl.join(localSocket, Maps.<String, InetSocketAddress>newHashMap());
+    assertChangeFired(ImmutableMap.<InetSocketAddress, Status>of(localSocket, Status.ALIVE));
+
+    Service.Iface svc = createThriftClient(serverSetImpl);
+    try {
+      String value = svc.getString();
+      LOG.info("Got value: " + value + " from server");
+      assertEquals(Service.Iface.DONE, value);
+    } catch (TResourceExhaustedException e) {
+      fail("ServerSet is not empty, should not throw exception here");
+    } finally {
+      connected.await();
+      server.close();
+    }
+  }
+
+  @Test
+  public void testUnwatchOnException() throws Exception {
+    IMocksControl control = createControl();
+
+    ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class);
+    Watcher onExpirationWatcher = control.createMock(Watcher.class);
+
+    expect(zkClient.registerExpirationHandler(anyObject(Command.class)))
+        .andReturn(onExpirationWatcher);
+
+    expect(zkClient.get()).andThrow(new InterruptedException());
+    expect(zkClient.unregister(onExpirationWatcher)).andReturn(true);
+    control.replay();
+
+    Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla");
+    ServerSetImpl serverset = new ServerSetImpl(zkClient, group);
+
+    try {
+      serverset.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() {
+        @Override
+        public void onChange(ImmutableSet<ServiceInstance> hostSet) {}
+      });
+      fail("Expected MonitorException");
+    } catch (DynamicHostSet.MonitorException e) {
+      // expected
+    }
+    control.verify();
+  }
+
+  private Service.Iface createThriftClient(DynamicHostSet<ServiceInstance> serverSet)
+      throws ThriftFactoryException {
+
+    final Thrift<Service.Iface> thrift = ThriftFactory.create(Service.Iface.class).build(serverSet);
+    addTearDown(new TearDown() {
+      @Override public void tearDown() {
+        thrift.close();
+      }
+    });
+    return thrift.create();
+  }
+
+  private static Map<String, InetSocketAddress> makePortMap(String name, int port) {
+    return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port));
+  }
+
+  public static class Service {
+    public static interface Iface {
+      public static final String DONE = "done";
+      public String getString() throws TResourceExhaustedException;
+    }
+
+    public static class Client implements Iface {
+      public Client(TProtocol protocol) {
+        assertNotNull(protocol);
+      }
+      @Override public String getString() {
+        return DONE;
+      }
+    }
+  }
+
+  private EndpointStatus join(ServerSet serverSet, String host)
+      throws JoinException, InterruptedException {
+
+    return serverSet.join(InetSocketAddress.createUnresolved(host, 42), ImmutableMap.<String, InetSocketAddress>of());
+  }
+
+  private void assertChangeFired(Map<InetSocketAddress, Status> hostsStatuses)
+      throws InterruptedException {
+    assertChangeFired(
+        ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(hostsStatuses.entrySet()),
+        new Function<Map.Entry<InetSocketAddress, Status>, ServiceInstance>() {
+          @Override public ServiceInstance apply(Map.Entry<InetSocketAddress, Status> e) {
+            return new ServiceInstance(new Endpoint(e.getKey().getHostName(), e.getKey().getPort()),
+                ImmutableMap.<String, Endpoint>of(), e.getValue());
+          }
+        })));
+  }
+
+  private void assertChangeFired(String... serviceHosts)
+      throws InterruptedException {
+
+    assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts),
+        new Function<String, ServiceInstance>() {
+          @Override public ServiceInstance apply(String serviceHost) {
+            return new ServiceInstance(new Endpoint(serviceHost, 42),
+                ImmutableMap.<String, Endpoint>of(), Status.ALIVE);
+          }
+        })));
+  }
+
+  protected void assertChangeFiredEmpty() throws InterruptedException {
+    assertChangeFired(ImmutableSet.<ServiceInstance>of());
+  }
+
+  protected void assertChangeFired(ServiceInstance... serviceInstances)
+      throws InterruptedException {
+    assertChangeFired(ImmutableSet.copyOf(serviceInstances));
+  }
+
+  protected void assertChangeFired(ImmutableSet<ServiceInstance> serviceInstances)
+      throws InterruptedException {
+    assertEquals(serviceInstances, serverSetBuffer.take());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/ServerSetsTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/com/twitter/common/zookeeper/ServerSetsTest.java
new file mode 100644
index 0000000..1c7d67c
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/ServerSetsTest.java
@@ -0,0 +1,35 @@
+package com.twitter.common.zookeeper;
+
+import com.google.common.collect.ImmutableMap;
+
+import com.twitter.common.io.Codec;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ServerSetsTest {
+  @Test
+  public void testSimpleSerialization() throws Exception {
+    InetSocketAddress endpoint = new InetSocketAddress(12345);
+    Map<String, Endpoint > additionalEndpoints = ImmutableMap.of();
+    Status status = Status.ALIVE;
+
+    Codec<ServiceInstance> codec = ServerSetImpl.createDefaultCodec();
+
+    byte[] data = ServerSets.serializeServiceInstance(
+        endpoint, additionalEndpoints, status, codec);
+
+    ServiceInstance instance = ServerSets.deserializeServiceInstance(data, codec);
+
+    assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort());
+    assertEquals(additionalEndpoints, instance.getAdditionalEndpoints());
+    assertEquals(Status.ALIVE, instance.getStatus());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/SingletonServiceTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/SingletonServiceTest.java b/commons/src/test/java/com/twitter/common/zookeeper/SingletonServiceTest.java
new file mode 100644
index 0000000..2d387ab
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/SingletonServiceTest.java
@@ -0,0 +1,352 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.testing.TearDown;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.common.base.ExceptionalCommand;
+import com.twitter.common.zookeeper.Candidate.Leader;
+import com.twitter.common.zookeeper.Group.JoinException;
+import com.twitter.common.zookeeper.ServerSet.EndpointStatus;
+import com.twitter.common.zookeeper.SingletonService.DefeatOnDisconnectLeader;
+import com.twitter.common.zookeeper.SingletonService.LeaderControl;
+import com.twitter.common.zookeeper.SingletonService.LeadershipListener;
+import com.twitter.common.zookeeper.testing.BaseZooKeeperTest;
+
+import static com.twitter.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createControl;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.fail;
+
+public class SingletonServiceTest extends BaseZooKeeperTest {
+  private static final int PORT_A = 1234;
+  private static final int PORT_B = 8080;
+  private static final InetSocketAddress PRIMARY_ENDPOINT =
+      InetSocketAddress.createUnresolved("foo", PORT_A);
+  private static final Map<String, InetSocketAddress> AUX_ENDPOINTS =
+      ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B));
+
+  private IMocksControl control;
+  private SingletonService.LeadershipListener listener;
+  private ServerSet serverSet;
+  private ServerSet.EndpointStatus endpointStatus;
+  private Candidate candidate;
+  private ExceptionalCommand<Group.JoinException> abdicate;
+
+  private SingletonService service;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void mySetUp() throws IOException {
+    control = createControl();
+    addTearDown(new TearDown() {
+      @Override public void tearDown() {
+        control.verify();
+      }
+    });
+    listener = control.createMock(SingletonService.LeadershipListener.class);
+    serverSet = control.createMock(ServerSet.class);
+    candidate = control.createMock(Candidate.class);
+    endpointStatus = control.createMock(ServerSet.EndpointStatus.class);
+    abdicate = control.createMock(ExceptionalCommand.class);
+
+    service = new SingletonService(serverSet, candidate);
+  }
+
+  private void newLeader(
+      final String hostName,
+      Capture<Leader> leader,
+      LeadershipListener listener) throws Exception {
+
+    service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A),
+        ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)),
+        listener);
+
+    // This actually elects the leader.
+    leader.getValue().onElected(abdicate);
+  }
+
+  private void newLeader(String hostName, Capture<Leader> leader) throws Exception {
+    newLeader(hostName, leader, listener);
+  }
+
+  private IExpectationSetters<EndpointStatus> expectJoin() throws Exception {
+    return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS));
+  }
+
+  @Test
+  public void testLeadAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+    endpointStatus.leave();
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().leave();
+  }
+
+  @Test
+  public void teatLeadLeaveNoAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    abdicate.execute();
+
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().leave();
+  }
+
+  @Test
+  public void testLeadJoinFailure() throws Exception {
+    Capture<Leader> leaderCapture = new Capture<Leader>();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception()));
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+
+    try {
+      controlCapture.getValue().advertise();
+      fail("Join should have failed.");
+    } catch (JoinException e) {
+      // Expected.
+    }
+
+    controlCapture.getValue().leave();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleAdvertise() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().advertise();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testMultipleLeave() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    expectJoin().andReturn(endpointStatus);
+    endpointStatus.leave();
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().advertise();
+    controlCapture.getValue().leave();
+    controlCapture.getValue().leave();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testAdvertiseAfterLeave() throws Exception {
+    Capture<Leader> leaderCapture = createCapture();
+
+    expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+    Capture<LeaderControl> controlCapture = createCapture();
+    listener.onLeading(capture(controlCapture));
+
+    abdicate.execute();
+
+    control.replay();
+
+    newLeader("foo", leaderCapture);
+    controlCapture.getValue().leave();
+    controlCapture.getValue().advertise();
+  }
+
+  @Test
+  public void testLeadMulti() throws Exception {
+    List<Capture<Leader>> leaderCaptures = Lists.newArrayList();
+    List<Capture<LeaderControl>> leaderControlCaptures = Lists.newArrayList();
+
+    for (int i = 0; i < 5; i++) {
+      Capture<Leader> leaderCapture = new Capture<Leader>();
+      leaderCaptures.add(leaderCapture);
+      Capture<LeaderControl> controlCapture = createCapture();
+      leaderControlCaptures.add(controlCapture);
+
+      expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null);
+      listener.onLeading(capture(controlCapture));
+      InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A);
+      Map<String, InetSocketAddress> aux =
+          ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B));
+      expect(serverSet.join(primary, aux)).andReturn(endpointStatus);
+      endpointStatus.leave();
+      abdicate.execute();
+    }
+
+    control.replay();
+
+    for (int i = 0; i < 5; i++) {
+      final String leaderName = "foo" + i;
+      newLeader(leaderName, leaderCaptures.get(i));
+      leaderControlCaptures.get(i).getValue().advertise();
+      leaderControlCaptures.get(i).getValue().leave();
+    }
+  }
+
+  @Test
+  public void testLeaderLeaves() throws Exception {
+    control.replay();
+    shutdownNetwork();
+  }
+
+  private static IAnswer<?> countDownAnswer(final CountDownLatch latch) {
+    return new IAnswer<Void>() {
+      @Override public Void answer() {
+        latch.countDown();
+        return null;
+      }
+    };
+  }
+
+  @Test
+  public void testLeaderDisconnect() throws Exception {
+    Capture<LeaderControl> controlCapture = createCapture();
+
+    CountDownLatch leading = new CountDownLatch(1);
+    listener.onLeading(capture(controlCapture));
+    expectLastCall().andAnswer(countDownAnswer(leading));
+
+    CountDownLatch defeated = new CountDownLatch(1);
+    listener.onDefeated(null);
+    expectLastCall().andAnswer(countDownAnswer(defeated));
+
+    control.replay();
+
+    ZooKeeperClient zkClient = createZkClient();
+    serverSet = new ServerSetImpl(zkClient, "/fake/path");
+    candidate = new CandidateImpl(
+        new Group(zkClient, ZooKeeperUtils.OPEN_ACL_UNSAFE, "/fake/path"));
+    DefeatOnDisconnectLeader leader = new DefeatOnDisconnectLeader(zkClient, listener);
+    service = new SingletonService(serverSet, candidate);
+    service.lead(InetSocketAddress.createUnresolved("foo", PORT_A),
+        ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)),
+        leader);
+
+    leading.await();
+
+    shutdownNetwork();
+    defeated.await();
+  }
+
+  @Test
+  public void testNonLeaderDisconnect() throws Exception {
+    CountDownLatch elected = new CountDownLatch(1);
+    listener.onLeading(EasyMock.<LeaderControl>anyObject());
+    expectLastCall().andAnswer(countDownAnswer(elected));
+    listener.onDefeated(null);
+    expectLastCall().anyTimes();
+
+    control.replay();
+
+    ZooKeeperClient zkClient = createZkClient();
+    String path = "/fake/path";
+    // Create a fake leading candidate node to ensure that the leader in this test is never
+    // elected.
+    ZooKeeperUtils.ensurePath(zkClient, ZooKeeperUtils.OPEN_ACL_UNSAFE, path);
+    String leaderNode = zkClient.get().create(
+        path + "/" + SingletonService.LEADER_ELECT_NODE_PREFIX,
+        "fake_leader".getBytes(),
+        ZooKeeperUtils.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT_SEQUENTIAL);
+
+    serverSet = new ServerSetImpl(zkClient, path);
+    candidate =
+        SingletonService.createSingletonCandidate(zkClient, path, ZooKeeperUtils.OPEN_ACL_UNSAFE);
+    DefeatOnDisconnectLeader leader = new DefeatOnDisconnectLeader(zkClient, listener);
+    service = new SingletonService(serverSet, candidate);
+    service.lead(InetSocketAddress.createUnresolved("foo", PORT_A),
+        ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)),
+        leader);
+
+    final CountDownLatch disconnected = new CountDownLatch(1);
+    zkClient.register(new Watcher() {
+      @Override public void process(WatchedEvent event) {
+        if ((event.getType() == EventType.None)
+            && (event.getState() == KeeperState.Disconnected)) {
+          disconnected.countDown();
+        }
+      }
+    });
+
+    shutdownNetwork();
+    disconnected.await();
+
+    restartNetwork();
+    zkClient.get().delete(leaderNode, ZooKeeperUtils.ANY_VERSION);
+    // Upon deletion of the fake leader node, the candidate should become leader.
+    elected.await();
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/StaticServerSetTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/StaticServerSetTest.java b/commons/src/test/java/com/twitter/common/zookeeper/StaticServerSetTest.java
new file mode 100644
index 0000000..0f09652
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/StaticServerSetTest.java
@@ -0,0 +1,72 @@
+package com.twitter.common.zookeeper;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor;
+import com.twitter.common.testing.easymock.EasyMockTest;
+import com.twitter.common.zookeeper.ServerSet.EndpointStatus;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import com.twitter.thrift.Status;
+
+public class StaticServerSetTest extends EasyMockTest {
+
+  private static final ServiceInstance BACKEND_1 = new ServiceInstance(
+      new Endpoint("host_1", 12345),
+      ImmutableMap.of("http", new Endpoint("host_1", 80)),
+      Status.ALIVE);
+  private static final ServiceInstance BACKEND_2 = new ServiceInstance(
+      new Endpoint("host_2", 12346),
+      ImmutableMap.of("http", new Endpoint("host_1", 80)),
+      Status.ALIVE);
+
+  private HostChangeMonitor<ServiceInstance> monitor;
+
+  @Before
+  public void setUp() {
+    monitor = createMock(new Clazz<HostChangeMonitor<ServiceInstance>>() { });
+  }
+
+  @Test
+  public void testMonitor() throws Exception {
+    ImmutableSet<ServiceInstance> hosts = ImmutableSet.of(BACKEND_1, BACKEND_2);
+    monitor.onChange(hosts);
+
+    control.replay();
+
+    ServerSet serverSet = new StaticServerSet(hosts);
+    serverSet.monitor(monitor);
+  }
+
+  @Test
+  public void testMonitorEmpty() throws Exception {
+    ImmutableSet<ServiceInstance> hosts = ImmutableSet.of();
+    monitor.onChange(hosts);
+
+    control.replay();
+
+    ServerSet serverSet = new StaticServerSet(hosts);
+    serverSet.monitor(monitor);
+  }
+
+  @Test
+  public void testJoin() throws Exception {
+    // Ensure join/update calls don't break.
+    ImmutableSet<ServiceInstance> hosts = ImmutableSet.of();
+
+    control.replay();
+
+    ServerSet serverSet = new StaticServerSet(hosts);
+    EndpointStatus status = serverSet.join(
+        InetSocketAddress.createUnresolved("host", 1000),
+        ImmutableMap.<String, InetSocketAddress>of(),
+        Status.ALIVE);
+    status.update(Status.DEAD);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/ZooKeeperClientTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/com/twitter/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/com/twitter/common/zookeeper/ZooKeeperClientTest.java
new file mode 100644
index 0000000..f126b9e
--- /dev/null
+++ b/commons/src/test/java/com/twitter/common/zookeeper/ZooKeeperClientTest.java
@@ -0,0 +1,254 @@
+// =================================================================================================
+// Copyright 2011 Twitter, Inc.
+// -------------------------------------------------------------------------------------------------
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this work except in compliance with the License.
+// You may obtain a copy of the License in the LICENSE file, or at:
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// =================================================================================================
+
+package com.twitter.common.zookeeper;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.zookeeper.ZooKeeperClient.Credentials;
+import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException;
+import com.twitter.common.zookeeper.testing.BaseZooKeeperTest;
+
+import static com.google.common.testing.junit4.JUnitAsserts.assertNotEqual;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author John Sirois
+ */
+public class ZooKeeperClientTest extends BaseZooKeeperTest {
+
+  public ZooKeeperClientTest() {
+    super(Amount.of(1, Time.DAYS));
+  }
+
+  @Test
+  public void testGet() throws Exception {
+    final ZooKeeperClient zkClient = createZkClient();
+    shutdownNetwork();
+    try {
+      zkClient.get(Amount.of(50L, Time.MILLISECONDS));
+      fail("Expected client connection to timeout while network down");
+    } catch (TimeoutException e) {
+      assertTrue(zkClient.isClosed());
+    }
+    assertNull(zkClient.getZooKeeperClientForTests());
+
+    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+    new Thread(new Runnable() {
+      @Override public void run() {
+        try {
+          client.set(zkClient.get());
+        } catch (ZooKeeperConnectionException e) {
+          throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        } finally {
+          blockingGetComplete.countDown();
+        }
+      }
+    }).start();
+
+    restartNetwork();
+
+    // Hung blocking connects should succeed when server connection comes up
+    blockingGetComplete.await();
+    assertNotNull(client.get());
+
+    // New connections should succeed now that network is back up
+    long sessionId = zkClient.get().getSessionId();
+
+    // While connected the same client should be reused (no new connections while healthy)
+    assertSame(client.get(), zkClient.get());
+
+    shutdownNetwork();
+    // Our client doesn't know the network is down yet so we should be able to get()
+    ZooKeeper zooKeeper = zkClient.get();
+    try {
+      zooKeeper.exists("/", false);
+      fail("Expected client operation to fail while network down");
+    } catch (ConnectionLossException e) {
+      // expected
+    }
+
+    restartNetwork();
+    assertEquals("Expected connection to be re-established with existing session",
+        sessionId, zkClient.get().getSessionId());
+  }
+
+  /**
+   * Test that if a blocking get() call gets interrupted, after a connection has been created
+   * but before it's connected, the zk connection gets closed.
+   */
+  @Test
+  public void testGetInterrupted() throws Exception {
+    final ZooKeeperClient zkClient = createZkClient();
+    shutdownNetwork();
+
+    final CountDownLatch blockingGetComplete = new CountDownLatch(1);
+    final AtomicBoolean interrupted = new AtomicBoolean();
+    final AtomicReference<ZooKeeper> client = new AtomicReference<ZooKeeper>();
+    Thread getThread = new Thread(new Runnable() {
+      @Override public void run() {
+        try {
+          client.set(zkClient.get());
+        } catch (ZooKeeperConnectionException e) {
+          throw new RuntimeException(e);
+        } catch (InterruptedException e) {
+          interrupted.set(true);
+          throw new RuntimeException(e);
+        } finally {
+          blockingGetComplete.countDown();
+        }
+      }
+    });
+    getThread.start();
+
+    while (zkClient.getZooKeeperClientForTests() == null) {
+      Thread.sleep(100);
+    }
+
+    getThread.interrupt();
+    blockingGetComplete.await();
+
+    assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests());
+    assertTrue("The waiter thread should have been interrupted", interrupted.get());
+    assertTrue(zkClient.isClosed());
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    ZooKeeperClient zkClient = createZkClient();
+    zkClient.close();
+
+    // Close should be idempotent
+    zkClient.close();
+
+    long firstSessionId = zkClient.get().getSessionId();
+
+    // Close on an open client should force session re-establishment
+    zkClient.close();
+
+    assertNotEqual(firstSessionId, zkClient.get().getSessionId());
+  }
+
+  @Test
+  public void testCredentials() throws Exception {
+    String path = "/test";
+    ZooKeeperClient authenticatedClient = createZkClient("creator", "creator");
+    assertEquals(path,
+        authenticatedClient.get().create(path, "42".getBytes(),
+            ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT));
+
+    ZooKeeperClient unauthenticatedClient = createZkClient(Credentials.NONE);
+    assertEquals("42", getData(unauthenticatedClient, path));
+    try {
+      setData(unauthenticatedClient, path, "37");
+      fail("Expected unauthenticated write attempt to fail");
+    } catch (NoAuthException e) {
+      assertEquals("42", getData(unauthenticatedClient, path));
+    }
+
+    ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner");
+    assertEquals("42", getData(nonOwnerClient, path));
+    try {
+      setData(nonOwnerClient, path, "37");
+      fail("Expected non owner write attempt to fail");
+    } catch (NoAuthException e) {
+      assertEquals("42", getData(nonOwnerClient, path));
+    }
+
+    ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator");
+    setData(authenticatedClient2, path, "37");
+    assertEquals("37", getData(authenticatedClient2, path));
+  }
+
+  @Test
+  public void testHasCredentials() {
+    assertFalse(createZkClient().hasCredentials());
+    assertFalse(createZkClient(Credentials.NONE).hasCredentials());
+    assertFalse(createZkClient(new Credentials() {
+      @Override
+      public void authenticate(ZooKeeper zooKeeper) {
+        // noop
+      }
+      @Override public String scheme() {
+        return "";
+      }
+      @Override public byte[] authToken() {
+        return new byte[0];
+      }
+    }).hasCredentials());
+
+    assertTrue(createZkClient("creator", "creator").hasCredentials());
+    assertTrue(createZkClient(new Credentials() {
+      @Override public void authenticate(ZooKeeper zooKeeper) {
+        // noop
+      }
+      @Override public String scheme() {
+        return "custom";
+      }
+      @Override public byte[] authToken() {
+        // a zero-length token should be ok - ZooKeeper says nothing about the validity of token
+        // data a scheme can accept.
+        return new byte[0];
+      }
+    }).hasCredentials());
+  }
+
+  @Test
+  public void testChrootPath() throws Exception {
+    ZooKeeperClient rootClient = createZkClient();
+    String rootPath = "/test";
+    String subPath = "/test/subtest";
+    assertEquals(rootPath,
+            rootClient.get().create(rootPath, "42".getBytes(),
+                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+    assertEquals(subPath,
+            rootClient.get().create(subPath, "37".getBytes(),
+                ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
+
+    ZooKeeperClient chrootedClient = createZkClient(rootPath);
+    assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null));
+  }
+
+  private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception {
+    zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION);
+  }
+
+  private String getData(ZooKeeperClient zkClient, String path) throws Exception {
+    return new String(zkClient.get().getData(path, false, null));
+  }
+}


Mime
View raw message