geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [30/70] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Thu, 28 Jan 2016 18:13:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
new file mode 100644
index 0000000..d667b1d
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
@@ -0,0 +1,3356 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+import java.util.Iterator;
+
+import org.junit.Ignore;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.ServerRefusedConnectionException;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqExistsException;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+import dunit.AsyncInvocation;
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
+
+  public DurableClientSimpleDUnitTest(String name) {
+    super(name);
+  }
+  /**
+   * Test that a durable client correctly receives updates.
+   */
+  public void testSimpleDurableClientUpdate() {
+    // Start a server
+    int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+
+    // Start a durable client that is not kept alive on the server when it stops
+    // normally
+    final String durableClientId = getName() + "_client";
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Have the durable client register interest in all keys
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Register interest in all keys
+        region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      }
+    });
+
+    // Start normal publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
+
+    // Publish some entries
+    final int numberOfEntries = 10;
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue);
+        }
+      }
+    });
+    
+    // Verify the durable client received the updates
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, listener.events.size());
+      }
+    });
+    
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+
+  /**
+   * Test that a durable client VM with multiple BridgeClients correctly
+   * registers on the server.
+   */
+  public void testMultipleBridgeClientsInSingleDurableVM() {
+    // Start a server
+    int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+
+    // Start a durable client with 2 regions (and 2 BridgeClients) that is not
+    // kept alive on the server when it stops normally
+    final String durableClientId = getName() + "_client";
+    final String regionName1 = regionName + "1";
+    final String regionName2 = regionName + "2";
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClients", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName1, regionName2, getClientDistributedSystemProperties(durableClientId)});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        assertEquals(2, PoolManager.getAll().size());
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Verify durable clients on server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {       
+        // Get the CacheClientNotifier
+        CacheClientNotifier notifier = getBridgeServer().getAcceptor()
+            .getCacheClientNotifier();
+        
+        // Iterate the CacheClientProxies
+        checkNumberOfClientProxies(2);
+        String firstProxyRegionName = null;
+        for (Iterator i = notifier.getClientProxies().iterator(); i.hasNext();) {
+          CacheClientProxy proxy = (CacheClientProxy) i.next();
+          assertTrue(proxy.isDurable());
+          assertEquals(durableClientId, proxy.getDurableId());
+          assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
+          
+          // Verify the two HA region names aren't the same
+          if (firstProxyRegionName == null) {
+            firstProxyRegionName = proxy.getHARegionName();
+          } else {
+            assertTrue(!firstProxyRegionName.equals(proxy.getHARegionName()));
+          }
+        }
+      }
+    });
+    
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Verify the durable client is no longer on the server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(0);
+      }
+    });
+    
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  /**
+   * Test that a second VM with the same durable id cannot connect to the server
+   * while the first VM is connected. Also, verify that the first client is not
+   * affected by the second one attempting to connect.
+   */
+  public void XtestMultipleVMsWithSameDurableId() {
+    // Start a server
+    final int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+
+    // Start a durable client that is not kept alive on the server when it
+    // stops normally
+    final String durableClientId = getName() + "_client";
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Have the durable client register interest in all keys
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Register interest in all keys
+        region.registerInterestRegex(".*", InterestResultPolicy.NONE);
+      }
+    });
+
+    // Attempt to start another durable client VM with the same id.
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Create another durable client") {
+      public void run2() throws CacheException {
+        getSystem(getClientDistributedSystemProperties(durableClientId));
+        PoolFactoryImpl pf = (PoolFactoryImpl)PoolManager.createFactory();
+        pf.init(getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, true));
+        try {
+          pf.create("uncreatablePool");
+          fail("Should not have been able to create the pool");
+        } catch (ServerRefusedConnectionException e) {
+          // expected exception
+          disconnectFromDS();
+        } catch (Exception e) {
+          fail("Should not have gotten here", e);
+        }
+      }
+    });
+
+    // Verify durable client on server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+        
+        // Verify that it is durable and its properties are correct
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+        assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
+      }
+    });
+    
+    // Start normal publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
+
+    // Publish some entries
+    final int numberOfEntries = 10;
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue);
+        }
+      }
+    });
+      
+    // Verify the durable client received the updates
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, listener.events.size());
+      }
+    });
+    
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+
+  /**
+   * Test that the server correctly processes starting two durable clients.
+   */
+  public void testSimpleTwoDurableClients() {
+    // Start a server
+    int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+
+    // Start a durable client that is not kept alive on the server when it
+    // stops normally
+    final String durableClientId = getName() + "_client";
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId)});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Start another durable client that is not kept alive on the server when
+    // it stops normally. Use the 'publisherClientVM' as a durable client.
+    VM durableClient2VM = this.publisherClientVM;
+    final String durableClientId2 = getName() + "_client2";
+    durableClient2VM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClient2VM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId2)});
+    
+    // Send clientReady message
+    durableClient2VM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Verify durable clients on server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Get the CacheClientNotifier
+        CacheClientNotifier notifier = getBridgeServer().getAcceptor()
+            .getCacheClientNotifier();
+        
+        // Iterate the CacheClientProxies and verify they are correct
+        checkNumberOfClientProxies(2);
+        boolean durableClient1Found=false, durableClient2Found=false;
+        for (Iterator i = notifier.getClientProxies().iterator(); i.hasNext();) {
+          CacheClientProxy proxy = (CacheClientProxy) i.next();
+          assertTrue(proxy.isDurable());
+          if (proxy.getDurableId().equals(durableClientId)) {
+            durableClient1Found = true;
+          }
+          if (proxy.getDurableId().equals(durableClientId2)) {
+            durableClient2Found = true;
+          }
+          assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
+        }
+        assertTrue(durableClient1Found);
+        assertTrue(durableClient2Found);
+      }
+    });
+    
+    // Stop the durable clients
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    durableClient2VM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  /**
+   * Test that starting a durable client on multiple servers (one live and one
+   * not live) is processed correctly.
+   */
+  @Ignore("Disabled for bug 52043")
+  public void DISABLED_testDurableClientMultipleServersOneLive() {
+    // Start server 1
+    final int server1Port = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+
+    // Start server 2
+    final int server2Port = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+    
+    // Stop server 2
+    this.server2VM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Start a durable client that is kept alive on the server when it stops
+    // normally
+    final String durableClientId = getName() + "_client";
+    final int durableClientTimeout = 60; // keep the client alive for 60 seconds
+    //final boolean durableClientKeepAlive = true; // keep the client alive when it stops normally
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port, true), regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout), Boolean.TRUE});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Have the durable client register interest in all keys
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Register interest in all keys
+        region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      }
+    });
+
+    // Verify durable client on server1
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+
+        // Verify that it is durable and its properties are correct
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+        assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+      }
+    });
+
+    // Start normal publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), server1Port, server2Port, false), regionName});
+
+    // Publish some entries
+    final int numberOfEntries = 10;
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue);
+        }
+      }
+    });
+    
+    // Verify the durable client received the updates
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals("Events were" + listener.events, numberOfEntries, listener.events.size());
+      }
+    });
+    
+    try {
+      java.lang.Thread.sleep(10000);
+    }
+    catch (InterruptedException ex) {
+      fail("interrupted");
+    }
+    
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache", new Object[] {new Boolean(true)});
+    
+    // Verify the durable client still exists on the server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+      }
+    });
+
+    // Publish some more entries
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue);
+        }
+      }
+    });
+    
+    // Re-start the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port, true), regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Verify durable client on server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+        
+        // Verify that it is durable and its properties are correct
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+      }
+    });
+        
+    // Verify the durable client received the updates held for it on the server
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals("Events were" + listener.events, numberOfEntries, listener.events.size());
+      }
+    });
+    
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Stop server 1
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+    
+  /**
+   * Test that updates to two durable clients are processed correctly.
+   */
+  public void testTwoDurableClientsStartStopUpdate() {
+    // Start a server
+    int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+
+    // Start a durable client that is kept alive on the server when it stops
+    // normally
+    final String durableClientId = getName() + "_client";
+    final int durableClientTimeout = 60; // keep the client alive for 60 seconds
+    //final boolean durableClientKeepAlive = true; // keep the client alive when it stops normally
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout), Boolean.TRUE});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Have the durable client register interest in all keys
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Register interest in all keys
+        region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      }
+    });
+    
+    // Start another durable client that is not kept alive on the server when
+    // it stops normally. Use the 'server2VM' as the second durable client.
+    VM durableClient2VM = this.server2VM;
+    final String durableClientId2 = getName() + "_client2";
+    durableClient2VM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClient2VM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId2, durableClientTimeout), Boolean.TRUE});
+    
+    // Send clientReady message
+    durableClient2VM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Have the durable client register interest in all keys
+    durableClient2VM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Register interest in all keys
+        region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      }
+    });
+    
+    // Verify durable clients on server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Get the CacheClientNotifier
+        CacheClientNotifier notifier = getBridgeServer().getAcceptor()
+            .getCacheClientNotifier();
+        
+        // Iterate the CacheClientProxies and verify they are correct
+        checkNumberOfClientProxies(2);
+        boolean durableClient1Found=false, durableClient2Found=false;
+        for (Iterator i = notifier.getClientProxies().iterator(); i.hasNext();) {
+          CacheClientProxy proxy = (CacheClientProxy) i.next();
+          assertTrue(proxy.isDurable());
+          if (proxy.getDurableId().equals(durableClientId)) {
+            durableClient1Found = true;
+          }
+          if (proxy.getDurableId().equals(durableClientId2)) {
+            durableClient2Found = true;
+          }
+          assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+        }
+        assertTrue(durableClient1Found);
+        assertTrue(durableClient2Found);
+      }
+    });
+    
+    // Start normal publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
+
+    // Publish some entries
+    final int numberOfEntries = 10;
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue);
+        }
+      }
+    });
+    
+    // Verify durable client 1 received the updates
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, listener.events.size());
+      }
+    });
+    
+    // Verify durable client 2 received the updates
+    durableClient2VM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, listener.events.size());
+      }
+    });
+    
+    // ARB: Wait for queue ack to arrive at server.
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      fail("interrupted");
+    }
+    
+    // Stop the durable clients
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache", new Object[] {new Boolean(true)});
+    durableClient2VM.invoke(CacheServerTestUtil.class, "closeCache", new Object[] {new Boolean(true)});
+    
+    // Verify the durable clients still exist on the server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Get the CacheClientNotifier
+        CacheClientNotifier notifier = getBridgeServer().getAcceptor()
+            .getCacheClientNotifier();
+        
+        // Iterate the CacheClientProxies and verify they are correct
+        checkNumberOfClientProxies(2);
+        boolean durableClient1Found=false, durableClient2Found=false;
+        for (Iterator i = notifier.getClientProxies().iterator(); i.hasNext();) {
+          CacheClientProxy proxy = (CacheClientProxy) i.next();
+          assertTrue(proxy.isDurable());
+          if (proxy.getDurableId().equals(durableClientId)) {
+            durableClient1Found = true;
+          }
+          if (proxy.getDurableId().equals(durableClientId2)) {
+            durableClient2Found = true;
+          }
+          assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+        }
+        assertTrue(durableClient1Found);
+        assertTrue(durableClient2Found);
+      }
+    });
+
+    // Publish some more entries
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue);
+        }
+      }
+    });
+    
+    try {
+      java.lang.Thread.sleep(1000);
+    }
+    catch (java.lang.InterruptedException ex) {
+      fail("interrupted");
+    }
+
+    // Verify the durable clients' queues contain the entries
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Get the CacheClientNotifier
+        CacheClientNotifier notifier = getBridgeServer().getAcceptor()
+            .getCacheClientNotifier();
+        
+        // Iterate the CacheClientProxies and verify the queue sizes
+        checkNumberOfClientProxies(2);
+//        boolean durableClient1Found=false, durableClient2Found=false;
+        for (Iterator i = notifier.getClientProxies().iterator(); i.hasNext();) {
+          CacheClientProxy proxy = (CacheClientProxy) i.next();
+          assertEquals(numberOfEntries, proxy.getQueueSize());
+        }
+      }
+    });
+
+    // Re-start durable client 1
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Re-start durable client 2
+    durableClient2VM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClient2VM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId2), Boolean.TRUE});
+
+    // Send clientReady message
+    durableClient2VM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Verify durable client 1 received the updates held for it on the server
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, listener.events.size());
+      }
+    });
+    
+    // Verify durable client 2 received the updates held for it on the server
+    durableClient2VM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, listener.events.size());
+      }
+    });
+    
+    // Stop durable client 1
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop durable client 2
+    durableClient2VM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  /**
+   * Tests whether a durable client reconnects properly to two servers.
+   */
+  public void testDurableClientReconnectTwoServers() {
+    // Start server 1
+    Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+    
+    // on test flag for periodic ack
+    this.server1VM.invoke(DurableClientTestCase.class, "setTestFlagToVerifyActForMarker",
+        new Object[] { new Boolean(true) });
+    
+    final int server1Port = ports[0].intValue();
+    
+    // Start server 2 using the same mcast port as server 1
+    final int server2Port = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+        
+    // Stop server 2
+    this.server2VM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Start a durable client that is kept alive on the server when it stops
+    // normally
+    final String durableClientId = getName() + "_client";
+    final int durableClientTimeout = 60; // keep the client alive for 60 seconds
+    //final boolean durableClientKeepAlive = true; // keep the client alive when it stops normally
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port, true), regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout), Boolean.TRUE});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Have the durable client register interest in all keys
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Register interest in all keys
+        region.registerInterestRegex(".*", InterestResultPolicy.NONE,true);
+      }
+    });
+    
+    // Verify durable client on server 1
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+        
+        // Verify that it is durable and its properties are correct
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+        assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+        verifyReceivedMarkerAck(proxy);
+      }
+    });
+
+    // VJR: wait for ack to go out
+    pause(5000);
+
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache", new Object[] {new Boolean(true)});
+    
+    // Verify durable client on server 1
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+      }
+    });
+
+    // Re-start server2
+    this.server2VM.invoke(CacheServerTestUtil.class, "createCacheServer",
+        new Object[] { regionName, new Boolean(true),
+            new Integer(server2Port)});
+        
+    // Start normal publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), server1Port, server2Port, false), regionName});
+
+    // Publish some entries
+    final int numberOfEntries = 10;
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue);
+        }
+      }
+    });
+
+    try {
+      java.lang.Thread.sleep(1000);
+    }
+    catch (java.lang.InterruptedException ex) {
+      fail("interrupted");
+    }
+
+    // Verify the durable client's queue contains the entries
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+        
+        // Verify the queue size
+        assertEquals(numberOfEntries, proxy.getQueueSize());
+      }
+    });
+
+    // Re-start the durable client that is kept alive on the server when it stops
+    // normally
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), server1Port, server2Port, true), regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout), Boolean.TRUE});
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Verify durable client on server 1
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+        
+        // Verify that it is durable and its properties are correct
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+        assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+      }
+    });
+
+    // Verify durable client on server 2
+    this.server2VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+        
+        // Verify that it is durable and its properties are correct
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+        assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+      }
+    });
+
+    // Verify the HA region names are the same on both servers
+    String server1HARegionQueueName= (String) this.server1VM.invoke(DurableClientTestCase.class, "getHARegionQueueName");
+    String server2HARegionQueueName= (String) this.server2VM.invoke(DurableClientTestCase.class, "getHARegionQueueName");
+    assertEquals(server1HARegionQueueName, server2HARegionQueueName);
+    
+    // Verify the durable client received the updates
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, listener.events.size());
+      }
+    });
+    
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // off test flag for periodic ack
+    this.server1VM.invoke(DurableClientTestCase.class, "setTestFlagToVerifyActForMarker",
+        new Object[] { new Boolean(false) });
+    
+    // Stop server 1
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Stop server 2
+    this.server2VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  public void testReadyForEventsNotCalledImplicitly() {
+    // Start a server
+    int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+
+    // Start a durable client that is not kept alive on the server when it
+    // stops normally
+    final String durableClientId = getName() + "_client";
+    // make the client use ClientCacheFactory so it will have a default pool
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createClientCache", 
+        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId)});
+
+    // verify that readyForEvents has not yet been called on the client's default pool
+    this.durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called") {
+      public void run2() throws CacheException {
+        for (Pool p: PoolManager.getAll().values()) {
+          assertEquals(false, ((PoolImpl)p).getReadyForEventsCalled());
+        }
+      }
+    });
+    
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Verify durable clients on server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Get the CacheClientNotifier
+        CacheClientNotifier notifier = getBridgeServer().getAcceptor()
+            .getCacheClientNotifier();
+        
+        // Iterate the CacheClientProxies and verify they are correct
+        checkNumberOfClientProxies(1);
+        boolean durableClient1Found=false, durableClient2Found=false;
+        for (Iterator i = notifier.getClientProxies().iterator(); i.hasNext();) {
+          CacheClientProxy proxy = (CacheClientProxy) i.next();
+          assertTrue(proxy.isDurable());
+          if (proxy.getDurableId().equals(durableClientId)) {
+            durableClient1Found = true;
+          }
+          assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
+        }
+        assertTrue(durableClient1Found);
+      }
+    });
+    
+    // Stop the durable clients
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  //This test method is disabled because it is failing
+  //periodically and causing cruise control failures
+  //See bug #47060
+  public void testReadyForEventsNotCalledImplicitlyWithCacheXML() {
+    final String cqName = "cqTest";
+    // Start a server
+    int serverPort = (Integer) this.server1VM.invoke(CacheServerTestUtil.class, "createCacheServerFromXml", new Object[]{ DurableClientTestCase.class.getResource("durablecq-server-cache.xml")});
+
+    // Start a durable client that is not kept alive on the server when it
+    // stops normally
+    final String durableClientId = getName() + "_client";
+    
+    //create client cache from xml
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClientFromXml", new Object[]{ DurableClientTestCase.class.getResource("durablecq-client-cache.xml"), "client", durableClientId, 45, Boolean.FALSE});
+
+    // verify that readyForEvents has not yet been called on all the client's pools
+    this.durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called") {
+      public void run2() throws CacheException {
+        for (Pool p: PoolManager.getAll().values()) {
+          assertEquals(false, ((PoolImpl)p).getReadyForEventsCalled());
+        }
+      }
+    });
+    
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+    
+    //Durable client registers durable cq on server
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+        
+        // Create CQ Attributes.
+        CqAttributesFactory cqAf = new CqAttributesFactory();
+        
+        // Initialize and set CqListener.
+        CqListener[] cqListeners = { new CacheServerTestUtil.ControlCqListener() };
+        cqAf.initCqListeners(cqListeners);
+        CqAttributes cqa = cqAf.create();
+
+        // Create cq's
+        // Get the query service for the Pool
+        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+
+        try { 
+          CqQuery query = queryService.newCq(cqName , "Select * from /" + regionName, cqa, true);
+          query.execute();
+        }
+        catch (CqExistsException e) {
+          fail("Failed due to " + e);
+        }
+        catch (CqException e) {
+          fail("Failed due to " + e);
+        }
+        catch (RegionNotFoundException e) {
+          fail("Could not find specified region:" + regionName + ":" + e);
+        }
+      }
+    });
+
+    // Verify durable client on server1
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+
+        // Verify that it is durable
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+      }
+    });
+    
+    // Start normal publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
+
+    // Publish some entries
+    final int numberOfEntries = 10;
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("publish updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue);
+        }
+      }
+    });
+    
+    // Verify the durable client received the updates
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+        
+        // Get the listener and wait for the appropriate number of events
+        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+        CqQuery cqQuery = queryService.getCq(cqName);
+        CacheServerTestUtil.ControlCqListener cqlistener = (CacheServerTestUtil.ControlCqListener) cqQuery.getCqAttributes().getCqListener();
+        cqlistener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, cqlistener.events.size());
+      }
+    });
+    
+     try {
+      Thread.sleep(10000);
+    }
+    catch (InterruptedException e) {
+      fail("interrupted" + e);
+    }
+    
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache", new Object[] {new Boolean(true)});
+    
+    // Verify the durable client still exists on the server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+      }
+    });
+
+    // Publish some more entries
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish additional updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue + "lkj");
+        }
+      }
+    });
+    
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Re-start the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClientFromXml", new Object[]{ DurableClientTestCase.class.getResource("durablecq-client-cache.xml"), "client", durableClientId, 45,  Boolean.FALSE});
+
+    
+    //Durable client registers durable cq on server
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register cq") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Create CQ Attributes.
+        CqAttributesFactory cqAf = new CqAttributesFactory();
+        
+        // Initialize and set CqListener.
+        CqListener[] cqListeners = { new CacheServerTestUtil.ControlCqListener() };
+        cqAf.initCqListeners(cqListeners);
+        CqAttributes cqa = cqAf.create();
+
+        // Create cq's
+        // Get the query service for the Pool
+        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+
+        try { 
+          CqQuery query = queryService.newCq(cqName , "Select * from /" + regionName, cqa, true);
+          query.execute();
+        }
+        catch (CqExistsException e) {
+          fail("Failed due to " + e);
+        }
+        catch (CqException e) {
+          fail("Failed due to " + e);
+        }
+        catch (RegionNotFoundException e) {
+          fail("Could not find specified region:" + regionName + ":" + e);
+        }
+       
+      }
+    });
+    
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Verify durable client on server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+        
+        // Verify that it is durable and its properties are correct
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+      }
+    });
+        
+    // Verify the durable client received the updates held for it on the server
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+
+        CqQuery cqQuery = queryService.getCq(cqName);
+        
+        CacheServerTestUtil.ControlCqListener cqlistener = (CacheServerTestUtil.ControlCqListener) cqQuery.getCqAttributes().getCqListener();
+        cqlistener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, cqlistener.events.size());
+      }
+    });
+    
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  public void testReadyForEventsNotCalledImplicitlyForRegisterInterestWithCacheXML() {
+    final String cqName = "cqTest";
+    regionName = "testReadyForEventsNotCalledImplicitlyWithCacheXML_region";
+    // Start a server
+    int serverPort = (Integer) this.server1VM.invoke(CacheServerTestUtil.class, "createCacheServerFromXmlN", new Object[]{ DurableClientTestCase.class.getResource("durablecq-server-cache.xml")});
+
+    // Start a durable client that is not kept alive on the server when it
+    // stops normally
+    final String durableClientId = getName() + "_client";
+    
+    //create client cache from xml
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClientFromXmlN", new Object[]{ DurableClientTestCase.class.getResource("durablecq-client-cache.xml"), "client", durableClientId, 45, Boolean.TRUE});
+
+    // verify that readyForEvents has not yet been called on all the client's pools
+    this.durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called") {
+      public void run2() throws CacheException {
+        for (Pool p: PoolManager.getAll().values()) {
+          assertEquals(false, ((PoolImpl)p).getReadyForEventsCalled());
+        }
+      }
+    });
+
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+    
+    //Durable client registers durable cq on server
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register Interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+        
+        // Register interest in all keys
+        region.registerInterestRegex(".*", InterestResultPolicy.KEYS_VALUES, true);
+      }
+    });
+
+    // Verify durable client on server1
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+
+        // Verify that it is durable
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+      }
+    });
+    
+    // Start normal publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+        new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
+
+    // Publish some entries
+    final int numberOfEntries = 10;
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("publish updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue);
+        }
+      }
+    });
+    
+    // Verify the durable client received the updates
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+        
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, listener.events.size());
+      }
+    });
+    try {
+      Thread.sleep(10000);
+    }
+    catch (InterruptedException e) {
+      fail("interrupted" + e);
+    }
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache", new Object[] {new Boolean(true)});
+    
+    // Verify the durable client still exists on the server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+      }
+    });
+
+    // Publish some more entries
+    this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish additional updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Publish some entries
+        for (int i=0; i<numberOfEntries; i++) {
+          String keyAndValue = String.valueOf(i);
+          region.put(keyAndValue, keyAndValue + "lkj");
+        }
+      }
+    });
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+    
+    // Re-start the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClientFromXmlN", new Object[]{ DurableClientTestCase.class.getResource("durablecq-client-cache.xml"), "client", durableClientId, 45,  Boolean.TRUE});
+
+    
+    //Durable client registers durable cq on server
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+
+        // Register interest in all keys
+        region.registerInterestRegex(".*", InterestResultPolicy.KEYS_VALUES, true);
+      }
+    });
+    
+    // Send clientReady message
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+      public void run2() throws CacheException {
+        CacheServerTestUtil.getCache().readyForEvents();
+      }
+    });
+
+    // Verify durable client on server
+    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+      public void run2() throws CacheException {
+        // Find the proxy
+        checkNumberOfClientProxies(1);
+        CacheClientProxy proxy = getClientProxy();
+        assertNotNull(proxy);
+        
+        // Verify that it is durable and its properties are correct
+        assertTrue(proxy.isDurable());
+        assertEquals(durableClientId, proxy.getDurableId());
+      }
+    });
+        
+    // Verify the durable client received the updates held for it on the server
+    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+      public void run2() throws CacheException {
+        // Get the region
+        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+        assertNotNull(region);
+        
+        // Get the listener and wait for the appropriate number of events
+        CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+                .getAttributes().getCacheListeners()[0];
+        listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+        assertEquals(numberOfEntries, listener.events.size());
+      }
+    });
+    
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  /**
+   * Tests the ha queued events stat
+   * Connects a durable client, registers durable cqs and then shuts down the durable client
+   * Publisher then does puts onto the server
+   * Events are queued up and the stats are checked
+   * Durable client is then reconnected, events are dispatched and stats are rechecked
+   */
+  public void testHAQueueSizeStat() throws Exception {
+    String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
+    String allQuery = "select * from /" + regionName + " p where p.ID > -1";
+    String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
+        
+    // Start server 1
+    Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+    final int serverPort = ports[0].intValue();
+    
+    final String durableClientId = getName() + "_client";
+  
+    startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+    //register durable cqs
+    createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
+    createCq(durableClientVM, "All", allQuery, true);
+    createCq(durableClientVM, "LessThan5", lessThan5Query, true);
+    //send client ready
+    sendClientReady(durableClientVM);
+      
+    // Verify durable client on server
+    verifyDurableClientOnServer(server1VM, durableClientId);
+          
+    // Stop the durable client
+    this.disconnectDurableClient(true);
+    
+    // Start normal publisher client
+    startClient(publisherClientVM, serverPort, regionName);
+
+    // Publish some entries
+    publishEntries(publisherClientVM, regionName, 10);
+     
+    //verify cq stats are correct
+    checkNumDurableCqs(server1VM, durableClientId, 3);
+    checkHAQueueSize(server1VM, durableClientId, 10, 11);
+
+    //Restart the durable client
+    startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+  
+    //Reregister durable cqs
+    createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true);
+    createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
+    createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true);
+    //send client ready
+    sendClientReady(durableClientVM);
+  
+    checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    
+    //Due to the implementation of DurableHARegionQueue where remove is called after dispatch.
+    //This can cause events to linger in the queue due to a "later" ack and only cleared on
+    //the next dispatch.  We need to send one more message to dispatch, that calls remove one more
+    //time and any remaining acks (with or without this final published events ack)
+    flushEntries(server1VM, durableClientVM, regionName);
+    checkHAQueueSize(server1VM, durableClientId, 0, 1);
+        
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+
+  /**
+   * Tests the ha queued events stat
+   * Connects a durable client, registers durable cqs and then shuts down the durable client
+   * Publisher then does puts onto the server
+   * Events are queued up and the stats are checked
+   * Test sleeps until durable client times out
+   * Stats should now be 0
+   * Durable client is then reconnected, no events should exist and stats are rechecked
+   */
+  public void testHAQueueSizeStatExpired() throws Exception {
+    int timeoutInSeconds = 20;
+    String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
+    String allQuery = "select * from /" + regionName + " p where p.ID > -1";
+    String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
+        
+    // Start server 1
+    Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+    final int serverPort = ports[0].intValue();
+    final int mcastPort = ports[1].intValue();
+    
+    final String durableClientId = getName() + "_client";
+  
+    startDurableClient(durableClientVM, durableClientId, serverPort, regionName, timeoutInSeconds);
+    //register durable cqs
+    createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
+    createCq(durableClientVM, "All", allQuery, true);
+    createCq(durableClientVM, "LessThan5", lessThan5Query, true);
+    //send client ready
+    sendClientReady(durableClientVM);
+      
+    // Verify durable client on server
+    verifyDurableClientOnServer(server1VM, durableClientId);
+          
+    // Stop the durable client
+    this.disconnectDurableClient(true);
+    
+    // Start normal publisher client
+    startClient(publisherClientVM, serverPort, regionName);
+
+    // Publish some entries
+    publishEntries(publisherClientVM, regionName, 10);
+     
+    //verify cq stats are correct
+    checkNumDurableCqs(server1VM, durableClientId, 3);
+    checkHAQueueSize(server1VM, durableClientId, 10, 11);
+
+    //pause until timeout
+    try {
+      Thread.sleep((timeoutInSeconds + 2) * 1000);
+    }
+    catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+    //Restart the durable client
+    startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+  
+    //Reregister durable cqs
+    createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true);
+    createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
+    createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true);
+    //send client ready
+    sendClientReady(durableClientVM);
+  
+    checkCqListenerEvents(durableClientVM, "GreaterThan5", 0 /*numEventsExpected*/, 1/*numEventsToWaitFor*/, 5/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "LessThan5", 0 /*numEventsExpected*/, 1/*numEventsToWaitFor*/, 5/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "All", 0 /*numEventsExpected*/, 1/*numEventsToWaitFor*/, 5/*secondsToWait*/);
+    
+    //Due to the implementation of DurableHARegionQueue where remove is called after dispatch.
+    //This can cause events to linger in the queue due to a "later" ack and only cleared on
+    //the next dispatch.  We need to send one more message to dispatch, that calls remove one more
+    //time and any remaining acks (with or without this final published events ack)
+    flushEntries(server1VM, durableClientVM, regionName);
+    checkHAQueueSize(server1VM, durableClientId, 0, 1);
+
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  /**
+   * Tests the ha queued events stat
+   * Starts up two servers, shuts one down
+   * Connects a durable client, registers durable cqs and then shuts down the durable client
+   * Publisher then does puts onto the server
+   * Events are queued up
+   * Durable client is then reconnected but does not send ready for events
+   * Secondary server is brought back up
+   * Stats are checked
+   * Durable client then reregisters cqs and sends ready for events
+   */
+  public void testHAQueueSizeStatForGII() throws Exception {
+    String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
+    String allQuery = "select * from /" + regionName + " p where p.ID > -1";
+    String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
+        
+    // Start server 1
+    Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+    final int serverPort = ports[0].intValue();
+    
+    // Start server 2 using the same mcast port as server 1
+    final int serverPort2 = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+  
+    //shut down server 2
+    closeCache(server2VM);
+    
+    final String durableClientId = getName() + "_client";
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints");
+  
+    startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName);
+  
+    //register durable cqs
+    createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
+    createCq(durableClientVM, "All", allQuery, true);
+    createCq(durableClientVM, "LessThan5", lessThan5Query, true);
+    //send client ready
+    sendClientReady(durableClientVM);
+      
+    verifyDurableClientOnServer(server1VM, durableClientId);
+    checkNumDurableCqs(server1VM, durableClientId, 3);
+
+    // Stop the durable client
+    this.disconnectDurableClient(true);
+
+    // Start normal publisher client
+    startClient(publisherClientVM, serverPort, regionName);
+  
+    // Publish some entries
+    publishEntries(publisherClientVM, regionName, 10);
+    
+    // Restart the durable client
+    startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName);
+    
+    // Re-start server2, at this point it will be the first time server2 has connected to client
+    this.server2VM.invoke(CacheServerTestUtil.class, "createCacheServer",
+        new Object[] { regionName, new Boolean(true),
+            new Integer(serverPort2)});
+    
+    // Verify durable client on server2
+    verifyDurableClientOnServer(server2VM, durableClientId);
+    
+    //verify cqs and stats on server 2.  These events are through gii, stats should be correct
+    checkNumDurableCqs(server2VM, durableClientId, 3);
+    checkHAQueueSize(server2VM, durableClientId, 10, 11);
+    
+    closeCache(server1VM);
+    
+    //Reregister durable cqs
+    createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true);
+    createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
+    createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true);
+    //send client ready
+    sendClientReady(durableClientVM);
+    
+    //verify cq listeners received events
+    checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    
+    //Verify stats are 0 for server2 (we failed over)
+    flushEntries(server2VM, durableClientVM, regionName);
+    checkHAQueueSize(server2VM, durableClientId, 0, 1);
+    
+    checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0);
+    checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0);
+    checkCqStatOnServer(server2VM, durableClientId, "All", 0);
+
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the servers
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+    this.server2VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  /**
+   * Tests the ha queued cq stat
+   */
+  public void testHAQueuedCqStat() throws Exception {
+    String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
+    String allQuery = "select * from /" + regionName + " p where p.ID > -1";
+    String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
+        
+    // Start server 1
+    Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+    final int serverPort = ports[0].intValue();
+    final int mcastPort = ports[1].intValue();
+    
+    final String durableClientId = getName() + "_client";
+  
+    startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+    //register durable cqs
+    createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
+    createCq(durableClientVM, "All", allQuery, true);
+    createCq(durableClientVM, "LessThan5", lessThan5Query, true);
+    //send client ready
+    sendClientReady(durableClientVM);
+      
+    // Verify durable client on server
+    verifyDurableClientOnServer(server1VM, durableClientId);
+          
+    // Stop the durable client
+    this.disconnectDurableClient(true);
+    
+    // Start normal publisher client
+    startClient(publisherClientVM, serverPort, regionName);
+
+    // Publish some entries
+    publishEntries(publisherClientVM, regionName, 10);
+     
+    //verify cq stats are correct
+    checkNumDurableCqs(server1VM, durableClientId, 3);
+    checkCqStatOnServer(server1VM, durableClientId, "All", 10);
+    checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 4);
+    checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 5);
+
+    //Restart the durable client
+    startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+  
+    //Reregister durable cqs
+    createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true);
+    createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
+    createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true);
+    //send client ready
+    sendClientReady(durableClientVM);
+  
+    checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    
+
+    //Due to the implementation of DurableHARegionQueue where remove is called after dispatch.
+    //This can cause events to linger in the queue due to a "later" ack and only cleared on
+    //the next dispatch.  We need to send one more message to dispatch, that calls remove one more
+    //time and any remaining acks (with or without this final published events ack)
+    flushEntries(server1VM, durableClientVM, regionName);
+    
+    checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 0);
+    checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 0);
+    checkCqStatOnServer(server1VM, durableClientId, "All", 0);
+        
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  /**
+   * @throws Exception
+   */
+  public void testHAQueuedCqStatOnSecondary() throws Exception {
+    String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
+    String allQuery = "select * from /" + regionName + " p where p.ID > -1";
+    String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
+        
+    // Start server 1
+    Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+    final int serverPort = ports[0].intValue();
+    
+    // Start server 2 using the same mcast port as server 1
+    final int serverPort2 = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+    
+    final String durableClientId = getName() + "_client";
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints");
+  
+    startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName);
+  
+    //register durable cqs
+    createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
+    createCq(durableClientVM, "All", allQuery, true);
+    createCq(durableClientVM, "LessThan5", lessThan5Query, true);
+    //send client ready
+    sendClientReady(durableClientVM);
+      
+    //Verify durable client on server 2
+    verifyDurableClientOnServer(server2VM, durableClientId);
+    
+    //Verify durable client on server
+    verifyDurableClientOnServer(server1VM, durableClientId);
+  
+    //Stop the durable client
+    this.disconnectDurableClient(true);
+    
+    // Start normal publisher client
+    startClient(publisherClientVM, serverPort, regionName);
+  
+    // Publish some entries
+    publishEntries(publisherClientVM, regionName, 10);
+       
+    //verify cq stats are correct on both servers
+    checkNumDurableCqs(server1VM, durableClientId, 3);
+    checkCqStatOnServer(server1VM, durableClientId, "All", 10);
+    checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 4);
+    checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 5);
+    
+    //verify cq stats are correct
+    checkNumDurableCqs(server2VM, durableClientId, 3);
+    checkCqStatOnServer(server2VM, durableClientId, "All", 10);
+    checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 4);
+    checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 5);
+      
+    //Restart the durable client
+    startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName);
+    
+    //Reregister durable cqs
+    createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true);
+    createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
+    createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true);
+    //send client ready
+    sendClientReady(durableClientVM);
+      
+    checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+    
+    //Verify stats are 0 for both servers
+    flushEntries(server1VM, durableClientVM, regionName);
+    
+    checkCqStatOnServer(server1VM, durableClientId, "LessThan5", 0);
+    checkCqStatOnServer(server1VM, durableClientId, "GreaterThan5", 0);
+    checkCqStatOnServer(server1VM, durableClientId, "All", 0);
+    checkCqStatOnServer(server2VM, durableClientId, "LessThan5", 0);
+    checkCqStatOnServer(server2VM, durableClientId, "GreaterThan5", 0);
+    checkCqStatOnServer(server2VM, durableClientId, "All", 0);
+        
+    // Stop the durable client
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the publisher client
+    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+    // Stop the server
+    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+  }
+  
+  
+  /**
+   * Server 2 comes up, client connects and registers cqs, server 2 then disconnects
+   * events are put into region
+   * client goes away
+   * server 2 comes back up and should get a gii
+   * check stats
+   * server 1 goes away
+   * client comes back and receives all events
+   * stats should still be correct
+   * 
+   * @throws Exception
+   */
+  public void testHAQueuedCqStatForGII() throws Exception {
+    String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
+    String allQuery = "select * from /" + regionName + " p where p.ID > -1";
+    String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
+        
+    // Start server 1
+    Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+        "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+    final int serverPort = ports[0].intValue();
+    
+    // Start server 2 using the same mcast port as server 1
+    final int serverPort2 = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class,
+        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+        .intValue();
+  
+    // Start a durable client that is kept alive on the server when it stops
+    // normally
+    final String durableClientId = getName() + "_client";
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints");
+  
+    startDurableClient(durableClientVM, durableClientId, serverPort, serverPort2, regionName);
+  
+    //register durable cqs
+    createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
+    createCq(durable

<TRUNCATED>

Mime
View raw message