geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [02/50] [abbrv] geode git commit: GEODE-1862: Invoke getAllDurableCqsFromServer on the primary queue server
Date Wed, 14 Dec 2016 17:57:17 GMT
GEODE-1862: Invoke getAllDurableCqsFromServer on the primary queue server

Invoking this method on a server that does not have the queue will
return the wrong results.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/05c2388f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/05c2388f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/05c2388f

Branch: refs/heads/feature/GEODE-1930
Commit: 05c2388f5add287aa07a43ca08a85776f8b9f43b
Parents: 9dd4205
Author: Barry Oglesby <boglesby@pivotal.io>
Authored: Thu Sep 1 17:40:03 2016 -0700
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Tue Dec 6 16:53:20 2016 -0800

----------------------------------------------------------------------
 .../cache/client/internal/ConnectionStats.java  |   4 +
 .../cache/client/internal/GetDurableCQsOp.java  |   2 +-
 .../sockets/DurableClientSimpleDUnitTest.java   | 248 ++++++++++++-------
 3 files changed, 159 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/05c2388f/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionStats.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionStats.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionStats.java
index c2229c1..d91719d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionStats.java
@@ -3344,6 +3344,10 @@ public class ConnectionStats implements MessageStats {
     return this.stats.getLong(executeFunctionDurationId);
   }
 
+  public int getGetDurableCqs() {
+    return this.stats.getInt(getDurableCQsId);
+  }
+
   /**
    * Records that the specified GetClientPRMetadata operation is starting
    * <p>

http://git-wip-us.apache.org/repos/asf/geode/blob/05c2388f/geode-cq/src/main/java/org/apache/geode/cache/client/internal/GetDurableCQsOp.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/client/internal/GetDurableCQsOp.java
b/geode-cq/src/main/java/org/apache/geode/cache/client/internal/GetDurableCQsOp.java
index b520c5d..814931c 100755
--- a/geode-cq/src/main/java/org/apache/geode/cache/client/internal/GetDurableCQsOp.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/client/internal/GetDurableCQsOp.java
@@ -43,7 +43,7 @@ public class GetDurableCQsOp {
    */
   public static List<String> execute(ExecutablePool pool) {
     AbstractOp op = new GetDurableCQsOpImpl();
-    return (List<String>) pool.execute(op);
+    return (List<String>) pool.executeOnPrimary(op);
   }
 
   private GetDurableCQsOp() {

http://git-wip-us.apache.org/repos/asf/geode/blob/05c2388f/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
index 68c397d..0ea7050 100644
--- a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
@@ -16,12 +16,15 @@ package org.apache.geode.internal.cache.tier.sockets;
 
 import static org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil.TYPE_CREATE;
 import static org.apache.geode.test.dunit.Assert.fail;
+import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.CacheException;
@@ -31,6 +34,7 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.Pool;
 import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.client.ServerRefusedConnectionException;
+import org.apache.geode.cache.client.internal.ConnectionStats;
 import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
@@ -42,6 +46,7 @@ import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.cache.query.RegionNotFoundException;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.internal.cache.ClientServerObserver;
 import org.apache.geode.internal.cache.ClientServerObserverAdapter;
 import org.apache.geode.internal.cache.ClientServerObserverHolder;
@@ -83,8 +88,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     // normally
     final String durableClientId = getName() + "_client";
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), serverPort,
true),
-        regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
+        getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName,
+        getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
 
     // Send clientReady message
     this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@@ -109,8 +114,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
 
     // Start normal publisher client
     this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(publisherClientVM.getHost()), serverPort,
-            false),
+        getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false),
         regionName));
 
     // Publish some entries
@@ -147,8 +151,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     final String regionName1 = regionName + "1";
     final String regionName2 = regionName + "2";
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClients(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), serverPort,
true),
-        regionName1, regionName2, getClientDistributedSystemProperties(durableClientId)));
+        getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName1,
+        regionName2, getClientDistributedSystemProperties(durableClientId)));
 
     // Send clientReady message
     this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@@ -219,8 +223,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     // stops normally
     final String durableClientId = getName() + "_client";
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), serverPort,
true),
-        regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
+        getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName,
+        getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
 
     // Send clientReady message
     this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@@ -249,8 +253,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
       public void run2() throws CacheException {
         getSystem(getClientDistributedSystemProperties(durableClientId));
         PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory();
-        pf.init(getClientPool(NetworkUtils.getServerHostName(publisherClientVM.getHost()),
-            serverPort, true));
+        pf.init(getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort,
true));
         try {
           pf.create("uncreatablePool");
           fail("Should not have been able to create the pool");
@@ -281,8 +284,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
 
     // Start normal publisher client
     this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(publisherClientVM.getHost()), serverPort,
-            false),
+        getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false),
         regionName));
 
     // Publish some entries
@@ -316,8 +318,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     // stops normally
     final String durableClientId = getName() + "_client";
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), serverPort,
true),
-        regionName, getClientDistributedSystemProperties(durableClientId)));
+        getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName,
+        getClientDistributedSystemProperties(durableClientId)));
 
     // Send clientReady message
     this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@@ -332,8 +334,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     VM durableClient2VM = this.publisherClientVM;
     final String durableClientId2 = getName() + "_client2";
     durableClient2VM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClient2VM.getHost()), serverPort,
true),
-        regionName, getClientDistributedSystemProperties(durableClientId2)));
+        getClientPool(getServerHostName(durableClient2VM.getHost()), serverPort, true), regionName,
+        getClientDistributedSystemProperties(durableClientId2)));
 
     // Send clientReady message
     durableClient2VM.invoke(new CacheSerializableRunnable("Send clientReady") {
@@ -404,8 +406,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     final int durableClientTimeout = 60; // keep the client alive for 60 seconds
     // final boolean durableClientKeepAlive = true; // keep the client alive when it stops
normally
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), server1Port,
-            server2Port, true),
+        getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port,
true),
         regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
         Boolean.TRUE));
 
@@ -447,10 +448,9 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     });
 
     // Start normal publisher client
-    this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(publisherClientVM.getHost()), server1Port,
-            server2Port, false),
-        regionName));
+    this.publisherClientVM.invoke(() -> CacheServerTestUtil
+        .createCacheClient(getClientPool(getServerHostName(publisherClientVM.getHost()),
+            server1Port, server2Port, false), regionName));
 
     // Publish some entries
     final int numberOfEntries = 10;
@@ -496,8 +496,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
 
     // Re-start the durable client
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), server1Port,
-            server2Port, true),
+        getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port,
true),
         regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
 
     // Send clientReady message
@@ -565,9 +564,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     final int durableClientTimeout = 60; // keep the client alive for 60 seconds
     // final boolean durableClientKeepAlive = true; // keep the client alive when it stops
normally
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), serverPort,
true),
-        regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
-        Boolean.TRUE));
+        getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName,
+        getClientDistributedSystemProperties(durableClientId, durableClientTimeout), Boolean.TRUE));
 
     // Send clientReady message
     this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@@ -595,8 +593,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     VM durableClient2VM = this.server2VM;
     final String durableClientId2 = getName() + "_client2";
     durableClient2VM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClient2VM.getHost()), serverPort,
true),
-        regionName, getClientDistributedSystemProperties(durableClientId2, durableClientTimeout),
+        getClientPool(getServerHostName(durableClient2VM.getHost()), serverPort, true), regionName,
+        getClientDistributedSystemProperties(durableClientId2, durableClientTimeout),
         Boolean.TRUE));
 
     // Send clientReady message
@@ -648,8 +646,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
 
     // Start normal publisher client
     this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(publisherClientVM.getHost()), serverPort,
-            false),
+        getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false),
         regionName));
 
     // Publish some entries
@@ -726,8 +723,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
 
     // Re-start durable client 1
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), serverPort,
true),
-        regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
+        getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName,
+        getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
 
     // Send clientReady message
     this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
@@ -739,8 +736,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
 
     // Re-start durable client 2
     durableClient2VM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClient2VM.getHost()), serverPort,
true),
-        regionName, getClientDistributedSystemProperties(durableClientId2), Boolean.TRUE));
+        getClientPool(getServerHostName(durableClient2VM.getHost()), serverPort, true), regionName,
+        getClientDistributedSystemProperties(durableClientId2), Boolean.TRUE));
 
     // Send clientReady message
     durableClient2VM.invoke(new CacheSerializableRunnable("Send clientReady") {
@@ -798,8 +795,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     final int durableClientTimeout = 60; // keep the client alive for 60 seconds
     // final boolean durableClientKeepAlive = true; // keep the client alive when it stops
normally
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), server1Port,
-            server2Port, true),
+        getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port,
true),
         regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
         Boolean.TRUE));
 
@@ -863,10 +859,9 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
         new Integer(server2Port)));
 
     // Start normal publisher client
-    this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(publisherClientVM.getHost()), server1Port,
-            server2Port, false),
-        regionName));
+    this.publisherClientVM.invoke(() -> CacheServerTestUtil
+        .createCacheClient(getClientPool(getServerHostName(publisherClientVM.getHost()),
+            server1Port, server2Port, false), regionName));
 
     // Publish some entries
     final int numberOfEntries = 10;
@@ -894,8 +889,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     // Re-start the durable client that is kept alive on the server when it stops
     // normally
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), server1Port,
-            server2Port, true),
+        getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port,
true),
         regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
         Boolean.TRUE));
 
@@ -978,8 +972,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     final String durableClientId = getName() + "_client";
     // make the client use ClientCacheFactory so it will have a default pool
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createClientCache(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), serverPort,
true),
-        regionName, getClientDistributedSystemProperties(durableClientId)));
+        getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName,
+        getClientDistributedSystemProperties(durableClientId)));
 
     // verify that readyForEvents has not yet been called on the client's default pool
     this.durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called")
{
@@ -1069,39 +1063,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
           CacheServerTestUtil.getCache().readyForEvents();
         }
       });
-
-      // Durable client registers durable cq on server
-      this.durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") {
-        @Override
-        public void run2() throws CacheException {
-          // Get the region
-          Region region = CacheServerTestUtil.getCache().getRegion(regionName);
-          assertNotNull(region);
-
-          // Create CQ Attributes.
-          CqAttributesFactory cqAf = new CqAttributesFactory();
-
-          // Initialize and set CqListener.
-          CqListener[] cqListeners = {new CacheServerTestUtil.ControlCqListener()};
-          cqAf.initCqListeners(cqListeners);
-          CqAttributes cqa = cqAf.create();
-
-          // Create cq's
-          // Get the query service for the Pool
-          QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
-
-          try {
-            CqQuery query = queryService.newCq(cqName, "Select * from /" + regionName, cqa,
true);
-            query.execute();
-          } catch (CqExistsException e) {
-            fail("Failed due to ", e);
-          } catch (CqException e) {
-            fail("Failed due to ", e);
-          } catch (RegionNotFoundException e) {
-            fail("Could not find specified region:" + regionName + ":", e);
-          }
-        }
-      });
+      registerDurableCq(cqName);
 
       // Verify durable client on server1
       this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
@@ -1120,8 +1082,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
 
       // Start normal publisher client
       this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-          getClientPool(NetworkUtils.getServerHostName(publisherClientVM.getHost()), serverPort,
-              false),
+          getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false),
           regionName));
 
       // Publish some entries
@@ -1262,6 +1223,41 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     }
   }
 
+  protected void registerDurableCq(final String cqName) {
+    // Durable client registers durable cq on server
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") {
+      @Override
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Create CQ Attributes.
+        CqAttributesFactory cqAf = new CqAttributesFactory();
+
+        // Initialize and set CqListener.
+        CqListener[] cqListeners = {new CacheServerTestUtil.ControlCqListener()};
+        cqAf.initCqListeners(cqListeners);
+        CqAttributes cqa = cqAf.create();
+
+        // Create cq's
+        // Get the query service for the Pool
+        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+
+        try {
+          CqQuery query = queryService.newCq(cqName, "Select * from /" + regionName, cqa,
true);
+          query.execute();
+        } catch (CqExistsException e) {
+          fail("Failed due to ", e);
+        } catch (CqException e) {
+          fail("Failed due to ", e);
+        } catch (RegionNotFoundException e) {
+          fail("Could not find specified region:" + regionName + ":", e);
+        }
+      }
+    });
+  }
+
   private void setPeriodicACKObserver(VM vm) {
     CacheSerializableRunnable cacheSerializableRunnable =
         new CacheSerializableRunnable("Set ClientServerObserver") {
@@ -1359,8 +1355,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
 
     // Start normal publisher client
     this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(publisherClientVM.getHost()), serverPort,
-            false),
+        getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false),
         regionName));
 
     // Publish some entries
@@ -3282,8 +3277,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     // final boolean durableClientKeepAlive = true; // keep the client alive when it stops
normally
     final String durableClientId = getName() + "_client";
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), server1Port,
-            server2Port, true),
+        getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port,
true),
         regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
         Boolean.TRUE));
 
@@ -3355,8 +3349,7 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     // Start up the client again. This time initialize it so that it is not kept
     // alive on the servers when it stops normally.
     this.durableClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(durableClientVM.getHost()), server1Port,
-            server2Port, true),
+        getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port,
true),
         regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE));
 
     // Send clientReady message
@@ -3424,10 +3417,9 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
         .invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true)));
 
     // Start normal publisher client
-    this.publisherClientVM.invoke(() -> CacheServerTestUtil.createCacheClient(
-        getClientPool(NetworkUtils.getServerHostName(this.publisherClientVM.getHost()), server1Port,
-            server2Port, false),
-        this.regionName));
+    this.publisherClientVM.invoke(() -> CacheServerTestUtil
+        .createCacheClient(getClientPool(getServerHostName(this.publisherClientVM.getHost()),
+            server1Port, server2Port, false), this.regionName));
 
     // Create an entry
     publishEntries(1);
@@ -3436,8 +3428,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     final String durableClientId = getName() + "_client";
     final int durableClientTimeout = 60; // keep the client alive for 60 seconds
     restartDurableClient(new Object[] {
-        getClientPool(NetworkUtils.getServerHostName(this.durableClientVM.getHost()), server1Port,
-            server2Port, true),
+        getClientPool(getServerHostName(this.durableClientVM.getHost()), server1Port, server2Port,
+            true),
         regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
         true});
 
@@ -3469,8 +3461,8 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
 
     // Restart durable client
     restartDurableClient(new Object[] {
-        getClientPool(NetworkUtils.getServerHostName(this.durableClientVM.getHost()), server1Port,
-            server2Port, true),
+        getClientPool(getServerHostName(this.durableClientVM.getHost()), server1Port, server2Port,
+            true),
         regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout),
         true});
 
@@ -3499,6 +3491,74 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase
{
     return registered;
   }
 
+  @Test
+  public void testGetAllDurableCqsFromServer() {
+    // Start server 1
+    final int server1Port = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)})).intValue();
+
+    // Start server 2
+    final int server2Port = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)})).intValue();
+
+    // Start a durable client
+    final String durableClientId = getName() + "_client";
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+        new Object[] {
+            getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port,
+                true, 0),
+            regionName, getClientDistributedSystemProperties(durableClientId, 60), Boolean.TRUE});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Register durable CQ
+    String cqName = getName() + "_cq";
+    registerDurableCq(cqName);
+
+    // Execute getAllDurableCqsFromServer on the client
+    List<String> durableCqNames =
+        (List<String>) this.durableClientVM.invoke(() -> getAllDurableCqsFromServer());
+
+    this.durableClientVM.invoke(() -> verifyDurableCqs(durableCqNames, cqName));
+
+    // Stop the durable client
+    this.durableClientVM.invoke(() -> CacheServerTestUtil.closeCache());
+
+    // Stop the servers
+    this.server1VM.invoke(() -> CacheServerTestUtil.closeCache());
+    this.server2VM.invoke(() -> CacheServerTestUtil.closeCache());
+  }
+
+  public static List<String> getAllDurableCqsFromServer() throws CqException {
+    QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+    return queryService.getAllDurableCqsFromServer();
+  }
+
+  public static void verifyDurableCqs(final List<String> durableCqNames,
+      final String registeredCqName) {
+    // Verify the number of durable CQ names is one, and it matches the registered name
+    assertEquals(1, durableCqNames.size());
+    String returnedCqName = durableCqNames.get(0);
+    assertEquals(registeredCqName, returnedCqName);
+
+    // Get client's primary server
+    PoolImpl pool = CacheServerTestUtil.getPool();
+    ServerLocation primaryServerLocation = pool.getPrimary();
+
+    // Verify the primary server was used and no other server was used
+    Map<ServerLocation, ConnectionStats> statistics = pool.getEndpointManager().getAllStats();
+    for (Map.Entry<ServerLocation, ConnectionStats> entry : statistics.entrySet())
{
+      int expectedGetDurableCqInvocations = entry.getKey().equals(primaryServerLocation)
? 1 : 0;
+      assertEquals(expectedGetDurableCqInvocations, entry.getValue().getGetDurableCqs());
+    }
+  }
+
+
   private void waitForEventsRemovedByQueueRemovalMessage(VM secondaryServerVM,
       final String durableClientId, final int numEvents) {
     secondaryServerVM.invoke(() -> DurableClientSimpleDUnitTest


Mime
View raw message