geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [51/51] [abbrv] incubator-geode git commit: GEODE-804: Remaining WAN and CQ changes from pivotal
Date Fri, 22 Jan 2016 23:26:32 GMT
GEODE-804: Remaining WAN and CQ changes from pivotal

These are minor changes that happened after the SGA but before the
initial code drop.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8fdbab40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8fdbab40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8fdbab40

Branch: refs/heads/wan_cq_donation
Commit: 8fdbab40c0fd3e5eb977f41666d6fd4550600d06
Parents: 56bbcbe
Author: Dan Smith <upthewaterspout@apache.org>
Authored: Fri Jan 22 15:07:36 2016 -0800
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Fri Jan 22 15:11:42 2016 -0800

----------------------------------------------------------------------
 .../query/dunit/QueryMonitorDUnitTest.java      | 124 ++-
 .../internal/cache/PutAllCSDUnitTest.java       |  79 +-
 .../sockets/DurableClientSimpleDUnitTest.java   | 862 ++++++++++---------
 .../tier/sockets/DurableClientTestCase.java     | 193 +++--
 .../DurableClientCommandsDUnitTest.java         |  39 +-
 .../wan/GatewaySenderEventRemoteDispatcher.java |   4 +-
 .../wan/parallel/ParallelGatewaySenderImpl.java |   8 +-
 .../wan/serial/SerialGatewaySenderImpl.java     |   8 +-
 .../gemfire/internal/cache/wan/WANTestBase.java |  36 +-
 ...allelGatewaySenderOperation_1_DUnitTest.java |   7 +-
 ...tSerialGatewaySenderOperationsDUnitTest.java |  67 +-
 ...arallelGatewaySenderOperationsDUnitTest.java |   5 -
 .../SerialGatewaySenderOperationsDUnitTest.java |  66 +-
 13 files changed, 859 insertions(+), 639 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryMonitorDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryMonitorDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryMonitorDUnitTest.java
index 92f2817..1f1758d 100644
--- a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryMonitorDUnitTest.java
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/dunit/QueryMonitorDUnitTest.java
@@ -59,47 +59,48 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
   private final String exampleRegionName2 = "exampleRegion2";
   private final String poolName = "serverConnectionPool";
   
+  
   /* Some of the queries are commented out as they were taking less time */
   String[]  queryStr = {
       "SELECT ID FROM /root/exampleRegion p WHERE  p.ID > 100",
-      //"SELECT DISTINCT * FROM /root/exampleRegion x, x.positions.values WHERE  x.pk != '1000'",
-      //"SELECT DISTINCT * FROM /root/exampleRegion x, x.positions.values WHERE  x.pkid != '1'",
-      //"SELECT DISTINCT * FROM /root/exampleRegion p, p.positions.values WHERE  p.pk > '1'",
-      //"SELECT DISTINCT * FROM /root/exampleRegion p, p.positions.values WHERE  p.pkid != '53'",
-      //"SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos WHERE  pos.Id > 100",
-      //"SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos WHERE  pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
-      //"SELECT * FROM /root/exampleRegion p WHERE  p.ID > 100 and p.status = 'active' and p.ID < 100000",
+      "SELECT DISTINCT * FROM /root/exampleRegion x, x.positions.values WHERE  x.pk != '1000'",
+      "SELECT DISTINCT * FROM /root/exampleRegion x, x.positions.values WHERE  x.pkid != '1'",
+      "SELECT DISTINCT * FROM /root/exampleRegion p, p.positions.values WHERE  p.pk > '1'",
+      "SELECT DISTINCT * FROM /root/exampleRegion p, p.positions.values WHERE  p.pkid != '53'",
+      "SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos WHERE  pos.Id > 100",
+      "SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos WHERE  pos.Id > 100 and pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
+      "SELECT * FROM /root/exampleRegion p WHERE  p.ID > 100 and p.status = 'active' and p.ID < 100000",
       "SELECT * FROM /root/exampleRegion WHERE  ID > 100 and status = 'active'",
-      //"SELECT DISTINCT * FROM /root/exampleRegion p WHERE  p.ID > 100 and p.status = 'active' and p.ID < 100000",
-      //"SELECT DISTINCT ID FROM /root/exampleRegion WHERE  status = 'active'",
-      //"SELECT DISTINCT ID FROM /root/exampleRegion p WHERE  p.status = 'active'",
-      //"SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos WHERE  pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
-      //"SELECT DISTINCT proj1:p, proj2:itrX FROM /root/exampleRegion p, (SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos"
-      //+ " WHERE  pos.secId = 'YHOO') as itrX",
-      //"SELECT DISTINCT * FROM /root/exampleRegion p, (SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos"
-      //+ " WHERE  pos.secId = 'YHOO') as itrX",
-      //"SELECT DISTINCT * FROM /root/exampleRegion p, (SELECT DISTINCT p.ID FROM /root/exampleRegion x"
-      //+ " WHERE  x.ID = p.ID) as itrX",
+      "SELECT DISTINCT * FROM /root/exampleRegion p WHERE  p.ID > 100 and p.status = 'active' and p.ID < 100000",
+      "SELECT DISTINCT ID FROM /root/exampleRegion WHERE  status = 'active'",
+      "SELECT DISTINCT ID FROM /root/exampleRegion p WHERE  p.status = 'active'",
+      "SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos WHERE  pos.secId IN SET('YHOO', 'IBM', 'AMZN')",
+      "SELECT DISTINCT proj1:p, proj2:itrX FROM /root/exampleRegion p, (SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos"
+      + " WHERE  pos.secId = 'YHOO') as itrX",
+      "SELECT DISTINCT * FROM /root/exampleRegion p, (SELECT DISTINCT pos FROM /root/exampleRegion p, p.positions.values pos"
+      + " WHERE  pos.secId = 'YHOO') as itrX",
+      "SELECT DISTINCT * FROM /root/exampleRegion p, (SELECT DISTINCT p.ID FROM /root/exampleRegion x"
+      + " WHERE  x.ID = p.ID) as itrX",
       "SELECT DISTINCT * FROM /root/exampleRegion p, (SELECT DISTINCT pos FROM /root/exampleRegion x, x.positions.values pos"
       + " WHERE  x.ID = p.ID) as itrX",
-      //"SELECT DISTINCT x.ID FROM /root/exampleRegion x, x.positions.values v WHERE  "
-      //+ "v.secId = element(SELECT DISTINCT vals.secId FROM /root/exampleRegion p, p.positions.values vals WHERE  vals.secId = 'YHOO')",
+      "SELECT DISTINCT x.ID FROM /root/exampleRegion x, x.positions.values v WHERE  "
+      + "v.secId = element(SELECT DISTINCT vals.secId FROM /root/exampleRegion p, p.positions.values vals WHERE  vals.secId = 'YHOO')",
       "SELECT DISTINCT * FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.status = 'active'",
-      //"SELECT DISTINCT p.ID FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.ID = p2.ID",
+      "SELECT DISTINCT p.ID FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.ID = p2.ID",
       "SELECT p.ID FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.ID = p2.ID and p.status = 'active' and p2.status = 'active'",
-      //"SELECT p.ID FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.ID = p2.ID and p.status = 'active' and p.status = p2.status",
+      "SELECT p.ID FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.ID = p2.ID and p.status = 'active' and p.status = p2.status",
       "SELECT DISTINCT p.ID FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.ID = p2.ID and p.ID > 100 and p2.ID < 100000",
-      //"SELECT p.ID FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.ID = p2.ID and p.ID > 100 and p2.ID < 100000 or p.status = p2.status",
+      "SELECT p.ID FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.ID = p2.ID and p.ID > 100 and p2.ID < 100000 or p.status = p2.status",
       "SELECT p.ID FROM /root/exampleRegion p, /root/exampleRegion2 p2 WHERE  p.ID = p2.ID and p.ID > 100 and p2.ID < 100000 or p.status = 'active'",
-      //"SELECT DISTINCT * FROM /root/exampleRegion p, positions.values pos WHERE   (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')", 
-      //"SELECT DISTINCT * FROM /root/exampleRegion p, positions.values pos WHERE   (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')",
-      //"SELECT DISTINCT structset.sos, structset.key " 
-      //+ "FROM /root/exampleRegion p, p.positions.values outerPos, " 
-      //+ "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
-      //+ "FROM /root/exampleRegion.entries pf, pf.value.positions.values pos "
-      //+ "where outerPos.secId != 'IBM' AND "
-      //+ "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
-      //+ "where structset.sos > 2000",
+      "SELECT DISTINCT * FROM /root/exampleRegion p, positions.values pos WHERE   (p.ID > 1 or p.status = 'active') or (true AND pos.secId ='IBM')", 
+      "SELECT DISTINCT * FROM /root/exampleRegion p, positions.values pos WHERE   (p.ID > 1 or p.status = 'active') or (true AND pos.secId !='IBM')",
+      "SELECT DISTINCT structset.sos, structset.key " 
+      + "FROM /root/exampleRegion p, p.positions.values outerPos, " 
+      + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
+      + "FROM /root/exampleRegion.entries pf, pf.value.positions.values pos "
+      + "where outerPos.secId != 'IBM' AND "
+      + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
+      + "where structset.sos > 2000",
       "SELECT DISTINCT * "
       + "FROM /root/exampleRegion p, p.positions.values outerPos, "
       + "(SELECT DISTINCT key: key, sos: pos.sharesOutstanding "
@@ -107,8 +108,8 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
       + "where outerPos.secId != 'IBM' AND " 
       + "pf.key IN (SELECT DISTINCT * FROM pf.value.collectionHolderMap['0'].arr)) structset "
       + "where structset.sos > 2000",
-      //"SELECT DISTINCT * FROM /root/exampleRegion p, p.positions.values position "
-      //+ "WHERE (true = null OR position.secId = 'SUN') AND true", 
+      "SELECT DISTINCT * FROM /root/exampleRegion p, p.positions.values position "
+      + "WHERE (true = null OR position.secId = 'SUN') AND true", 
   };
 
   String[]  prQueryStr = {
@@ -220,7 +221,6 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
         } catch (Exception ex) {
           fail("While starting CacheServer", ex);
         }
-        pause(1000);
         Cache cache = getCache();
         GemFireCacheImpl.getInstance().TEST_MAX_QUERY_EXECUTION_TIME = queryMonitorTime;
         cache.getLogger().fine("#### RUNNING TEST : " + testName);
@@ -307,10 +307,10 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     VM client2 = host.getVM(2);
     VM client3 = host.getVM(3);
 
-    final int numberOfEntries = 800;
+    final int numberOfEntries = 100;
 
     // Start server
-    configServer(server, 40, "testQueryMonitorClientServer"); // All the queries taking more than 40ms should be canceled by Query monitor.
+    configServer(server, 20, "testQueryMonitorClientServer"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server);
     
     // Initialize server regions.
@@ -383,13 +383,13 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     VM client1 = host.getVM(2);
     VM client2 = host.getVM(3);
 
-    final int numberOfEntries = 800;
+    final int numberOfEntries = 100;
 
     // Start server
-    configServer(server1, 40, "testQueryMonitorMultiClientMultiServer"); // All the queries taking more than 40ms should be canceled by Query monitor.
+    configServer(server1, 20, "testQueryMonitorMultiClientMultiServer"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server1);
 
-    configServer(server2, 40, "testQueryMonitorMultiClientMultiServer"); // All the queries taking more than 40ms should be canceled by Query monitor.
+    configServer(server2, 20, "testQueryMonitorMultiClientMultiServer"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server2);
 
     // Initialize server regions.
@@ -470,13 +470,13 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     VM server1 = host.getVM(0);
     VM server2 = host.getVM(1);
 
-    final int numberOfEntries = 800;
+    final int numberOfEntries = 100;
 
     // Start server
-    configServer(server1, 40, "testQueryExecutionLocally"); // All the queries taking more than 40ms should be canceled by Query monitor.
+    configServer(server1, 20, "testQueryExecutionLocally"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server1);
 
-    configServer(server2, 40, "testQueryExecutionLocally"); // All the queries taking more than 40ms should be canceled by Query monitor.
+    configServer(server2, 20, "testQueryExecutionLocally"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server2);
 
     // Initialize server regions.
@@ -548,13 +548,13 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     VM server1 = host.getVM(0);
     VM server2 = host.getVM(1);
 
-    final int numberOfEntries = 8000;
+    final int numberOfEntries = 1000;
 
     // Start server
-    configServer(server1, 40, "testQueryExecutionLocally"); // All the queries taking more than 40ms should be canceled by Query monitor.
+    configServer(server1, 20, "testQueryExecutionLocally"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server1);
 
-    configServer(server2, 40, "testQueryExecutionLocally"); // All the queries taking more than 40ms should be canceled by Query monitor.
+    configServer(server2, 20, "testQueryExecutionLocally"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server2);
 
     // Initialize server regions.
@@ -636,16 +636,14 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     VM client1 = host.getVM(2);
     VM client2 = host.getVM(3);
 
-    final int numberOfEntries = 800;
+    final int numberOfEntries = 100;
 
     // Start server
-    configServer(server1, 200, "testQueryMonitorMultiClientMultiServerOnPR"); // All the queries taking more than 200ms should be canceled by Query monitor.
+    configServer(server1, 100, "testQueryMonitorMultiClientMultiServerOnPR"); // All the queries taking more than 100ms should be canceled by Query monitor.
     createPRRegion(server1);
 
-    configServer(server2, 50, "testQueryMonitorMultiClientMultiServerOnPR"); // All the queries taking more than 200ms should be canceled by Query monitor.
+    configServer(server2, 100, "testQueryMonitorMultiClientMultiServerOnPR"); // All the queries taking more than 100ms should be canceled by Query monitor.
     createPRRegion(server2);
-
-    pause(1000);
     
     // Initialize server regions.
     server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
@@ -719,16 +717,15 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     VM server1 = host.getVM(0);
     VM server2 = host.getVM(1);
 
-    final int numberOfEntries = 800;
+    final int numberOfEntries = 100;
 
     // Start server
-    configServer(server1, 200, "testQueryMonitorMultiClientMultiServerOnPR"); // All the queries taking more than 200ms should be canceled by Query monitor.
+    configServer(server1, 100, "testQueryMonitorMultiClientMultiServerOnPR"); // All the queries taking more than 100ms should be canceled by Query monitor.
     createPRRegion(server1);
 
-    configServer(server2, 50, "testQueryMonitorMultiClientMultiServerOnPR"); // All the queries taking more than 200ms should be canceled by Query monitor.
+    configServer(server2, 100, "testQueryMonitorMultiClientMultiServerOnPR"); // All the queries taking more than 100ms should be canceled by Query monitor.
     createPRRegion(server2);
 
-    pause(1000);
     
     // Initialize server regions.
     server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
@@ -793,16 +790,15 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     VM client1 = host.getVM(2);
     VM client2 = host.getVM(3);
 
-    final int numberOfEntries = 800;
+    final int numberOfEntries = 100;
 
     // Start server
-    configServer(server1, 5000, "testQueryMonitorRegionWithEviction"); // All the queries taking more than 120ms should be canceled by Query monitor.
+    configServer(server1, 20, "testQueryMonitorRegionWithEviction"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server1, true, "server1_testQueryMonitorRegionWithEviction");
 
-    configServer(server2, 5000, "testQueryMonitorRegionWithEviction"); // All the queries taking more than 120ms should be canceled by Query monitor.
+    configServer(server2, 20, "testQueryMonitorRegionWithEviction"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server2, true, "server2_testQueryMonitorRegionWithEviction");
 
-    pause(1000);
     
     // Initialize server regions.
     server1.invoke(new CacheSerializableRunnable("Create Bridge Server") {
@@ -878,16 +874,16 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     VM client1 = host.getVM(2);
     VM client2 = host.getVM(3);
 
-    final int numberOfEntries = 800;
+    final int numberOfEntries = 100;
 
     // Start server
-    configServer(server1, 40, "testQueryMonitorRegionWithIndex"); // All the queries taking more than 80ms should be canceled by Query monitor.
+    configServer(server1, 20, "testQueryMonitorRegionWithIndex"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server1);
 
-    configServer(server2, 40, "testQueryMonitorRegionWithIndex"); // All the queries taking more than 80ms should be canceled by Query monitor.
+    configServer(server2, 20, "testQueryMonitorRegionWithIndex"); // All the queries taking more than 20ms should be canceled by Query monitor.
     createRegion(server2);
 
-    pause(1000);
+//    pause(1000);
     
 
     // Initialize server regions.
@@ -1089,7 +1085,7 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     VM server3 = host.getVM(2);
     VM server4 = host.getVM(3);
     
-    final int numberOfEntries = 8000;
+    final int numberOfEntries = 1000;
 
     // Start server
     configServer(server1, 5, "testQueryExecutionLocally"); 
@@ -1274,7 +1270,7 @@ public class QueryMonitorDUnitTest extends CacheTestCase {
     public void doTestHook(String description) {
       if (description.equals("6")) {
         try {
-          Thread.sleep(timeout);
+          Thread.sleep(timeout * 2);
         }
         catch (InterruptedException ie) {
           Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/PutAllCSDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/PutAllCSDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/PutAllCSDUnitTest.java
index 52c454f..6a8cfa9 100644
--- a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/PutAllCSDUnitTest.java
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/PutAllCSDUnitTest.java
@@ -32,17 +32,12 @@ import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.Declarable;
 import com.gemstone.gemfire.cache.DiskStore;
 import com.gemstone.gemfire.cache.EntryEvent;
-import com.gemstone.gemfire.cache.InterestResultPolicy;
 import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.Region.Entry;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.ClientCache;
-import com.gemstone.gemfire.cache.client.ClientCacheFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
 import com.gemstone.gemfire.cache.client.PoolFactory;
 import com.gemstone.gemfire.cache.client.PoolManager;
 import com.gemstone.gemfire.cache.client.ServerConnectivityException;
@@ -62,15 +57,13 @@ import com.gemstone.gemfire.cache.query.RegionNotFoundException;
 import com.gemstone.gemfire.cache.query.SelectResults;
 import com.gemstone.gemfire.cache.query.Struct;
 import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.cache.server.CacheServer;
 import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
 import com.gemstone.gemfire.cache.util.CacheWriterAdapter;
-import com.gemstone.gemfire.cache30.ClientServerTestCase;
 import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.internal.AvailablePort;
 import com.gemstone.gemfire.internal.AvailablePortHelper;
-import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 
@@ -87,6 +80,7 @@ import dunit.VM;
  * @author Gester Zhou
  * @since 5.0.23
  */
+@SuppressWarnings("serial")
 public class PutAllCSDUnitTest extends ClientServerTestCase {
 
   final int numberOfEntries = 100;
@@ -417,8 +411,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
   
     // set notifyBySubscription=false to test local-invalidates
@@ -487,8 +482,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set notifyBySubscription=false to test local-invalidates
@@ -1131,8 +1127,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
 
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
@@ -1279,8 +1276,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     final VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
@@ -1383,8 +1381,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
@@ -1811,8 +1810,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     final VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
@@ -2098,8 +2098,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
@@ -2277,8 +2278,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     final VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
@@ -2508,8 +2510,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     final VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
@@ -2666,9 +2669,7 @@ public void testOneServer() throws CacheException, InterruptedException {
     final VM client1 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort3 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final int[] serverPorts = AvailablePortHelper.getRandomAvailableTCPPorts(3);
     final String serverHost = getServerHostName(server1.getHost());
     
     final SharedCounter sc_server1 = new SharedCounter("server1");
@@ -2677,10 +2678,10 @@ public void testOneServer() throws CacheException, InterruptedException {
     final SharedCounter sc_client2 = new SharedCounter("client2");
 
     // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
-    createBridgeServer(server1, regionName, serverPort1, true, 0, null);
-    createBridgeServer(server2, regionName, serverPort2, true, 0, null);
-    createBridgeServer(server3, regionName, serverPort3, true, 0, null);
-    createClient(client1, regionName, serverHost, new int[] {serverPort1, serverPort2, serverPort3}, -1, -1, false, true, true);
+    createBridgeServer(server1, regionName, serverPorts[0], true, 0, null);
+    createBridgeServer(server2, regionName, serverPorts[1], true, 0, null);
+    createBridgeServer(server3, regionName, serverPorts[2], true, 0, null);
+    createClient(client1, regionName, serverHost, serverPorts, -1, -1, false, true, true);
 
     {
       // Create local region
@@ -2695,7 +2696,7 @@ public void testOneServer() throws CacheException, InterruptedException {
       
       try {
         getCache();
-        ClientServerTestCase.configureConnectionPool(factory, serverHost, new int[] {serverPort1, serverPort2, serverPort3}, true, -1, -1, null);
+        ClientServerTestCase.configureConnectionPool(factory, serverHost, serverPorts, true, -1, -1, null);
         createRegion(regionName, factory.create());
         assertNotNull(getRootRegion().getSubregion(regionName));
       }
@@ -2812,8 +2813,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set notifyBySubscription=true to test register interest
@@ -2911,8 +2913,9 @@ public void testOneServer() throws CacheException, InterruptedException {
     VM client2 = host.getVM(3);
     final String regionName = getUniqueName();
     
-    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    int serverPorts[] = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    final int serverPort1 = serverPorts[0];
+    final int serverPort2 = serverPorts[1];
     final String serverHost = getServerHostName(server1.getHost());
 
     // set notifyBySubscription=true to test register interest

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
index d667b1d..03e5bcb 100644
--- a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientSimpleDUnitTest.java
@@ -27,7 +27,10 @@ import com.gemstone.gemfire.cache.query.CqQuery;
 import com.gemstone.gemfire.cache.query.QueryService;
 import com.gemstone.gemfire.cache.query.RegionNotFoundException;
 import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.cache.ClientServerObserver;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
 import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 
@@ -1095,238 +1098,271 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
   //This test method is disabled because it is failing
   //periodically and causing cruise control failures
   //See bug #47060
-  public void testReadyForEventsNotCalledImplicitlyWithCacheXML() {
-    final String cqName = "cqTest";
-    // Start a server
-    int serverPort = (Integer) this.server1VM.invoke(CacheServerTestUtil.class, "createCacheServerFromXml", new Object[]{ DurableClientTestCase.class.getResource("durablecq-server-cache.xml")});
-
-    // Start a durable client that is not kept alive on the server when it
-    // stops normally
-    final String durableClientId = getName() + "_client";
-    
-    //create client cache from xml
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClientFromXml", new Object[]{ DurableClientTestCase.class.getResource("durablecq-client-cache.xml"), "client", durableClientId, 45, Boolean.FALSE});
-
-    // verify that readyForEvents has not yet been called on all the client's pools
-    this.durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called") {
-      public void run2() throws CacheException {
-        for (Pool p: PoolManager.getAll().values()) {
-          assertEquals(false, ((PoolImpl)p).getReadyForEventsCalled());
+  public void testReadyForEventsNotCalledImplicitlyWithCacheXML() {
+    try {
+      setPeriodicACKObserver(durableClientVM);
+      final String cqName = "cqTest";
+      // Start a server
+      int serverPort = (Integer) this.server1VM.invoke(CacheServerTestUtil.class, "createCacheServerFromXml", new Object[]{ DurableClientTestCase.class.getResource("durablecq-server-cache.xml")});
+  
+      // Start a durable client that is not kept alive on the server when it
+      // stops normally
+      final String durableClientId = getName() + "_client";
+      
+      //create client cache from xml
+      this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClientFromXml", new Object[]{ DurableClientTestCase.class.getResource("durablecq-client-cache.xml"), "client", durableClientId, 45, Boolean.FALSE});
+  
+      // verify that readyForEvents has not yet been called on all the client's pools
+      this.durableClientVM.invoke(new CacheSerializableRunnable("check readyForEvents not called") {
+        public void run2() throws CacheException {
+          for (Pool p: PoolManager.getAll().values()) {
+            assertEquals(false, ((PoolImpl)p).getReadyForEventsCalled());
+          }
         }
-      }
-    });
-    
-    // Send clientReady message
-    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
-    
-    //Durable client registers durable cq on server
-    this.durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") {
-      public void run2() throws CacheException {
-        // Get the region
-        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
-        assertNotNull(region);
-        
-        // Create CQ Attributes.
-        CqAttributesFactory cqAf = new CqAttributesFactory();
-        
-        // Initialize and set CqListener.
-        CqListener[] cqListeners = { new CacheServerTestUtil.ControlCqListener() };
-        cqAf.initCqListeners(cqListeners);
-        CqAttributes cqa = cqAf.create();
-
-        // Create cq's
-        // Get the query service for the Pool
-        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
-
-        try { 
-          CqQuery query = queryService.newCq(cqName , "Select * from /" + regionName, cqa, true);
-          query.execute();
+      });
+      
+      // Send clientReady message
+      this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+        public void run2() throws CacheException {
+          CacheServerTestUtil.getCache().readyForEvents();
         }
-        catch (CqExistsException e) {
-          fail("Failed due to " + e);
+      });
+      
+      //Durable client registers durable cq on server
+      this.durableClientVM.invoke(new CacheSerializableRunnable("Register Cq") {
+        public void run2() throws CacheException {
+          // Get the region
+          Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+          assertNotNull(region);
+          
+          // Create CQ Attributes.
+          CqAttributesFactory cqAf = new CqAttributesFactory();
+          
+          // Initialize and set CqListener.
+          CqListener[] cqListeners = { new CacheServerTestUtil.ControlCqListener() };
+          cqAf.initCqListeners(cqListeners);
+          CqAttributes cqa = cqAf.create();
+  
+          // Create cq's
+          // Get the query service for the Pool
+          QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+  
+          try { 
+            CqQuery query = queryService.newCq(cqName , "Select * from /" + regionName, cqa, true);
+            query.execute();
+          }
+          catch (CqExistsException e) {
+            fail("Failed due to " + e);
+          }
+          catch (CqException e) {
+            fail("Failed due to " + e);
+          }
+          catch (RegionNotFoundException e) {
+            fail("Could not find specified region:" + regionName + ":" + e);
+          }
         }
-        catch (CqException e) {
-          fail("Failed due to " + e);
+      });
+  
+      // Verify durable client on server1
+      this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+        public void run2() throws CacheException {
+          // Find the proxy
+          checkNumberOfClientProxies(1);
+          CacheClientProxy proxy = getClientProxy();
+          assertNotNull(proxy);
+  
+          // Verify that it is durable
+          assertTrue(proxy.isDurable());
+          assertEquals(durableClientId, proxy.getDurableId());
         }
-        catch (RegionNotFoundException e) {
-          fail("Could not find specified region:" + regionName + ":" + e);
+      });
+      
+      // Start normal publisher client
+      this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
+          new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
+  
+      // Publish some entries
+      final int numberOfEntries = 10;
+      this.publisherClientVM.invoke(new CacheSerializableRunnable("publish updates") {
+        public void run2() throws CacheException {
+          // Get the region
+          Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+          assertNotNull(region);
+  
+          // Publish some entries
+          for (int i=0; i<numberOfEntries; i++) {
+            String keyAndValue = String.valueOf(i);
+            region.put(keyAndValue, keyAndValue);
+          }
         }
-      }
-    });
-
-    // Verify durable client on server1
-    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
-      public void run2() throws CacheException {
-        // Find the proxy
-        checkNumberOfClientProxies(1);
-        CacheClientProxy proxy = getClientProxy();
-        assertNotNull(proxy);
-
-        // Verify that it is durable
-        assertTrue(proxy.isDurable());
-        assertEquals(durableClientId, proxy.getDurableId());
-      }
-    });
-    
-    // Start normal publisher client
-    this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
-        new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
-
-    // Publish some entries
-    final int numberOfEntries = 10;
-    this.publisherClientVM.invoke(new CacheSerializableRunnable("publish updates") {
-      public void run2() throws CacheException {
-        // Get the region
-        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
-        assertNotNull(region);
-
-        // Publish some entries
-        for (int i=0; i<numberOfEntries; i++) {
-          String keyAndValue = String.valueOf(i);
-          region.put(keyAndValue, keyAndValue);
+      });
+      
+      // Verify the durable client received the updates
+      this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+        public void run2() throws CacheException {
+          // Get the region
+          Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+          assertNotNull(region);
+          
+          // Get the listener and wait for the appropriate number of events
+          QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+          CqQuery cqQuery = queryService.getCq(cqName);
+          CacheServerTestUtil.ControlCqListener cqlistener = (CacheServerTestUtil.ControlCqListener) cqQuery.getCqAttributes().getCqListener();
+          cqlistener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+          assertEquals(numberOfEntries, cqlistener.events.size());
         }
+      });
+      
+       try {
+        Thread.sleep(10000);
       }
-    });
-    
-    // Verify the durable client received the updates
-    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
-      public void run2() throws CacheException {
-        // Get the region
-        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
-        assertNotNull(region);
-        
-        // Get the listener and wait for the appropriate number of events
-        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
-        CqQuery cqQuery = queryService.getCq(cqName);
-        CacheServerTestUtil.ControlCqListener cqlistener = (CacheServerTestUtil.ControlCqListener) cqQuery.getCqAttributes().getCqListener();
-        cqlistener.waitWhileNotEnoughEvents(30000, numberOfEntries);
-        assertEquals(numberOfEntries, cqlistener.events.size());
-      }
-    });
-    
-     try {
-      Thread.sleep(10000);
-    }
-    catch (InterruptedException e) {
-      fail("interrupted" + e);
-    }
-    
-    // Stop the durable client
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache", new Object[] {new Boolean(true)});
-    
-    // Verify the durable client still exists on the server
-    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
-      public void run2() throws CacheException {
-        // Find the proxy
-        CacheClientProxy proxy = getClientProxy();
-        assertNotNull(proxy);
+      catch (InterruptedException e) {
+        fail("interrupted" + e);
       }
-    });
-
-    // Publish some more entries
-    this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish additional updates") {
-      public void run2() throws CacheException {
-        // Get the region
-        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
-        assertNotNull(region);
-
-        // Publish some entries
-        for (int i=0; i<numberOfEntries; i++) {
-          String keyAndValue = String.valueOf(i);
-          region.put(keyAndValue, keyAndValue + "lkj");
+      
+      // Stop the durable client
+      this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache", new Object[] {new Boolean(true)});
+      
+      // Verify the durable client still exists on the server
+      this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+        public void run2() throws CacheException {
+          // Find the proxy
+          CacheClientProxy proxy = getClientProxy();
+          assertNotNull(proxy);
         }
-      }
-    });
-    
-    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
-    
-    // Re-start the durable client
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClientFromXml", new Object[]{ DurableClientTestCase.class.getResource("durablecq-client-cache.xml"), "client", durableClientId, 45,  Boolean.FALSE});
-
-    
-    //Durable client registers durable cq on server
-    this.durableClientVM.invoke(new CacheSerializableRunnable("Register cq") {
-      public void run2() throws CacheException {
-        // Get the region
-        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
-        assertNotNull(region);
-
-        // Create CQ Attributes.
-        CqAttributesFactory cqAf = new CqAttributesFactory();
-        
-        // Initialize and set CqListener.
-        CqListener[] cqListeners = { new CacheServerTestUtil.ControlCqListener() };
-        cqAf.initCqListeners(cqListeners);
-        CqAttributes cqa = cqAf.create();
-
-        // Create cq's
-        // Get the query service for the Pool
-        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
-
-        try { 
-          CqQuery query = queryService.newCq(cqName , "Select * from /" + regionName, cqa, true);
-          query.execute();
+      });
+  
+      // Publish some more entries
+      this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish additional updates") {
+        public void run2() throws CacheException {
+          // Get the region
+          Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+          assertNotNull(region);
+  
+          // Publish some entries
+          for (int i=0; i<numberOfEntries; i++) {
+            String keyAndValue = String.valueOf(i);
+            region.put(keyAndValue, keyAndValue + "lkj");
+          }
         }
-        catch (CqExistsException e) {
-          fail("Failed due to " + e);
+      });
+      
+      this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+      
+      // Re-start the durable client
+      this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClientFromXml", new Object[]{ DurableClientTestCase.class.getResource("durablecq-client-cache.xml"), "client", durableClientId, 45,  Boolean.FALSE});
+  
+      
+      //Durable client registers durable cq on server
+      this.durableClientVM.invoke(new CacheSerializableRunnable("Register cq") {
+        public void run2() throws CacheException {
+          // Get the region
+          Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+          assertNotNull(region);
+  
+          // Create CQ Attributes.
+          CqAttributesFactory cqAf = new CqAttributesFactory();
+          
+          // Initialize and set CqListener.
+          CqListener[] cqListeners = { new CacheServerTestUtil.ControlCqListener() };
+          cqAf.initCqListeners(cqListeners);
+          CqAttributes cqa = cqAf.create();
+  
+          // Create cq's
+          // Get the query service for the Pool
+          QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+  
+          try { 
+            CqQuery query = queryService.newCq(cqName , "Select * from /" + regionName, cqa, true);
+            query.execute();
+          }
+          catch (CqExistsException e) {
+            fail("Failed due to " + e);
+          }
+          catch (CqException e) {
+            fail("Failed due to " + e);
+          }
+          catch (RegionNotFoundException e) {
+            fail("Could not find specified region:" + regionName + ":" + e);
+          }
+         
         }
-        catch (CqException e) {
-          fail("Failed due to " + e);
+      });
+      
+      // Send clientReady message
+      this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+        public void run2() throws CacheException {
+          CacheServerTestUtil.getCache().readyForEvents();
         }
-        catch (RegionNotFoundException e) {
-          fail("Could not find specified region:" + regionName + ":" + e);
+      });
+  
+      // Verify durable client on server
+      this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+        public void run2() throws CacheException {
+          // Find the proxy
+          checkNumberOfClientProxies(1);
+          CacheClientProxy proxy = getClientProxy();
+          assertNotNull(proxy);
+          
+          // Verify that it is durable and its properties are correct
+          assertTrue(proxy.isDurable());
+          assertEquals(durableClientId, proxy.getDurableId());
         }
-       
-      }
-    });
-    
-    // Send clientReady message
-    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
-
-    // Verify durable client on server
-    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
-      public void run2() throws CacheException {
-        // Find the proxy
-        checkNumberOfClientProxies(1);
-        CacheClientProxy proxy = getClientProxy();
-        assertNotNull(proxy);
-        
-        // Verify that it is durable and its properties are correct
-        assertTrue(proxy.isDurable());
-        assertEquals(durableClientId, proxy.getDurableId());
-      }
-    });
-        
-    // Verify the durable client received the updates held for it on the server
-    this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
-      public void run2() throws CacheException {
-        // Get the region
-        Region region = CacheServerTestUtil.getCache().getRegion(regionName);
-        assertNotNull(region);
-
-        QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
-
-        CqQuery cqQuery = queryService.getCq(cqName);
-        
-        CacheServerTestUtil.ControlCqListener cqlistener = (CacheServerTestUtil.ControlCqListener) cqQuery.getCqAttributes().getCqListener();
-        cqlistener.waitWhileNotEnoughEvents(30000, numberOfEntries);
-        assertEquals(numberOfEntries, cqlistener.events.size());
-      }
-    });
-    
-    // Stop the durable client
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
-
-    // Stop the server
-    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+      });
+          
+      // Verify the durable client received the updates held for it on the server
+      this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+        public void run2() throws CacheException {
+          // Get the region
+          Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+          assertNotNull(region);
+  
+          QueryService queryService = CacheServerTestUtil.getPool().getQueryService();
+  
+          CqQuery cqQuery = queryService.getCq(cqName);
+          
+          CacheServerTestUtil.ControlCqListener cqlistener = (CacheServerTestUtil.ControlCqListener) cqQuery.getCqAttributes().getCqListener();
+          cqlistener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+          assertEquals(numberOfEntries, cqlistener.events.size());
+        }
+      });
+      
+      // Stop the durable client
+      this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+  
+      // Stop the server
+      this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+    }finally{
+      unsetPeriodicACKObserver(durableClientVM);
+    }
+  }
+  
+  private void setPeriodicACKObserver(VM vm){
+    CacheSerializableRunnable cacheSerializableRunnable = new CacheSerializableRunnable("Set ClientServerObserver"){
+      @Override
+      public void run2() throws CacheException {
+        PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG = true;
+        ClientServerObserver origObserver = ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
+          public void beforeSendingClientAck()
+          {
+            getLogWriter().info("beforeSendingClientAck invoked");
+           
+          }
+        });
+        
+      }
+    };
+    vm.invoke(cacheSerializableRunnable);
+  }
+  
+  private void unsetPeriodicACKObserver(VM vm){
+    CacheSerializableRunnable cacheSerializableRunnable = new CacheSerializableRunnable("Unset ClientServerObserver"){
+      @Override
+      public void run2() throws CacheException {
+        PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG = false;        
+      }
+    };
+    vm.invoke(cacheSerializableRunnable);
   }
   
   public void testReadyForEventsNotCalledImplicitlyForRegisterInterestWithCacheXML() {
@@ -2831,115 +2867,117 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
    * The client should be rejected until no cqs are currently being drained
    * @throws Exception
    */
-  public void testRejectClientWhenDrainingCq() throws Exception {
-    DistributedTestCase.addExpectedException(LocalizedStrings.CacheClientNotifier_COULD_NOT_CONNECT_DUE_TO_CQ_BEING_DRAINED.toLocalizedString());
-    DistributedTestCase.addExpectedException("Could not initialize a primary queue on startup. No queue servers available.");
-    
-    String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
-    String allQuery = "select * from /" + regionName + " p where p.ID > -1";
-    String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
-        
-    // Start server 1
-    Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
-        "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
-    final int serverPort = ports[0].intValue();
-
-    final String durableClientId = getName() + "_client";
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints");
-  
-    startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+  public void testRejectClientWhenDrainingCq() throws Exception {
+    try {
+      DistributedTestCase.addExpectedException(LocalizedStrings.CacheClientNotifier_COULD_NOT_CONNECT_DUE_TO_CQ_BEING_DRAINED.toLocalizedString());
+      DistributedTestCase.addExpectedException("Could not initialize a primary queue on startup. No queue servers available.");
+      
+      String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
+      String allQuery = "select * from /" + regionName + " p where p.ID > -1";
+      String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
+          
+      // Start server 1
+      Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+          "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+      final int serverPort = ports[0].intValue();
   
-    //register durable cqs
-    createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
-    createCq(durableClientVM, "All", allQuery, true);
-    createCq(durableClientVM, "LessThan5", lessThan5Query, true);
-    //send client ready
-    sendClientReady(durableClientVM);
-
-    verifyDurableClientOnServer(server1VM, durableClientId);
-
-    // Stop the durable client
-    this.disconnectDurableClient(true);
+      final String durableClientId = getName() + "_client";
+      this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints");
     
-    // Start normal publisher client
-    startClient(publisherClientVM, serverPort, regionName);
+      startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+    
+      //register durable cqs
+      createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
+      createCq(durableClientVM, "All", allQuery, true);
+      createCq(durableClientVM, "LessThan5", lessThan5Query, true);
+      //send client ready
+      sendClientReady(durableClientVM);
   
-    // Publish some entries
-    publishEntries(publisherClientVM, regionName, 10);
-            
-    this.server1VM.invokeAsync(new CacheSerializableRunnable(
-        "Close cq for durable client") {
-      public void run2() throws CacheException {
-
-        //Set the Test Hook!
-        //This test hook will pause during the drain process
-        CacheClientProxy.testHook = new RejectClientReconnectTestHook();
-
-        final CacheClientNotifier ccnInstance = CacheClientNotifier
-            .getInstance();
-        final CacheClientProxy clientProxy = ccnInstance
-            .getClientProxy(durableClientId);
-        ClientProxyMembershipID proxyId = clientProxy.getProxyID();
-
-        try {
-          ccnInstance.closeClientCq(durableClientId, "All");
-        }
-        catch (CqException e) {
-          throw new CacheException(e){};
-        }
-      }
-    });
-
-    // Restart the durable client
-    startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
-
-    this.server1VM.invoke(new CacheSerializableRunnable(
-        "verify was rejected at least once") {
-      public void run2() throws CacheException {
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return (((RejectClientReconnectTestHook) CacheClientProxy.testHook).wasClientRejected());
+      verifyDurableClientOnServer(server1VM, durableClientId);
+  
+      // Stop the durable client
+      this.disconnectDurableClient(true);
+      
+      // Start normal publisher client
+      startClient(publisherClientVM, serverPort, regionName);
+    
+      // Publish some entries
+      publishEntries(publisherClientVM, regionName, 10);
+              
+      this.server1VM.invokeAsync(new CacheSerializableRunnable(
+          "Close cq for durable client") {
+        public void run2() throws CacheException {
+  
+          //Set the Test Hook!
+          //This test hook will pause during the drain process
+          CacheClientProxy.testHook = new RejectClientReconnectTestHook();
+  
+          final CacheClientNotifier ccnInstance = CacheClientNotifier
+              .getInstance();
+          final CacheClientProxy clientProxy = ccnInstance
+              .getClientProxy(durableClientId);
+          ClientProxyMembershipID proxyId = clientProxy.getProxyID();
+  
+          try {
+            ccnInstance.closeClientCq(durableClientId, "All");
           }
-          public String description() {
-            return null;
+          catch (CqException e) {
+            throw new CacheException(e){};
           }
-        };
-        DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
-        assertTrue(((RejectClientReconnectTestHook) CacheClientProxy.testHook).wasClientRejected());
-      }
-    });
-    
-    checkPrimaryUpdater(durableClientVM);
-    
-    // After rejection, the client will retry and eventually connect
-    // Verify durable client on server2
-    verifyDurableClientOnServer(server1VM, durableClientId);
-  
-    createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true);
-    createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
-    createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true);
-    //send client ready
-    sendClientReady(durableClientVM);
-
-    checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/);
-    checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/);
-    checkCqListenerEvents(durableClientVM, "All", 0 /*numEventsExpected*/, 1/*numEventsToWaitFor*/, 5/*secondsToWait*/);
-    
-    this.server1VM.invoke(new CacheSerializableRunnable(
-        "unset test hook") {
-      public void run2() throws CacheException {
-        CacheClientProxy.testHook = null;
-      }
-    });
-
-    // Stop the durable client
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
-
-    // Stop the publisher client
-    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
-
-    // Stop the server
-    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+        }
+      });
+  
+      // Restart the durable client
+      startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+  
+      this.server1VM.invoke(new CacheSerializableRunnable(
+          "verify was rejected at least once") {
+        public void run2() throws CacheException {
+          WaitCriterion ev = new WaitCriterion() {
+            public boolean done() {
+              return  CacheClientProxy.testHook != null && (((RejectClientReconnectTestHook) CacheClientProxy.testHook).wasClientRejected());
+            }
+            public String description() {
+              return null;
+            }
+          };
+          DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
+          assertTrue(((RejectClientReconnectTestHook) CacheClientProxy.testHook).wasClientRejected());
+        }
+      });
+      
+      checkPrimaryUpdater(durableClientVM);
+      
+      // After rejection, the client will retry and eventually connect
+      // Verify durable client on server2
+      verifyDurableClientOnServer(server1VM, durableClientId);
+    
+      createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true);
+      createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
+      createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true);
+      //send client ready
+      sendClientReady(durableClientVM);
+  
+      checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+      checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+      checkCqListenerEvents(durableClientVM, "All", 0 /*numEventsExpected*/, 1/*numEventsToWaitFor*/, 5/*secondsToWait*/);      
+  
+      // Stop the durable client
+      this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+  
+      // Stop the publisher client
+      this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+  
+      // Stop the server
+      this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+    }finally{
+      this.server1VM.invoke(new CacheSerializableRunnable(
+          "unset test hook") {
+        public void run2() throws CacheException {
+          CacheClientProxy.testHook = null;
+        }
+      });
+    }
   }
   
   
@@ -2948,99 +2986,101 @@ public class DurableClientSimpleDUnitTest extends DurableClientTestCase {
    * being reactivated
    * @throws Exception
    */
-  public void testCqCloseExceptionDueToActivatingClient() throws Exception {
-    String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
-    String allQuery = "select * from /" + regionName + " p where p.ID > -1";
-    String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
+  public void testCqCloseExceptionDueToActivatingClient() throws Exception {
+    try {
+      String greaterThan5Query = "select * from /" + regionName + " p where p.ID > 5";
+      String allQuery = "select * from /" + regionName + " p where p.ID > -1";
+      String lessThan5Query = "select * from /" + regionName + " p where p.ID < 5";
+          
+      // Start server 1
+      Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+          "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+      final int serverPort = ports[0].intValue();
+      
+      final String durableClientId = getName() + "_client";
+    
+      startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+      //register durable cqs
+      createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
+      createCq(durableClientVM, "All", allQuery, true);
+      createCq(durableClientVM, "LessThan5", lessThan5Query, true);
+      //send client ready
+      sendClientReady(durableClientVM);
         
-    // Start server 1
-    Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
-        "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
-    final int serverPort = ports[0].intValue();
-    
-    final String durableClientId = getName() + "_client";
+      // Verify durable client on server
+      verifyDurableClientOnServer(server1VM, durableClientId);
+            
+      // Stop the durable client
+      this.disconnectDurableClient(true);
+      
+      // Start normal publisher client
+      startClient(publisherClientVM, serverPort, regionName);
   
-    startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
-    //register durable cqs
-    createCq(durableClientVM, "GreaterThan5", greaterThan5Query, true);
-    createCq(durableClientVM, "All", allQuery, true);
-    createCq(durableClientVM, "LessThan5", lessThan5Query, true);
-    //send client ready
-    sendClientReady(durableClientVM);
+      // Publish some entries
+      publishEntries(publisherClientVM, regionName, 10);
+          
       
-    // Verify durable client on server
-    verifyDurableClientOnServer(server1VM, durableClientId);
+      AsyncInvocation async = this.server1VM.invokeAsync(new CacheSerializableRunnable(
+          "Close cq for durable client") {
+        public void run2() throws CacheException {
+  
+          //Set the Test Hook!
+          //This test hook will pause during the drain process
+          CacheClientProxy.testHook = new CqExceptionDueToActivatingClientTestHook();
           
-    // Stop the durable client
-    this.disconnectDurableClient(true);
-    
-    // Start normal publisher client
-    startClient(publisherClientVM, serverPort, regionName);
-
-    // Publish some entries
-    publishEntries(publisherClientVM, regionName, 10);
-        
-    
-    AsyncInvocation async = this.server1VM.invokeAsync(new CacheSerializableRunnable(
-        "Close cq for durable client") {
-      public void run2() throws CacheException {
-
-        //Set the Test Hook!
-        //This test hook will pause during the drain process
-        CacheClientProxy.testHook = new CqExceptionDueToActivatingClientTestHook();
-        
-        final CacheClientNotifier ccnInstance = CacheClientNotifier
-            .getInstance();
-        final CacheClientProxy clientProxy = ccnInstance
-            .getClientProxy(durableClientId);
-        ClientProxyMembershipID proxyId = clientProxy.getProxyID();
-
-        try {
-          ccnInstance.closeClientCq(durableClientId, "All");
-          fail("Should have thrown an exception due to activating client");
-        }
-        catch (CqException e) {
-          String expected = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_RESTARTING_DURABLE_CLIENT.toLocalizedString("All", proxyId.getDurableId());
-          if (!e.getMessage().equals(expected)) {            
-           fail("Not the expected exception, was expecting " + (LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_RESTARTING_DURABLE_CLIENT.toLocalizedString("All", proxyId.getDurableId()) + " instead of exception: " + e.getMessage()));
+          final CacheClientNotifier ccnInstance = CacheClientNotifier
+              .getInstance();
+          final CacheClientProxy clientProxy = ccnInstance
+              .getClientProxy(durableClientId);
+          ClientProxyMembershipID proxyId = clientProxy.getProxyID();
+  
+          try {
+            ccnInstance.closeClientCq(durableClientId, "All");
+            fail("Should have thrown an exception due to activating client");
+          }
+          catch (CqException e) {
+            String expected = LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_RESTARTING_DURABLE_CLIENT.toLocalizedString("All", proxyId.getDurableId());
+            if (!e.getMessage().equals(expected)) {            
+             fail("Not the expected exception, was expecting " + (LocalizedStrings.CacheClientProxy_COULD_NOT_DRAIN_CQ_DUE_TO_RESTARTING_DURABLE_CLIENT.toLocalizedString("All", proxyId.getDurableId()) + " instead of exception: " + e.getMessage()));
+            }
           }
         }
-      }
-    });
-    
-    //Restart the durable client
-    startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
-  
-    //Reregister durable cqs
-    createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true);
-    createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
-    createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true);
-    //send client ready
-    sendClientReady(durableClientVM);
-    
-    async.join();
-    assertEquals(async.getException() != null ? async.getException().toString(): "No error" ,false,  async.exceptionOccurred());
-    
-    //verify cq listener events
-    checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/);
-    checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/);
-    checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/);
-    
-    this.server1VM.invoke(new CacheSerializableRunnable(
-        "unset test hook") {
-      public void run2() throws CacheException {
-        CacheClientProxy.testHook = null;
-      }
-    });
-
-    // Stop the durable client
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
-
-    // Stop the publisher client
-    this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
-
-    // Stop the server
-    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+      });
+      
+      //Restart the durable client
+      startDurableClient(durableClientVM, durableClientId, serverPort, regionName);
+    
+      //Reregister durable cqs
+      createCq(durableClientVM, "GreaterThan5", "select * from /" + regionName + " p where p.ID > 5", true);
+      createCq(durableClientVM, "All", "select * from /" + regionName + " p where p.ID > -1", true);
+      createCq(durableClientVM, "LessThan5", "select * from /" + regionName + " p where p.ID < 5", true);
+      //send client ready
+      sendClientReady(durableClientVM);
+      
+      async.join();
+      assertEquals(async.getException() != null ? async.getException().toString(): "No error" ,false,  async.exceptionOccurred());
+      
+      //verify cq listener events
+      checkCqListenerEvents(durableClientVM, "GreaterThan5", 4 /*numEventsExpected*/, 4/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+      checkCqListenerEvents(durableClientVM, "LessThan5", 5 /*numEventsExpected*/, 5/*numEventsToWaitFor*/, 15/*secondsToWait*/);
+      checkCqListenerEvents(durableClientVM, "All", 10 /*numEventsExpected*/, 10/*numEventsToWaitFor*/, 15/*secondsToWait*/);           
+  
+      // Stop the durable client
+      this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+  
+      // Stop the publisher client
+      this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+  
+      // Stop the server
+      this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+    }finally {
+      this.server1VM.invoke(new CacheSerializableRunnable(
+          "unset test hook") {
+        public void run2() throws CacheException {
+          CacheClientProxy.testHook = null;
+        }
+      });
+    }
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
index 583ef81..ec621de 100755
--- a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
@@ -7,8 +7,14 @@
  */
 package com.gemstone.gemfire.internal.cache.tier.sockets;
 
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -83,6 +89,8 @@ public class DurableClientTestCase extends DistributedTestCase {
     this.regionName = getName() + "_region";
     //Clients see this when the servers disconnect
     addExpectedException("Could not find any server");
+    testName = getName();
+    System.out.println("\n\n[setup] START TEST " + getClass().getSimpleName()+"."+ testName+"\n\n");
   }
   
   public void tearDown2() throws Exception {
@@ -148,70 +156,67 @@ public class DurableClientTestCase extends DistributedTestCase {
    * In this test we will set gemfire.SPECIAL_DURABLE property to true and will
    * see durableID appended by poolname or not
    */
-  public void testSimpleDurableClient2() {    
-    // Start a server
-    int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
-        "createCacheServer", new Object[] {regionName, new Boolean(true)}))
-        .intValue();
-
-    // Start a durable client that is not kept alive on the server when it
-    // stops normally
-    final String durableClientId = getName() + "_client";
+  public void testSimpleDurableClient2() {
     final Properties jp = new Properties();
     jp.setProperty("gemfire.SPECIAL_DURABLE", "true");
-    
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", 
-        new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true)
-    									, regionName
-    									, getClientDistributedSystemProperties(durableClientId)
-    									, new Boolean(false)
-    									, jp});
 
-    // Send clientReady message
-    this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
-      public void run2() throws CacheException {
-        CacheServerTestUtil.getCache().readyForEvents();
-      }
-    });
+    try {
+      // Start a server
+      int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class, "createCacheServer", new Object[] { regionName, new Boolean(true) })).intValue();
 
-    // Verify durable client on server
-    this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
-      public void run2() throws CacheException {
-        // Find the proxy
-        checkNumberOfClientProxies(1);
-        CacheClientProxy proxy = getClientProxy();
-        assertNotNull(proxy);
-        
-        // Verify that it is durable and its properties are correct
-        assertTrue(proxy.isDurable());
-        assertNotSame(durableClientId, proxy.getDurableId());
-        
-        /* new durable id will be like this
-         * durableClientId
-         * _gem_ //separartor
-         * client pool name
-         */
-        String dId = durableClientId + "_gem_" + "CacheServerTestUtil";
-        
-        assertEquals(dId, proxy.getDurableId());
-        assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
-        //assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_KEEP_ALIVE, proxy.getDurableKeepAlive());
-      }
-    });
-    
-    // Stop the durable client
-    this.disconnectDurableClient();
-    
-    // Verify the durable client is present on the server for closeCache=false case.
-    this.verifySimpleDurableClient();
-    
-    // Stop the server
-    this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+      // Start a durable client that is not kept alive on the server when it
+      // stops normally
+      final String durableClientId = getName() + "_client";
 
-    this.closeDurableClient();
-    
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "unsetJavaSystemProperties", 
+      this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+          new Object[] { getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId), new Boolean(false), jp });
+
+      // Send clientReady message
+      this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+        public void run2() throws CacheException {
+          CacheServerTestUtil.getCache().readyForEvents();
+        }
+      });
+
+      // Verify durable client on server
+      this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+        public void run2() throws CacheException {
+          // Find the proxy
+          checkNumberOfClientProxies(1);
+          CacheClientProxy proxy = getClientProxy();
+          assertNotNull(proxy);
+
+          // Verify that it is durable and its properties are correct
+          assertTrue(proxy.isDurable());
+          assertNotSame(durableClientId, proxy.getDurableId());
+
+          /* new durable id will be like this
+           * durableClientId
+           * _gem_ //separartor
+           * client pool name */
+          String dId = durableClientId + "_gem_" + "CacheServerTestUtil";
+
+          assertEquals(dId, proxy.getDurableId());
+          assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
+          // assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_KEEP_ALIVE, proxy.getDurableKeepAlive());
+        }
+      });
+
+      // Stop the durable client
+      this.disconnectDurableClient();
+
+      // Verify the durable client is present on the server for closeCache=false case.
+      this.verifySimpleDurableClient();
+
+      // Stop the server
+      this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+
+      this.closeDurableClient();
+    } finally {
+ 
+      this.durableClientVM.invoke(CacheServerTestUtil.class, "unsetJavaSystemProperties", 
         new Object[] {jp});
+    }
     
   }
   
@@ -226,9 +231,49 @@ public class DurableClientTestCase extends DistributedTestCase {
 
   public void disconnectDurableClient(boolean keepAlive)
   {
-    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache",new Object[] {new Boolean(keepAlive)});    
+    printClientProxyState("Before");
+    this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache",new Object[] {new Boolean(keepAlive)});
+    pause(1000);
+    printClientProxyState("after");
   }
 
+  public void printClientProxyState(String st) {
+    CacheSerializableRunnable s = new CacheSerializableRunnable("Loggiog CCCP and ServerConnetcion state"){
+      @Override
+      public void run2() throws CacheException {
+        // TODO Auto-generated method stub
+        CacheServerTestUtil.getCache().getLogger().info(st + " CCP states: " + getAllClientProxyState());
+        CacheServerTestUtil.getCache().getLogger().info(st + " CHM states: " + printMap(ClientHealthMonitor._instance.getConnectedClients(null)));
+      }
+    };
+    server1VM.invoke(s);
+  }
+  
+  private String printMap(Map m) {
+    Iterator<Map.Entry> itr = m.entrySet().iterator();
+    StringBuffer sb = new StringBuffer();
+    while(itr.hasNext()){
+      sb.append("{");
+      Map.Entry entry = itr.next();
+      sb.append(entry.getKey());
+      sb.append(", ");
+      printMapValue(entry.getValue(), sb);
+      sb.append("}");
+    }
+    return sb.toString();    
+  }
+  
+  private void printMapValue(Object value, StringBuffer sb) {
+    if(value.getClass().isArray()) {
+      
+      sb.append("{");
+      sb.append(Arrays.toString((Object[])value));
+      sb.append("}");
+    } else {
+      sb.append(value);
+    }
+  }
+  
   public void verifySimpleDurableClient()
   {
     this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
@@ -1431,7 +1476,7 @@ public class DurableClientTestCase extends DistributedTestCase {
   public class RejectClientReconnectTestHook implements CacheClientProxy.TestHook {
       CountDownLatch reconnectLatch = new CountDownLatch(1);
       CountDownLatch continueDrain = new CountDownLatch(1);
-      boolean clientWasRejected = false;
+      volatile boolean clientWasRejected = false;
       CountDownLatch clientConnected = new CountDownLatch(1);
       
       public void doTestHook(String spot) {
@@ -1579,13 +1624,39 @@ public class DurableClientTestCase extends DistributedTestCase {
     return proxy;
   }
   
+  protected static String getAllClientProxyState() {
+    // Get the CacheClientNotifier
+    CacheClientNotifier notifier = getBridgeServer().getAcceptor()
+        .getCacheClientNotifier();
+    
+    // Get the CacheClientProxy or not (if proxy set is empty)
+    CacheClientProxy proxy = null;
+    Iterator<CacheClientProxy> i = notifier.getClientProxies().iterator();
+    StringBuffer sb = new StringBuffer();
+    while(i.hasNext()) {
+      sb.append(" [");
+      sb.append(i.next().getState());
+      sb.append(" ]");
+    }
+    return sb.toString();
+  }
+  
   protected static void checkNumberOfClientProxies(final int expected) {
     WaitCriterion ev = new WaitCriterion() {
       public boolean done() {
         return expected == getNumberOfClientProxies();
       }
       public String description() {
-        return null;
+        /*String result =  "";
+        if(ClientHealthMonitor._instance != null && ClientHealthMonitor._instance.getCleanupProxyIdTable() != null)
+          result = ClientHealthMonitor._instance.getCleanupProxyIdTable().toString(); 
+        
+        
+        CacheClientProxy ccp = getClientProxy();
+        
+        if(ccp != null)
+          result += " ccp: " + ccp.toString();*/
+        return getAllClientProxyState();
       }
     };
     DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-cq/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/DurableClientCommandsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/DurableClientCommandsDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/DurableClientCommandsDUnitTest.java
index 353d7fa..95b5301 100644
--- a/gemfire-cq/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/DurableClientCommandsDUnitTest.java
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/management/internal/cli/commands/DurableClientCommandsDUnitTest.java
@@ -95,9 +95,15 @@ public class DurableClientCommandsDUnitTest extends CliCommandTestBase {
     CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CLOSE_DURABLE_CLIENTS);
     csb.addOption(CliStrings.CLOSE_DURABLE_CLIENTS__CLIENT__ID, clientName);
     String commandString = csb.toString();
-    writeToLog("Command String : ", commandString);
-    CommandResult commandResult = executeCommand(commandString);
-    String resultAsString = commandResultToString(commandResult);
+    long giveUpTime = System.currentTimeMillis() + 20000;
+    CommandResult commandResult = null;
+    String resultAsString = null;
+    do {
+      writeToLog("Command String : ", commandString);
+      commandResult = executeCommand(commandString);
+      resultAsString = commandResultToString(commandResult);
+    } while (resultAsString.contains("Cannot close a running durable client")
+        && giveUpTime > System.currentTimeMillis());
     writeToLog("Command Result :\n", resultAsString);
     assertTrue(Status.OK.equals(commandResult.getStatus()));
     
@@ -140,6 +146,16 @@ public class DurableClientCommandsDUnitTest extends CliCommandTestBase {
     
   }
     
+//  public void testRepeat() throws Exception {
+//    long endTime = System.currentTimeMillis() + (75 * 60000);
+//    while (endTime > System.currentTimeMillis()) {
+//      testCountSubscriptionQueueSize();
+//      tearDown();
+//      setUp();
+//    }
+//    testCountSubscriptionQueueSize();
+//  }
+//  
   public void testCountSubscriptionQueueSize() throws Exception {
     setupSystem();
     setupCqs();
@@ -199,11 +215,17 @@ public class DurableClientCommandsDUnitTest extends CliCommandTestBase {
     csb = new CommandStringBuilder(CliStrings.CLOSE_DURABLE_CLIENTS);
     csb.addOption(CliStrings.CLOSE_DURABLE_CLIENTS__CLIENT__ID, clientName);
     commandString = csb.toString();
-    writeToLog("Command String : ", commandString);
-    commandResult = executeCommand(commandString);
-    resultAsString = commandResultToString(commandResult);
+    // since it can take the server a bit to know that the client has disconnected
+    // we loop here
+    long giveUpTime = System.currentTimeMillis() + 20000;
+    do {
+      writeToLog("Command String : ", commandString);
+      commandResult = executeCommand(commandString);
+      resultAsString = commandResultToString(commandResult);
+    } while (resultAsString.contains("Cannot close a running durable client")
+        && giveUpTime > System.currentTimeMillis());
     writeToLog("Command Result :\n", resultAsString);
-    assertTrue(Status.OK.equals(commandResult.getStatus()));
+    assertTrue("failed executing" + commandString + "; result = "+resultAsString, Status.OK.equals(commandResult.getStatus()));
     
     csb = new CommandStringBuilder(CliStrings.COUNT_DURABLE_CQ_EVENTS);
     csb.addOption(CliStrings.COUNT_DURABLE_CQ_EVENTS__DURABLE__CLIENT__ID, clientName);
@@ -217,8 +239,7 @@ public class DurableClientCommandsDUnitTest extends CliCommandTestBase {
   }
   
   private void writeToLog(String text, String resultAsString) {
-    getLogWriter().info(testName + "\n");
-    getLogWriter().info(resultAsString);
+    getLogWriter().info(getUniqueName() + ": " + text + "\n" + resultAsString);
   }
   
   private void setupSystem() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 4a4446c..a41a246 100644
--- a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -630,12 +630,12 @@ public class GatewaySenderEventRemoteDispatcher implements
           logger.fatal(LocalizedMessage.create(
               LocalizedStrings.GatewayEventRemoteDispatcher_STOPPING_THE_PROCESSOR_BECAUSE_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_PROCESSING_A_BATCH) ,e);
         }
-        sender.lifeCycleLock.writeLock().lock();
+        sender.getLifeCycleLock().writeLock().lock();
         try {
           processor.stopProcessing();
           sender.clearTempEventsAfterSenderStopped();
         } finally {
-          sender.lifeCycleLock.writeLock().unlock();
+          sender.getLifeCycleLock().writeLock().unlock();
         }
         // destroyConnection();
       } finally {



Mime
View raw message