Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E564D200D2B for ; Thu, 19 Oct 2017 02:34:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E2310160BEA; Thu, 19 Oct 2017 00:34:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6A21F160BEB for ; Thu, 19 Oct 2017 02:34:05 +0200 (CEST) Received: (qmail 33590 invoked by uid 500); 19 Oct 2017 00:34:04 -0000 Mailing-List: contact commits-help@aurora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@aurora.apache.org Delivered-To: mailing list commits@aurora.apache.org Received: (qmail 33575 invoked by uid 99); 19 Oct 2017 00:34:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Oct 2017 00:34:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4E47CDFAB2; Thu, 19 Oct 2017 00:34:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wfarner@apache.org To: commits@aurora.apache.org Date: Thu, 19 Oct 2017 00:34:05 -0000 Message-Id: <70fc04ebac6540fa8a08f528b088f9bd@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] aurora git commit: Remove legacy commons ZK code archived-at: Thu, 19 Oct 2017 00:34:08 -0000 http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java index 93ddd89..f091384 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ZooKeeperUtils.java @@ -13,29 +13,19 @@ */ package org.apache.aurora.common.zookeeper; -import java.util.List; - -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.common.PathUtils; import org.apache.zookeeper.data.ACL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Utilities for dealing with zoo keeper. + * Utilities for dealing with ZooKeeper. */ public final class ZooKeeperUtils { - private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtils.class); - /** * An appropriate default session timeout for Twitter ZooKeeper clusters. */ @@ -44,12 +34,6 @@ public final class ZooKeeperUtils { public static final Amount DEFAULT_ZK_CONNECTION_TIMEOUT = Amount.of(10, Time.SECONDS); /** - * The magic version number that allows any mutation to always succeed regardless of actual - * version number. - */ - public static final int ANY_VERSION = -1; - - /** * An ACL that gives all permissions any user authenticated or not. */ public static final ImmutableList OPEN_ACL_UNSAFE = @@ -65,99 +49,13 @@ public final class ZooKeeperUtils { .build(); /** - * Returns true if the given exception indicates an error that can be resolved by retrying the - * operation without modification. - * - * @param e the exception to check - * @return true if the causing operation is strictly retryable - */ - public static boolean isRetryable(KeeperException e) { - Preconditions.checkNotNull(e); - - switch (e.code()) { - case CONNECTIONLOSS: - case SESSIONEXPIRED: - case SESSIONMOVED: - case OPERATIONTIMEOUT: - return true; - - case RUNTIMEINCONSISTENCY: - case DATAINCONSISTENCY: - case MARSHALLINGERROR: - case BADARGUMENTS: - case NONODE: - case NOAUTH: - case BADVERSION: - case NOCHILDRENFOREPHEMERALS: - case NODEEXISTS: - case NOTEMPTY: - case INVALIDCALLBACK: - case INVALIDACL: - case AUTHFAILED: - case UNIMPLEMENTED: - - // These two should not be encountered - they are used internally by ZK to specify ranges - case SYSTEMERROR: - case APIERROR: - - case OK: // This is actually an invalid ZK exception code - - default: - return false; - } - } - - /** - * Ensures the given {@code path} exists in the ZK cluster accessed by {@code zkClient}. If the - * path already exists, nothing is done; however if any portion of the path is missing, it will be - * created with the given {@code acl} as a persistent zookeeper node. The given {@code path} must - * be a valid zookeeper absolute path. - * - * @param zkClient the client to use to access the ZK cluster - * @param acl the acl to use if creating path nodes - * @param path the path to ensure exists - * @throws ZooKeeperConnectionException if there was a problem accessing the ZK cluster - * @throws InterruptedException if we were interrupted attempting to connect to the ZK cluster - * @throws KeeperException if there was a problem in ZK - */ - public static void ensurePath(ZooKeeperClient zkClient, List acl, String path) - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - Preconditions.checkNotNull(zkClient); - Preconditions.checkNotNull(path); - Preconditions.checkArgument(path.startsWith("/")); - - ensurePathInternal(zkClient, acl, path); - } - - private static void ensurePathInternal(ZooKeeperClient zkClient, List acl, String path) - throws ZooKeeperConnectionException, InterruptedException, KeeperException { - if (zkClient.get().exists(path, false) == null) { - // The current path does not exist; so back up a level and ensure the parent path exists - // unless we're already a root-level path. - int lastPathIndex = path.lastIndexOf('/'); - if (lastPathIndex > 0) { - ensurePathInternal(zkClient, acl, path.substring(0, lastPathIndex)); - } - - // We've ensured our parent path (if any) exists so we can proceed to create our path. - try { - zkClient.get().create(path, null, acl, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException e) { - // This ensures we don't die if a race condition was met between checking existence and - // trying to create the node. - LOG.info("Node existed when trying to ensure path " + path + ", somebody beat us to it?"); - } - } - } - - /** * Validate and return a normalized zookeeper path which doesn't contain consecutive slashes and * never ends with a slash (except for root path). * * @param path the path to be normalized * @return normalized path string */ - public static String normalizePath(String path) { + static String normalizePath(String path) { String normalizedPath = path.replaceAll("//+", "/").replaceFirst("(.+)/$", "$1"); PathUtils.validatePath(normalizedPath); return normalizedPath; http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java deleted file mode 100644 index ba09279..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/BaseZooKeeperClientTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper.testing; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.zookeeper.Credentials; -import org.apache.aurora.common.zookeeper.ZooKeeperClient; - -/** - * A base-class for tests that interact with ZooKeeper via the commons ZooKeeperClient. - */ -public abstract class BaseZooKeeperClientTest extends BaseZooKeeperTest { - - private final Amount defaultSessionTimeout; - - /** - * Creates a test case where the test server uses its - * {@link ZooKeeperTestServer#DEFAULT_SESSION_TIMEOUT} for clients created without an explicit - * session timeout. - */ - public BaseZooKeeperClientTest() { - this(ZooKeeperTestServer.DEFAULT_SESSION_TIMEOUT); - } - - /** - * Creates a test case where the test server uses the given {@code defaultSessionTimeout} for - * clients created without an explicit session timeout. - */ - public BaseZooKeeperClientTest(Amount defaultSessionTimeout) { - this.defaultSessionTimeout = Preconditions.checkNotNull(defaultSessionTimeout); - } - - - /** - * Starts zookeeper back up on the last used port. - */ - protected final void restartNetwork() throws IOException, InterruptedException { - getServer().restartNetwork(); - } - - /** - * Shuts down the in-process zookeeper network server. - */ - protected final void shutdownNetwork() { - getServer().shutdownNetwork(); - } - - /** - * Expires the active session for the given client. The client should be one returned from - * {@link #createZkClient}. - * - * @param zkClient the client to expire - * @throws ZooKeeperClient.ZooKeeperConnectionException if a problem is encountered connecting to - * the local zk server while trying to expire the session - * @throws InterruptedException if interrupted while requesting expiration - */ - protected final void expireSession(ZooKeeperClient zkClient) - throws ZooKeeperClient.ZooKeeperConnectionException, InterruptedException { - getServer().expireClientSession(zkClient.get().getSessionId()); - } - - /** - * Returns the current port to connect to the in-process zookeeper instance. - */ - protected final int getPort() { - return getServer().getPort(); - } - - /** - * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server - * with the default session timeout. - */ - protected final ZooKeeperClient createZkClient() { - return createZkClient(defaultSessionTimeout, Optional.absent(), Optional.absent()); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * the default session timeout. - */ - protected final ZooKeeperClient createZkClient(Credentials credentials) { - return createZkClient(defaultSessionTimeout, Optional.of(credentials), Optional.absent()); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * the default session timeout. The client is authenticated in the digest authentication scheme - * with the given {@code username} and {@code password}. - */ - protected final ZooKeeperClient createZkClient(String username, String password) { - return createZkClient(Credentials.digestCredentials(username, password)); - } - - /** - * Returns a new unauthenticated zookeeper client connected to the in-process zookeeper server - * with a custom {@code sessionTimeout}. - */ - protected final ZooKeeperClient createZkClient(Amount sessionTimeout) { - return createZkClient(sessionTimeout, Optional.absent(), Optional.absent()); - } - - /** - * Returns a new authenticated zookeeper client connected to the in-process zookeeper server with - * the default session timeout and the custom chroot path. - */ - protected final ZooKeeperClient createZkClient(String chrootPath) { - return createZkClient(defaultSessionTimeout, Optional.absent(), - Optional.of(chrootPath)); - } - - private ZooKeeperClient createZkClient( - Amount sessionTimeout, - Optional credentials, - Optional chrootPath) { - - ZooKeeperClient client = new ZooKeeperClient(sessionTimeout, credentials, chrootPath, - ImmutableList.of(InetSocketAddress.createUnresolved("127.0.0.1", getPort()))); - addTearDown(client::close); - return client; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java index 29204cd..a4504b8 100644 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java +++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/testing/ZooKeeperTestServer.java @@ -19,8 +19,6 @@ import java.net.InetSocketAddress; import com.google.common.base.Preconditions; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; @@ -34,8 +32,6 @@ import org.apache.zookeeper.server.persistence.FileTxnSnapLog; */ public class ZooKeeperTestServer { - static final Amount DEFAULT_SESSION_TIMEOUT = Amount.of(100, Time.MILLISECONDS); - private final File dataDir; private final File snapDir; @@ -91,7 +87,7 @@ public class ZooKeeperTestServer { /** * Shuts down the in-process zookeeper network server. */ - final void shutdownNetwork() { + private void shutdownNetwork() { if (connectionFactory != null) { connectionFactory.shutdown(); // Also shuts down zooKeeperServer. connectionFactory = null; http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java deleted file mode 100644 index 9c0cebe..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/CandidateImplTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; - -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -public class CandidateImplTest extends BaseZooKeeperClientTest { - private static final List ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; - private static final String SERVICE = "/twitter/services/puffin_linkhose/leader"; - private static final Amount TIMEOUT = Amount.of(1, Time.MINUTES); - - private LinkedBlockingDeque candidateBuffer; - - @Before - public void mySetUp() throws IOException { - candidateBuffer = new LinkedBlockingDeque<>(); - } - - private Group createGroup(ZooKeeperClient zkClient) throws IOException { - return new Group(zkClient, ACL, SERVICE); - } - - private class Reign implements Candidate.Leader { - private ExceptionalCommand abdicate; - private final CandidateImpl candidate; - private final String id; - private CountDownLatch defeated = new CountDownLatch(1); - - Reign(String id, CandidateImpl candidate) { - this.id = id; - this.candidate = candidate; - } - - @Override - public void onElected(ExceptionalCommand abdicate) { - candidateBuffer.offerFirst(candidate); - this.abdicate = abdicate; - } - - @Override - public void onDefeated() { - defeated.countDown(); - } - - public void abdicate() throws Group.JoinException { - Preconditions.checkState(abdicate != null); - abdicate.execute(); - } - - public void expectDefeated() throws InterruptedException { - defeated.await(); - } - - @Override - public String toString() { - return id; - } - } - - @Test - public void testOfferLeadership() throws Exception { - ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); - final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)) { - @Override public String toString() { - return "Leader1"; - } - }; - ZooKeeperClient zkClient2 = createZkClient(TIMEOUT); - final CandidateImpl candidate2 = new CandidateImpl(createGroup(zkClient2)) { - @Override public String toString() { - return "Leader2"; - } - }; - ZooKeeperClient zkClient3 = createZkClient(TIMEOUT); - final CandidateImpl candidate3 = new CandidateImpl(createGroup(zkClient3)) { - @Override public String toString() { - return "Leader3"; - } - }; - - Reign candidate1Reign = new Reign("1", candidate1); - Reign candidate2Reign = new Reign("2", candidate2); - Reign candidate3Reign = new Reign("3", candidate3); - - Supplier candidate1Leader = candidate1.offerLeadership(candidate1Reign); - Supplier candidate2Leader = candidate2.offerLeadership(candidate2Reign); - Supplier candidate3Leader = candidate3.offerLeadership(candidate3Reign); - - assertTrue("Since initial group join is synchronous, candidate 1 should be the first leader", - candidate1Leader.get()); - - shutdownNetwork(); - restartNetwork(); - - assertTrue("A re-connect without a session expiration should leave the leader elected", - candidate1Leader.get()); - - candidate1Reign.abdicate(); - assertSame(candidate1, candidateBuffer.takeLast()); - assertFalse(candidate1Leader.get()); - // Active abdication should trigger defeat. - candidate1Reign.expectDefeated(); - - CandidateImpl secondCandidate = candidateBuffer.takeLast(); - assertTrue("exactly 1 remaining candidate should now be leader: " + secondCandidate + " " - + candidateBuffer, - candidate2Leader.get() ^ candidate3Leader.get()); - - if (secondCandidate == candidate2) { - expireSession(zkClient2); - assertSame(candidate3, candidateBuffer.takeLast()); - assertTrue(candidate3Leader.get()); - // Passive expiration should trigger defeat. - candidate2Reign.expectDefeated(); - } else { - expireSession(zkClient3); - assertSame(candidate2, candidateBuffer.takeLast()); - assertTrue(candidate2Leader.get()); - // Passive expiration should trigger defeat. - candidate3Reign.expectDefeated(); - } - } - - @Test - public void testEmptyMembership() throws Exception { - ZooKeeperClient zkClient1 = createZkClient(TIMEOUT); - final CandidateImpl candidate1 = new CandidateImpl(createGroup(zkClient1)); - Reign candidate1Reign = new Reign("1", candidate1); - - candidate1.offerLeadership(candidate1Reign); - assertSame(candidate1, candidateBuffer.takeLast()); - candidate1Reign.abdicate(); - assertFalse(candidate1.getLeaderData().isPresent()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java new file mode 100644 index 0000000..16c0171 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/EncodingTest.java @@ -0,0 +1,44 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.zookeeper; + +import java.net.InetSocketAddress; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; + +import org.apache.aurora.common.thrift.Endpoint; +import org.apache.aurora.common.thrift.ServiceInstance; +import org.apache.aurora.common.thrift.Status; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class EncodingTest { + @Test + public void testSimpleSerialization() throws Exception { + InetSocketAddress endpoint = new InetSocketAddress(12345); + Map additionalEndpoints = ImmutableMap.of(); + Status status = Status.ALIVE; + + byte[] data = Encoding.serializeServiceInstance( + endpoint, additionalEndpoints, status, Encoding.JSON_CODEC); + + ServiceInstance instance = Encoding.deserializeServiceInstance(data, Encoding.JSON_CODEC); + + assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort()); + assertEquals(additionalEndpoints, instance.getAdditionalEndpoints()); + assertEquals(Status.ALIVE, instance.getStatus()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java deleted file mode 100644 index 97a42d1..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/GroupTest.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.common.zookeeper.Group.GroupChangeListener; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.Group.Membership; -import org.apache.aurora.common.zookeeper.Group.NodeScheme; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.ZooDefs.Ids; -import org.junit.Before; -import org.junit.Test; - -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; -import static org.easymock.EasyMock.verify; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.fail; - -public class GroupTest extends BaseZooKeeperClientTest { - - private ZooKeeperClient zkClient; - private Group joinGroup; - private Group watchGroup; - private Command stopWatching; - private Command onLoseMembership; - - private RecordingListener listener; - - public GroupTest() { - super(Amount.of(1, Time.DAYS)); - } - - @Before - public void mySetUp() throws Exception { - onLoseMembership = createMock(Command.class); - - zkClient = createZkClient("group", "test"); - joinGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group"); - watchGroup = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group"); - - listener = new RecordingListener(); - stopWatching = watchGroup.watch(listener); - } - - private static class RecordingListener implements GroupChangeListener { - private final LinkedBlockingQueue> membershipChanges = - new LinkedBlockingQueue>(); - - @Override - public void onGroupChange(Iterable memberIds) { - membershipChanges.add(memberIds); - } - - public Iterable take() throws InterruptedException { - return membershipChanges.take(); - } - - public void assertEmpty() { - assertEquals(ImmutableList.>of(), ImmutableList.copyOf(membershipChanges)); - } - - @Override - public String toString() { - return membershipChanges.toString(); - } - } - - private static class CustomScheme implements NodeScheme { - static final String NODE_NAME = "custom_name"; - - @Override - public boolean isMember(String nodeName) { - return NODE_NAME.equals(nodeName); - } - - @Override - public String createName(byte[] membershipData) { - return NODE_NAME; - } - - @Override - public boolean isSequential() { - return false; - } - } - - @Test - public void testSessionExpirationTriggersOnLoseMembership() throws Exception { - final CountDownLatch lostMembership = new CountDownLatch(1); - Command onLoseMembership = lostMembership::countDown; - assertEmptyMembershipObserved(); - - Membership membership = joinGroup.join(onLoseMembership); - assertMembershipObserved(membership.getMemberId()); - expireSession(zkClient); - - lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated. - } - - @Test - public void testNodeDeleteTriggersOnLoseMembership() throws Exception { - final CountDownLatch lostMembership = new CountDownLatch(1); - Command onLoseMembership = lostMembership::countDown; - assertEmptyMembershipObserved(); - - Membership membership = joinGroup.join(onLoseMembership); - assertMembershipObserved(membership.getMemberId()); - membership.cancel(); - - lostMembership.await(); // Will hang this test if onLoseMembership event is not propagated. - } - - @Test - public void testJoinsAndWatchesSurviveDisconnect() throws Exception { - replay(onLoseMembership); - - assertEmptyMembershipObserved(); - - Membership membership = joinGroup.join(); - String originalMemberId = membership.getMemberId(); - assertMembershipObserved(originalMemberId); - - shutdownNetwork(); - restartNetwork(); - - // The member should still be present under existing ephemeral node since session did not - // expire. - watchGroup.watch(listener); - assertMembershipObserved(originalMemberId); - - membership.cancel(); - - assertEmptyMembershipObserved(); - assertEmptyMembershipObserved(); // and again for 2nd listener - - listener.assertEmpty(); - - verify(onLoseMembership); - reset(onLoseMembership); // Turn off expectations during ZK server shutdown. - } - - @Test - public void testJoinsAndWatchesSurviveExpiredSession() throws Exception { - onLoseMembership.execute(); - replay(onLoseMembership); - - assertEmptyMembershipObserved(); - - Membership membership = joinGroup.join(onLoseMembership); - String originalMemberId = membership.getMemberId(); - assertMembershipObserved(originalMemberId); - - expireSession(zkClient); - - // We should have lost our group membership and then re-gained it with a new ephemeral node. - // We may or may-not see the intermediate state change but we must see the final state - Iterable members = listener.take(); - if (Iterables.isEmpty(members)) { - members = listener.take(); - } - assertEquals(1, Iterables.size(members)); - assertNotEquals(originalMemberId, Iterables.getOnlyElement(members)); - assertNotEquals(originalMemberId, membership.getMemberId()); - - listener.assertEmpty(); - - verify(onLoseMembership); - reset(onLoseMembership); // Turn off expectations during ZK server shutdown. - } - - @Test - public void testJoinCustomNamingScheme() throws Exception { - Group group = new Group(zkClient, ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, "/a/group", - new CustomScheme()); - - listener = new RecordingListener(); - group.watch(listener); - assertEmptyMembershipObserved(); - - Membership membership = group.join(); - String memberId = membership.getMemberId(); - - assertEquals("Wrong member ID.", CustomScheme.NODE_NAME, memberId); - assertMembershipObserved(memberId); - - expireSession(zkClient); - } - - @Test - public void testUpdateMembershipData() throws Exception { - Supplier dataSupplier = new EasyMockTest.Clazz>() {}.createMock(); - - byte[] initial = "start".getBytes(); - expect(dataSupplier.get()).andReturn(initial); - - byte[] second = "update".getBytes(); - expect(dataSupplier.get()).andReturn(second); - - replay(dataSupplier); - - Membership membership = joinGroup.join(dataSupplier, onLoseMembership); - assertArrayEquals("Initial setting is incorrect.", initial, zkClient.get() - .getData(membership.getMemberPath(), false, null)); - - assertArrayEquals("Updating supplier should not change membership data", - initial, zkClient.get().getData(membership.getMemberPath(), false, null)); - - membership.updateMemberData(); - assertArrayEquals("Updating membership should change data", - second, zkClient.get().getData(membership.getMemberPath(), false, null)); - - verify(dataSupplier); - } - - @Test - public void testAcls() throws Exception { - Group securedMembership = - new Group(createZkClient("secured", "group"), ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, - "/secured/group/membership"); - - String memberId = securedMembership.join().getMemberId(); - - Group unauthenticatedObserver = - new Group(createZkClient(), - Ids.READ_ACL_UNSAFE, - "/secured/group/membership"); - RecordingListener unauthenticatedListener = new RecordingListener(); - unauthenticatedObserver.watch(unauthenticatedListener); - - assertMembershipObserved(unauthenticatedListener, memberId); - - try { - unauthenticatedObserver.join(); - fail("Expected join exception for unauthenticated observer"); - } catch (JoinException e) { - // expected - } - - Group unauthorizedObserver = - new Group(createZkClient("joe", "schmoe"), - Ids.READ_ACL_UNSAFE, - "/secured/group/membership"); - RecordingListener unauthorizedListener = new RecordingListener(); - unauthorizedObserver.watch(unauthorizedListener); - - assertMembershipObserved(unauthorizedListener, memberId); - - try { - unauthorizedObserver.join(); - fail("Expected join exception for unauthorized observer"); - } catch (JoinException e) { - // expected - } - } - - @Test - public void testStopWatching() throws Exception { - replay(onLoseMembership); - - assertEmptyMembershipObserved(); - - Membership member1 = joinGroup.join(); - String memberId1 = member1.getMemberId(); - assertMembershipObserved(memberId1); - - Membership member2 = joinGroup.join(); - String memberId2 = member2.getMemberId(); - assertMembershipObserved(memberId1, memberId2); - - stopWatching.execute(); - - member1.cancel(); - Membership member3 = joinGroup.join(); - member2.cancel(); - member3.cancel(); - - listener.assertEmpty(); - } - - private void assertEmptyMembershipObserved() throws InterruptedException { - assertMembershipObserved(); - } - - private void assertMembershipObserved(String... expectedMemberIds) throws InterruptedException { - assertMembershipObserved(listener, expectedMemberIds); - } - - private void assertMembershipObserved(RecordingListener listener, String... expectedMemberIds) - throws InterruptedException { - - assertEquals(ImmutableSet.copyOf(expectedMemberIds), ImmutableSet.copyOf(listener.take())); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java index 2166123..6cf335d 100644 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/JsonCodecTest.java @@ -52,25 +52,25 @@ public class JsonCodecTest { ImmutableMap.of("http", new Endpoint("foo", 8080)), Status.ALIVE) .setShard(0); - byte[] data = ServerSets.serializeServiceInstance(instance1, codec); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + byte[] data = Encoding.serializeServiceInstance(instance1, codec); + assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertTrue(Encoding.deserializeServiceInstance(data, codec).isSetShard()); ServiceInstance instance2 = new ServiceInstance( new Endpoint("foo", 1000), ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), Status.ALIVE); - data = ServerSets.serializeServiceInstance(instance2, codec); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); - assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + data = Encoding.serializeServiceInstance(instance2, codec); + assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertFalse(Encoding.deserializeServiceInstance(data, codec).isSetShard()); ServiceInstance instance3 = new ServiceInstance( new Endpoint("foo", 1000), ImmutableMap.of(), Status.ALIVE); - data = ServerSets.serializeServiceInstance(instance3, codec); - assertTrue(ServerSets.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); - assertFalse(ServerSets.deserializeServiceInstance(data, codec).isSetShard()); + data = Encoding.serializeServiceInstance(instance3, codec); + assertTrue(Encoding.deserializeServiceInstance(data, codec).getServiceEndpoint().isSetPort()); + assertFalse(Encoding.deserializeServiceInstance(data, codec).isSetShard()); } @Test http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java deleted file mode 100644 index f0c0cb4..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.ACL; -import org.easymock.IMocksControl; -import org.junit.Before; -import org.junit.Test; - -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createControl; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * - * TODO(William Farner): Change this to remove thrift dependency. - */ -public class ServerSetImplTest extends BaseZooKeeperClientTest { - private static final List ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE; - private static final String SERVICE = "/twitter/services/puffin_hosebird"; - - private LinkedBlockingQueue> serverSetBuffer; - private DynamicHostSet.HostChangeMonitor serverSetMonitor; - - @Before - public void mySetUp() throws IOException { - serverSetBuffer = new LinkedBlockingQueue<>(); - serverSetMonitor = serverSetBuffer::offer; - } - - private ServerSetImpl createServerSet() throws IOException { - return new ServerSetImpl(createZkClient(), ACL, SERVICE); - } - - @Test - public void testLifecycle() throws Exception { - ServerSetImpl client = createServerSet(); - client.watch(serverSetMonitor); - assertChangeFiredEmpty(); - - ServerSetImpl server = createServerSet(); - ServerSet.EndpointStatus status = server.join( - InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080)); - - ServiceInstance serviceInstance = new ServiceInstance( - new Endpoint("foo", 1234), - ImmutableMap.of("http-admin", new Endpoint("foo", 8080)), - Status.ALIVE); - - assertChangeFired(serviceInstance); - - status.leave(); - assertChangeFiredEmpty(); - assertTrue(serverSetBuffer.isEmpty()); - } - - @Test - public void testMembershipChanges() throws Exception { - ServerSetImpl client = createServerSet(); - client.watch(serverSetMonitor); - assertChangeFiredEmpty(); - - ServerSetImpl server = createServerSet(); - - ServerSet.EndpointStatus foo = join(server, "foo"); - assertChangeFired("foo"); - - expireSession(client.getZkClient()); - - ServerSet.EndpointStatus bar = join(server, "bar"); - - // We should've auto re-monitored membership, but not been notifed of "foo" since this was not a - // change, just "foo", "bar" since this was an addition. - assertChangeFired("foo", "bar"); - - foo.leave(); - assertChangeFired("bar"); - - ServerSet.EndpointStatus baz = join(server, "baz"); - assertChangeFired("bar", "baz"); - - baz.leave(); - assertChangeFired("bar"); - - bar.leave(); - assertChangeFiredEmpty(); - - assertTrue(serverSetBuffer.isEmpty()); - } - - @Test - public void testStopMonitoring() throws Exception { - ServerSetImpl client = createServerSet(); - Command stopMonitoring = client.watch(serverSetMonitor); - assertChangeFiredEmpty(); - - ServerSetImpl server = createServerSet(); - - ServerSet.EndpointStatus foo = join(server, "foo"); - assertChangeFired("foo"); - ServerSet.EndpointStatus bar = join(server, "bar"); - assertChangeFired("foo", "bar"); - - stopMonitoring.execute(); - - // No new updates should be received since monitoring has stopped. - foo.leave(); - assertTrue(serverSetBuffer.isEmpty()); - - // Expiration event. - assertTrue(serverSetBuffer.isEmpty()); - } - - @Test - public void testOrdering() throws Exception { - ServerSetImpl client = createServerSet(); - client.watch(serverSetMonitor); - assertChangeFiredEmpty(); - - Map server1Ports = makePortMap("http-admin1", 8080); - Map server2Ports = makePortMap("http-admin2", 8081); - Map server3Ports = makePortMap("http-admin3", 8082); - - ServerSetImpl server1 = createServerSet(); - ServerSetImpl server2 = createServerSet(); - ServerSetImpl server3 = createServerSet(); - - ServiceInstance instance1 = new ServiceInstance( - new Endpoint("foo", 1000), - ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)), - Status.ALIVE); - ServiceInstance instance2 = new ServiceInstance( - new Endpoint("foo", 1001), - ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)), - Status.ALIVE); - ServiceInstance instance3 = new ServiceInstance( - new Endpoint("foo", 1002), - ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)), - Status.ALIVE); - - server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports); - assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take())); - - ServerSet.EndpointStatus status2 = server2.join( - InetSocketAddress.createUnresolved("foo", 1001), - server2Ports); - assertEquals(ImmutableList.of(instance1, instance2), - ImmutableList.copyOf(serverSetBuffer.take())); - - server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports); - assertEquals(ImmutableList.of(instance1, instance2, instance3), - ImmutableList.copyOf(serverSetBuffer.take())); - - status2.leave(); - assertEquals(ImmutableList.of(instance1, instance3), - ImmutableList.copyOf(serverSetBuffer.take())); - } - - @Test - public void testUnwatchOnException() throws Exception { - IMocksControl control = createControl(); - - ZooKeeperClient zkClient = control.createMock(ZooKeeperClient.class); - Watcher onExpirationWatcher = control.createMock(Watcher.class); - - expect(zkClient.registerExpirationHandler(anyObject(Command.class))) - .andReturn(onExpirationWatcher); - - expect(zkClient.get()).andThrow(new InterruptedException()); // See interrupted() note below. - expect(zkClient.unregister(onExpirationWatcher)).andReturn(true); - control.replay(); - - Group group = new Group(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, "/blabla"); - ServerSetImpl serverset = new ServerSetImpl(zkClient, group); - - try { - serverset.watch(hostSet -> {}); - fail("Expected MonitorException"); - } catch (DynamicHostSet.MonitorException e) { - // NB: The assert is not important to this test, but the call to `Thread.interrupted()` is. - // That call both returns the current interrupted status as well as clearing it. The clearing - // is crucial depending on the order tests are run in this class. If this test runs before - // one of the tests above that uses a `ZooKeeperClient` for example, those tests will fail - // executing `ZooKeeperClient.get` which internally blocks on s sync-point that takes part in - // the interruption mechanism and so immediately throws `InterruptedException` based on the - // un-cleared interrupted bit. - assertTrue(Thread.interrupted()); - } - control.verify(); - } - - private static Map makePortMap(String name, int port) { - return ImmutableMap.of(name, InetSocketAddress.createUnresolved("foo", port)); - } - - private ServerSet.EndpointStatus join(ServerSet serverSet, String host) - throws JoinException, InterruptedException { - - return serverSet.join( - InetSocketAddress.createUnresolved(host, 42), ImmutableMap.of()); - } - - private void assertChangeFired(String... serviceHosts) - throws InterruptedException { - - assertChangeFired(ImmutableSet.copyOf(Iterables.transform(ImmutableSet.copyOf(serviceHosts), - serviceHost -> new ServiceInstance(new Endpoint(serviceHost, 42), - ImmutableMap.of(), Status.ALIVE)))); - } - - protected void assertChangeFiredEmpty() throws InterruptedException { - assertChangeFired(ImmutableSet.of()); - } - - protected void assertChangeFired(ServiceInstance... serviceInstances) - throws InterruptedException { - assertChangeFired(ImmutableSet.copyOf(serviceInstances)); - } - - protected void assertChangeFired(ImmutableSet serviceInstances) - throws InterruptedException { - assertEquals(serviceInstances, serverSetBuffer.take()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java deleted file mode 100644 index 0e67191..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetsTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.Map; - -import com.google.common.collect.ImmutableMap; - -import org.apache.aurora.common.thrift.Endpoint; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class ServerSetsTest { - @Test - public void testSimpleSerialization() throws Exception { - InetSocketAddress endpoint = new InetSocketAddress(12345); - Map additionalEndpoints = ImmutableMap.of(); - Status status = Status.ALIVE; - - byte[] data = ServerSets.serializeServiceInstance( - endpoint, additionalEndpoints, status, ServerSet.JSON_CODEC); - - ServiceInstance instance = ServerSets.deserializeServiceInstance(data, ServerSet.JSON_CODEC); - - assertEquals(endpoint.getPort(), instance.getServiceEndpoint().getPort()); - assertEquals(additionalEndpoints, instance.getAdditionalEndpoints()); - assertEquals(Status.ALIVE, instance.getStatus()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java deleted file mode 100644 index 5f6cdd8..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/SingletonServiceImplTest.java +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.zookeeper.Candidate.Leader; -import org.apache.aurora.common.zookeeper.SingletonService.LeaderControl; -import org.apache.aurora.common.zookeeper.SingletonService.LeadershipListener; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.easymock.Capture; -import org.easymock.IExpectationSetters; -import org.easymock.IMocksControl; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.createControl; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.fail; - -public class SingletonServiceImplTest extends BaseZooKeeperClientTest { - private static final int PORT_A = 1234; - private static final int PORT_B = 8080; - private static final InetSocketAddress PRIMARY_ENDPOINT = - InetSocketAddress.createUnresolved("foo", PORT_A); - private static final Map AUX_ENDPOINTS = - ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo", PORT_B)); - - private IMocksControl control; - private SingletonServiceImpl.LeadershipListener listener; - private ServerSet serverSet; - private ServerSet.EndpointStatus endpointStatus; - private Candidate candidate; - private ExceptionalCommand abdicate; - - private SingletonService service; - - @Before - @SuppressWarnings("unchecked") - public void mySetUp() throws IOException { - control = createControl(); - addTearDown(control::verify); - listener = control.createMock(SingletonServiceImpl.LeadershipListener.class); - serverSet = control.createMock(ServerSet.class); - candidate = control.createMock(Candidate.class); - endpointStatus = control.createMock(ServerSet.EndpointStatus.class); - abdicate = control.createMock(ExceptionalCommand.class); - - service = new SingletonServiceImpl(serverSet, candidate); - } - - private void newLeader( - final String hostName, - Capture leader, - LeadershipListener listener) throws Exception { - - service.lead(InetSocketAddress.createUnresolved(hostName, PORT_A), - ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved(hostName, PORT_B)), - listener); - - // This actually elects the leader. - leader.getValue().onElected(abdicate); - } - - private void newLeader(String hostName, Capture leader) throws Exception { - newLeader(hostName, leader, listener); - } - - private IExpectationSetters expectJoin() throws Exception { - return expect(serverSet.join(PRIMARY_ENDPOINT, AUX_ENDPOINTS)); - } - - @Test - public void testLeadAdvertise() throws Exception { - Capture leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andReturn(endpointStatus); - endpointStatus.leave(); - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().advertise(); - controlCapture.getValue().leave(); - } - - @Test - public void teatLeadLeaveNoAdvertise() throws Exception { - Capture leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - abdicate.execute(); - - Capture controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().leave(); - } - - @Test - public void testLeadJoinFailure() throws Exception { - Capture leaderCapture = new Capture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andThrow(new Group.JoinException("Injected join failure.", new Exception())); - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - - try { - controlCapture.getValue().advertise(); - fail("Join should have failed."); - } catch (SingletonService.AdvertiseException e) { - // Expected. - } - - controlCapture.getValue().leave(); - } - - @Test(expected = IllegalStateException.class) - public void testMultipleAdvertise() throws Exception { - Capture leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andReturn(endpointStatus); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().advertise(); - controlCapture.getValue().advertise(); - } - - @Test(expected = IllegalStateException.class) - public void testMultipleLeave() throws Exception { - Capture leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - expectJoin().andReturn(endpointStatus); - endpointStatus.leave(); - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().advertise(); - controlCapture.getValue().leave(); - controlCapture.getValue().leave(); - } - - @Test(expected = IllegalStateException.class) - public void testAdvertiseAfterLeave() throws Exception { - Capture leaderCapture = createCapture(); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - Capture controlCapture = createCapture(); - listener.onLeading(capture(controlCapture)); - - abdicate.execute(); - - control.replay(); - - newLeader("foo", leaderCapture); - controlCapture.getValue().leave(); - controlCapture.getValue().advertise(); - } - - @Test - public void testLeadMulti() throws Exception { - List> leaderCaptures = Lists.newArrayList(); - List> leaderControlCaptures = Lists.newArrayList(); - - for (int i = 0; i < 5; i++) { - Capture leaderCapture = new Capture(); - leaderCaptures.add(leaderCapture); - Capture controlCapture = createCapture(); - leaderControlCaptures.add(controlCapture); - - expect(candidate.offerLeadership(capture(leaderCapture))).andReturn(null); - listener.onLeading(capture(controlCapture)); - InetSocketAddress primary = InetSocketAddress.createUnresolved("foo" + i, PORT_A); - Map aux = - ImmutableMap.of("http-admin", InetSocketAddress.createUnresolved("foo" + i, PORT_B)); - expect(serverSet.join(primary, aux)).andReturn(endpointStatus); - endpointStatus.leave(); - abdicate.execute(); - } - - control.replay(); - - for (int i = 0; i < 5; i++) { - final String leaderName = "foo" + i; - newLeader(leaderName, leaderCaptures.get(i)); - leaderControlCaptures.get(i).getValue().advertise(); - leaderControlCaptures.get(i).getValue().leave(); - } - } - - @Test - public void testLeaderLeaves() throws Exception { - control.replay(); - shutdownNetwork(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java deleted file mode 100644 index 5eee235..0000000 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperClientTest.java +++ /dev/null @@ -1,210 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.zookeeper.ZooKeeperClient.ZooKeeperConnectionException; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.ConnectionLossException; -import org.apache.zookeeper.KeeperException.NoAuthException; -import org.apache.zookeeper.ZooKeeper; -import org.junit.Test; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * @author John Sirois - */ -public class ZooKeeperClientTest extends BaseZooKeeperClientTest { - - public ZooKeeperClientTest() { - super(Amount.of(1, Time.DAYS)); - } - - @Test - public void testGet() throws Exception { - final ZooKeeperClient zkClient = createZkClient(); - shutdownNetwork(); - try { - zkClient.get(Amount.of(50L, Time.MILLISECONDS)); - fail("Expected client connection to timeout while network down"); - } catch (TimeoutException e) { - assertTrue(zkClient.isClosed()); - } - assertNull(zkClient.getZooKeeperClientForTests()); - - final CountDownLatch blockingGetComplete = new CountDownLatch(1); - final AtomicReference client = new AtomicReference(); - new Thread(() -> { - try { - client.set(zkClient.get()); - } catch (ZooKeeperConnectionException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - blockingGetComplete.countDown(); - } - }).start(); - - restartNetwork(); - - // Hung blocking connects should succeed when server connection comes up - blockingGetComplete.await(); - assertNotNull(client.get()); - - // New connections should succeed now that network is back up - long sessionId = zkClient.get().getSessionId(); - - // While connected the same client should be reused (no new connections while healthy) - assertSame(client.get(), zkClient.get()); - - shutdownNetwork(); - // Our client doesn't know the network is down yet so we should be able to get() - ZooKeeper zooKeeper = zkClient.get(); - try { - zooKeeper.exists("/", false); - fail("Expected client operation to fail while network down"); - } catch (ConnectionLossException e) { - // expected - } - - restartNetwork(); - assertEquals("Expected connection to be re-established with existing session", - sessionId, zkClient.get().getSessionId()); - } - - /** - * Test that if a blocking get() call gets interrupted, after a connection has been created - * but before it's connected, the zk connection gets closed. - */ - @Test - public void testGetInterrupted() throws Exception { - final ZooKeeperClient zkClient = createZkClient(); - shutdownNetwork(); - - final CountDownLatch blockingGetComplete = new CountDownLatch(1); - final AtomicBoolean interrupted = new AtomicBoolean(); - final AtomicReference client = new AtomicReference(); - Thread getThread = new Thread(() -> { - try { - client.set(zkClient.get()); - } catch (ZooKeeperConnectionException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - interrupted.set(true); - throw new RuntimeException(e); - } finally { - blockingGetComplete.countDown(); - } - }); - getThread.start(); - - while (zkClient.getZooKeeperClientForTests() == null) { - Thread.sleep(100); - } - - getThread.interrupt(); - blockingGetComplete.await(); - - assertNull("The zk connection should have been closed", zkClient.getZooKeeperClientForTests()); - assertTrue("The waiter thread should have been interrupted", interrupted.get()); - assertTrue(zkClient.isClosed()); - } - - @Test - public void testClose() throws Exception { - ZooKeeperClient zkClient = createZkClient(); - zkClient.close(); - - // Close should be idempotent - zkClient.close(); - - long firstSessionId = zkClient.get().getSessionId(); - - // Close on an open client should force session re-establishment - zkClient.close(); - - assertNotEquals(firstSessionId, zkClient.get().getSessionId()); - } - - @Test - public void testCredentials() throws Exception { - String path = "/test"; - ZooKeeperClient authenticatedClient = createZkClient("creator", "creator"); - assertEquals(path, - authenticatedClient.get().create(path, "42".getBytes(), - ZooKeeperUtils.EVERYONE_READ_CREATOR_ALL, CreateMode.PERSISTENT)); - - ZooKeeperClient unauthenticatedClient = createZkClient(); - assertEquals("42", getData(unauthenticatedClient, path)); - try { - setData(unauthenticatedClient, path, "37"); - fail("Expected unauthenticated write attempt to fail"); - } catch (NoAuthException e) { - assertEquals("42", getData(unauthenticatedClient, path)); - } - - ZooKeeperClient nonOwnerClient = createZkClient("nonowner", "nonowner"); - assertEquals("42", getData(nonOwnerClient, path)); - try { - setData(nonOwnerClient, path, "37"); - fail("Expected non owner write attempt to fail"); - } catch (NoAuthException e) { - assertEquals("42", getData(nonOwnerClient, path)); - } - - ZooKeeperClient authenticatedClient2 = createZkClient("creator", "creator"); - setData(authenticatedClient2, path, "37"); - assertEquals("37", getData(authenticatedClient2, path)); - } - - @Test - public void testChrootPath() throws Exception { - ZooKeeperClient rootClient = createZkClient(); - String rootPath = "/test"; - String subPath = "/test/subtest"; - assertEquals(rootPath, - rootClient.get().create(rootPath, "42".getBytes(), - ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - assertEquals(subPath, - rootClient.get().create(subPath, "37".getBytes(), - ZooKeeperUtils.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); - - ZooKeeperClient chrootedClient = createZkClient(rootPath); - assertArrayEquals("37".getBytes(), chrootedClient.get().getData("/subtest", false, null)); - } - - private void setData(ZooKeeperClient zkClient, String path, String data) throws Exception { - zkClient.get().setData(path, data.getBytes(), ZooKeeperUtils.ANY_VERSION); - } - - private String getData(ZooKeeperClient zkClient, String path) throws Exception { - return new String(zkClient.get().getData(path, false, null)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java index 9e482a6..5eb3c5e 100644 --- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java +++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ZooKeeperUtilsTest.java @@ -13,87 +13,16 @@ */ package org.apache.aurora.common.zookeeper; -import com.google.common.base.Charsets; - -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.BadVersionException; -import org.apache.zookeeper.KeeperException.NoAuthException; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.data.Stat; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; import org.junit.Test; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** * @author John Sirois */ -public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest { - @Test - public void testEnsurePath() throws Exception { - ZooKeeperClient zkClient = createZkClient(); - zkClient.get().addAuthInfo("digest", "client1:boo".getBytes(Charsets.UTF_8)); - - assertNull(zkClient.get().exists("/foo", false)); - ZooKeeperUtils.ensurePath(zkClient, ZooDefs.Ids.CREATOR_ALL_ACL, "/foo/bar/baz"); - - zkClient = createZkClient(); - zkClient.get().addAuthInfo("digest", "client2:bap".getBytes(Charsets.UTF_8)); - - // Anyone can check for existence in ZK - assertNotNull(zkClient.get().exists("/foo", false)); - assertNotNull(zkClient.get().exists("/foo/bar", false)); - assertNotNull(zkClient.get().exists("/foo/bar/baz", false)); - - try { - zkClient.get().delete("/foo/bar/baz", -1 /* delete no matter what */); - fail("Expected CREATOR_ALL_ACL to be applied to created path and client2 mutations to be " - + "rejected"); - } catch (NoAuthException e) { - // expected - } - } - - @Test - public void testMagicVersionNumberAllowsUnconditionalUpdate() throws Exception { - String nodePath = "/foo"; - ZooKeeperClient zkClient = createZkClient(); - - zkClient.get().create(nodePath, "init".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - - Stat initStat = new Stat(); - byte[] initialData = zkClient.get().getData(nodePath, false, initStat); - assertArrayEquals("init".getBytes(), initialData); - - // bump the version - Stat rev1Stat = zkClient.get().setData(nodePath, "rev1".getBytes(), initStat.getVersion()); - - try { - zkClient.get().setData(nodePath, "rev2".getBytes(), initStat.getVersion()); - fail("expected correct version to be required"); - } catch (BadVersionException e) { - // expected - } - - // expect using the correct version to work - Stat rev2Stat = zkClient.get().setData(nodePath, "rev2".getBytes(), rev1Stat.getVersion()); - assertNotEquals(ZooKeeperUtils.ANY_VERSION, rev2Stat.getVersion()); - - zkClient.get().setData(nodePath, "force-write".getBytes(), ZooKeeperUtils.ANY_VERSION); - Stat forceWriteStat = new Stat(); - byte[] forceWriteData = zkClient.get().getData(nodePath, false, forceWriteStat); - assertArrayEquals("force-write".getBytes(), forceWriteData); - - assertTrue(forceWriteStat.getVersion() > rev2Stat.getVersion()); - assertNotEquals(ZooKeeperUtils.ANY_VERSION, forceWriteStat.getVersion()); - } +public class ZooKeeperUtilsTest extends BaseZooKeeperTest { @Test public void testNormalizingPath() throws Exception { @@ -135,5 +64,4 @@ public class ZooKeeperUtilsTest extends BaseZooKeeperClientTest { // expected } } - } http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java deleted file mode 100644 index 339f63b..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceDiscoveryModule.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.net.InetSocketAddress; -import java.util.List; - -import javax.inject.Singleton; - -import com.google.inject.Exposed; -import com.google.inject.PrivateModule; -import com.google.inject.Provides; - -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.zookeeper.ServerSetImpl; -import org.apache.aurora.common.zookeeper.SingletonService; -import org.apache.aurora.common.zookeeper.SingletonServiceImpl; -import org.apache.aurora.common.zookeeper.ZooKeeperClient; -import org.apache.aurora.common.zookeeper.ZooKeeperUtils; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; -import org.apache.zookeeper.data.ACL; - -import static java.util.Objects.requireNonNull; - -/** - * Binding module for utilities to advertise the network presence of the scheduler. - * - * Uses a fork of Twitter commons/zookeeper. - */ -class CommonsServiceDiscoveryModule extends PrivateModule { - - private final String discoveryPath; - private final ZooKeeperConfig zooKeeperConfig; - - CommonsServiceDiscoveryModule(String discoveryPath, ZooKeeperConfig zooKeeperConfig) { - this.discoveryPath = ZooKeeperUtils.normalizePath(discoveryPath); - this.zooKeeperConfig = requireNonNull(zooKeeperConfig); - } - - @Override - protected void configure() { - requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY); - requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY); - - bind(ServiceGroupMonitor.class).to(CommonsServiceGroupMonitor.class).in(Singleton.class); - expose(ServiceGroupMonitor.class); - } - - @Provides - @Singleton - ZooKeeperClient provideZooKeeperClient( - @ServiceDiscoveryBindings.ZooKeeper Iterable zooKeeperCluster) { - - return new ZooKeeperClient( - zooKeeperConfig.getSessionTimeout(), - zooKeeperConfig.getCredentials(), - zooKeeperConfig.getChrootPath(), - zooKeeperCluster); - } - - @Provides - @Singleton - ServerSetImpl provideServerSet( - ZooKeeperClient client, - @ServiceDiscoveryBindings.ZooKeeper List zooKeeperAcls) { - - return new ServerSetImpl(client, zooKeeperAcls, discoveryPath); - } - - @Provides - @Singleton - DynamicHostSet provideServerSet(ServerSetImpl serverSet) { - // Used for a type re-binding of the server set. - return serverSet; - } - - // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding. - @Provides - @Singleton - @Exposed - SingletonService provideSingletonService( - ZooKeeperClient client, - ServerSetImpl serverSet, - @ServiceDiscoveryBindings.ZooKeeper List zookeeperAcls) { - - return new SingletonServiceImpl( - serverSet, - SingletonServiceImpl.createSingletonCandidate(client, discoveryPath, zookeeperAcls)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java deleted file mode 100644 index 9161455..0000000 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CommonsServiceGroupMonitor.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.discovery; - -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; - -import javax.inject.Inject; - -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.net.pool.DynamicHostSet; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.scheduler.app.ServiceGroupMonitor; - -import static java.util.Objects.requireNonNull; - -class CommonsServiceGroupMonitor implements ServiceGroupMonitor { - private Optional closeCommand = Optional.empty(); - private final DynamicHostSet serverSet; - private final AtomicReference> services = - new AtomicReference<>(ImmutableSet.of()); - - @Inject - CommonsServiceGroupMonitor(DynamicHostSet serverSet) { - this.serverSet = requireNonNull(serverSet); - } - - @Override - public void start() throws MonitorException { - try { - closeCommand = Optional.of(serverSet.watch(services::set)); - } catch (DynamicHostSet.MonitorException e) { - throw new MonitorException("Unable to watch scheduler host set.", e); - } - } - - @Override - public void close() { - closeCommand.ifPresent(Command::execute); - } - - @Override - public ImmutableSet get() { - return services.get(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java index 40cda8c..77f90ee 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/CuratorServiceDiscoveryModule.java @@ -37,7 +37,7 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.thrift.ServiceInstance; import org.apache.aurora.common.zookeeper.Credentials; -import org.apache.aurora.common.zookeeper.ServerSet; +import org.apache.aurora.common.zookeeper.Encoding; import org.apache.aurora.common.zookeeper.SingletonService; import org.apache.aurora.scheduler.app.ServiceGroupMonitor; import org.apache.curator.RetryPolicy; @@ -76,7 +76,7 @@ class CuratorServiceDiscoveryModule extends PrivateModule { requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_CLUSTER_KEY); requireBinding(ServiceDiscoveryBindings.ZOO_KEEPER_ACL_KEY); - bind(new TypeLiteral>() { }).toInstance(ServerSet.JSON_CODEC); + bind(new TypeLiteral>() { }).toInstance(Encoding.JSON_CODEC); } @Provides http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java index 1e7b9ce..48c7bfd 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/FlaggedZooKeeperConfig.java @@ -40,13 +40,6 @@ public final class FlaggedZooKeeperConfig { @Parameters(separators = "=") public static class Options { - @Parameter(names = "-zk_use_curator", - description = - "DEPRECATED: Uses Apache Curator as the zookeeper client; otherwise a copy of Twitter " - + "commons/zookeeper (the legacy library) is used.", - arity = 1) - public boolean useCurator = true; - @Parameter(names = "-zk_in_proc", description = "Launches an embedded zookeeper server for local testing causing -zk_endpoints " @@ -87,7 +80,6 @@ public final class FlaggedZooKeeperConfig { */ public static ZooKeeperConfig create(Options opts) { return new ZooKeeperConfig( - opts.useCurator, opts.zkEndpoints, Optional.fromNullable(opts.chrootPath), opts.inProcess, http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java index 917a567..7e3b6c4 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ServiceDiscoveryModule.java @@ -28,7 +28,6 @@ import com.google.common.io.Files; import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.AbstractModule; import com.google.inject.Inject; -import com.google.inject.Module; import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.binder.LinkedBindingBuilder; @@ -85,15 +84,7 @@ public class ServiceDiscoveryModule extends AbstractModule { clusterBinder.toInstance(zooKeeperConfig.getServers()); } - install(discoveryModule()); - } - - private Module discoveryModule() { - if (zooKeeperConfig.isUseCurator()) { - return new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig); - } else { - return new CommonsServiceDiscoveryModule(discoveryPath, zooKeeperConfig); - } + install(new CuratorServiceDiscoveryModule(discoveryPath, zooKeeperConfig)); } @Provides http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java index 433ed31..1a7e8cb 100644 --- a/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java +++ b/src/main/java/org/apache/aurora/scheduler/discovery/ZooKeeperConfig.java @@ -44,13 +44,11 @@ public class ZooKeeperConfig { /** * Creates a new client configuration with defaults for the session timeout and credentials. * - * @param useCurator {@code true} to use Apache Curator; otherwise commons/zookeeper is used. * @param servers ZooKeeper server addresses. * @return A new configuration. */ - public static ZooKeeperConfig create(boolean useCurator, Iterable servers) { + public static ZooKeeperConfig create(Iterable servers) { return new ZooKeeperConfig( - useCurator, servers, Optional.absent(), // chrootPath false, @@ -59,7 +57,6 @@ public class ZooKeeperConfig { Optional.absent()); // credentials } - private final boolean useCurator; private final Iterable servers; private final boolean inProcess; private final Amount sessionTimeout; @@ -77,7 +74,6 @@ public class ZooKeeperConfig { * @param credentials ZooKeeper authentication credentials. */ ZooKeeperConfig( - boolean useCurator, Iterable servers, Optional chrootPath, boolean inProcess, @@ -85,7 +81,6 @@ public class ZooKeeperConfig { Amount connectionTimeout, Optional credentials) { - this.useCurator = useCurator; this.servers = MorePreconditions.checkNotBlank(servers); this.chrootPath = requireNonNull(chrootPath); this.inProcess = inProcess; @@ -103,7 +98,6 @@ public class ZooKeeperConfig { */ public ZooKeeperConfig withCredentials(Credentials newCredentials) { return new ZooKeeperConfig( - useCurator, servers, chrootPath, inProcess, @@ -112,10 +106,6 @@ public class ZooKeeperConfig { Optional.of(newCredentials)); } - boolean isUseCurator() { - return useCurator; - } - Iterable getServers() { return servers; } http://git-wip-us.apache.org/repos/asf/aurora/blob/15cb049f/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index a363e70..8e3c1de 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -14,6 +14,7 @@ package org.apache.aurora.scheduler.app; import java.io.File; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Set; @@ -47,9 +48,7 @@ import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; import org.apache.aurora.common.stats.Stats; import org.apache.aurora.common.zookeeper.Credentials; -import org.apache.aurora.common.zookeeper.ServerSetImpl; -import org.apache.aurora.common.zookeeper.ZooKeeperClient; -import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperClientTest; +import org.apache.aurora.common.zookeeper.testing.BaseZooKeeperTest; import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduleStatus; @@ -110,10 +109,9 @@ import static org.easymock.EasyMock.createControl; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class SchedulerIT extends BaseZooKeeperClientTest { +public class SchedulerIT extends BaseZooKeeperTest { private static final Logger LOG = LoggerFactory.getLogger(SchedulerIT.class); @@ -153,7 +151,6 @@ public class SchedulerIT extends BaseZooKeeperClientTest { private Stream logStream; private StreamMatcher streamMatcher; private EntrySerializer entrySerializer; - private ZooKeeperClient zkClient; private File backupDir; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -181,11 +178,9 @@ public class SchedulerIT extends BaseZooKeeperClientTest { entrySerializer = new EntrySerializer.EntrySerializerImpl( Amount.of(512, Data.KB), Hashing.md5()); - - zkClient = createZkClient(); } - private void startScheduler() throws Exception { + private Injector startScheduler() throws Exception { // TODO(wfarner): Try to accomplish all this by subclassing SchedulerMain and actually using // AppLauncher. Module testModule = new AbstractModule() { @@ -215,8 +210,8 @@ public class SchedulerIT extends BaseZooKeeperClientTest { }; ZooKeeperConfig zkClientConfig = ZooKeeperConfig.create( - true, // useCurator - ImmutableList.of(InetSocketAddress.createUnresolved("localhost", getPort()))) + ImmutableList.of( + InetSocketAddress.createUnresolved("localhost", getServer().getPort()))) .withCredentials(Credentials.digestCredentials("mesos", "mesos")); SchedulerMain main = SchedulerMain.class.newInstance(); Injector injector = Guice.createInjector( @@ -245,21 +240,35 @@ public class SchedulerIT extends BaseZooKeeperClientTest { }); injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class)) .awaitHealthy(); + return injector; } - private void awaitSchedulerReady() throws Exception { + private void awaitSchedulerReady(Injector injector) throws Exception { executor.submit(() -> { - ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH); - final CountDownLatch schedulerReady = new CountDownLatch(1); - schedulerService.watch(hostSet -> { - if (!hostSet.isEmpty()) { - schedulerReady.countDown(); + ServiceGroupMonitor groupMonitor = injector.getInstance(ServiceGroupMonitor.class); + try { + // A timeout is used because certain types of assertion errors (mocks) will not surface + // until the main test thread exits this body of code. + long waited = 0; + while (waited < 5000) { + if (groupMonitor.get().isEmpty()) { + try { + Thread.sleep(100); + waited += 100; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + break; + } } - }); - // A timeout is used because certain types of assertion errors (mocks) will not surface - // until the main test thread exits this body of code. - assertTrue(schedulerReady.await(5L, TimeUnit.MINUTES)); - return null; + } finally { + try { + groupMonitor.close(); + } catch (IOException e) { + LOG.info("Failed to close:" + e, e); + } + } }).get(); } @@ -345,7 +354,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest { expect(driver.stop(true)).andReturn(Protos.Status.DRIVER_STOPPED).anyTimes(); control.replay(); - startScheduler(); + Injector injector = startScheduler(); driverStarted.await(); scheduler.getValue().registered( @@ -353,7 +362,7 @@ public class SchedulerIT extends BaseZooKeeperClientTest { Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(), MASTER); - awaitSchedulerReady(); + awaitSchedulerReady(injector); assertEquals(0L, Stats.getVariable("task_store_PENDING").read().longValue()); assertEquals(1L, Stats.getVariable("task_store_ASSIGNED").read().longValue());