Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BCBF718125 for ; Fri, 9 Oct 2015 00:37:50 +0000 (UTC) Received: (qmail 14437 invoked by uid 500); 9 Oct 2015 00:37:50 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 14409 invoked by uid 500); 9 Oct 2015 00:37:50 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 14400 invoked by uid 99); 9 Oct 2015 00:37:50 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Oct 2015 00:37:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 0C222180DDC for ; Fri, 9 Oct 2015 00:37:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.791 X-Spam-Level: X-Spam-Status: No, score=0.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 3uWgbqFtlobw for ; Fri, 9 Oct 2015 00:37:32 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 1DE5F26B61 for ; Fri, 9 Oct 2015 00:37:21 +0000 (UTC) Received: (qmail 5010 invoked by uid 99); 9 Oct 2015 00:37:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Oct 2015 00:37:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9840AE0D59; Fri, 9 Oct 2015 00:37:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.incubator.apache.org Date: Fri, 09 Oct 2015 00:37:53 -0000 Message-Id: <49f54cd83d0c4c58959497fe4902fce7@git.apache.org> In-Reply-To: <2a4b6dced2564102876049863b2f143b@git.apache.org> References: <2a4b6dced2564102876049863b2f143b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [35/50] [abbrv] incubator-geode git commit: GEODE-243: remove deprecated Bridge feature http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientMembershipDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientMembershipDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientMembershipDUnitTest.java new file mode 100644 index 0000000..5dd3cfb --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientMembershipDUnitTest.java @@ -0,0 +1,1642 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache30; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import com.gemstone.gemfire.InternalGemFireException; +import com.gemstone.gemfire.LogWriter; +import com.gemstone.gemfire.Statistics; +import com.gemstone.gemfire.StatisticsType; +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.client.ClientCache; +import com.gemstone.gemfire.cache.client.Pool; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.DurableClientAttributes; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.cache.tier.InternalClientMembership; +import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl; +import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection; +import com.gemstone.gemfire.internal.logging.LocalLogWriter; +import com.gemstone.gemfire.internal.logging.InternalLogWriter; +import com.gemstone.gemfire.management.membership.ClientMembership; +import com.gemstone.gemfire.management.membership.ClientMembershipEvent; +import com.gemstone.gemfire.management.membership.ClientMembershipListener; + +import dunit.DistributedTestCase; +import dunit.Host; +import dunit.SerializableRunnable; +import dunit.VM; +import dunit.DistributedTestCase.WaitCriterion; + +/** + * Tests the ClientMembership API including ClientMembershipListener. + * + * @author Kirk Lund + * @since 4.2.1 + */ +public class ClientMembershipDUnitTest extends ClientServerTestCase { + + protected static final boolean CLIENT = true; + protected static final boolean SERVER = false; + + protected static final int JOINED = 0; + protected static final int LEFT = 1; + protected static final int CRASHED = 2; + + public ClientMembershipDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + getSystem(); + } + + public void tearDown2() throws Exception { + super.tearDown2(); + InternalClientMembership.unregisterAllListeners(); + } + + private void waitForAcceptsInProgressToBe(final int target) + throws Exception { + WaitCriterion ev = new WaitCriterion() { + String excuse; + public boolean done() { + int actual = getAcceptsInProgress(); + if (actual == getAcceptsInProgress()) { + return true; + } + excuse = "accepts in progress (" + actual + ") never became " + target; + return false; + } + public String description() { + return excuse; + } + }; + DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true); + } + + protected int getAcceptsInProgress() { + StatisticsType st = InternalDistributedSystem.getAnyInstance().findType("CacheServerStats"); + Statistics[] s = InternalDistributedSystem.getAnyInstance().findStatisticsByType(st); + return s[0].getInt("acceptsInProgress"); + } + + protected static Socket meanSocket; + + /** test that a server times out waiting for a handshake that + never arrives. + */ + public void testConnectionTimeout() throws Exception { + addExpectedException("failed accepting client connection"); + final Host host = Host.getHost(0); + final String hostName = getServerHostName(host); + final VM vm0 = host.getVM(0); + System.setProperty(AcceptorImpl.ACCEPT_TIMEOUT_PROPERTY_NAME, "1000"); + try { + final int port = startBridgeServer(0); +// AsyncInvocation ai = null; + try { + assertTrue(port != 0); + SerializableRunnable createMeanSocket = new CacheSerializableRunnable("Connect to server with socket") { + public void run2() throws CacheException { + getCache(); // create a cache so we have stats + getLogWriter().info("connecting to cache server with socket"); + try { + InetAddress addr = InetAddress.getByName(hostName); + meanSocket = new Socket(addr, port); + } + catch (Exception e) { + throw new RuntimeException("Test failed to connect or was interrupted", e); + } + } + }; + SerializableRunnable closeMeanSocket = new CacheSerializableRunnable("close mean socket") { + public void run2() throws CacheException { + getLogWriter().info("closing mean socket"); + try { + meanSocket.close(); + } + catch (IOException ignore) { + } + } + }; + + assertEquals(0, getAcceptsInProgress()); + + getLogWriter().info("creating mean socket"); + vm0.invoke(createMeanSocket); + try { + getLogWriter().info("waiting to see it connect on server"); + waitForAcceptsInProgressToBe(1); + } finally { + getLogWriter().info("closing mean socket"); + vm0.invoke(closeMeanSocket); + } + getLogWriter().info("waiting to see accept to go away on server"); + waitForAcceptsInProgressToBe(0); + + // now try it without a close. Server should timeout the mean connect + getLogWriter().info("creating mean socket 2"); + vm0.invoke(createMeanSocket); + try { + getLogWriter().info("waiting to see it connect on server 2"); + waitForAcceptsInProgressToBe(1); + getLogWriter().info("waiting to see accept to go away on server without us closing"); + waitForAcceptsInProgressToBe(0); + } finally { + getLogWriter().info("closing mean socket 2"); + vm0.invoke(closeMeanSocket); + } + +// SerializableRunnable denialOfService = new CacheSerializableRunnable("Do lots of connects") { +// public void run2() throws CacheException { +// int connectionCount = 0; +// ArrayList al = new ArrayList(60000); +// try { +// InetAddress addr = InetAddress.getLocalHost(); +// for (;;) { +// Socket s = new Socket(addr, port); +// al.add(s); +// connectionCount++; +// getLogWriter().info("connected # " + connectionCount + " s=" + s); +// // try { +// // s.close(); +// // } catch (IOException ignore) {} +// } +// } +// catch (Exception e) { +// getLogWriter().info("connected # " + connectionCount +// + " stopped because of exception " + e); +// Iterator it = al.iterator(); +// while (it.hasNext()) { +// Socket s = (Socket)it.next(); +// try { +// s.close(); +// } catch (IOException ignore) {} +// } +// } +// } +// }; +// // now pretend to do a denial of service attack by doing a bunch of connects +// // really fast and see what that does to the server's fds. +// getLogWriter().info("doing denial of service attach"); +// vm0.invoke(denialOfService); +// // @todo darrel: check fd limit? + } + finally { + stopBridgeServers(getCache()); + } + } + finally { + System.getProperties().remove(AcceptorImpl.ACCEPT_TIMEOUT_PROPERTY_NAME); + } + } + + public void testSynchronousEvents() throws Exception { + InternalClientMembership.setForceSynchronous(true); + try { + doTestBasicEvents(); + } + finally { + InternalClientMembership.setForceSynchronous(false); + } + } + + /** + * Tests event notification methods on ClientMembership. + */ + public void testBasicEvents() throws Exception { + doTestBasicEvents(); + } + + public void doTestBasicEvents() throws Exception { + final boolean[] fired = new boolean[3]; + final DistributedMember[] member = new DistributedMember[3]; + final String[] memberId = new String[3]; + final boolean[] isClient = new boolean[3]; + + ClientMembershipListener listener = new ClientMembershipListener() { + public synchronized void memberJoined(ClientMembershipEvent event) { + fired[JOINED] = true; + member[JOINED] = event.getMember(); + memberId[JOINED] = event.getMemberId(); + isClient[JOINED] = event.isClient(); + notify(); + } + public synchronized void memberLeft(ClientMembershipEvent event) { + fired[LEFT] = true; + member[LEFT] = event.getMember(); + memberId[LEFT] = event.getMemberId(); + isClient[LEFT] = event.isClient(); + notify(); + } + public synchronized void memberCrashed(ClientMembershipEvent event) { + fired[CRASHED] = true; + member[CRASHED] = event.getMember(); + memberId[CRASHED] = event.getMemberId(); + isClient[CRASHED] = event.isClient(); + notify(); + } + }; + ClientMembership.registerClientMembershipListener(listener); + + // test JOIN for server + DistributedMember serverJoined = new TestDistributedMember("serverJoined"); + InternalClientMembership.notifyJoined(serverJoined, SERVER); + synchronized(listener) { + if (!fired[JOINED]) { + listener.wait(2000); + } + } + assertTrue(fired[JOINED]); + assertEquals(serverJoined, member[JOINED]); + assertEquals(serverJoined.getId(), memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + // test JOIN for client + DistributedMember clientJoined = new TestDistributedMember("clientJoined"); + InternalClientMembership.notifyJoined(clientJoined, CLIENT); + synchronized(listener) { + if (!fired[JOINED]) { + listener.wait(2000); + } + } + assertTrue(fired[JOINED]); + assertEquals(clientJoined, member[JOINED]); + assertEquals(clientJoined.getId(), memberId[JOINED]); + assertTrue(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + // test LEFT for server + DistributedMember serverLeft = new TestDistributedMember("serverLeft"); + InternalClientMembership.notifyLeft(serverLeft, SERVER); + synchronized(listener) { + if (!fired[LEFT]) { + listener.wait(2000); + } + } + assertFalse(fired[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertTrue(fired[LEFT]); + assertEquals(serverLeft, member[LEFT]); + assertEquals(serverLeft.getId(), memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + // test LEFT for client + DistributedMember clientLeft = new TestDistributedMember("clientLeft"); + InternalClientMembership.notifyLeft(clientLeft, CLIENT); + synchronized(listener) { + if (!fired[LEFT]) { + listener.wait(2000); + } + } + assertFalse(fired[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertTrue(fired[LEFT]); + assertEquals(clientLeft, member[LEFT]); + assertEquals(clientLeft.getId(), memberId[LEFT]); + assertTrue(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + // test CRASHED for server + DistributedMember serverCrashed = new TestDistributedMember("serverCrashed"); + InternalClientMembership.notifyCrashed(serverCrashed, SERVER); + synchronized(listener) { + if (!fired[CRASHED]) { + listener.wait(2000); + } + } + assertFalse(fired[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertTrue(fired[CRASHED]); + assertEquals(serverCrashed, member[CRASHED]); + assertEquals(serverCrashed.getId(), memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + // test CRASHED for client + DistributedMember clientCrashed = new TestDistributedMember("clientCrashed"); + InternalClientMembership.notifyCrashed(clientCrashed, CLIENT); + synchronized(listener) { + if (!fired[CRASHED]) { + listener.wait(2000); + } + } + assertFalse(fired[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertTrue(fired[CRASHED]); + assertEquals(clientCrashed, member[CRASHED]); + assertEquals(clientCrashed.getId(), memberId[CRASHED]); + assertTrue(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + } + + /** + * Resets all elements of arrays used for listener testing. Boolean values + * are reset to false. String values are reset to null. + */ + private void resetArraysForTesting(boolean[] fired, + DistributedMember[] member, + String[] memberId, + boolean[] isClient) { + for (int i = 0; i < fired.length; i++) { + fired[i] = false; + member[i] = null; + memberId[i] = null; + isClient[i] = false; + } + } + + /** + * Tests unregisterClientMembershipListener to ensure that no further events + * are delivered to unregistered listeners. + */ + public void testUnregisterClientMembershipListener() throws Exception { + final boolean[] fired = new boolean[1]; + final DistributedMember[] member = new DistributedMember[1]; + final String[] memberId = new String[1]; + final boolean[] isClient = new boolean[1]; + + ClientMembershipListener listener = new ClientMembershipListener() { + public synchronized void memberJoined(ClientMembershipEvent event) { + fired[0] = true; + member[0] = event.getMember(); + memberId[0] = event.getMemberId(); + isClient[0] = event.isClient(); + notify(); + } + public void memberLeft(ClientMembershipEvent event) { + } + public void memberCrashed(ClientMembershipEvent event) { + } + }; + ClientMembership.registerClientMembershipListener(listener); + + // fire event to make sure listener is registered + DistributedMember clientJoined = new TestDistributedMember("clientJoined"); + InternalClientMembership.notifyJoined(clientJoined, true); + synchronized(listener) { + if (!fired[0]) { + listener.wait(2000); + } + } + assertTrue(fired[0]); + assertEquals(clientJoined, member[0]); + assertEquals(clientJoined.getId(), memberId[0]); + assertTrue(isClient[0]); + + resetArraysForTesting(fired, member, memberId, isClient); + assertFalse(fired[0]); + assertNull(memberId[0]); + assertFalse(isClient[0]); + + // unregister and verify listener is not notified + ClientMembership.unregisterClientMembershipListener(listener); + InternalClientMembership.notifyJoined(clientJoined, true); + synchronized(listener) { + listener.wait(20); + } + assertFalse(fired[0]); + assertNull(member[0]); + assertNull(memberId[0]); + assertFalse(isClient[0]); + } + + public void testMultipleListeners() throws Exception { + final int NUM_LISTENERS = 4; + final boolean[] fired = new boolean[NUM_LISTENERS]; + final DistributedMember[] member = new DistributedMember[NUM_LISTENERS]; + final String[] memberId = new String[NUM_LISTENERS]; + final boolean[] isClient = new boolean[NUM_LISTENERS]; + + final ClientMembershipListener[] listeners = new ClientMembershipListener[NUM_LISTENERS]; + for (int i = 0; i < NUM_LISTENERS; i++) { + final int whichListener = i; + listeners[i] = new ClientMembershipListener() { + public synchronized void memberJoined(ClientMembershipEvent event) { + assertFalse(fired[whichListener]); + assertNull(member[whichListener]); + assertNull(memberId[whichListener]); + assertFalse(isClient[whichListener]); + fired[whichListener] = true; + member[whichListener] = event.getMember(); + memberId[whichListener] = event.getMemberId(); + isClient[whichListener] = event.isClient(); + notify(); + } + public void memberLeft(ClientMembershipEvent event) { + } + public void memberCrashed(ClientMembershipEvent event) { + } + }; + } + + final DistributedMember clientJoined = new TestDistributedMember("clientJoined"); + InternalClientMembership.notifyJoined(clientJoined, true); + for (int i = 0; i < NUM_LISTENERS; i++) { + synchronized(listeners[i]) { + listeners[i].wait(20); + } + assertFalse(fired[i]); + assertNull(member[i]); + assertNull(memberId[i]); + assertFalse(isClient[i]); + } + + // attempt to register same listener twice... 2nd reg should be ignored + // failure would cause an assertion failure in memberJoined impl + ClientMembership.registerClientMembershipListener(listeners[0]); + ClientMembership.registerClientMembershipListener(listeners[0]); + + ClientMembershipListener[] registeredListeners = + ClientMembership.getClientMembershipListeners(); + assertEquals(1, registeredListeners.length); + assertEquals(listeners[0], registeredListeners[0]); + + ClientMembership.registerClientMembershipListener(listeners[1]); + registeredListeners = ClientMembership.getClientMembershipListeners(); + assertEquals(2, registeredListeners.length); + assertEquals(listeners[0], registeredListeners[0]); + assertEquals(listeners[1], registeredListeners[1]); + + InternalClientMembership.notifyJoined(clientJoined, true); + synchronized(listeners[1]) { + if (!fired[1]) { + listeners[1].wait(2000); + } + } + for (int i = 0; i < NUM_LISTENERS; i++) { + if (i < 2) { + assertTrue(fired[i]); + assertEquals(clientJoined, member[i]); + assertEquals(clientJoined.getId(), memberId[i]); + assertTrue(isClient[i]); + } else { + assertFalse(fired[i]); + assertNull(member[i]); + assertNull(memberId[i]); + assertFalse(isClient[i]); + } + } + resetArraysForTesting(fired, member, memberId, isClient); + + ClientMembership.unregisterClientMembershipListener(listeners[0]); + registeredListeners = ClientMembership.getClientMembershipListeners(); + assertEquals(1, registeredListeners.length); + assertEquals(listeners[1], registeredListeners[0]); + + InternalClientMembership.notifyJoined(clientJoined, true); + synchronized(listeners[1]) { + if (!fired[1]) { + listeners[1].wait(2000); + } + } + for (int i = 0; i < NUM_LISTENERS; i++) { + if (i == 1) { + assertTrue(fired[i]); + assertEquals(clientJoined, member[i]); + assertEquals(clientJoined.getId(), memberId[i]); + assertTrue(isClient[i]); + } else { + assertFalse(fired[i]); + assertNull(member[i]); + assertNull(memberId[i]); + assertFalse(isClient[i]); + } + } + resetArraysForTesting(fired, member, memberId, isClient); + + ClientMembership.registerClientMembershipListener(listeners[2]); + ClientMembership.registerClientMembershipListener(listeners[3]); + registeredListeners = ClientMembership.getClientMembershipListeners(); + assertEquals(3, registeredListeners.length); + assertEquals(listeners[1], registeredListeners[0]); + assertEquals(listeners[2], registeredListeners[1]); + assertEquals(listeners[3], registeredListeners[2]); + + InternalClientMembership.notifyJoined(clientJoined, true); + synchronized(listeners[3]) { + if (!fired[3]) { + listeners[3].wait(2000); + } + } + for (int i = 0; i < NUM_LISTENERS; i++) { + if (i != 0) { + assertTrue(fired[i]); + assertEquals(clientJoined, member[i]); + assertEquals(clientJoined.getId(), memberId[i]); + assertTrue(isClient[i]); + } else { + assertFalse(fired[i]); + assertNull(member[i]); + assertNull(memberId[i]); + assertFalse(isClient[i]); + } + } + resetArraysForTesting(fired, member, memberId, isClient); + + ClientMembership.registerClientMembershipListener(listeners[0]); + registeredListeners = ClientMembership.getClientMembershipListeners(); + assertEquals(4, registeredListeners.length); + assertEquals(listeners[1], registeredListeners[0]); + assertEquals(listeners[2], registeredListeners[1]); + assertEquals(listeners[3], registeredListeners[2]); + assertEquals(listeners[0], registeredListeners[3]); + + InternalClientMembership.notifyJoined(clientJoined, true); + synchronized(listeners[0]) { + if (!fired[0]) { + listeners[0].wait(2000); + } + } + for (int i = 0; i < NUM_LISTENERS; i++) { + assertTrue(fired[i]); + assertEquals(clientJoined, member[i]); + assertEquals(clientJoined.getId(), memberId[i]); + assertTrue(isClient[i]); + } + resetArraysForTesting(fired, member, memberId, isClient); + + ClientMembership.unregisterClientMembershipListener(listeners[3]); + registeredListeners = ClientMembership.getClientMembershipListeners(); + assertEquals(3, registeredListeners.length); + assertEquals(listeners[1], registeredListeners[0]); + assertEquals(listeners[2], registeredListeners[1]); + assertEquals(listeners[0], registeredListeners[2]); + + InternalClientMembership.notifyJoined(clientJoined, true); + synchronized(listeners[0]) { + if (!fired[0]) { + listeners[0].wait(2000); + } + } + for (int i = 0; i < NUM_LISTENERS; i++) { + if (i < 3) { + assertTrue(fired[i]); + assertEquals(clientJoined, member[i]); + assertEquals(clientJoined.getId(), memberId[i]); + assertTrue(isClient[i]); + } else { + assertFalse(fired[i]); + assertNull(member[i]); + assertNull(memberId[i]); + assertFalse(isClient[i]); + } + } + resetArraysForTesting(fired, member, memberId, isClient); + + ClientMembership.unregisterClientMembershipListener(listeners[2]); + registeredListeners = ClientMembership.getClientMembershipListeners(); + assertEquals(2, registeredListeners.length); + assertEquals(listeners[1], registeredListeners[0]); + assertEquals(listeners[0], registeredListeners[1]); + + InternalClientMembership.notifyJoined(clientJoined, true); + synchronized(listeners[0]) { + if (!fired[0]) { + listeners[0].wait(2000); + } + } + for (int i = 0; i < NUM_LISTENERS; i++) { + if (i < 2) { + assertTrue(fired[i]); + assertEquals(clientJoined, member[i]); + assertEquals(clientJoined.getId(), memberId[i]); + assertTrue(isClient[i]); + } else { + assertFalse(fired[i]); + assertNull(member[i]); + assertNull(memberId[i]); + assertFalse(isClient[i]); + } + } + resetArraysForTesting(fired, member, memberId, isClient); + + ClientMembership.unregisterClientMembershipListener(listeners[1]); + ClientMembership.unregisterClientMembershipListener(listeners[0]); + registeredListeners = ClientMembership.getClientMembershipListeners(); + assertEquals(0, registeredListeners.length); + + InternalClientMembership.notifyJoined(clientJoined, true); + for (int i = 0; i < NUM_LISTENERS; i++) { + synchronized(listeners[i]) { + listeners[i].wait(20); + } + assertFalse(fired[i]); + assertNull(member[i]); + assertNull(memberId[i]); + assertFalse(isClient[i]); + } + resetArraysForTesting(fired, member, memberId, isClient); + + ClientMembership.registerClientMembershipListener(listeners[1]); + registeredListeners = ClientMembership.getClientMembershipListeners(); + assertEquals(1, registeredListeners.length); + assertEquals(listeners[1], registeredListeners[0]); + + InternalClientMembership.notifyJoined(clientJoined, true); + synchronized(listeners[1]) { + if (!fired[1]) { + listeners[1].wait(2000); + } + } + for (int i = 0; i < NUM_LISTENERS; i++) { + if (i == 1) { + assertTrue(fired[i]); + assertEquals(clientJoined, member[i]); + assertEquals(clientJoined.getId(), memberId[i]); + assertTrue(isClient[i]); + } else { + assertFalse(fired[i]); + assertNull(member[i]); + assertNull(memberId[i]); + assertFalse(isClient[i]); + } + } + } + + protected static int testClientMembershipEventsInClient_port; + private static int getTestClientMembershipEventsInClient_port() { + return testClientMembershipEventsInClient_port; + } + /** + * Tests notification of events in client process. Bridge clients detect + * server joins when the client connects to the server. If the server + * crashes or departs gracefully, the client will detect this as a crash. + */ + public void testClientMembershipEventsInClient() throws Exception { + addExpectedException("IOException"); + final boolean[] fired = new boolean[3]; + final DistributedMember[] member = new DistributedMember[3]; + final String[] memberId = new String[3]; + final boolean[] isClient = new boolean[3]; + + // create and register ClientMembershipListener in controller vm... + ClientMembershipListener listener = new ClientMembershipListener() { + public synchronized void memberJoined(ClientMembershipEvent event) { + getLogWriter().info("[testClientMembershipEventsInClient] memberJoined: " + event); + fired[JOINED] = true; + member[JOINED] = event.getMember(); + memberId[JOINED] = event.getMemberId(); + isClient[JOINED] = event.isClient(); + notifyAll(); + } + public synchronized void memberLeft(ClientMembershipEvent event) { + getLogWriter().info("[testClientMembershipEventsInClient] memberLeft: " + event); +// fail("Please update testClientMembershipEventsInClient to handle memberLeft for BridgeServer."); + } + public synchronized void memberCrashed(ClientMembershipEvent event) { + getLogWriter().info("[testClientMembershipEventsInClient] memberCrashed: " + event); + fired[CRASHED] = true; + member[CRASHED] = event.getMember(); + memberId[CRASHED] = event.getMemberId(); + isClient[CRASHED] = event.isClient(); + notifyAll(); + } + }; + ClientMembership.registerClientMembershipListener(listener); + + final VM vm0 = Host.getHost(0).getVM(0); + final String name = this.getUniqueName(); + final int[] ports = new int[1]; + + // create BridgeServer in vm0... + vm0.invoke(new CacheSerializableRunnable("Create BridgeServer") { + public void run2() throws CacheException { + try { + getLogWriter().info("[testClientMembershipEventsInClient] Create BridgeServer"); + getSystem(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + Region region = createRegion(name, factory.create()); + assertNotNull(region); + assertNotNull(getRootRegion().getSubregion(name)); + testClientMembershipEventsInClient_port = startBridgeServer(0); + } + catch(IOException e) { + getSystem().getLogWriter().fine(new Exception(e)); + fail("Failed to start CacheServer on VM1: " + e.getMessage()); + } + } + }); + + // gather details for later creation of ConnectionPool... + ports[0] = vm0.invokeInt(ClientMembershipDUnitTest.class, + "getTestClientMembershipEventsInClient_port"); + assertTrue(ports[0] != 0); + + DistributedMember serverMember = (DistributedMember) vm0.invoke(ClientMembershipDUnitTest.class, + "getDistributedMember"); + + String serverMemberId = (String) vm0.invoke(ClientMembershipDUnitTest.class, + "getMemberId"); + + getLogWriter().info("[testClientMembershipEventsInClient] ports[0]=" + ports[0]); + getLogWriter().info("[testClientMembershipEventsInClient] serverMember=" + serverMember); + getLogWriter().info("[testClientMembershipEventsInClient] serverMemberId=" + serverMemberId); + + assertFalse(fired[JOINED]); + assertNull(member[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(member[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + + // sanity check... + getLogWriter().info("[testClientMembershipEventsInClient] sanity check"); + DistributedMember test = new TestDistributedMember("test"); + InternalClientMembership.notifyJoined(test, SERVER); + synchronized(listener) { + if (!fired[JOINED] && !fired[CRASHED]) { + listener.wait(2000); + } + } + + assertTrue(fired[JOINED]); + assertEquals(test, member[JOINED]); + assertEquals(test.getId(), memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(member[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + // create bridge client in controller vm... + getLogWriter().info("[testClientMembershipEventsInClient] create bridge client"); + Properties config = new Properties(); + config.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + config.setProperty(DistributionConfig.LOCATORS_NAME, ""); + getSystem(config); + + try { + getCache(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + ClientServerTestCase.configureConnectionPool(factory, getServerHostName(Host.getHost(0)), ports, true, -1, -1, null); + createRegion(name, factory.create()); + assertNotNull(getRootRegion().getSubregion(name)); + } + catch (CacheException ex) { + fail("While creating Region on Edge", ex); + } + synchronized(listener) { + if (!fired[JOINED] && !fired[CRASHED]) { + listener.wait(60 * 1000); + } + } + + getLogWriter().info("[testClientMembershipEventsInClient] assert client detected server join"); + + // first check the getCurrentServers() result + ClientCache clientCache = (ClientCache)getCache(); + Set servers = clientCache.getCurrentServers(); + assertTrue(!servers.isEmpty()); + InetSocketAddress serverAddr = servers.iterator().next(); + InetSocketAddress expectedAddr = new InetSocketAddress(serverMember.getHost(), ports[0]); + assertEquals(expectedAddr, serverAddr); + + // now check listener results + assertTrue(fired[JOINED]); + assertNotNull(member[JOINED]); + assertNotNull(memberId[JOINED]); + assertEquals(serverMember, member[JOINED]); + assertEquals(serverMemberId, memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(member[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + vm0.invoke(new SerializableRunnable("Stop BridgeServer") { + public void run() { + getLogWriter().info("[testClientMembershipEventsInClient] Stop BridgeServer"); + stopBridgeServers(getCache()); + } + }); + synchronized(listener) { + if (!fired[JOINED] && !fired[CRASHED]) { + listener.wait(60 * 1000); + } + } + + getLogWriter().info("[testClientMembershipEventsInClient] assert client detected server departure"); + assertFalse(fired[JOINED]); + assertNull(member[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertTrue(fired[CRASHED]); + assertNotNull(member[CRASHED]); + assertNotNull(memberId[CRASHED]); + assertEquals(serverMember, member[CRASHED]); + assertEquals(serverMemberId, memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + //now test that we redisover the bridge server + vm0.invoke(new CacheSerializableRunnable("Recreate BridgeServer") { + public void run2() throws CacheException { + try { + getLogWriter().info("[testClientMembershipEventsInClient] restarting BridgeServer"); + startBridgeServer(ports[0]); + } + catch(IOException e) { + getSystem().getLogWriter().fine(new Exception(e)); + fail("Failed to start CacheServer on VM1: " + e.getMessage()); + } + } + }); + synchronized(listener) { + if (!fired[JOINED] && !fired[CRASHED]) { + listener.wait(60 * 1000); + } + } + + getLogWriter().info("[testClientMembershipEventsInClient] assert client detected server recovery"); + assertTrue(fired[JOINED]); + assertNotNull(member[JOINED]); + assertNotNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertEquals(serverMember, member[JOINED]); + assertEquals(serverMemberId, memberId[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(member[CRASHED]); + assertNull(memberId[CRASHED]); + } + + /** + * Tests notification of events in server process. Bridge servers detect + * client joins when the client connects to the server. + */ + public void testClientMembershipEventsInServer() throws Exception { + final boolean[] fired = new boolean[3]; + final DistributedMember[] member = new DistributedMember[3]; + final String[] memberId = new String[3]; + final boolean[] isClient = new boolean[3]; + + // create and register ClientMembershipListener in controller vm... + ClientMembershipListener listener = new ClientMembershipListener() { + public synchronized void memberJoined(ClientMembershipEvent event) { + getLogWriter().info("[testClientMembershipEventsInServer] memberJoined: " + event); + fired[JOINED] = true; + member[JOINED] = event.getMember(); + memberId[JOINED] = event.getMemberId(); + isClient[JOINED] = event.isClient(); + notifyAll(); + assertFalse(fired[LEFT] || fired[CRASHED]); + } + public synchronized void memberLeft(ClientMembershipEvent event) { + getLogWriter().info("[testClientMembershipEventsInServer] memberLeft: " + event); + fired[LEFT] = true; + member[LEFT] = event.getMember(); + memberId[LEFT] = event.getMemberId(); + isClient[LEFT] = event.isClient(); + notifyAll(); + assertFalse(fired[JOINED] || fired[CRASHED]); + } + public synchronized void memberCrashed(ClientMembershipEvent event) { + getLogWriter().info("[testClientMembershipEventsInServer] memberCrashed: " + event); + fired[CRASHED] = true; + member[CRASHED] = event.getMember(); + memberId[CRASHED] = event.getMemberId(); + isClient[CRASHED] = event.isClient(); + notifyAll(); + assertFalse(fired[JOINED] || fired[LEFT]); + } + }; + ClientMembership.registerClientMembershipListener(listener); + + final VM vm0 = Host.getHost(0).getVM(0); + final String name = this.getUniqueName(); + final int[] ports = new int[1]; + + // create BridgeServer in controller vm... + getLogWriter().info("[testClientMembershipEventsInServer] Create BridgeServer"); + getSystem(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + Region region = createRegion(name, factory.create()); + assertNotNull(region); + assertNotNull(getRootRegion().getSubregion(name)); + + ports[0] = startBridgeServer(0); + assertTrue(ports[0] != 0); + String serverMemberId = getMemberId(); + DistributedMember serverMember = getDistributedMember(); + + getLogWriter().info("[testClientMembershipEventsInServer] ports[0]=" + ports[0]); + getLogWriter().info("[testClientMembershipEventsInServer] serverMemberId=" + serverMemberId); + getLogWriter().info("[testClientMembershipEventsInServer] serverMember=" + serverMember); + + assertFalse(fired[JOINED]); + assertNull(member[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(member[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + + // sanity check... + getLogWriter().info("[testClientMembershipEventsInServer] sanity check"); + DistributedMember test = new TestDistributedMember("test"); + InternalClientMembership.notifyJoined(test, CLIENT); + synchronized(listener) { + if (!fired[JOINED] && !fired[LEFT] && !fired[CRASHED]) { + listener.wait(2000); + } + } + assertTrue(fired[JOINED]); + assertEquals(test, member[JOINED]); + assertEquals(test.getId(), memberId[JOINED]); + assertTrue(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(member[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + final Host host = Host.getHost(0); + SerializableRunnable createConnectionPool = + new CacheSerializableRunnable("Create connectionPool") { + public void run2() throws CacheException { + getLogWriter().info("[testClientMembershipEventsInServer] create bridge client"); + Properties config = new Properties(); + config.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + config.setProperty(DistributionConfig.LOCATORS_NAME, ""); + getSystem(config); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + ClientServerTestCase.configureConnectionPool(factory, getServerHostName(host), ports, true, -1, 2, null); + createRegion(name, factory.create()); + assertNotNull(getRootRegion().getSubregion(name)); + } + }; + + // create bridge client in vm0... + vm0.invoke(createConnectionPool); + String clientMemberId = (String) vm0.invoke(ClientMembershipDUnitTest.class, + "getMemberId"); + DistributedMember clientMember = (DistributedMember) vm0.invoke(ClientMembershipDUnitTest.class, + "getDistributedMember"); + + synchronized(listener) { + if (!fired[JOINED] && !fired[LEFT] && !fired[CRASHED]) { + listener.wait(60000); + } + } + + getLogWriter().info("[testClientMembershipEventsInServer] assert server detected client join"); + assertTrue(fired[JOINED]); + assertEquals(member[JOINED] + " should equal " + clientMember, + clientMember, member[JOINED]); + assertEquals(memberId[JOINED] + " should equal " + clientMemberId, + clientMemberId, memberId[JOINED]); + assertTrue(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(member[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + pauseForClientToJoin(); + + vm0.invoke(new SerializableRunnable("Stop bridge client") { + public void run() { + getLogWriter().info("[testClientMembershipEventsInServer] Stop bridge client"); + getRootRegion().getSubregion(name).close(); + Map m = PoolManager.getAll(); + Iterator mit = m.values().iterator(); + while(mit.hasNext()) { + Pool p = (Pool)mit.next(); + p.destroy(); + } + } + }); + + synchronized(listener) { + if (!fired[JOINED] && !fired[LEFT] && !fired[CRASHED]) { + listener.wait(60000); + } + } + + getLogWriter().info("[testClientMembershipEventsInServer] assert server detected client left"); + assertFalse(fired[JOINED]); + assertNull(member[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertTrue(fired[LEFT]); + assertEquals(clientMember, member[LEFT]); + assertEquals(clientMemberId, memberId[LEFT]); + assertTrue(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(member[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + // reconnect bridge client to test for crashed event + vm0.invoke(createConnectionPool); + clientMemberId = (String) vm0.invoke(ClientMembershipDUnitTest.class, + "getMemberId"); + + synchronized(listener) { + if (!fired[JOINED] && !fired[LEFT] && !fired[CRASHED]) { + listener.wait(60000); + } + } + + getLogWriter().info("[testClientMembershipEventsInServer] assert server detected client re-join"); + assertTrue(fired[JOINED]); + assertEquals(clientMember, member[JOINED]); + assertEquals(clientMemberId, memberId[JOINED]); + assertTrue(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertFalse(fired[CRASHED]); + assertNull(member[CRASHED]); + assertNull(memberId[CRASHED]); + assertFalse(isClient[CRASHED]); + resetArraysForTesting(fired, member, memberId, isClient); + + pauseForClientToJoin(); + + ServerConnection.setForceClientCrashEvent(true); + try { + vm0.invoke(new SerializableRunnable("Stop bridge client") { + public void run() { + getLogWriter().info("[testClientMembershipEventsInServer] Stop bridge client"); + getRootRegion().getSubregion(name).close(); + Map m = PoolManager.getAll(); + Iterator mit = m.values().iterator(); + while(mit.hasNext()) { + Pool p = (Pool)mit.next(); + p.destroy(); + } + } + }); + + synchronized(listener) { + if (!fired[JOINED] && !fired[LEFT] && !fired[CRASHED]) { + listener.wait(60000); + } + } + + getLogWriter().info("[testClientMembershipEventsInServer] assert server detected client crashed"); + assertFalse(fired[JOINED]); + assertNull(member[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + assertFalse(fired[LEFT]); + assertNull(member[LEFT]); + assertNull(memberId[LEFT]); + assertFalse(isClient[LEFT]); + assertTrue(fired[CRASHED]); + assertEquals(clientMember, member[CRASHED]); + assertEquals(clientMemberId, memberId[CRASHED]); + assertTrue(isClient[CRASHED]); + } + finally { + ServerConnection.setForceClientCrashEvent(false); + } + } + + /** + * The joined event fires when the first client handshake is processed. + * This pauses long enough to allow the rest of the client sockets to + * complete handshaking before making the client leave. Without doing this + * subsequent socket handshakes that are processed could fire join events + * after departure events and then a departure event again. If you see + * failures in testClientMembershipEventsInServer, try increasing this + * timeout. + */ + private void pauseForClientToJoin() { + pause(2000); + } + + /** + * Tests registration and event notification in conjunction with + * disconnecting and reconnecting to DistributedSystem. + */ + public void testLifecycle() throws Exception { + final boolean[] fired = new boolean[3]; + final DistributedMember[] member = new DistributedMember[3]; + final String[] memberId = new String[3]; + final boolean[] isClient = new boolean[3]; + + // create and register ClientMembershipListener in controller vm... + ClientMembershipListener listener = new ClientMembershipListener() { + public synchronized void memberJoined(ClientMembershipEvent event) { + assertFalse(fired[JOINED]); + assertNull(member[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + fired[JOINED] = true; + member[JOINED] = event.getMember(); + memberId[JOINED] = event.getMemberId(); + isClient[JOINED] = event.isClient(); + notifyAll(); + } + public synchronized void memberLeft(ClientMembershipEvent event) { + } + public synchronized void memberCrashed(ClientMembershipEvent event) { + } + }; + ClientMembership.registerClientMembershipListener(listener); + + // create loner in controller vm... + Properties config = new Properties(); + config.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + config.setProperty(DistributionConfig.LOCATORS_NAME, ""); + getSystem(config); + + // assert that event is fired while connected + DistributedMember serverJoined = new TestDistributedMember("serverJoined"); + InternalClientMembership.notifyJoined(serverJoined, SERVER); + synchronized(listener) { + if (!fired[JOINED]) { + listener.wait(2000); + } + } + assertTrue(fired[JOINED]); + assertEquals(serverJoined, member[JOINED]); + assertEquals(serverJoined.getId(), memberId[JOINED]); + assertFalse(isClient[JOINED]); + resetArraysForTesting(fired, member, memberId, isClient); + + // assert that event is NOT fired while disconnected + disconnectFromDS(); + + + InternalClientMembership.notifyJoined(serverJoined, SERVER); + synchronized(listener) { + listener.wait(20); + } + assertFalse(fired[JOINED]); + assertNull(member[JOINED]); + assertNull(memberId[JOINED]); + assertFalse(isClient[JOINED]); + resetArraysForTesting(fired, member, memberId, isClient); + + // assert that event is fired again after reconnecting + InternalDistributedSystem sys = getSystem(config); + assertTrue(sys.isConnected()); + + InternalClientMembership.notifyJoined(serverJoined, SERVER); + synchronized(listener) { + if (!fired[JOINED]) { + listener.wait(2000); + } + } + assertTrue(fired[JOINED]); + assertEquals(serverJoined, member[JOINED]); + assertEquals(serverJoined.getId(), memberId[JOINED]); + assertFalse(isClient[JOINED]); + } + + /** + * Starts up server in controller vm and 4 clients, then calls and tests + * ClientMembership.getConnectedClients(). + */ + public void testGetConnectedClients() throws Exception { + final String name = this.getUniqueName(); + final int[] ports = new int[1]; + + // create BridgeServer in controller vm... + getLogWriter().info("[testGetConnectedClients] Create BridgeServer"); + getSystem(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + Region region = createRegion(name, factory.create()); + assertNotNull(region); + assertNotNull(getRootRegion().getSubregion(name)); + + ports[0] = startBridgeServer(0); + assertTrue(ports[0] != 0); + String serverMemberId = getMemberId(); + + getLogWriter().info("[testGetConnectedClients] ports[0]=" + ports[0]); + getLogWriter().info("[testGetConnectedClients] serverMemberId=" + serverMemberId); + + final Host host = Host.getHost(0); + SerializableRunnable createPool = + new CacheSerializableRunnable("Create connection pool") { + public void run2() throws CacheException { + getLogWriter().info("[testGetConnectedClients] create bridge client"); + Properties config = new Properties(); + config.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + config.setProperty(DistributionConfig.LOCATORS_NAME, ""); + getSystem(config); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + ClientServerTestCase.configureConnectionPool(factory, getServerHostName(host), ports, true, -1, -1, null); + createRegion(name, factory.create()); + assertNotNull(getRootRegion().getSubregion(name)); + } + }; + + // create bridge client in vm0... + final String[] clientMemberIdArray = new String[host.getVMCount()]; + + for (int i = 0; i < host.getVMCount(); i++) { + final VM vm = Host.getHost(0).getVM(i); + vm.invoke(createPool); + clientMemberIdArray[i] = String.valueOf(vm.invoke( + ClientMembershipDUnitTest.class, "getMemberId")); + } + Collection clientMemberIds = Arrays.asList(clientMemberIdArray); + + { + final int expectedClientCount = clientMemberIds.size(); + WaitCriterion wc = new WaitCriterion() { + public String description() { + return "wait for clients"; + } + public boolean done() { + Map connectedClients = InternalClientMembership.getConnectedClients(false); + if (connectedClients == null) { + return false; + } + if (connectedClients.size() != expectedClientCount) { + return false; + } + return true; + } + }; + waitForCriterion(wc, 10000, 100, false); + } + + Map connectedClients = InternalClientMembership.getConnectedClients(false); + assertNotNull(connectedClients); + assertEquals(clientMemberIds.size(), connectedClients.size()); + for (Iterator iter = connectedClients.keySet().iterator(); iter.hasNext();) { + String connectedClient = (String)iter.next(); + getLogWriter().info("[testGetConnectedClients] checking for client " + connectedClient); + assertTrue(clientMemberIds.contains(connectedClient)); + getLogWriter().info("[testGetConnectedClients] count for connectedClient: " + + connectedClients.get(connectedClient)); + } + } + + /** + * Starts up 4 server and the controller vm as a client, then calls and tests + * ClientMembership.getConnectedServers(). + */ + public void testGetConnectedServers() throws Exception { + final Host host = Host.getHost(0); + final String name = this.getUniqueName(); + final int[] ports = new int[host.getVMCount()]; + + for (int i = 0; i < host.getVMCount(); i++) { + final int whichVM = i; + final VM vm = Host.getHost(0).getVM(i); + vm.invoke(new CacheSerializableRunnable("Create bridge server") { + public void run2() throws CacheException { + // create BridgeServer in controller vm... + getLogWriter().info("[testGetConnectedServers] Create BridgeServer"); + getSystem(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + Region region = createRegion(name+"_"+whichVM, factory.create()); + assertNotNull(region); + assertNotNull(getRootRegion().getSubregion(name+"_"+whichVM)); + region.put("KEY-1", "VAL-1"); + + try { + testGetConnectedServers_port = startBridgeServer(0); + } + catch (IOException e) { + getLogWriter().error("startBridgeServer threw IOException", e); + fail("startBridgeServer threw IOException " + e.getMessage()); + } + + assertTrue(testGetConnectedServers_port != 0); + + getLogWriter().info("[testGetConnectedServers] port=" + + ports[whichVM]); + getLogWriter().info("[testGetConnectedServers] serverMemberId=" + + getDistributedMember()); + } + }); + ports[whichVM] = vm.invokeInt(ClientMembershipDUnitTest.class, + "getTestGetConnectedServers_port"); + assertTrue(ports[whichVM] != 0); + } + + getLogWriter().info("[testGetConnectedServers] create bridge client"); + Properties config = new Properties(); + config.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + config.setProperty(DistributionConfig.LOCATORS_NAME, ""); + getSystem(config); + getCache(); + + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + + for (int i = 0; i < ports.length; i++) { + getLogWriter().info("[testGetConnectedServers] creating connectionpool for " + + getServerHostName(host) + " " + ports[i]); + int[] thisServerPorts = new int[] { ports[i] }; + ClientServerTestCase.configureConnectionPoolWithName(factory, getServerHostName(host), thisServerPorts, false, -1, -1, null,"pooly"+i); + Region region = createRegion(name+"_"+i, factory.create()); + assertNotNull(getRootRegion().getSubregion(name+"_"+i)); + region.get("KEY-1"); + } + + { + final int expectedVMCount = host.getVMCount(); + WaitCriterion wc = new WaitCriterion() { + public String description() { + return "wait for pools and servers"; + } + public boolean done() { + if (PoolManager.getAll().size() != expectedVMCount) { + return false; + } + Map connectedServers = InternalClientMembership.getConnectedServers(); + if (connectedServers == null) { + return false; + } + if (connectedServers.size() != expectedVMCount) { + return false; + } + return true; + } + }; + waitForCriterion(wc, 10000, 100, false); + } + + { + assertEquals(host.getVMCount(), PoolManager.getAll().size()); + + } + + Map connectedServers = InternalClientMembership.getConnectedServers(); + assertNotNull(connectedServers); + assertEquals(host.getVMCount(), connectedServers.size()); + for (Iterator iter = connectedServers.keySet().iterator(); iter.hasNext();) { + String connectedServer = (String) iter.next(); + getLogWriter().info("[testGetConnectedServers] value for connectedServer: " + + connectedServers.get(connectedServer)); + } + } + + protected static int testGetConnectedServers_port; + private static int getTestGetConnectedServers_port() { + return testGetConnectedServers_port; + } + + /** + * Tests getConnectedClients(boolean onlyClientsNotifiedByThisServer) where + * onlyClientsNotifiedByThisServer is true. + */ + public void testGetNotifiedClients() throws Exception { + final Host host = Host.getHost(0); + final String name = this.getUniqueName(); + final int[] ports = new int[host.getVMCount()]; + + for (int i = 0; i < host.getVMCount(); i++) { + final int whichVM = i; + final VM vm = Host.getHost(0).getVM(i); + vm.invoke(new CacheSerializableRunnable("Create bridge server") { + public void run2() throws CacheException { + // create BridgeServer in controller vm... + getLogWriter().info("[testGetNotifiedClients] Create BridgeServer"); + getSystem(); + AttributesFactory factory = new AttributesFactory(); + Region region = createRegion(name, factory.create()); + assertNotNull(region); + assertNotNull(getRootRegion().getSubregion(name)); + region.put("KEY-1", "VAL-1"); + + try { + testGetNotifiedClients_port = startBridgeServer(0); + } + catch (IOException e) { + getLogWriter().error("startBridgeServer threw IOException", e); + fail("startBridgeServer threw IOException " + e.getMessage()); + } + + assertTrue(testGetNotifiedClients_port != 0); + + getLogWriter().info("[testGetNotifiedClients] port=" + + ports[whichVM]); + getLogWriter().info("[testGetNotifiedClients] serverMemberId=" + + getMemberId()); + } + }); + ports[whichVM] = vm.invokeInt(ClientMembershipDUnitTest.class, + "getTestGetNotifiedClients_port"); + assertTrue(ports[whichVM] != 0); + } + + getLogWriter().info("[testGetNotifiedClients] create bridge client"); + Properties config = new Properties(); + config.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + config.setProperty(DistributionConfig.LOCATORS_NAME, ""); + getSystem(config); + getCache(); + + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + + getLogWriter().info("[testGetNotifiedClients] creating connection pool"); + ClientServerTestCase.configureConnectionPool(factory, getServerHostName(host), ports, true, -1, -1, null); + Region region = createRegion(name, factory.create()); + assertNotNull(getRootRegion().getSubregion(name)); + region.registerInterest("KEY-1"); + region.get("KEY-1"); + + final String clientMemberId = getMemberId(); + + pauseForClientToJoin(); + + // assertions go here + int[] clientCounts = new int[host.getVMCount()]; + + // only one server vm will have that client for updating + for (int i = 0; i < host.getVMCount(); i++) { + final int whichVM = i; + final VM vm = Host.getHost(0).getVM(i); + vm.invoke(new CacheSerializableRunnable("Create bridge server") { + public void run2() throws CacheException { + Map clients = InternalClientMembership.getConnectedClients(true); + assertNotNull(clients); + testGetNotifiedClients_clientCount = clients.size(); + if (testGetNotifiedClients_clientCount > 0) { + // assert that the clientMemberId matches + assertEquals(clientMemberId, clients.keySet().iterator().next()); + } + } + }); + clientCounts[whichVM] = vm.invokeInt(ClientMembershipDUnitTest.class, + "getTestGetNotifiedClients_clientCount"); + } + + // only one server should have a notifier for this client... + int totalClientCounts = 0; + for (int i = 0; i < clientCounts.length; i++) { + totalClientCounts += clientCounts[i]; + } + // this assertion fails because the count is 4 + //assertEquals(1, totalClientCounts); + } + protected static int testGetNotifiedClients_port; + private static int getTestGetNotifiedClients_port() { + return testGetNotifiedClients_port; + } + protected static int testGetNotifiedClients_clientCount; + private static int getTestGetNotifiedClients_clientCount() { + return testGetNotifiedClients_clientCount; + } + + // Simple DistributedMember implementation + static final class TestDistributedMember implements DistributedMember { + + private String host; + + public TestDistributedMember(String host) { + this.host = host; + } + + public String getName() { + return ""; + } + + public String getHost() { + return this.host; + } + + public Set getRoles() { + return new HashSet(); + } + + public int getProcessId() { + return 0; + } + + public String getId() { + return this.host; + } + + public int compareTo(DistributedMember o) { + if ((o == null) || !(o instanceof TestDistributedMember)) { + throw new InternalGemFireException("Invalidly comparing TestDistributedMember to " + o); + } + + TestDistributedMember tds = (TestDistributedMember) o; + return getHost().compareTo(tds.getHost()); + } + + public boolean equals(Object obj) { + if ((obj == null) || !(obj instanceof TestDistributedMember)) { + return false; + } + return compareTo((TestDistributedMember)obj) == 0; + } + + public int hashCode() { + return getHost().hashCode(); + } + + public DurableClientAttributes getDurableClientAttributes() { + + return null; + } + + public List getGroups() { + return Collections.emptyList(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientMembershipSelectorDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientMembershipSelectorDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientMembershipSelectorDUnitTest.java new file mode 100644 index 0000000..ebf894e --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientMembershipSelectorDUnitTest.java @@ -0,0 +1,16 @@ +package com.gemstone.gemfire.cache30; + +/** + * Same as parent but uses selector in server + * + * @author darrel + * @since 5.1 + */ +public class ClientMembershipSelectorDUnitTest extends ClientMembershipDUnitTest { + public ClientMembershipSelectorDUnitTest(String name) { + super(name); + } + protected int getMaxThreads() { + return 2; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientRegisterInterestDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientRegisterInterestDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientRegisterInterestDUnitTest.java new file mode 100644 index 0000000..b49ec56 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientRegisterInterestDUnitTest.java @@ -0,0 +1,418 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache30; + +import java.io.IOException; +import java.util.Properties; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.client.internal.PoolImpl; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException; + +import dunit.DistributedTestCase; +import dunit.Host; +import dunit.VM; + +/** + * Tests the client register interest + * + * @author Kirk Lund + * @since 4.2.3 + */ +public class ClientRegisterInterestDUnitTest extends ClientServerTestCase { + + public ClientRegisterInterestDUnitTest(String name) { + super(name); + } + + public void tearDown2() throws Exception { + super.tearDown2(); + disconnectAllFromDS(); // cleans up bridge server and client and lonerDS + } + + /** + * Tests for Bug 35381 Calling register interest if + * establishCallbackConnection is not set causes bridge server NPE. + */ + public void testBug35381() throws Exception { + final Host host = Host.getHost(0); + final String name = this.getUniqueName(); + final int[] ports = new int[1]; // 1 server in this test + + final int whichVM = 0; + final VM vm = Host.getHost(0).getVM(whichVM); + vm.invoke(new CacheSerializableRunnable("Create bridge server") { + public void run2() throws CacheException { + getLogWriter().info("[testBug35381] Create BridgeServer"); + getSystem(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + Region region = createRegion(name, factory.create()); + assertNotNull(region); + assertNotNull(getRootRegion().getSubregion(name)); + region.put("KEY-1", "VAL-1"); + + try { + bridgeServerPort = startBridgeServer(0); + } + catch (IOException e) { + getLogWriter().error("startBridgeServer threw IOException", e); + fail("startBridgeServer threw IOException " + e.getMessage()); + } + + assertTrue(bridgeServerPort != 0); + + getLogWriter().info("[testBug35381] port=" + bridgeServerPort); + getLogWriter().info("[testBug35381] serverMemberId=" + getMemberId()); + } + }); + ports[whichVM] = vm.invokeInt(ClientRegisterInterestDUnitTest.class, + "getBridgeServerPort"); + assertTrue(ports[whichVM] != 0); + + getLogWriter().info("[testBug35381] create bridge client"); + Properties config = new Properties(); + config.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + config.setProperty(DistributionConfig.LOCATORS_NAME, ""); + getSystem(config); + getCache(); + + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + + getLogWriter().info("[testBug35381] creating connection pool"); + boolean establishCallbackConnection = false; // SOURCE OF BUG 35381 + ClientServerTestCase.configureConnectionPool(factory, getServerHostName(host), ports, establishCallbackConnection, -1, -1, null); + Region region = createRegion(name, factory.create()); + assertNotNull(getRootRegion().getSubregion(name)); + try { + region.registerInterest("KEY-1"); + fail("registerInterest failed to throw SubscriptionNotEnabledException with establishCallbackConnection set to false"); + } + catch (SubscriptionNotEnabledException expected) { + } + } + protected static int bridgeServerPort; + private static int getBridgeServerPort() { + return bridgeServerPort; + } + + /** + * Tests failover of register interest from client point of view. Related + * bugs include: + * + *

Bug 35654 "failed re-registration may never be detected and thus + * may never re-re-register" + * + *

Bug 35639 "registerInterest re-registration happens everytime a healthy + * server is detected" + * + *

Bug 35655 "a single failed re-registration causes all other pending + * re-registrations to be cancelled" + */ + public void _testRegisterInterestFailover() throws Exception { + // controller is bridge client + + final Host host = Host.getHost(0); + final String name = this.getUniqueName(); + final String regionName1 = name+"-1"; + final String regionName2 = name+"-2"; + final String regionName3 = name+"-3"; + final String key1 = "KEY-"+regionName1+"-1"; + final String key2 = "KEY-"+regionName1+"-2"; + final String key3 = "KEY-"+regionName1+"-3"; + final int[] ports = new int[3]; // 3 servers in this test + + // create first bridge server with region for client... + final int firstServerIdx = 0; + final VM firstServerVM = Host.getHost(0).getVM(firstServerIdx); + firstServerVM.invoke(new CacheSerializableRunnable("Create first bridge server") { + public void run2() throws CacheException { + getLogWriter().info("[testRegisterInterestFailover] Create first bridge server"); + getSystem(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + Region region1 = createRootRegion(regionName1, factory.create()); + Region region2 = createRootRegion(regionName2, factory.create()); + Region region3 = createRootRegion(regionName3, factory.create()); + region1.put(key1, "VAL-1"); + region2.put(key2, "VAL-1"); + region3.put(key3, "VAL-1"); + + try { + bridgeServerPort = startBridgeServer(0); + } + catch (IOException e) { + getLogWriter().error("startBridgeServer threw IOException", e); + fail("startBridgeServer threw IOException " + e.getMessage()); + } + + assertTrue(bridgeServerPort != 0); + + getLogWriter().info("[testRegisterInterestFailover] " + + "firstServer port=" + bridgeServerPort); + getLogWriter().info("[testRegisterInterestFailover] " + + "firstServer memberId=" + getMemberId()); + } + }); + + // create second bridge server missing region for client... + final int secondServerIdx = 1; + final VM secondServerVM = Host.getHost(0).getVM(secondServerIdx); + secondServerVM.invoke(new CacheSerializableRunnable("Create second bridge server") { + public void run2() throws CacheException { + getLogWriter().info("[testRegisterInterestFailover] Create second bridge server"); + getSystem(); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + Region region1 = createRootRegion(regionName1, factory.create()); + Region region3 = createRootRegion(regionName3, factory.create()); + region1.put(key1, "VAL-2"); + region3.put(key3, "VAL-2"); + + try { + bridgeServerPort = startBridgeServer(0); + } + catch (IOException e) { + getLogWriter().error("startBridgeServer threw IOException", e); + fail("startBridgeServer threw IOException " + e.getMessage()); + } + + assertTrue(bridgeServerPort != 0); + + getLogWriter().info("[testRegisterInterestFailover] " + + "secondServer port=" + bridgeServerPort); + getLogWriter().info("[testRegisterInterestFailover] " + + "secondServer memberId=" + getMemberId()); + } + }); + + // get the bridge server ports... + ports[firstServerIdx] = firstServerVM.invokeInt( + ClientRegisterInterestDUnitTest.class, "getBridgeServerPort"); + assertTrue(ports[firstServerIdx] != 0); + ports[secondServerIdx] = secondServerVM.invokeInt( + ClientRegisterInterestDUnitTest.class, "getBridgeServerPort"); + assertTrue(ports[secondServerIdx] != 0); + assertTrue(ports[firstServerIdx] != ports[secondServerIdx]); + + // stop second and third servers + secondServerVM.invoke(new CacheSerializableRunnable("Stop second bridge server") { + public void run2() throws CacheException { + stopBridgeServers(getCache()); + } + }); + + // create the bridge client + getLogWriter().info("[testBug35654] create bridge client"); + Properties config = new Properties(); + config.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + config.setProperty(DistributionConfig.LOCATORS_NAME, ""); + getSystem(config); + getCache(); + + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + + getLogWriter().info("[testRegisterInterestFailover] creating connection pool"); + boolean establishCallbackConnection = true; + final PoolImpl p = (PoolImpl)ClientServerTestCase.configureConnectionPool(factory, getServerHostName(host), ports, establishCallbackConnection, -1, -1, null); + + final Region region1 = createRootRegion(regionName1, factory.create()); + final Region region2 = createRootRegion(regionName2, factory.create()); + final Region region3 = createRootRegion(regionName3, factory.create()); + + assertTrue(region1.getInterestList().isEmpty()); + assertTrue(region2.getInterestList().isEmpty()); + assertTrue(region3.getInterestList().isEmpty()); + + region1.registerInterest(key1); + region2.registerInterest(key2); + region3.registerInterest(key3); + + assertTrue(region1.getInterestList().contains(key1)); + assertTrue(region2.getInterestList().contains(key2)); + assertTrue(region3.getInterestList().contains(key3)); + + assertTrue(region1.getInterestListRegex().isEmpty()); + assertTrue(region2.getInterestListRegex().isEmpty()); + assertTrue(region3.getInterestListRegex().isEmpty()); + + // get ConnectionProxy and wait until connected to first server + WaitCriterion ev = new WaitCriterion() { + public boolean done() { + return p.getPrimaryPort() != -1; + } + public String description() { + return "primary port remained invalid"; + } + }; + DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true); + assertEquals(ports[firstServerIdx], p.getPrimaryPort()); + + // assert intial values + assertEquals("VAL-1", region1.get(key1)); + assertEquals("VAL-1", region2.get(key2)); + assertEquals("VAL-1", region3.get(key3)); + + // do puts on server1 and make sure values come thru for all 3 registrations + firstServerVM.invoke(new CacheSerializableRunnable("Puts from first bridge server") { + public void run2() throws CacheException { + Region region1 = getCache().getRegion(regionName1); + region1.put(key1, "VAL-1-1"); + Region region2 = getCache().getRegion(regionName2); + region2.put(key2, "VAL-1-1"); + Region region3 = getCache().getRegion(regionName3); + region3.put(key3, "VAL-1-1"); + } + }); + + ev = new WaitCriterion() { + public boolean done() { + if (!"VAL-1-1".equals(region1.get(key1)) || + !"VAL-1-1".equals(region2.get(key2)) || + !"VAL-1-1".equals(region3.get(key3)) + ) return false; + return true; + } + public String description() { + return null; + } + }; + DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true); + assertEquals("VAL-1-1", region1.get(key1)); + assertEquals("VAL-1-1", region2.get(key2)); + assertEquals("VAL-1-1", region3.get(key3)); + + // force failover to server 2 + secondServerVM.invoke(new CacheSerializableRunnable("Start second bridge server") { + public void run2() throws CacheException { + try { + startBridgeServer(ports[secondServerIdx]); + } + catch (IOException e) { + getLogWriter().error("startBridgeServer threw IOException", e); + fail("startBridgeServer threw IOException " + e.getMessage()); + } + } + }); + + firstServerVM.invoke(new CacheSerializableRunnable("Stop first bridge server") { + public void run2() throws CacheException { + stopBridgeServers(getCache()); + } + }); + + // wait for failover to second server + ev = new WaitCriterion() { + public boolean done() { + return ports[secondServerIdx] == p.getPrimaryPort(); + } + public String description() { + return "primary port never became " + ports[secondServerIdx]; + } + }; + DistributedTestCase.waitForCriterion(ev, 100 * 1000, 200, true); + + try { + assertEquals(null, region2.get(key2)); + fail("CacheLoaderException expected"); + } + catch (com.gemstone.gemfire.cache.CacheLoaderException e) { + } + + // region2 registration should be gone now + // do puts on server2 and make sure values come thru for only 2 registrations + secondServerVM.invoke(new CacheSerializableRunnable("Puts from second bridge server") { + public void run2() throws CacheException { + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + createRootRegion(regionName2, factory.create()); + } + }); + + // assert that there is no actively registered interest on region2 + assertTrue(region2.getInterestList().isEmpty()); + assertTrue(region2.getInterestListRegex().isEmpty()); + + region2.put(key2, "VAL-0"); + + secondServerVM.invoke(new CacheSerializableRunnable("Put from second bridge server") { + public void run2() throws CacheException { + Region region1 = getCache().getRegion(regionName1); + region1.put(key1, "VAL-2-2"); + Region region2 = getCache().getRegion(regionName2); + region2.put(key2, "VAL-2-1"); + Region region3 = getCache().getRegion(regionName3); + region3.put(key3, "VAL-2-2"); + } + }); + + // wait for updates to come thru + ev = new WaitCriterion() { + public boolean done() { + if (!"VAL-2-2".equals(region1.get(key1)) || + !"VAL-2-2".equals(region3.get(key3))) + return false; + return true; + } + public String description() { + return null; + } + }; + DistributedTestCase.waitForCriterion(ev, 100 * 1000, 200, true); + assertEquals("VAL-2-2", region1.get(key1)); + assertEquals("VAL-0", region2.get(key2)); + assertEquals("VAL-2-2", region3.get(key3)); + + // assert again that there is no actively registered interest on region2 + assertTrue(region2.getInterestList().isEmpty()); + + // register interest again on region2 and make + region2.registerInterest(key2); + assertEquals("VAL-2-1", region2.get(key2)); + + secondServerVM.invoke(new CacheSerializableRunnable("Put from second bridge server") { + public void run2() throws CacheException { + Region region1 = getCache().getRegion(regionName1); + region1.put(key1, "VAL-2-3"); + Region region2 = getCache().getRegion(regionName2); + region2.put(key2, "VAL-2-2"); + Region region3 = getCache().getRegion(regionName3); + region3.put(key3, "VAL-2-3"); + } + }); + + // wait for updates to come thru + ev = new WaitCriterion() { + public boolean done() { + if (!"VAL-2-3".equals(region1.get(key1)) || + !"VAL-2-2".equals(region2.get(key2)) || + !"VAL-2-3".equals(region3.get(key3))) + return false; + return true; + } + public String description() { + return null; + } + }; + DistributedTestCase.waitForCriterion(ev, 100 * 1000, 200, true); + assertEquals("VAL-2-3", region1.get(key1)); + assertEquals("VAL-2-2", region2.get(key2)); + assertEquals("VAL-2-3", region3.get(key3)); + + // assert public methods report actively registered interest on region2 + assertTrue(region2.getInterestList().contains(key2)); + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientRegisterInterestSelectorDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientRegisterInterestSelectorDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientRegisterInterestSelectorDUnitTest.java new file mode 100644 index 0000000..86c12f1 --- /dev/null +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ClientRegisterInterestSelectorDUnitTest.java @@ -0,0 +1,16 @@ +package com.gemstone.gemfire.cache30; + +/** + * Same as parent but uses selector in server + * + * @author darrel + * @since 5.1 + */ +public class ClientRegisterInterestSelectorDUnitTest extends ClientRegisterInterestDUnitTest { + public ClientRegisterInterestSelectorDUnitTest(String name) { + super(name); + } + protected int getMaxThreads() { + return 2; + } +}