Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CEB7918985 for ; Wed, 20 Jan 2016 02:22:49 +0000 (UTC) Received: (qmail 59133 invoked by uid 500); 20 Jan 2016 02:22:49 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 59097 invoked by uid 500); 20 Jan 2016 02:22:49 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 59088 invoked by uid 99); 20 Jan 2016 02:22:49 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jan 2016 02:22:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 0D313180501 for ; Wed, 20 Jan 2016 02:22:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.247 X-Spam-Level: * X-Spam-Status: No, score=1.247 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id g0Y4wL1dVsLt for ; Wed, 20 Jan 2016 02:22:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 48B8943F82 for ; Wed, 20 Jan 2016 02:22:10 +0000 (UTC) Received: (qmail 53703 invoked by uid 99); 20 Jan 2016 02:22:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jan 2016 02:22:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C6569E0498; Wed, 20 Jan 2016 02:22:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rvs@apache.org To: commits@geode.incubator.apache.org Date: Wed, 20 Jan 2016 02:22:36 -0000 Message-Id: In-Reply-To: <38f5b46407394391bacbec02d10d52c0@git.apache.org> References: <38f5b46407394391bacbec02d10d52c0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [30/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java new file mode 100644 index 0000000..d667b1d --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java @@ -0,0 +1,3356 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.internal.cache.tier.sockets; + +import java.util.Iterator; + +import org.junit.Ignore; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.InterestResultPolicy; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.client.Pool; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.client.ServerRefusedConnectionException; +import com.gemstone.gemfire.cache.client.internal.PoolImpl; +import com.gemstone.gemfire.cache.query.CqAttributes; +import com.gemstone.gemfire.cache.query.CqAttributesFactory; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.CqExistsException; +import com.gemstone.gemfire.cache.query.CqListener; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.RegionNotFoundException; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.internal.cache.PoolFactoryImpl; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + +import dunit.AsyncInvocation; +import dunit.DistributedTestCase; +import dunit.Host; +import dunit.VM; + +public class DurableClientSimpleDUnitTest extends DurableClientTestCase { + + public DurableClientSimpleDUnitTest(String name) { + super(name); + } + /** + * Test that a durable client correctly receives updates. + */ + public void testSimpleDurableClientUpdate() { + // Start a server + int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class, + "createCacheServer", new Object[] {regionName, new Boolean(true)})) + .intValue(); + + // Start a durable client that is not kept alive on the server when it stops + // normally + final String durableClientId = getName() + "_client"; + this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", + new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE}); + + // Send clientReady message + this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") { + public void run2() throws CacheException { + CacheServerTestUtil.getCache().readyForEvents(); + } + }); + + // Have the durable client register interest in all keys + this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") { + public void run2() throws CacheException { + // Get the region + Region region = CacheServerTestUtil.getCache().getRegion(regionName); + assertNotNull(region); + + // Register interest in all keys + region.registerInterestRegex(".*", InterestResultPolicy.NONE, true); + } + }); + + // Start normal publisher client + this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", + new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName}); + + // Publish some entries + final int numberOfEntries = 10; + this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") { + public void run2() throws CacheException { + // Get the region + Region region = CacheServerTestUtil.getCache().getRegion(regionName); + assertNotNull(region); + + // Publish some entries + for (int i=0; i 5"; + String allQuery = "select * from /" + regionName + " p where p.ID > -1"; + String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5"; + + // Start server 1 + Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class, + "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)})); + final int serverPort = ports[0].intValue(); + + final String durableClientId = getName() + "_client"; + + startDurableClient(durableClientVM, durableClientId, serverPort, regionName); + //register durable cqs + createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); + createCq(durableClientVM, "All", allQuery, true); + createCq(durableClientVM, "LessThan5", lessThan5Query, true); + //send client ready + sendClientReady(durableClientVM); + + // Verify durable client on server + verifyDurableClientOnServer(server1VM, durableClientId); + + // Stop the durable client + this.disconnectDurableClient(true); + + // Start normal publisher client + startClient(publisherClientVM, serverPort, regionName); + + // Publish some entries + publishEntries(publisherClientVM, regionName, 10); + + //verify cq stats are correct + checkNumDurableCqs(server1VM, durableClientId, 3); + checkHAQueueSize(server1VM, durableClientId, 10, 11); + + //Restart the durable client + startDurableClient(durableClientVM, durableClientId, serverPort, regionName); + + //Reregister durable cqs + createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true); + createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true); + createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true); + //send client ready + sendClientReady(durableClientVM); + + checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/); + + //Due to the implementation of DurableHARegionQueue where remove is called after dispatch. + //This can cause events to linger in the queue due to a "later" ack and only cleared on + //the next dispatch. We need to send one more message to dispatch, that calls remove one more + //time and any remaining acks (with or without this final published events ack) + flushEntries(server1VM, durableClientVM, regionName); + checkHAQueueSize(server1VM, durableClientId, 0, 1); + + // Stop the durable client + this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the publisher client + this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the server + this.server1VM.invoke(CacheServerTestUtil.class, "closeCache"); + } + + + /** + * Tests the ha queued events stat + * Connects a durable client, registers durable cqs and then shuts down the durable client + * Publisher then does puts onto the server + * Events are queued up and the stats are checked + * Test sleeps until durable client times out + * Stats should now be 0 + * Durable client is then reconnected, no events should exist and stats are rechecked + */ + public void testHAQueueSizeStatExpired() throws Exception { + int timeoutInSeconds = 20; + String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5"; + String allQuery = "select * from /" + regionName + " p where p.ID > -1"; + String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5"; + + // Start server 1 + Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class, + "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)})); + final int serverPort = ports[0].intValue(); + final int mcastPort = ports[1].intValue(); + + final String durableClientId = getName() + "_client"; + + startDurableClient(durableClientVM, durableClientId, serverPort, regionName, timeoutInSeconds); + //register durable cqs + createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); + createCq(durableClientVM, "All", allQuery, true); + createCq(durableClientVM, "LessThan5", lessThan5Query, true); + //send client ready + sendClientReady(durableClientVM); + + // Verify durable client on server + verifyDurableClientOnServer(server1VM, durableClientId); + + // Stop the durable client + this.disconnectDurableClient(true); + + // Start normal publisher client + startClient(publisherClientVM, serverPort, regionName); + + // Publish some entries + publishEntries(publisherClientVM, regionName, 10); + + //verify cq stats are correct + checkNumDurableCqs(server1VM, durableClientId, 3); + checkHAQueueSize(server1VM, durableClientId, 10, 11); + + //pause until timeout + try { + Thread.sleep((timeoutInSeconds + 2) * 1000); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + //Restart the durable client + startDurableClient(durableClientVM, durableClientId, serverPort, regionName); + + //Reregister durable cqs + createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true); + createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true); + createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true); + //send client ready + sendClientReady(durableClientVM); + + checkCqListenerEvents(durableClientVM, "GreaterThan5", 0 /*numEventsExpected*/, 1/*numEventsToWaitFor*/, 5/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "LessThan5", 0 /*numEventsExpected*/, 1/*numEventsToWaitFor*/, 5/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "All", 0 /*numEventsExpected*/, 1/*numEventsToWaitFor*/, 5/*secondsToWait*/); + + //Due to the implementation of DurableHARegionQueue where remove is called after dispatch. + //This can cause events to linger in the queue due to a "later" ack and only cleared on + //the next dispatch. We need to send one more message to dispatch, that calls remove one more + //time and any remaining acks (with or without this final published events ack) + flushEntries(server1VM, durableClientVM, regionName); + checkHAQueueSize(server1VM, durableClientId, 0, 1); + + // Stop the durable client + this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the publisher client + this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the server + this.server1VM.invoke(CacheServerTestUtil.class, "closeCache"); + } + + /** + * Tests the ha queued events stat + * Starts up two servers, shuts one down + * Connects a durable client, registers durable cqs and then shuts down the durable client + * Publisher then does puts onto the server + * Events are queued up + * Durable client is then reconnected but does not send ready for events + * Secondary server is brought back up + * Stats are checked + * Durable client then reregisters cqs and sends ready for events + */ + public void testHAQueueSizeStatForGII() throws Exception { + String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5"; + String allQuery = "select * from /" + regionName + " p where p.ID > -1"; + String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5"; + + // Start server 1 + Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class, + "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)})); + final int serverPort = ports[0].intValue(); + + // Start server 2 using the same mcast port as server 1 + final int serverPort2 = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class, + "createCacheServer", new Object[] {regionName, new Boolean(true)})) + .intValue(); + + //shut down server 2 + closeCache(server2VM); + + final String durableClientId = getName() + "_client"; + this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints"); + + startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName); + + //register durable cqs + createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); + createCq(durableClientVM, "All", allQuery, true); + createCq(durableClientVM, "LessThan5", lessThan5Query, true); + //send client ready + sendClientReady(durableClientVM); + + verifyDurableClientOnServer(server1VM, durableClientId); + checkNumDurableCqs(server1VM, durableClientId, 3); + + // Stop the durable client + this.disconnectDurableClient(true); + + // Start normal publisher client + startClient(publisherClientVM, serverPort, regionName); + + // Publish some entries + publishEntries(publisherClientVM, regionName, 10); + + // Restart the durable client + startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName); + + // Re-start server2, at this point it will be the first time server2 has connected to client + this.server2VM.invoke(CacheServerTestUtil.class, "createCacheServer", + new Object[] { regionName, new Boolean(true), + new Integer(serverPort2)}); + + // Verify durable client on server2 + verifyDurableClientOnServer(server2VM, durableClientId); + + //verify cqs and stats on server 2. These events are through gii, stats should be correct + checkNumDurableCqs(server2VM, durableClientId, 3); + checkHAQueueSize(server2VM, durableClientId, 10, 11); + + closeCache(server1VM); + + //Reregister durable cqs + createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true); + createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true); + createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true); + //send client ready + sendClientReady(durableClientVM); + + //verify cq listeners received events + checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/); + + //Verify stats are 0 for server2 (we failed over) + flushEntries(server2VM, durableClientVM, regionName); + checkHAQueueSize(server2VM, durableClientId, 0, 1); + + checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0); + checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0); + checkCqStatOnServer(server2VM, durableClientId, "All", 0); + + // Stop the durable client + this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the publisher client + this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the servers + this.server1VM.invoke(CacheServerTestUtil.class, "closeCache"); + this.server2VM.invoke(CacheServerTestUtil.class, "closeCache"); + } + + /** + * Tests the ha queued cq stat + */ + public void testHAQueuedCqStat() throws Exception { + String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5"; + String allQuery = "select * from /" + regionName + " p where p.ID > -1"; + String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5"; + + // Start server 1 + Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class, + "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)})); + final int serverPort = ports[0].intValue(); + final int mcastPort = ports[1].intValue(); + + final String durableClientId = getName() + "_client"; + + startDurableClient(durableClientVM, durableClientId, serverPort, regionName); + //register durable cqs + createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); + createCq(durableClientVM, "All", allQuery, true); + createCq(durableClientVM, "LessThan5", lessThan5Query, true); + //send client ready + sendClientReady(durableClientVM); + + // Verify durable client on server + verifyDurableClientOnServer(server1VM, durableClientId); + + // Stop the durable client + this.disconnectDurableClient(true); + + // Start normal publisher client + startClient(publisherClientVM, serverPort, regionName); + + // Publish some entries + publishEntries(publisherClientVM, regionName, 10); + + //verify cq stats are correct + checkNumDurableCqs(server1VM, durableClientId, 3); + checkCqStatOnServer(server1VM, durableClientId, "All", 10); + checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 4); + checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 5); + + //Restart the durable client + startDurableClient(durableClientVM, durableClientId, serverPort, regionName); + + //Reregister durable cqs + createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true); + createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true); + createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true); + //send client ready + sendClientReady(durableClientVM); + + checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/); + + + //Due to the implementation of DurableHARegionQueue where remove is called after dispatch. + //This can cause events to linger in the queue due to a "later" ack and only cleared on + //the next dispatch. We need to send one more message to dispatch, that calls remove one more + //time and any remaining acks (with or without this final published events ack) + flushEntries(server1VM, durableClientVM, regionName); + + checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 0); + checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 0); + checkCqStatOnServer(server1VM, durableClientId, "All", 0); + + // Stop the durable client + this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the publisher client + this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the server + this.server1VM.invoke(CacheServerTestUtil.class, "closeCache"); + } + /** + * @throws Exception + */ + public void testHAQueuedCqStatOnSecondary() throws Exception { + String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5"; + String allQuery = "select * from /" + regionName + " p where p.ID > -1"; + String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5"; + + // Start server 1 + Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class, + "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)})); + final int serverPort = ports[0].intValue(); + + // Start server 2 using the same mcast port as server 1 + final int serverPort2 = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class, + "createCacheServer", new Object[] {regionName, new Boolean(true)})) + .intValue(); + + final String durableClientId = getName() + "_client"; + this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints"); + + startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName); + + //register durable cqs + createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); + createCq(durableClientVM, "All", allQuery, true); + createCq(durableClientVM, "LessThan5", lessThan5Query, true); + //send client ready + sendClientReady(durableClientVM); + + //Verify durable client on server 2 + verifyDurableClientOnServer(server2VM, durableClientId); + + //Verify durable client on server + verifyDurableClientOnServer(server1VM, durableClientId); + + //Stop the durable client + this.disconnectDurableClient(true); + + // Start normal publisher client + startClient(publisherClientVM, serverPort, regionName); + + // Publish some entries + publishEntries(publisherClientVM, regionName, 10); + + //verify cq stats are correct on both servers + checkNumDurableCqs(server1VM, durableClientId, 3); + checkCqStatOnServer(server1VM, durableClientId, "All", 10); + checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 4); + checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 5); + + //verify cq stats are correct + checkNumDurableCqs(server2VM, durableClientId, 3); + checkCqStatOnServer(server2VM, durableClientId, "All", 10); + checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 4); + checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 5); + + //Restart the durable client + startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName); + + //Reregister durable cqs + createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true); + createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true); + createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true); + //send client ready + sendClientReady(durableClientVM); + + checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/); + checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/); + + //Verify stats are 0 for both servers + flushEntries(server1VM, durableClientVM, regionName); + + checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 0); + checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 0); + checkCqStatOnServer(server1VM, durableClientId, "All", 0); + checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0); + checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0); + checkCqStatOnServer(server2VM, durableClientId, "All", 0); + + // Stop the durable client + this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the publisher client + this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache"); + + // Stop the server + this.server1VM.invoke(CacheServerTestUtil.class, "closeCache"); + } + + + /** + * Server 2 comes up, client connects and registers cqs, server 2 then disconnects + * events are put into region + * client goes away + * server 2 comes back up and should get a gii + * check stats + * server 1 goes away + * client comes back and receives all events + * stats should still be correct + * + * @throws Exception + */ + public void testHAQueuedCqStatForGII() throws Exception { + String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5"; + String allQuery = "select * from /" + regionName + " p where p.ID > -1"; + String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5"; + + // Start server 1 + Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class, + "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)})); + final int serverPort = ports[0].intValue(); + + // Start server 2 using the same mcast port as server 1 + final int serverPort2 = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class, + "createCacheServer", new Object[] {regionName, new Boolean(true)})) + .intValue(); + + // Start a durable client that is kept alive on the server when it stops + // normally + final String durableClientId = getName() + "_client"; + this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints"); + + startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName); + + //register durable cqs + createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true); + createCq(durable