Return-Path: X-Original-To: apmail-aurora-commits-archive@minotaur.apache.org Delivered-To: apmail-aurora-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2FAA018243 for ; Tue, 25 Aug 2015 18:19:16 +0000 (UTC) Received: (qmail 72389 invoked by uid 500); 25 Aug 2015 18:19:16 -0000 Delivered-To: apmail-aurora-commits-archive@aurora.apache.org Received: (qmail 72347 invoked by uid 500); 25 Aug 2015 18:19:16 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 72331 invoked by uid 99); 25 Aug 2015 18:19:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Aug 2015 18:19:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9A5C1E027B; Tue, 25 Aug 2015 18:19:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zmanji@apache.org To: commits@aurora.apache.org Date: Tue, 25 Aug 2015 18:19:16 -0000 Message-Id: In-Reply-To: <2d86d301903d4a1c81757199842a5e58@git.apache.org> References: <2d86d301903d4a1c81757199842a5e58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/37] aurora git commit: Import of Twitter Commons. http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/CandidateImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/com/twitter/common/zookeeper/CandidateImplTest.java new file mode 100644 index 0000000..d66c875 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/CandidateImplTest.java @@ -0,0 +1,231 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.Ordering; + +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.zookeeper.Candidate.Leader; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.testing.BaseZooKeeperTest; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class CandidateImplTest extends BaseZooKeeperTest { + private static final List ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; + private static final String SERVICE = "/twitter/services/puffin_linkhose/leader"; + private static final Amount TIMEOUT = Amount.of(1, Time.MINUTES); + + private LinkedBlockingDeque candidateBuffer; + + @Before + public void mySetUp() throws IOException { + candidateBuffer = new LinkedBlockingDeque(); + } + + private Group createGroup(ZooKeeperClient zkClient) throws IOException { + return new Group(zkClient, ACL, SERVICE); + } + + private class Reign implements Leader { + private ExceptionalCommand abdicate; + private final CandidateImpl candidate; + private final String id; + private CountDownLatch defeated = new CountDownLatch(1); + + Reign(String id, CandidateImpl candidate) { + this.id = id; + this.candidate = candidate; + } + + @Override + public void onElected(ExceptionalCommand abdicate) { + candidateBuffer.offerFirst(candidate); + this.abdicate = abdicate; + } + + @Override + public void onDefeated() { + defeated.countDown(); + } + + public void abdicate() throws JoinException { + Preconditions.checkState(abdicate != null); + abdicate.execute(); + } + + public void expectDefeated() throws InterruptedException { + defeated.await(); + } + + @Override + public String toString() { + return id; + } + } + + @Test + public void testOfferLeadership() throws Exception { + ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); + final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) { + @Override public String toString() { + return "Leader1"; + } + }; + ZooKeeperClient zkClient2 = createZkClient(TIMEOUT); + final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) { + @Override public String toString() { + return "Leader2"; + } + }; + ZooKeeperClient zkClient3 = createZkClient(TIMEOUT); + final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) { + @Override public String toString() { + return "Leader3"; + } + }; + + Reign candidate1Reign = new Reign("1", candidate1); + Reign candidate2Reign = new Reign("2", candidate2); + Reign candidate3Reign = new Reign("3", candidate3); + + Supplier candidate1Leader = candidate1.offerLeadership(candidate1Reign); + Supplier candidate2Leader = candidate2.offerLeadership(candidate2Reign); + Supplier candidate3Leader = candidate3.offerLeadership(candidate3Reign); + + assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader", + candidate1Leader.get()); + + shutdownNetwork(); + restartNetwork(); + + assertTrue("A re-connect without a session expiration should leave the leader elected", + candidate1Leader.get()); + + candidate1Reign.abdicate(); + assertSame(candidate1, candidateBuffer.takeLast()); + assertFalse(candidate1Leader.get()); + // Active abdication should trigger defeat. + candidate1Reign.expectDefeated(); + + CandidateImpl secondCandidate = candidateBuffer.takeLast(); + assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " " + + candidateBuffer, + candidate2Leader.get() ^ candidate3Leader.get()); + + if (secondCandidate == candidate2) { + expireSession(zkClient2); + assertSame(candidate3, candidateBuffer.takeLast()); + assertTrue(candidate3Leader.get()); + // Passive expiration should trigger defeat. + candidate2Reign.expectDefeated(); + } else { + expireSession(zkClient3); + assertSame(candidate2, candidateBuffer.takeLast()); + assertTrue(candidate2Leader.get()); + // Passive expiration should trigger defeat. + candidate3Reign.expectDefeated(); + } + } + + @Test + public void testCustomJudge() throws Exception { + Function, String> judge = new Function, String>() { + @Override public String apply(Iterable input) { + return Ordering.natural().max(input); + } + }; + + ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); + Group group1 = createGroup(zkClient1); + final CandidateImpl candidate1 = + new CandidateImpl(group1, judge, Suppliers.ofInstance("Leader1".getBytes())) { + @Override public String toString() { + return "Leader1"; + } + }; + ZooKeeperClient zkClient2 = createZkClient(TIMEOUT); + Group group2 = createGroup(zkClient2); + final CandidateImpl candidate2 = + new CandidateImpl(group2, judge, Suppliers.ofInstance("Leader2".getBytes())) { + @Override public String toString() { + return "Leader2"; + } + }; + + Reign candidate1Reign = new Reign("1", candidate1); + Reign candidate2Reign = new Reign("2", candidate2); + + candidate1.offerLeadership(candidate1Reign); + assertSame(candidate1, candidateBuffer.takeLast()); + + Supplier candidate2Leader = candidate2.offerLeadership(candidate2Reign); + assertSame(candidate2, candidateBuffer.takeLast()); + candidate1Reign.expectDefeated(); + assertTrue("Since the judge picks the newest member joining a group as leader candidate 1 " + + "should be defeated and candidate 2 leader", candidate2Leader.get()); + } + + @Test + public void testCustomDataSupplier() throws Exception { + byte[] DATA = "Leader1".getBytes(); + ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); + Group group1 = createGroup(zkClient1); + CandidateImpl candidate1 = new CandidateImpl(group1, Suppliers.ofInstance(DATA)) { + @Override public String toString() { + return "Leader1"; + } + }; + Reign candidate1Reign = new Reign("1", candidate1); + + Supplier candidate1Leader = candidate1.offerLeadership(candidate1Reign); + assertSame(candidate1, candidateBuffer.takeLast()); + assertTrue(candidate1Leader.get()); + assertArrayEquals(DATA, candidate1.getLeaderData().get()); + } + + @Test + public void testEmptyMembership() throws Exception { + ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); + final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)); + Reign candidate1Reign = new Reign("1", candidate1); + + candidate1.offerLeadership(candidate1Reign); + assertSame(candidate1, candidateBuffer.takeLast()); + candidate1Reign.abdicate(); + assertFalse(candidate1.getLeaderData().isPresent()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/CompoundServerSetTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/CompoundServerSetTest.java b/commons/src/test/java/com/twitter/common/zookeeper/CompoundServerSetTest.java new file mode 100644 index 0000000..19a4322 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/CompoundServerSetTest.java @@ -0,0 +1,220 @@ +package com.twitter.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.base.Command; +import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor; +import com.twitter.common.net.pool.DynamicHostSet.MonitorException; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.thrift.ServiceInstance; + +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.getCurrentArguments; + +public class CompoundServerSetTest extends EasyMockTest { + private static final Map AUX_PORTS = ImmutableMap.of(); + private static final InetSocketAddress END_POINT = + InetSocketAddress.createUnresolved("foo", 12345); + + private ServerSet.EndpointStatus mockStatus1; + private ServerSet.EndpointStatus mockStatus2; + private ServerSet.EndpointStatus mockStatus3; + private HostChangeMonitor compoundMonitor; + + private ServerSet serverSet1; + private ServerSet serverSet2; + private ServerSet serverSet3; + private Command stop1; + private Command stop2; + private Command stop3; + private CompoundServerSet compoundServerSet; + + private ServiceInstance instance1; + private ServiceInstance instance2; + private ServiceInstance instance3; + + private void triggerChange(ServiceInstance... hostChanges) { + compoundMonitor.onChange(ImmutableSet.copyOf(hostChanges)); + } + + private void triggerChange( + Capture> capture, + ServiceInstance... hostChanges) { + + capture.getValue().onChange(ImmutableSet.copyOf(hostChanges)); + } + + @Before + public void setUpMocks() throws Exception { + control = createControl(); + compoundMonitor = createMock(new Clazz>() { }); + + mockStatus1 = createMock(ServerSet.EndpointStatus.class); + mockStatus2 = createMock(ServerSet.EndpointStatus.class); + mockStatus3 = createMock(ServerSet.EndpointStatus.class); + + serverSet1 = createMock(ServerSet.class); + serverSet2 = createMock(ServerSet.class); + serverSet3 = createMock(ServerSet.class); + + stop1 = createMock(Command.class); + stop2 = createMock(Command.class); + stop3 = createMock(Command.class); + + instance1 = createMock(ServiceInstance.class); + instance2 = createMock(ServiceInstance.class); + instance3 = createMock(ServiceInstance.class); + + compoundServerSet = new CompoundServerSet(ImmutableList.of(serverSet1, serverSet2, serverSet3)); + } + + @Test + public void testJoin() throws Exception { + expect(serverSet1.join(END_POINT, AUX_PORTS, 0)).andReturn(mockStatus1); + expect(serverSet2.join(END_POINT, AUX_PORTS, 0)).andReturn(mockStatus2); + expect(serverSet3.join(END_POINT, AUX_PORTS, 0)).andReturn(mockStatus3); + + mockStatus1.leave(); + mockStatus2.leave(); + mockStatus3.leave(); + + control.replay(); + + compoundServerSet.join(END_POINT, AUX_PORTS, 0).leave(); + } + + @Test(expected = Group.JoinException.class) + public void testJoinFailure() throws Exception { + // Throw exception for the first serverSet join. + expect(serverSet1.join(END_POINT, AUX_PORTS)) + .andThrow(new Group.JoinException("Group join exception", null)); + + control.replay(); + compoundServerSet.join(END_POINT, AUX_PORTS); + } + + @Test(expected = ServerSet.UpdateException.class) + public void testStatusUpdateFailure() throws Exception { + expect(serverSet1.join(END_POINT, AUX_PORTS)).andReturn(mockStatus1); + expect(serverSet2.join(END_POINT, AUX_PORTS)).andReturn(mockStatus2); + expect(serverSet3.join(END_POINT, AUX_PORTS)).andReturn(mockStatus3); + + mockStatus1.leave(); + mockStatus2.leave(); + expectLastCall().andThrow(new ServerSet.UpdateException("Update exception")); + mockStatus3.leave(); + + control.replay(); + + compoundServerSet.join(END_POINT, AUX_PORTS).leave(); + } + + @Test + public void testMonitor() throws Exception { + Capture> set1Capture = createCapture(); + Capture> set2Capture = createCapture(); + Capture> set3Capture = createCapture(); + + expect(serverSet1.watch( + EasyMock.>capture(set1Capture))) + .andReturn(stop1); + expect(serverSet2.watch( + EasyMock.>capture(set2Capture))) + .andReturn(stop2); + expect(serverSet3.watch( + EasyMock.>capture(set3Capture))) + .andReturn(stop3); + + triggerChange(instance1); + triggerChange(instance1, instance2); + triggerChange(instance1, instance2, instance3); + triggerChange(instance1, instance3); + triggerChange(instance1, instance2, instance3); + triggerChange(instance3); + triggerChange(); + + control.replay(); + compoundServerSet.watch(compoundMonitor); + + // No new instances. + triggerChange(set1Capture); + triggerChange(set2Capture); + triggerChange(set3Capture); + // Add one instance from each serverset + triggerChange(set1Capture, instance1); + triggerChange(set2Capture, instance2); + triggerChange(set3Capture, instance3); + // Remove instance2 + triggerChange(set2Capture); + // instance1 in both serverset1 and serverset2 + triggerChange(set2Capture, instance1, instance2); + // Remove instances from serversets. + triggerChange(set1Capture); + triggerChange(set2Capture); + triggerChange(set3Capture); + } + + @Test(expected = MonitorException.class) + public void testMonitorFailure() throws Exception { + serverSet1.watch(EasyMock.>anyObject()); + expectLastCall().andThrow(new MonitorException("Monitor exception", null)); + + control.replay(); + compoundServerSet.watch(compoundMonitor); + } + + @Test + public void testInitialChange() throws Exception { + // Ensures that a synchronous change notification during the call to monitor() is properly + // reported. + serverSet1.watch(EasyMock.>anyObject()); + expectLastCall().andAnswer(new IAnswer() { + @Override public Command answer() { + @SuppressWarnings("unchecked") + HostChangeMonitor monitor = + (HostChangeMonitor) getCurrentArguments()[0]; + monitor.onChange(ImmutableSet.of(instance1, instance2)); + return stop1; + } + }); + compoundMonitor.onChange(ImmutableSet.of(instance1, instance2)); + expect(serverSet2.watch(EasyMock.>anyObject())) + .andReturn(stop2); + expect(serverSet3.watch(EasyMock.>anyObject())) + .andReturn(stop3); + + control.replay(); + + compoundServerSet.watch(compoundMonitor); + } + + @Test + public void testStopMonitoring() throws Exception { + expect(serverSet1.watch(EasyMock.>anyObject())) + .andReturn(stop1); + expect(serverSet2.watch(EasyMock.>anyObject())) + .andReturn(stop2); + expect(serverSet3.watch(EasyMock.>anyObject())) + .andReturn(stop3); + + stop1.execute(); + stop2.execute(); + stop3.execute(); + + control.replay(); + compoundServerSet.watch(compoundMonitor).execute(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/DistributedLockTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/DistributedLockTest.java b/commons/src/test/java/com/twitter/common/zookeeper/DistributedLockTest.java new file mode 100644 index 0000000..4643062 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/DistributedLockTest.java @@ -0,0 +1,177 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import com.twitter.common.zookeeper.testing.BaseZooKeeperTest; + +/** + * @author Florian Leibert + */ +public class DistributedLockTest extends BaseZooKeeperTest { + private static final String LOCK_PATH = "/test/lock"; + + private ZooKeeperClient zkClient; + + @Before + public void mySetUp() throws Exception { + zkClient = createZkClient(); + + } + + @Test + public void testFailDoubleLock() { + DistributedLock lock = new DistributedLockImpl(zkClient, LOCK_PATH); + lock.lock(); + try { + lock.lock(); + fail("Exception expected!"); + } catch (DistributedLockImpl.LockingException e) { + // expected + } finally { + lock.unlock(); + } + } + + @Test + public void testFailUnlock() { + DistributedLock lock = new DistributedLockImpl(zkClient, LOCK_PATH); + try { + lock.unlock(); + fail("Expected exception while trying to unlock!"); + } catch (DistributedLockImpl.LockingException e) { + // success + } + } + + @Test + public void testTwoLocks() { + DistributedLock lock1 = new DistributedLockImpl(zkClient, LOCK_PATH); + DistributedLock lock2 = new DistributedLockImpl(zkClient, LOCK_PATH); + lock1.lock(); + List children = expectZkNodes(LOCK_PATH); + assertEquals("One child == lock held!", children.size(), 1); + lock1.unlock(); + // check no locks held/empty children + children = expectZkNodes(LOCK_PATH); + assertEquals("No children, no lock held!", children.size(), 0); + lock2.lock(); + children = expectZkNodes(LOCK_PATH); + assertEquals("One child == lock held!", children.size(), 1); + lock2.unlock(); + } + + @Test + public void testTwoLocksFailFast() { + DistributedLock lock1 = new DistributedLockImpl(zkClient, LOCK_PATH); + DistributedLock lock2 = new DistributedLockImpl(zkClient, LOCK_PATH); + lock1.lock(); + boolean acquired = lock2.tryLock(10, TimeUnit.MILLISECONDS); + assertFalse("Couldn't acquire lock because it's currently held", acquired); + lock1.unlock(); + lock2.unlock(); + } + + @Test + @Ignore("pending: ") + public void testMultiConcurrentLocking() throws Exception { + //TODO(Florian Leibert): this is a bit janky, so let's replace it. + for (int i = 0; i < 50; i++) { + testConcurrentLocking(); + } + mySetUp(); + } + + @Test + public void testConcurrentLocking() throws Exception { + ZooKeeperClient zk1 = createZkClient(); + ZooKeeperClient zk2 = createZkClient(); + ZooKeeperClient zk3 = createZkClient(); + + final DistributedLock lock1 = new DistributedLockImpl(zk1, LOCK_PATH); + final DistributedLock lock2 = new DistributedLockImpl(zk2, LOCK_PATH); + final DistributedLock lock3 = new DistributedLockImpl(zk3, LOCK_PATH); + Callable t1 = new Callable() { + @Override + public Object call() throws InterruptedException { + lock1.lock(); + try { + Thread.sleep(50); + } finally { + lock1.unlock(); + } + return new Object(); + } + }; + + Callable t2 = new Callable() { + @Override + public Object call() throws InterruptedException { + lock2.lock(); + try { + Thread.sleep(50); + } finally { + lock2.unlock(); + } + return new Object(); + } + }; + + Callable t3 = new Callable() { + @Override + public Object call() throws InterruptedException { + lock3.lock(); + try { + Thread.sleep(50); + } finally { + lock3.unlock(); + } + return new Object(); + } + }; + + //TODO(Florian Leibert): remove this executors stuff and use a latch instead. + ExecutorService ex = Executors.newCachedThreadPool(); + @SuppressWarnings("unchecked") List> tlist = Arrays.asList(t1, t2, t3); + ex.invokeAll(tlist); + assertTrue("No Children left!", expectZkNodes(LOCK_PATH).size() == 0); + } + + protected List expectZkNodes(String path) { + try { + List children = zkClient.get().getChildren(path, null); + return children; + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/GroupTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/GroupTest.java b/commons/src/test/java/com/twitter/common/zookeeper/GroupTest.java new file mode 100644 index 0000000..a5342d9 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/GroupTest.java @@ -0,0 +1,334 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.apache.zookeeper.ZooDefs.Ids; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.base.Command; +import com.twitter.common.base.Supplier; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.zookeeper.Group.GroupChangeListener; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.Group.Membership; +import com.twitter.common.zookeeper.Group.NodeScheme; +import com.twitter.common.zookeeper.ZooKeeperClient.Credentials; +import com.twitter.common.zookeeper.testing.BaseZooKeeperTest; + +import static com.google.common.testing.junit4.JUnitAsserts.assertNotEqual; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class GroupTest extends BaseZooKeeperTest { + + private ZooKeeperClient zkClient; + private Group joinGroup; + private Group watchGroup; + private Command stopWatching; + private com.twitter.common.base.Command onLoseMembership; + + private RecordingListener listener; + + public GroupTest() { + super(Amount.of(1, Time.DAYS)); + } + + @Before + public void mySetUp() throws Exception { + onLoseMembership = createMock(Command.class); + + zkClient = createZkClient("group", "test"); + joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group"); + watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group"); + + listener = new RecordingListener(); + stopWatching = watchGroup.watch(listener); + } + + private static class RecordingListener implements GroupChangeListener { + private final LinkedBlockingQueue> membershipChanges = + new LinkedBlockingQueue>(); + + @Override + public void onGroupChange(Iterable memberIds) { + membershipChanges.add(memberIds); + } + + public Iterable take() throws InterruptedException { + return membershipChanges.take(); + } + + public void assertEmpty() { + assertEquals(ImmutableList.>of(), ImmutableList.copyOf(membershipChanges)); + } + + @Override + public String toString() { + return membershipChanges.toString(); + } + } + + private static class CustomScheme implements NodeScheme { + static final String NODE_NAME = "custom_name"; + + @Override + public boolean isMember(String nodeName) { + return NODE_NAME.equals(nodeName); + } + + @Override + public String createName(byte[] membershipData) { + return NODE_NAME; + } + + @Override + public boolean isSequential() { + return false; + } + } + + @Test + public void testSessionExpirationTriggersOnLoseMembership() throws Exception { + final CountDownLatch lostMembership = new CountDownLatch(1); + Command onLoseMembership = new Command() { + @Override public void execute() throws RuntimeException { + lostMembership.countDown(); + } + }; + assertEmptyMembershipObserved(); + + Membership membership = joinGroup.join(onLoseMembership); + assertMembershipObserved(membership.getMemberId()); + expireSession(zkClient); + + lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated. + } + + @Test + public void testNodeDeleteTriggersOnLoseMembership() throws Exception { + final CountDownLatch lostMembership = new CountDownLatch(1); + Command onLoseMembership = new Command() { + @Override public void execute() throws RuntimeException { + lostMembership.countDown(); + } + }; + assertEmptyMembershipObserved(); + + Membership membership = joinGroup.join(onLoseMembership); + assertMembershipObserved(membership.getMemberId()); + membership.cancel(); + + lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated. + } + + @Test + public void testJoinsAndWatchesSurviveDisconnect() throws Exception { + replay(onLoseMembership); + + assertEmptyMembershipObserved(); + + Membership membership = joinGroup.join(); + String originalMemberId = membership.getMemberId(); + assertMembershipObserved(originalMemberId); + + shutdownNetwork(); + restartNetwork(); + + // The member should still be present under existing ephemeral node since session did not + // expire. + watchGroup.watch(listener); + assertMembershipObserved(originalMemberId); + + membership.cancel(); + + assertEmptyMembershipObserved(); + assertEmptyMembershipObserved(); // and again for 2nd listener + + listener.assertEmpty(); + + verify(onLoseMembership); + reset(onLoseMembership); // Turn off expectations during ZK server shutdown. + } + + @Test + public void testJoinsAndWatchesSurviveExpiredSession() throws Exception { + onLoseMembership.execute(); + replay(onLoseMembership); + + assertEmptyMembershipObserved(); + + Membership membership = joinGroup.join(onLoseMembership); + String originalMemberId = membership.getMemberId(); + assertMembershipObserved(originalMemberId); + + expireSession(zkClient); + + // We should have lost our group membership and then re-gained it with a new ephemeral node. + // We may or may-not see the intermediate state change but we must see the final state + Iterable members = listener.take(); + if (Iterables.isEmpty(members)) { + members = listener.take(); + } + assertEquals(1, Iterables.size(members)); + assertNotEqual(originalMemberId, Iterables.getOnlyElement(members)); + assertNotEqual(originalMemberId, membership.getMemberId()); + + listener.assertEmpty(); + + verify(onLoseMembership); + reset(onLoseMembership); // Turn off expectations during ZK server shutdown. + } + + @Test + public void testJoinCustomNamingScheme() throws Exception { + Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group", + new CustomScheme()); + + listener = new RecordingListener(); + group.watch(listener); + assertEmptyMembershipObserved(); + + Membership membership = group.join(); + String memberId = membership.getMemberId(); + + assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId); + assertMembershipObserved(memberId); + + expireSession(zkClient); + } + + @Test + public void testUpdateMembershipData() throws Exception { + Supplier dataSupplier = new EasyMockTest.Clazz>() {}.createMock(); + + byte[] initial = "start".getBytes(); + expect(dataSupplier.get()).andReturn(initial); + + byte[] second = "update".getBytes(); + expect(dataSupplier.get()).andReturn(second); + + replay(dataSupplier); + + Membership membership = joinGroup.join(dataSupplier, onLoseMembership); + assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get() + .getData(membership.getMemberPath(), false, null)); + + assertArrayEquals("Updating supplier should not change membership data", + initial, zkClient.get().getData(membership.getMemberPath(), false, null)); + + membership.updateMemberData(); + assertArrayEquals("Updating membership should change data", + second, zkClient.get().getData(membership.getMemberPath(), false, null)); + + verify(dataSupplier); + } + + @Test + public void testAcls() throws Exception { + Group securedMembership = + new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, + "/secured/group/membership"); + + String memberId = securedMembership.join().getMemberId(); + + Group unauthenticatedObserver = + new Group(createZkClient(Credentials.NONE), + Ids.READ_ACL_UNSAFE, + "/secured/group/membership"); + RecordingListener unauthenticatedListener = new RecordingListener(); + unauthenticatedObserver.watch(unauthenticatedListener); + + assertMembershipObserved(unauthenticatedListener, memberId); + + try { + unauthenticatedObserver.join(); + fail("Expected join exception for unauthenticated observer"); + } catch (JoinException e) { + // expected + } + + Group unauthorizedObserver = + new Group(createZkClient("joe", "schmoe"), + Ids.READ_ACL_UNSAFE, + "/secured/group/membership"); + RecordingListener unauthorizedListener = new RecordingListener(); + unauthorizedObserver.watch(unauthorizedListener); + + assertMembershipObserved(unauthorizedListener, memberId); + + try { + unauthorizedObserver.join(); + fail("Expected join exception for unauthorized observer"); + } catch (JoinException e) { + // expected + } + } + + @Test + public void testStopWatching() throws Exception { + replay(onLoseMembership); + + assertEmptyMembershipObserved(); + + Membership member1 = joinGroup.join(); + String memberId1 = member1.getMemberId(); + assertMembershipObserved(memberId1); + + Membership member2 = joinGroup.join(); + String memberId2 = member2.getMemberId(); + assertMembershipObserved(memberId1, memberId2); + + stopWatching.execute(); + + member1.cancel(); + Membership member3 = joinGroup.join(); + member2.cancel(); + member3.cancel(); + + listener.assertEmpty(); + } + + private void assertEmptyMembershipObserved() throws InterruptedException { + assertMembershipObserved(); + } + + private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException { + assertMembershipObserved(listener, expectedMemberIds); + } + + private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds) + throws InterruptedException { + + assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take())); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/PartitionerTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/PartitionerTest.java b/commons/src/test/java/com/twitter/common/zookeeper/PartitionerTest.java new file mode 100644 index 0000000..2a0b774 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/PartitionerTest.java @@ -0,0 +1,164 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import com.google.common.testing.TearDown; +import com.twitter.common.zookeeper.Group.GroupChangeListener; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.Group.Membership; +import com.twitter.common.zookeeper.Partitioner.Partition; +import com.twitter.common.zookeeper.testing.BaseZooKeeperTest; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author John Sirois + */ +public class PartitionerTest extends BaseZooKeeperTest { + private static final List ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; + private static final String PARTITION_NAMESPACE = "/twitter/puffin/hosebird"; + + @Test + public void testHeterogeneousPartitionGroup() throws Exception { + ZooKeeperClient zkClient = createZkClient(); + ZooKeeperUtils.ensurePath(zkClient, ACL, PARTITION_NAMESPACE + "/not-a-partition-node"); + Partitioner partitioner = new Partitioner(zkClient, ACL, PARTITION_NAMESPACE); + join(partitioner); + + assertEquals("Expected Partitioner to be tolerant of foreign nodes", + 1, partitioner.getGroupSize()); + } + + private static class InstrumentedPartitioner extends Partitioner { + private final AtomicInteger myViewOfGroupSize = new AtomicInteger(); + + public InstrumentedPartitioner(ZooKeeperClient zkClient) throws IOException { + super(zkClient, ACL, PARTITION_NAMESPACE); + } + + @Override GroupChangeListener createGroupChangeListener(Membership membership) { + final GroupChangeListener listener = super.createGroupChangeListener(membership); + return new GroupChangeListener() { + @Override public void onGroupChange(Iterable memberIds) { + listener.onGroupChange(memberIds); + synchronized (myViewOfGroupSize) { + myViewOfGroupSize.set(getGroupSize()); + myViewOfGroupSize.notify(); + } + } + }; + } + + public void observeGroupSize(int expectedSize) throws InterruptedException { + while (expectedSize != myViewOfGroupSize.get()) { + synchronized (myViewOfGroupSize) { + myViewOfGroupSize.wait(); + } + } + } + } + + @Test + public void testJoin() throws Exception { + // Test that the 1st member of the partition group owns the whole space. + InstrumentedPartitioner firstPartitioner = new InstrumentedPartitioner(createZkClient()); + Partition firstPartition = join(firstPartitioner); + + assertTrue(firstPartition.isMember(0L)); + assertTrue(firstPartition.isMember(1L)); + assertTrue(firstPartition.isMember(2L)); + + // Test that when additional members join partitions are added and existing partitions shrink. + InstrumentedPartitioner secondPartitioner = new InstrumentedPartitioner(createZkClient()); + Partition secondPartition = join(secondPartitioner); + + firstPartitioner.observeGroupSize(2); + + assertTrue(firstPartition.isMember(0L)); + assertFalse(secondPartition.isMember(0L)); + + assertFalse(firstPartition.isMember(1L)); + assertTrue(secondPartition.isMember(1L)); + + assertTrue(firstPartition.isMember(2L)); + assertFalse(secondPartition.isMember(2L)); + + InstrumentedPartitioner thirdPartitioner = new InstrumentedPartitioner(createZkClient()); + Partition thirdPartition = join(thirdPartitioner); + + firstPartitioner.observeGroupSize(3); + secondPartitioner.observeGroupSize(3); + + assertTrue(firstPartition.isMember(0L)); + assertFalse(secondPartition.isMember(0L)); + assertFalse(thirdPartition.isMember(0L)); + + assertFalse(firstPartition.isMember(1L)); + assertTrue(secondPartition.isMember(1L)); + assertFalse(thirdPartition.isMember(1L)); + + assertFalse(firstPartition.isMember(2L)); + assertFalse(secondPartition.isMember(2L)); + assertTrue(thirdPartition.isMember(2L)); + + assertTrue(firstPartition.isMember(3L)); + assertFalse(secondPartition.isMember(3L)); + assertFalse(thirdPartition.isMember(3L)); + + // Test that members leaving the partition group results in the partitions being merged. + firstPartition.cancel(); + + secondPartitioner.observeGroupSize(2); + thirdPartitioner.observeGroupSize(2); + + assertTrue(secondPartition.isMember(0L)); + assertFalse(thirdPartition.isMember(0L)); + + assertFalse(secondPartition.isMember(1L)); + assertTrue(thirdPartition.isMember(1L)); + + assertTrue(secondPartition.isMember(2L)); + assertFalse(thirdPartition.isMember(2L)); + + thirdPartition.cancel(); + + secondPartitioner.observeGroupSize(1); + + assertTrue(secondPartition.isMember(0L)); + assertTrue(secondPartition.isMember(1L)); + assertTrue(secondPartition.isMember(2L)); + } + + private Partition join(Partitioner partitioner) throws JoinException, InterruptedException { + final Partition partition = partitioner.join(); + addTearDown(new TearDown() { + @Override public void tearDown() throws JoinException { + partition.cancel(); + } + }); + return partition; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/ServerSetImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/com/twitter/common/zookeeper/ServerSetImplTest.java new file mode 100644 index 0000000..64891a0 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/ServerSetImplTest.java @@ -0,0 +1,441 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.Override; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.testing.TearDown; +import com.google.gson.GsonBuilder; + +import org.apache.thrift.protocol.TProtocol; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import com.twitter.common.base.Command; +import com.twitter.common.io.Codec; +import com.twitter.common.io.JsonCodec; +import com.twitter.common.net.pool.DynamicHostSet; +import com.twitter.common.thrift.TResourceExhaustedException; +import com.twitter.common.thrift.Thrift; +import com.twitter.common.thrift.ThriftFactory; +import com.twitter.common.thrift.ThriftFactory.ThriftFactoryException; +import com.twitter.common.zookeeper.Group; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.Group.WatchException; +import com.twitter.common.zookeeper.ServerSet.EndpointStatus; +import com.twitter.common.zookeeper.testing.BaseZooKeeperTest; +import com.twitter.common.zookeeper.ZooKeeperClient; +import com.twitter.thrift.Endpoint; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * + * TODO(William Farner): Change this to remove thrift dependency. + */ +public class ServerSetImplTest extends BaseZooKeeperTest { + private static final Logger LOG = Logger.getLogger(ServerSetImpl.class.getName()); + private static final List ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; + private static final String SERVICE = "/twitter/services/puffin_hosebird"; + + private LinkedBlockingQueue> serverSetBuffer; + private DynamicHostSet.HostChangeMonitor serverSetMonitor; + + @Before + public void mySetUp() throws IOException { + serverSetBuffer = new LinkedBlockingQueue>(); + serverSetMonitor = new DynamicHostSet.HostChangeMonitor() { + @Override public void onChange(ImmutableSet serverSet) { + serverSetBuffer.offer(serverSet); + } + }; + } + + private ServerSetImpl createServerSet() throws IOException { + return new ServerSetImpl(createZkClient(), ACL, SERVICE); + } + + @Test + public void testLifecycle() throws Exception { + ServerSetImpl client = createServerSet(); + client.watch(serverSetMonitor); + assertChangeFiredEmpty(); + + ServerSetImpl server = createServerSet(); + EndpointStatus status = server.join( + InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080), 0); + + ServiceInstance serviceInstance = new ServiceInstance( + new Endpoint("foo", 1234), + ImmutableMap.of("http-admin", new Endpoint("foo", 8080)), + Status.ALIVE) + .setShard(0); + + assertChangeFired(serviceInstance); + + status.leave(); + assertChangeFiredEmpty(); + assertTrue(serverSetBuffer.isEmpty()); + } + + @Test + public void testMembershipChanges() throws Exception { + ServerSetImpl client = createServerSet(); + client.watch(serverSetMonitor); + assertChangeFiredEmpty(); + + ServerSetImpl server = createServerSet(); + + EndpointStatus foo = join(server, "foo"); + assertChangeFired("foo"); + + expireSession(client.getZkClient()); + + EndpointStatus bar = join(server, "bar"); + + // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a + // change, just "foo", "bar" since this was an addition. + assertChangeFired("foo", "bar"); + + foo.leave(); + assertChangeFired("bar"); + + EndpointStatus baz = join(server, "baz"); + assertChangeFired("bar", "baz"); + + baz.leave(); + assertChangeFired("bar"); + + bar.leave(); + assertChangeFiredEmpty(); + + assertTrue(serverSetBuffer.isEmpty()); + } + + @Test + public void testStopMonitoring() throws Exception { + ServerSetImpl client = createServerSet(); + Command stopMonitoring = client.watch(serverSetMonitor); + assertChangeFiredEmpty(); + + ServerSetImpl server = createServerSet(); + + EndpointStatus foo = join(server, "foo"); + assertChangeFired("foo"); + EndpointStatus bar = join(server, "bar"); + assertChangeFired("foo", "bar"); + + stopMonitoring.execute(); + + // No new updates should be received since monitoring has stopped. + foo.leave(); + assertTrue(serverSetBuffer.isEmpty()); + + // Expiration event. + assertTrue(serverSetBuffer.isEmpty()); + } + + @Test + public void testOrdering() throws Exception { + ServerSetImpl client = createServerSet(); + client.watch(serverSetMonitor); + assertChangeFiredEmpty(); + + Map server1Ports = makePortMap("http-admin1", 8080); + Map server2Ports = makePortMap("http-admin2", 8081); + Map server3Ports = makePortMap("http-admin3", 8082); + + ServerSetImpl server1 = createServerSet(); + ServerSetImpl server2 = createServerSet(); + ServerSetImpl server3 = createServerSet(); + + ServiceInstance instance1 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), + Status.ALIVE) + .setShard(0); + ServiceInstance instance2 = new ServiceInstance( + new Endpoint("foo", 1001), + ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)), + Status.ALIVE) + .setShard(1); + ServiceInstance instance3 = new ServiceInstance( + new Endpoint("foo", 1002), + ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)), + Status.ALIVE) + .setShard(2); + + server1.join( + InetSocketAddress.createUnresolved("foo", 1000), + server1Ports, + 0); + assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take())); + + EndpointStatus status2 = server2.join( + InetSocketAddress.createUnresolved("foo", 1001), + server2Ports, + 1); + assertEquals(ImmutableList.of(instance1, instance2), + ImmutableList.copyOf(serverSetBuffer.take())); + + server3.join( + InetSocketAddress.createUnresolved("foo", 1002), server3Ports, 2); + assertEquals(ImmutableList.of(instance1, instance2, instance3), + ImmutableList.copyOf(serverSetBuffer.take())); + + status2.leave(); + assertEquals(ImmutableList.of(instance1, instance3), + ImmutableList.copyOf(serverSetBuffer.take())); + } + + @Test + public void testJsonCodecRoundtrip() throws Exception { + Codec codec = ServerSetImpl.createJsonCodec(); + ServiceInstance instance1 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http", new Endpoint("foo", 8080)), + Status.ALIVE) + .setShard(0); + byte[] data = ServerSets.serializeServiceInstance(instance1, codec); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + + ServiceInstance instance2 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), + Status.ALIVE); + data = ServerSets.serializeServiceInstance(instance2, codec); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + + ServiceInstance instance3 = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of(), + Status.ALIVE); + data = ServerSets.serializeServiceInstance(instance3, codec); + assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + } + + @Ignore("TODO(zmanji): Fix to work with thrift 0.9.0") + @Test + public void testJsonCodecCompatibility() throws IOException { + ServiceInstance instance = new ServiceInstance( + new Endpoint("foo", 1000), + ImmutableMap.of("http", new Endpoint("foo", 8080)), + Status.ALIVE).setShard(42); + + ByteArrayOutputStream legacy = new ByteArrayOutputStream(); + JsonCodec.create( + ServiceInstance.class, + new GsonBuilder().setExclusionStrategies(JsonCodec.getThriftExclusionStrategy()) + .create()).serialize(instance, legacy); + + ByteArrayOutputStream results = new ByteArrayOutputStream(); + ServerSetImpl.createJsonCodec().serialize(instance, results); + + assertEquals(legacy.toString(), results.toString()); + + results = new ByteArrayOutputStream(); + ServerSetImpl.createJsonCodec().serialize(instance, results); + assertEquals( + "{\"serviceEndpoint\":{\"host\":\"foo\",\"port\":1000}," + + "\"additionalEndpoints\":{\"http\":{\"host\":\"foo\",\"port\":8080}}," + + "\"status\":\"ALIVE\"," + + "\"shard\":42}", + results.toString()); + } + + //TODO(Jake Mannix) move this test method to ServerSetConnectionPoolTest, which should be renamed + // to DynamicBackendConnectionPoolTest, and refactor assertChangeFired* methods to be used both + // here and there + @Test + public void testThriftWithServerSet() throws Exception { + final AtomicReference clientConnection = new AtomicReference(); + final CountDownLatch connected = new CountDownLatch(1); + final ServerSocket server = new ServerSocket(0); + Thread service = new Thread(new Runnable() { + @Override public void run() { + try { + clientConnection.set(server.accept()); + } catch (IOException e) { + LOG.log(Level.WARNING, "Problem accepting a connection to thrift server", e); + } finally { + connected.countDown(); + } + } + }); + service.setDaemon(true); + service.start(); + + ServerSetImpl serverSetImpl = new ServerSetImpl(createZkClient(), SERVICE); + serverSetImpl.watch(serverSetMonitor); + assertChangeFiredEmpty(); + InetSocketAddress localSocket = new InetSocketAddress(server.getLocalPort()); + serverSetImpl.join(localSocket, Maps.newHashMap()); + assertChangeFired(ImmutableMap.of(localSocket, Status.ALIVE)); + + Service.Iface svc = createThriftClient(serverSetImpl); + try { + String value = svc.getString(); + LOG.info("Got value: " + value + " from server"); + assertEquals(Service.Iface.DONE, value); + } catch (TResourceExhaustedException e) { + fail("ServerSet is not empty, should not throw exception here"); + } finally { + connected.await(); + server.close(); + } + } + + @Test + public void testUnwatchOnException() throws Exception { + IMocksControl control = createControl(); + + ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class); + Watcher onExpirationWatcher = control.createMock(Watcher.class); + + expect(zkClient.registerExpirationHandler(anyObject(Command.class))) + .andReturn(onExpirationWatcher); + + expect(zkClient.get()).andThrow(new InterruptedException()); + expect(zkClient.unregister(onExpirationWatcher)).andReturn(true); + control.replay(); + + Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla"); + ServerSetImpl serverset = new ServerSetImpl(zkClient, group); + + try { + serverset.watch(new DynamicHostSet.HostChangeMonitor() { + @Override + public void onChange(ImmutableSet hostSet) {} + }); + fail("Expected MonitorException"); + } catch (DynamicHostSet.MonitorException e) { + // expected + } + control.verify(); + } + + private Service.Iface createThriftClient(DynamicHostSet serverSet) + throws ThriftFactoryException { + + final Thrift thrift = ThriftFactory.create(Service.Iface.class).build(serverSet); + addTearDown(new TearDown() { + @Override public void tearDown() { + thrift.close(); + } + }); + return thrift.create(); + } + + private static Map makePortMap(String name, int port) { + return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port)); + } + + public static class Service { + public static interface Iface { + public static final String DONE = "done"; + public String getString() throws TResourceExhaustedException; + } + + public static class Client implements Iface { + public Client(TProtocol protocol) { + assertNotNull(protocol); + } + @Override public String getString() { + return DONE; + } + } + } + + private EndpointStatus join(ServerSet serverSet, String host) + throws JoinException, InterruptedException { + + return serverSet.join(InetSocketAddress.createUnresolved(host, 42), ImmutableMap.of()); + } + + private void assertChangeFired(Map hostsStatuses) + throws InterruptedException { + assertChangeFired( + ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(hostsStatuses.entrySet()), + new Function, ServiceInstance>() { + @Override public ServiceInstance apply(Map.Entry e) { + return new ServiceInstance(new Endpoint(e.getKey().getHostName(), e.getKey().getPort()), + ImmutableMap.of(), e.getValue()); + } + }))); + } + + private void assertChangeFired(String... serviceHosts) + throws InterruptedException { + + assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts), + new Function() { + @Override public ServiceInstance apply(String serviceHost) { + return new ServiceInstance(new Endpoint(serviceHost, 42), + ImmutableMap.of(), Status.ALIVE); + } + }))); + } + + protected void assertChangeFiredEmpty() throws InterruptedException { + assertChangeFired(ImmutableSet.of()); + } + + protected void assertChangeFired(ServiceInstance... serviceInstances) + throws InterruptedException { + assertChangeFired(ImmutableSet.copyOf(serviceInstances)); + } + + protected void assertChangeFired(ImmutableSet serviceInstances) + throws InterruptedException { + assertEquals(serviceInstances, serverSetBuffer.take()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/ServerSetsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/com/twitter/common/zookeeper/ServerSetsTest.java new file mode 100644 index 0000000..1c7d67c --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/ServerSetsTest.java @@ -0,0 +1,35 @@ +package com.twitter.common.zookeeper; + +import com.google.common.collect.ImmutableMap; + +import com.twitter.common.io.Codec; +import com.twitter.thrift.Endpoint; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; + +import java.net.InetSocketAddress; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ServerSetsTest { + @Test + public void testSimpleSerialization() throws Exception { + InetSocketAddress endpoint = new InetSocketAddress(12345); + Map additionalEndpoints = ImmutableMap.of(); + Status status = Status.ALIVE; + + Codec codec = ServerSetImpl.createDefaultCodec(); + + byte[] data = ServerSets.serializeServiceInstance( + endpoint, additionalEndpoints, status, codec); + + ServiceInstance instance = ServerSets.deserializeServiceInstance(data, codec); + + assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort()); + assertEquals(additionalEndpoints, instance.getAdditionalEndpoints()); + assertEquals(Status.ALIVE, instance.getStatus()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/SingletonServiceTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/SingletonServiceTest.java b/commons/src/test/java/com/twitter/common/zookeeper/SingletonServiceTest.java new file mode 100644 index 0000000..2d387ab --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/SingletonServiceTest.java @@ -0,0 +1,352 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.testing.TearDown; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.easymock.IExpectationSetters; +import org.easymock.IMocksControl; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.base.ExceptionalCommand; +import com.twitter.common.zookeeper.Candidate.Leader; +import com.twitter.common.zookeeper.Group.JoinException; +import com.twitter.common.zookeeper.ServerSet.EndpointStatus; +import com.twitter.common.zookeeper.SingletonService.DefeatOnDisconnectLeader; +import com.twitter.common.zookeeper.SingletonService.LeaderControl; +import com.twitter.common.zookeeper.SingletonService.LeadershipListener; +import com.twitter.common.zookeeper.testing.BaseZooKeeperTest; + +import static com.twitter.common.testing.easymock.EasyMockTest.createCapture; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createControl; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.fail; + +public class SingletonServiceTest extends BaseZooKeeperTest { + private static final int PORT_A = 1234; + private static final int PORT_B = 8080; + private static final InetSocketAddress PRIMARY_ENDPOINT = + InetSocketAddress.createUnresolved("foo", PORT_A); + private static final Map AUX_ENDPOINTS = + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)); + + private IMocksControl control; + private SingletonService.LeadershipListener listener; + private ServerSet serverSet; + private ServerSet.EndpointStatus endpointStatus; + private Candidate candidate; + private ExceptionalCommand abdicate; + + private SingletonService service; + + @Before + @SuppressWarnings("unchecked") + public void mySetUp() throws IOException { + control = createControl(); + addTearDown(new TearDown() { + @Override public void tearDown() { + control.verify(); + } + }); + listener = control.createMock(SingletonService.LeadershipListener.class); + serverSet = control.createMock(ServerSet.class); + candidate = control.createMock(Candidate.class); + endpointStatus = control.createMock(ServerSet.EndpointStatus.class); + abdicate = control.createMock(ExceptionalCommand.class); + + service = new SingletonService(serverSet, candidate); + } + + private void newLeader( + final String hostName, + Capture leader, + LeadershipListener listener) throws Exception { + + service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A), + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)), + listener); + + // This actually elects the leader. + leader.getValue().onElected(abdicate); + } + + private void newLeader(String hostName, Capture leader) throws Exception { + newLeader(hostName, leader, listener); + } + + private IExpectationSetters expectJoin() throws Exception { + return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS)); + } + + @Test + public void testLeadAdvertise() throws Exception { + Capture leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andReturn(endpointStatus); + endpointStatus.leave(); + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().advertise(); + controlCapture.getValue().leave(); + } + + @Test + public void teatLeadLeaveNoAdvertise() throws Exception { + Capture leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + abdicate.execute(); + + Capture controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().leave(); + } + + @Test + public void testLeadJoinFailure() throws Exception { + Capture leaderCapture = new Capture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception())); + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + + try { + controlCapture.getValue().advertise(); + fail("Join should have failed."); + } catch (JoinException e) { + // Expected. + } + + controlCapture.getValue().leave(); + } + + @Test(expected = IllegalStateException.class) + public void testMultipleAdvertise() throws Exception { + Capture leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andReturn(endpointStatus); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().advertise(); + controlCapture.getValue().advertise(); + } + + @Test(expected = IllegalStateException.class) + public void testMultipleLeave() throws Exception { + Capture leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + expectJoin().andReturn(endpointStatus); + endpointStatus.leave(); + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().advertise(); + controlCapture.getValue().leave(); + controlCapture.getValue().leave(); + } + + @Test(expected = IllegalStateException.class) + public void testAdvertiseAfterLeave() throws Exception { + Capture leaderCapture = createCapture(); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + Capture controlCapture = createCapture(); + listener.onLeading(capture(controlCapture)); + + abdicate.execute(); + + control.replay(); + + newLeader("foo", leaderCapture); + controlCapture.getValue().leave(); + controlCapture.getValue().advertise(); + } + + @Test + public void testLeadMulti() throws Exception { + List> leaderCaptures = Lists.newArrayList(); + List> leaderControlCaptures = Lists.newArrayList(); + + for (int i = 0; i < 5; i++) { + Capture leaderCapture = new Capture(); + leaderCaptures.add(leaderCapture); + Capture controlCapture = createCapture(); + leaderControlCaptures.add(controlCapture); + + expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); + listener.onLeading(capture(controlCapture)); + InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A); + Map aux = + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B)); + expect(serverSet.join(primary, aux)).andReturn(endpointStatus); + endpointStatus.leave(); + abdicate.execute(); + } + + control.replay(); + + for (int i = 0; i < 5; i++) { + final String leaderName = "foo" + i; + newLeader(leaderName, leaderCaptures.get(i)); + leaderControlCaptures.get(i).getValue().advertise(); + leaderControlCaptures.get(i).getValue().leave(); + } + } + + @Test + public void testLeaderLeaves() throws Exception { + control.replay(); + shutdownNetwork(); + } + + private static IAnswer countDownAnswer(final CountDownLatch latch) { + return new IAnswer() { + @Override public Void answer() { + latch.countDown(); + return null; + } + }; + } + + @Test + public void testLeaderDisconnect() throws Exception { + Capture controlCapture = createCapture(); + + CountDownLatch leading = new CountDownLatch(1); + listener.onLeading(capture(controlCapture)); + expectLastCall().andAnswer(countDownAnswer(leading)); + + CountDownLatch defeated = new CountDownLatch(1); + listener.onDefeated(null); + expectLastCall().andAnswer(countDownAnswer(defeated)); + + control.replay(); + + ZooKeeperClient zkClient = createZkClient(); + serverSet = new ServerSetImpl(zkClient, "/fake/path"); + candidate = new CandidateImpl( + new Group(zkClient, ZooKeeperUtils.OPEN_ACL_UNSAFE, "/fake/path")); + DefeatOnDisconnectLeader leader = new DefeatOnDisconnectLeader(zkClient, listener); + service = new SingletonService(serverSet, candidate); + service.lead(InetSocketAddress.createUnresolved("foo", PORT_A), + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)), + leader); + + leading.await(); + + shutdownNetwork(); + defeated.await(); + } + + @Test + public void testNonLeaderDisconnect() throws Exception { + CountDownLatch elected = new CountDownLatch(1); + listener.onLeading(EasyMock.anyObject()); + expectLastCall().andAnswer(countDownAnswer(elected)); + listener.onDefeated(null); + expectLastCall().anyTimes(); + + control.replay(); + + ZooKeeperClient zkClient = createZkClient(); + String path = "/fake/path"; + // Create a fake leading candidate node to ensure that the leader in this test is never + // elected. + ZooKeeperUtils.ensurePath(zkClient, ZooKeeperUtils.OPEN_ACL_UNSAFE, path); + String leaderNode = zkClient.get().create( + path + "/" + SingletonService.LEADER_ELECT_NODE_PREFIX, + "fake_leader".getBytes(), + ZooKeeperUtils.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + + serverSet = new ServerSetImpl(zkClient, path); + candidate = + SingletonService.createSingletonCandidate(zkClient, path, ZooKeeperUtils.OPEN_ACL_UNSAFE); + DefeatOnDisconnectLeader leader = new DefeatOnDisconnectLeader(zkClient, listener); + service = new SingletonService(serverSet, candidate); + service.lead(InetSocketAddress.createUnresolved("foo", PORT_A), + ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)), + leader); + + final CountDownLatch disconnected = new CountDownLatch(1); + zkClient.register(new Watcher() { + @Override public void process(WatchedEvent event) { + if ((event.getType() == EventType.None) + && (event.getState() == KeeperState.Disconnected)) { + disconnected.countDown(); + } + } + }); + + shutdownNetwork(); + disconnected.await(); + + restartNetwork(); + zkClient.get().delete(leaderNode, ZooKeeperUtils.ANY_VERSION); + // Upon deletion of the fake leader node, the candidate should become leader. + elected.await(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/StaticServerSetTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/StaticServerSetTest.java b/commons/src/test/java/com/twitter/common/zookeeper/StaticServerSetTest.java new file mode 100644 index 0000000..0f09652 --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/StaticServerSetTest.java @@ -0,0 +1,72 @@ +package com.twitter.common.zookeeper; + +import java.net.InetSocketAddress; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.junit.Before; +import org.junit.Test; + +import com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor; +import com.twitter.common.testing.easymock.EasyMockTest; +import com.twitter.common.zookeeper.ServerSet.EndpointStatus; +import com.twitter.thrift.Endpoint; +import com.twitter.thrift.ServiceInstance; +import com.twitter.thrift.Status; + +public class StaticServerSetTest extends EasyMockTest { + + private static final ServiceInstance BACKEND_1 = new ServiceInstance( + new Endpoint("host_1", 12345), + ImmutableMap.of("http", new Endpoint("host_1", 80)), + Status.ALIVE); + private static final ServiceInstance BACKEND_2 = new ServiceInstance( + new Endpoint("host_2", 12346), + ImmutableMap.of("http", new Endpoint("host_1", 80)), + Status.ALIVE); + + private HostChangeMonitor monitor; + + @Before + public void setUp() { + monitor = createMock(new Clazz>() { }); + } + + @Test + public void testMonitor() throws Exception { + ImmutableSet hosts = ImmutableSet.of(BACKEND_1, BACKEND_2); + monitor.onChange(hosts); + + control.replay(); + + ServerSet serverSet = new StaticServerSet(hosts); + serverSet.monitor(monitor); + } + + @Test + public void testMonitorEmpty() throws Exception { + ImmutableSet hosts = ImmutableSet.of(); + monitor.onChange(hosts); + + control.replay(); + + ServerSet serverSet = new StaticServerSet(hosts); + serverSet.monitor(monitor); + } + + @Test + public void testJoin() throws Exception { + // Ensure join/update calls don't break. + ImmutableSet hosts = ImmutableSet.of(); + + control.replay(); + + ServerSet serverSet = new StaticServerSet(hosts); + EndpointStatus status = serverSet.join( + InetSocketAddress.createUnresolved("host", 1000), + ImmutableMap.of(), + Status.ALIVE); + status.update(Status.DEAD); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/test/java/com/twitter/common/zookeeper/ZooKeeperClientTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/com/twitter/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/com/twitter/common/zookeeper/ZooKeeperClientTest.java new file mode 100644 index 0000000..f126b9e --- /dev/null +++ b/commons/src/test/java/com/twitter/common/zookeeper/ZooKeeperClientTest.java @@ -0,0 +1,254 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.zookeeper; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.NoAuthException; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Test; + +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.zookeeper.ZooKeeperClient.Credentials; +import com.twitter.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; +import com.twitter.common.zookeeper.testing.BaseZooKeeperTest; + +import static com.google.common.testing.junit4.JUnitAsserts.assertNotEqual; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author John Sirois + */ +public class ZooKeeperClientTest extends BaseZooKeeperTest { + + public ZooKeeperClientTest() { + super(Amount.of(1, Time.DAYS)); + } + + @Test + public void testGet() throws Exception { + final ZooKeeperClient zkClient = createZkClient(); + shutdownNetwork(); + try { + zkClient.get(Amount.of(50L, Time.MILLISECONDS)); + fail("Expected client connection to timeout while network down"); + } catch (TimeoutException e) { + assertTrue(zkClient.isClosed()); + } + assertNull(zkClient.getZooKeeperClientForTests()); + + final CountDownLatch blockingGetComplete = new CountDownLatch(1); + final AtomicReference client = new AtomicReference(); + new Thread(new Runnable() { + @Override public void run() { + try { + client.set(zkClient.get()); + } catch (ZooKeeperConnectionException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + blockingGetComplete.countDown(); + } + } + }).start(); + + restartNetwork(); + + // Hung blocking connects should succeed when server connection comes up + blockingGetComplete.await(); + assertNotNull(client.get()); + + // New connections should succeed now that network is back up + long sessionId = zkClient.get().getSessionId(); + + // While connected the same client should be reused (no new connections while healthy) + assertSame(client.get(), zkClient.get()); + + shutdownNetwork(); + // Our client doesn't know the network is down yet so we should be able to get() + ZooKeeper zooKeeper = zkClient.get(); + try { + zooKeeper.exists("/", false); + fail("Expected client operation to fail while network down"); + } catch (ConnectionLossException e) { + // expected + } + + restartNetwork(); + assertEquals("Expected connection to be re-established with existing session", + sessionId, zkClient.get().getSessionId()); + } + + /** + * Test that if a blocking get() call gets interrupted, after a connection has been created + * but before it's connected, the zk connection gets closed. + */ + @Test + public void testGetInterrupted() throws Exception { + final ZooKeeperClient zkClient = createZkClient(); + shutdownNetwork(); + + final CountDownLatch blockingGetComplete = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(); + final AtomicReference client = new AtomicReference(); + Thread getThread = new Thread(new Runnable() { + @Override public void run() { + try { + client.set(zkClient.get()); + } catch (ZooKeeperConnectionException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + interrupted.set(true); + throw new RuntimeException(e); + } finally { + blockingGetComplete.countDown(); + } + } + }); + getThread.start(); + + while (zkClient.getZooKeeperClientForTests() == null) { + Thread.sleep(100); + } + + getThread.interrupt(); + blockingGetComplete.await(); + + assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests()); + assertTrue("The waiter thread should have been interrupted", interrupted.get()); + assertTrue(zkClient.isClosed()); + } + + @Test + public void testClose() throws Exception { + ZooKeeperClient zkClient = createZkClient(); + zkClient.close(); + + // Close should be idempotent + zkClient.close(); + + long firstSessionId = zkClient.get().getSessionId(); + + // Close on an open client should force session re-establishment + zkClient.close(); + + assertNotEqual(firstSessionId, zkClient.get().getSessionId()); + } + + @Test + public void testCredentials() throws Exception { + String path = "/test"; + ZooKeeperClient authenticatedClient = createZkClient("creator", "creator"); + assertEquals(path, + authenticatedClient.get().create(path, "42".getBytes(), + ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT)); + + ZooKeeperClient unauthenticatedClient = createZkClient(Credentials.NONE); + assertEquals("42", getData(unauthenticatedClient, path)); + try { + setData(unauthenticatedClient, path, "37"); + fail("Expected unauthenticated write attempt to fail"); + } catch (NoAuthException e) { + assertEquals("42", getData(unauthenticatedClient, path)); + } + + ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner"); + assertEquals("42", getData(nonOwnerClient, path)); + try { + setData(nonOwnerClient, path, "37"); + fail("Expected non owner write attempt to fail"); + } catch (NoAuthException e) { + assertEquals("42", getData(nonOwnerClient, path)); + } + + ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator"); + setData(authenticatedClient2, path, "37"); + assertEquals("37", getData(authenticatedClient2, path)); + } + + @Test + public void testHasCredentials() { + assertFalse(createZkClient().hasCredentials()); + assertFalse(createZkClient(Credentials.NONE).hasCredentials()); + assertFalse(createZkClient(new Credentials() { + @Override + public void authenticate(ZooKeeper zooKeeper) { + // noop + } + @Override public String scheme() { + return ""; + } + @Override public byte[] authToken() { + return new byte[0]; + } + }).hasCredentials()); + + assertTrue(createZkClient("creator", "creator").hasCredentials()); + assertTrue(createZkClient(new Credentials() { + @Override public void authenticate(ZooKeeper zooKeeper) { + // noop + } + @Override public String scheme() { + return "custom"; + } + @Override public byte[] authToken() { + // a zero-length token should be ok - ZooKeeper says nothing about the validity of token + // data a scheme can accept. + return new byte[0]; + } + }).hasCredentials()); + } + + @Test + public void testChrootPath() throws Exception { + ZooKeeperClient rootClient = createZkClient(); + String rootPath = "/test"; + String subPath = "/test/subtest"; + assertEquals(rootPath, + rootClient.get().create(rootPath, "42".getBytes(), + ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + assertEquals(subPath, + rootClient.get().create(subPath, "37".getBytes(), + ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + + ZooKeeperClient chrootedClient = createZkClient(rootPath); + assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null)); + } + + private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception { + zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION); + } + + private String getData(ZooKeeperClient zkClient, String path) throws Exception { + return new String(zkClient.get().getData(path, false, null)); + } +}