geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [geode] branch develop updated: GEODE-5411: Adding awaitility fixing timing issue in CqPerfDUnitTest
Date Fri, 31 Aug 2018 00:11:07 GMT
This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 46b7897  GEODE-5411: Adding awaitility fixing timing issue in CqPerfDUnitTest
46b7897 is described below

commit 46b78978d894e8851daf86d929ae5efbd9093097
Author: mhansonp <mhanson@pivotal.io>
AuthorDate: Thu Aug 30 17:10:50 2018 -0700

    GEODE-5411: Adding awaitility fixing timing issue in CqPerfDUnitTest
    
    - Fixing timing issue by waiting for cq set to reach the right size
    - Code modernization and cleanup
---
 .../cache/query/cq/dunit/CqDataDUnitTest.java      |   33 +-
 .../cache/query/cq/dunit/CqPerfDUnitTest.java      |  602 +++---
 .../cache/query/cq/dunit/CqQueryDUnitTest.java     | 2026 ++++++++------------
 3 files changed, 1021 insertions(+), 1640 deletions(-)

diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDataDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDataDUnitTest.java
index f43276e..0aac70b 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDataDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqDataDUnitTest.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.fail;
 import java.util.HashSet;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.logging.log4j.Logger;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -27,7 +28,6 @@ import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAttributes;
-import org.apache.geode.cache.MirrorType;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.query.CqQuery;
@@ -40,6 +40,7 @@ import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.cache30.CertifiableTestCacheListener;
 import org.apache.geode.cache30.ClientServerTestCase;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.Host;
@@ -61,7 +62,7 @@ import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
  */
 @Category({ClientSubscriptionTest.class})
 public class CqDataDUnitTest extends JUnit4CacheTestCase {
-
+  private static final Logger logger = LogService.getLogger();
   protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest();
 
   public CqDataDUnitTest() {
@@ -386,8 +387,8 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
 
     VM client = host.getVM(2);
 
-    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
-    cqDUnitTest.createServer(server2, 0, false, MirrorType.KEYS);
+    cqDUnitTest.createServer(server1, 0, false, DataPolicy.REPLICATE);
+    cqDUnitTest.createServer(server2, 0, false, DataPolicy.REPLICATE);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
     final String host0 = NetworkUtils.getServerHostName(server1.getHost());
@@ -439,7 +440,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     final int evictionThreshold = 1;
     server1.invoke(new CacheSerializableRunnable("Create Cache Server") {
       public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
+        logger.info("### Create Cache Server. ###");
         AttributesFactory factory = new AttributesFactory();
         factory.setScope(Scope.DISTRIBUTED_ACK);
         factory.setDataPolicy(DataPolicy.REPLICATE);
@@ -508,7 +509,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     VM server1 = host.getVM(0);
     VM client = host.getVM(1);
 
-    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
+    cqDUnitTest.createServer(server1, 0, false, DataPolicy.REPLICATE);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
     final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
@@ -520,7 +521,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     SerializableRunnable createClientWithPool =
         new CacheSerializableRunnable("createClientWithPool") {
           public void run2() throws CacheException {
-            LogWriterUtils.getLogWriter().info("### Create Client. ###");
+            logger.info("### Create Client. ###");
             // Initialize CQ Service.
             try {
               getCache().getQueryService();
@@ -569,7 +570,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     VM server1 = host.getVM(0);
     VM client = host.getVM(1);
 
-    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
+    cqDUnitTest.createServer(server1, 0, false, DataPolicy.REPLICATE);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
     final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
@@ -581,7 +582,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     SerializableRunnable createClientWithPool =
         new CacheSerializableRunnable("createClientWithPool") {
           public void run2() throws CacheException {
-            LogWriterUtils.getLogWriter().info("### Create Client. ###");
+            logger.info("### Create Client. ###");
             // Region region1 = null;
             // Initialize CQ Service.
             try {
@@ -627,7 +628,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     VM server1 = host.getVM(0);
     VM client = host.getVM(1);
 
-    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
+    cqDUnitTest.createServer(server1, 0, false, DataPolicy.REPLICATE);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
     final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
@@ -639,7 +640,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     SerializableRunnable createClientWithConnectionPool =
         new CacheSerializableRunnable("createClientWithConnectionPool") {
           public void run2() throws CacheException {
-            LogWriterUtils.getLogWriter().info("### Create Client. ###");
+            logger.info("### Create Client. ###");
             // Region region1 = null;
             // Initialize CQ Service.
             try {
@@ -684,7 +685,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     VM server1 = host.getVM(0);
     VM client = host.getVM(1);
 
-    cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
+    cqDUnitTest.createServer(server1, 0, false, DataPolicy.REPLICATE);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
     final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
@@ -696,7 +697,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     SerializableRunnable createClientWithPool =
         new CacheSerializableRunnable("createClientWithPool") {
           public void run2() throws CacheException {
-            LogWriterUtils.getLogWriter().info("### Create Client. ###");
+            logger.info("### Create Client. ###");
             // Region region1 = null;
             // Initialize CQ Service.
             try {
@@ -769,7 +770,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     // Test for Event on Region Clear.
     server.invoke(new CacheSerializableRunnable("testRegionEvents") {
       public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Clearing the region on the server ###");
+        logger.info("### Clearing the region on the server ###");
         Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
         for (int i = 1; i <= 5; i++) {
           region.put(CqQueryDUnitTest.KEY + i, new Portfolio(i));
@@ -783,7 +784,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     // Test for Event on Region invalidate.
     server.invoke(new CacheSerializableRunnable("testRegionEvents") {
       public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Invalidate the region on the server ###");
+        logger.info("### Invalidate the region on the server ###");
         Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
         for (int i = 1; i <= 5; i++) {
           region.put(CqQueryDUnitTest.KEY + i, new Portfolio(i));
@@ -797,7 +798,7 @@ public class CqDataDUnitTest extends JUnit4CacheTestCase {
     // Test for Event on Region destroy.
     server.invoke(new CacheSerializableRunnable("testRegionEvents") {
       public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Destroying the region on the server ###");
+        logger.info("### Destroying the region on the server ###");
         Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[1]);
         for (int i = 1; i <= 5; i++) {
           region.put(CqQueryDUnitTest.KEY + i, new Portfolio(i));
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqPerfDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqPerfDUnitTest.java
index eb4868c..c880d1f 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqPerfDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqPerfDUnitTest.java
@@ -14,15 +14,17 @@
  */
 package org.apache.geode.cache.query.cq.dunit;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -40,8 +42,7 @@ import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
 import org.apache.geode.cache.query.internal.cq.ServerCQImpl;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Host;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.NetworkUtils;
@@ -52,20 +53,22 @@ import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 
 /**
- * This class tests the ContiunousQuery mechanism in GemFire. This includes the test with diffetent
+ * This class tests the ContinuousQuery mechanism in GemFire. This includes the test with different
  * data activities.
  */
+@SuppressWarnings("SpellCheckingInspection")
 @Category({ClientSubscriptionTest.class})
 public class CqPerfDUnitTest extends JUnit4CacheTestCase {
-
-  protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest();
+  private final Logger logger = LogService.getLogger();
+  @SuppressWarnings("CanBeFinal")
+  private CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest();
 
   public CqPerfDUnitTest() {
     super();
   }
 
   @Override
-  public final void postSetUp() throws Exception {
+  public final void postSetUp() {
     // avoid IllegalStateException from HandShake by connecting all vms tor
     // system before creating connection pools
     getSystem();
@@ -78,63 +81,48 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Tests the cq performance.
-   *
    */
   @Ignore("perf")
   @Test
-  public void testCQPerf() throws Exception {
+  public void testCQPerf() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     cqDUnitTest.createServer(server);
 
-    final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     cqDUnitTest.createClient(client, port, host0);
     final String cqName = "testCQPerf_0";
 
-    client.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
-        // Create CQ Attributes.
-        CqAttributesFactory cqf = new CqAttributesFactory();
-        CqListener[] cqListeners = {new CqTimeTestListener(LogWriterUtils.getLogWriter())};
-        ((CqTimeTestListener) cqListeners[0]).cqName = cqName;
-
-        cqf.initCqListeners(cqListeners);
-        CqAttributes cqa = cqf.create();
-
-        // Create and Execute CQ.
-        try {
-          CqQuery cq1 = cqService.newCq(cqName, cqDUnitTest.cqs[0], cqa);
-          assertTrue("newCq() state mismatch", cq1.getState().isStopped());
-          cq1.execute();
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("CqService is :" + cqService);
-          ex.printStackTrace();
-          AssertionError err = new AssertionError("Failed to create CQ " + cqName + " . ");
-          err.initCause(ex);
-          throw err;
-        }
-      }
+    client.invoke(() -> {
+      logger.info("### Create CQ. ###" + cqName);
+      // Get CQ Service.
+      QueryService cqService =
+          getCache().getQueryService();
+
+      // Create CQ Attributes.
+      CqAttributesFactory cqf = new CqAttributesFactory();
+      CqListener[] cqListeners = {new CqTimeTestListener(LogWriterUtils.getLogWriter())};
+      ((CqTimeTestListener) cqListeners[0]).cqName = cqName;
+
+      cqf.initCqListeners(cqListeners);
+      CqAttributes cqa = cqf.create();
+
+      // Create and Execute CQ.
+      CqQuery cq1 = cqService.newCq(cqName, cqDUnitTest.cqs[0], cqa);
+      assertThat(cq1.getState().isStopped()).describedAs("newCq() state mismatch").isTrue();
+      cq1.execute();
     });
 
     final int size = 50;
 
     // Create values.
     cqDUnitTest.createValuesWithTime(client, cqDUnitTest.regions[0], size);
+
     Wait.pause(5000);
 
     // Update values
@@ -142,39 +130,13 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
 
     client.invoke(new CacheSerializableRunnable("Validate CQs") {
       public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName);
+        logger.info("### Validating CQ. ### " + cqName);
         // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCqService.");
-        }
+        QueryService cqService = getCache().getQueryService();
 
         CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery == null) {
-          fail("Failed to get CqQuery for CQ : " + cqName);
-        }
+        assertThat(cQuery).isNotNull();
 
-        // CqAttributes cqAttr = cQuery.getCqAttributes();
-        // CqListener cqListeners[] = cqAttr.getCqListeners();
-        // CqTimeTestListener listener = (CqTimeTestListener) cqListeners[0];
-
-        // Wait for all the create to arrive.
-        // for (int i=1; i <= size; i++) {
-        // listener.waitForCreated(cqDUnitTest.KEY+i);
-        // }
-
-        // Wait for all the update to arrive.
-        // for (int i=1; i <= size; i++) {
-        // listener.waitForUpdated(cqDUnitTest.KEY+i);
-        // }
-        // getLogWriter().info("### Time taken for Creation of " + size + " events is :" +
-        // listener.getTotalQueryCreateTime());
-
-        // getLogWriter().info("### Time taken for Update of " + size + " events is :" +
-        // listener.getTotalQueryUpdateTime());
       }
     });
 
@@ -188,219 +150,174 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for maintaining keys for update optimization.
-   *
    */
   @Test
-  public void testKeyMaintainance() throws Exception {
+  public void testKeyMaintenance() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     cqDUnitTest.createServer(server);
-    final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
+    final String host0 = NetworkUtils.getServerHostName();
     cqDUnitTest.createClient(client, port, host0);
 
-
-    // HashSet for caching purpose will be created for cqs.
-    final int cqSize = 2;
-
     // Cq1
-    cqDUnitTest.createCQ(client, "testKeyMaintainance_0", cqDUnitTest.cqs[0]);
-    cqDUnitTest.executeCQ(client, "testKeyMaintainance_0", false, null);
+    cqDUnitTest.createCQ(client, "testKeyMaintenance_0", cqDUnitTest.cqs[0]);
+    cqDUnitTest.executeCQ(client, "testKeyMaintenance_0", false, null);
 
     // Cq2
-    cqDUnitTest.createCQ(client, "testKeyMaintainance_1", cqDUnitTest.cqs[10]);
-    cqDUnitTest.executeCQ(client, "testKeyMaintainance_1", false, null);
+    cqDUnitTest.createCQ(client, "testKeyMaintenance_1", cqDUnitTest.cqs[10]);
+    cqDUnitTest.executeCQ(client, "testKeyMaintenance_1", false, null);
 
     cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 1);
-    cqDUnitTest.waitForCreated(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY + 1);
-
-    // Entry is made into the CQs cache hashset.
-    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 0
-    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeys1") {
-      public void run2() throws CacheException {
-        CqService cqService = null;
-        try {
-          cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
-          Assert.fail("Failed to get the internal CqService.", ex);
-        }
-
-        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
-        for (InternalCqQuery cq : cqs) {
-          ServerCQImpl cqQuery = (ServerCQImpl) cq;
-          String serverCqName = (String) cqQuery.getServerCqName();
-          if (serverCqName.startsWith("testKeyMaintainance_0")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 1,
-                cqQuery.getCqResultKeysSize());
-          } else if (serverCqName.startsWith("testKeyMaintainance_1")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.", 0,
-                cqQuery.getCqResultKeysSize());
-          }
+    cqDUnitTest.waitForCreated(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 1);
+
+    // Entry is made into the CQs cache hashSet.
+    // testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 0
+    server.invoke(() -> {
+      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
+
+      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+      for (InternalCqQuery cq : cqs) {
+        ServerCQImpl cqQuery = (ServerCQImpl) cq;
+        String serverCqName = cqQuery.getServerCqName();
+        if (serverCqName.startsWith("testKeyMaintenance_0")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
+              .isEqualTo(1);
+        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
+              .isEqualTo(0);
         }
       }
     });
 
     // Update 1.
     cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 10);
-    cqDUnitTest.waitForCreated(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY + 10);
-
-    // Entry/check is made into the CQs cache hashset.
-    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 1
-    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate1") {
-      public void run2() throws CacheException {
-        CqService cqService = null;
-        try {
-          cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
-          Assert.fail("Failed to get the internal CqService.", ex);
-        }
-        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
-        for (InternalCqQuery cq : cqs) {
-          ServerCQImpl cqQuery = (ServerCQImpl) cq;
-
-          String serverCqName = (String) cqQuery.getServerCqName();
-          if (serverCqName.startsWith("testKeyMaintainance_0")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 10,
-                cqQuery.getCqResultKeysSize());
-          } else if (serverCqName.startsWith("testKeyMaintainance_1")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.", 5,
-                cqQuery.getCqResultKeysSize());
-          }
+    cqDUnitTest.waitForCreated(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 10);
+
+    // Entry/check is made into the CQs cache hashSet.
+    // testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 1
+    server.invoke(() -> {
+      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
+
+      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+      for (InternalCqQuery cq : cqs) {
+        ServerCQImpl cqQuery = (ServerCQImpl) cq;
+
+        String serverCqName = cqQuery.getServerCqName();
+        if (serverCqName.startsWith("testKeyMaintenance_0")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
+              .isEqualTo(10);
+        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
+              .isEqualTo(5);
         }
       }
     });
 
     // Update.
     cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);
-    cqDUnitTest.waitForCreated(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY + 12);
-
-    // Entry/check is made into the CQs cache hashset.
-    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 1
-    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2") {
-      public void run2() throws CacheException {
-        CqService cqService = null;
-        try {
-          cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
-          Assert.fail("Failed to get the internal CqService.", ex);
-        }
-
-        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
-        for (InternalCqQuery cq : cqs) {
-          ServerCQImpl cqQuery = (ServerCQImpl) cq;
-          String serverCqName = (String) cqQuery.getServerCqName();
-          if (serverCqName.startsWith("testKeyMaintainance_0")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 12,
-                cqQuery.getCqResultKeysSize());
-          } else if (serverCqName.startsWith("testKeyMaintainance_1")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.", 6,
-                cqQuery.getCqResultKeysSize());
-          }
+    cqDUnitTest.waitForCreated(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 12);
+
+    // Entry/check is made into the CQs cache hashSet.
+    // testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 1
+    server.invoke(() -> {
+      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
+
+      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+      for (InternalCqQuery cq : cqs) {
+        ServerCQImpl cqQuery = (ServerCQImpl) cq;
+        String serverCqName = cqQuery.getServerCqName();
+        if (serverCqName.startsWith("testKeyMaintenance_0")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
+              .isEqualTo(12);
+        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
+              .isEqualTo(6);
         }
-
       }
     });
 
     // Delete.
     cqDUnitTest.deleteValues(server, cqDUnitTest.regions[0], 6);
-    cqDUnitTest.waitForDestroyed(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY + 6);
-
-    // Entry/check is made into the CQs cache hashset.
-    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 1
-    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2") {
-      public void run2() throws CacheException {
-        CqService cqService = null;
-        try {
-          cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
-          Assert.fail("Failed to get the internal CqService.", ex);
-        }
-        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
-        for (InternalCqQuery cq : cqs) {
-          ServerCQImpl cqQuery = (ServerCQImpl) cq;
-
-          String serverCqName = (String) cqQuery.getServerCqName();
-          if (serverCqName.startsWith("testKeyMaintainance_0")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 6,
-                cqQuery.getCqResultKeysSize());
-          } else if (serverCqName.startsWith("testKeyMaintainance_1")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.", 3,
-                cqQuery.getCqResultKeysSize());
-          }
+    cqDUnitTest.waitForDestroyed(client, "testKeyMaintenance_0", CqQueryDUnitTest.KEY + 6);
+
+    // Entry/check is made into the CQs cache hashSet.
+    // testKeyMaintenance_0 with 1 entry and testKeyMaintenance_1 with 1
+    server.invoke(() -> {
+      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
+
+      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+      for (InternalCqQuery cq : cqs) {
+        ServerCQImpl cqQuery = (ServerCQImpl) cq;
+
+        String serverCqName = cqQuery.getServerCqName();
+        if (serverCqName.startsWith("testKeyMaintenance_0")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
+              .isEqualTo(6);
+        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
+              .isEqualTo(3);
         }
       }
     });
 
     // Stop CQ.
     // This should still needs to process the events so that Results are up-to-date.
-    cqDUnitTest.stopCQ(client, "testKeyMaintainance_1");
+    cqDUnitTest.stopCQ(client, "testKeyMaintenance_1");
     cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);
 
-    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2") {
-      public void run2() throws CacheException {
-        CqService cqService = null;
-        try {
-          cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
-          Assert.fail("Failed to get the internal CqService.", ex);
-        }
-
-        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
-        for (InternalCqQuery cq : cqs) {
-          ServerCQImpl cqQuery = (ServerCQImpl) cq;
-          String serverCqName = (String) cqQuery.getServerCqName();
-          if (serverCqName.startsWith("testKeyMaintainance_0")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 12,
-                cqQuery.getCqResultKeysSize());
-          } else if (serverCqName.startsWith("testKeyMaintainance_1")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.", 6,
-                cqQuery.getCqResultKeysSize());
-          }
+    server.invoke(() -> {
+      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
+
+      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+      for (InternalCqQuery cq : cqs) {
+        ServerCQImpl cqQuery = (ServerCQImpl) cq;
+        String serverCqName = cqQuery.getServerCqName();
+        if (serverCqName.startsWith("testKeyMaintenance_0")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
+              .isEqualTo(12);
+        } else if (serverCqName.startsWith("testKeyMaintenance_1")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_1 is wrong.")
+              .isEqualTo(6);
         }
-
       }
     });
 
-
     // re-start the CQ.
-    cqDUnitTest.executeCQ(client, "testKeyMaintainance_1", false, null);
+    cqDUnitTest.executeCQ(client, "testKeyMaintenance_1", false, null);
 
     // This will remove the caching for this CQ.
-    cqDUnitTest.closeCQ(client, "testKeyMaintainance_1");
-    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2") {
-      public void run2() throws CacheException {
-        CqService cqService = null;
-        try {
-          cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
-          Assert.fail("Failed to get the internal CqService.", ex);
-        }
-        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
-        for (InternalCqQuery cq : cqs) {
-          ServerCQImpl cqQuery = (ServerCQImpl) cq;
-
-          String serverCqName = (String) cqQuery.getServerCqName();
-          if (serverCqName.startsWith("testKeyMaintainance_0")) {
-            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 12,
-                cqQuery.getCqResultKeysSize());
-          } else if (serverCqName.startsWith("testKeyMaintainance_1")) {
-            fail("The key maintainance should not be present for this CQ.");
-          }
-        }
+    cqDUnitTest.closeCQ(client, "testKeyMaintenance_1");
+    server.invoke(() -> {
+      CqService cqService = ((DefaultQueryService) getCache().getQueryService()).getCqService();
+
+      Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+      for (InternalCqQuery cq : cqs) {
+        ServerCQImpl cqQuery = (ServerCQImpl) cq;
 
+        String serverCqName = cqQuery.getServerCqName();
+        assertThat(serverCqName.startsWith("testKeyMaintenance_1")).isFalse();
+
+        if (serverCqName.startsWith("testKeyMaintenance_0")) {
+          assertThat(cqQuery.getCqResultKeysSize())
+              .describedAs("The number of keys cached for cq testKeyMaintenance_0 is wrong.")
+              .isEqualTo(12);
+        }
       }
     });
 
-
     // Close.
     cqDUnitTest.closeClient(client);
     cqDUnitTest.closeServer(server);
@@ -409,18 +326,16 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
   /**
    * Test for common CQs. To test the changes relating to, executing CQ only once for all similar
    * CQs.
-   *
    */
   @Test
-  public void testMatchingCqs() throws Exception {
+  public void testMatchingCqs() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     cqDUnitTest.createServer(server);
-    final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
+    final String host0 = NetworkUtils.getServerHostName();
     cqDUnitTest.createClient(client, port, host0);
 
     // Create and Execute same kind of CQs.
@@ -464,7 +379,6 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
     // Execute the stopped CQ.
     cqDUnitTest.executeCQ(client, "testMatchingCqs_1", false, null);
 
-
     // Update - 3.
     cqDUnitTest.clearCQListenerEvents(client, "testMatchingCqs_3");
     cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
@@ -509,22 +423,20 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
   /**
    * Test for common CQs. To test the changes relating to, executing CQ only once for all similar
    * CQs.
-   *
    */
   @Test
-  public void testMatchingCQWithMultipleClients() throws Exception {
+  public void testMatchingCQWithMultipleClients() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client1 = host.getVM(1);
-    VM client2 = host.getVM(2);
-    VM client3 = host.getVM(3);
+    VM server = VM.getVM(0);
+    VM client1 = VM.getVM(1);
+    VM client2 = VM.getVM(2);
+    VM client3 = VM.getVM(3);
 
     VM clients[] = new VM[] {client1, client2, client3};
 
     cqDUnitTest.createServer(server);
-    final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final int port = server.invoke(CqQueryDUnitTest::getCacheServerPort);
+    final String host0 = NetworkUtils.getServerHostName();
 
     for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
       cqDUnitTest.createClient(clients[clientIndex], port, host0);
@@ -589,7 +501,6 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
     // Execute the stopped CQ.
     cqDUnitTest.executeCQ(client2, "testMatchingCQWithMultipleClients_1", false, null);
 
-
     // Update - 3.
     for (int clientIndex = 0; clientIndex < 3; clientIndex++) {
       cqDUnitTest.clearCQListenerEvents(clients[clientIndex],
@@ -636,7 +547,6 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
 
     validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2 * clients.length);
 
-
     // update 4
     cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
 
@@ -664,19 +574,19 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
   }
 
   @Test
-  public void testMatchingCQsWithMultipleServers() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client1 = host.getVM(2);
-    VM client2 = host.getVM(3);
+  public void testMatchingCQsWithMultipleServers() {
+
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client1 = VM.getVM(2);
+    VM client2 = VM.getVM(3);
 
     cqDUnitTest.createServer(server1);
 
     VM clients[] = new VM[] {client1, client2};
 
-    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
+    final String host0 = NetworkUtils.getServerHostName();
     // Create client.
 
     // Create client with redundancyLevel -1
@@ -696,10 +606,10 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
       cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, false, null);
     }
 
-    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], 1 * clients.length);
-    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], 1 * clients.length);
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], clients.length);
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], clients.length);
 
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
 
     // CREATE.
     cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], 10);
@@ -712,7 +622,7 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
 
     cqDUnitTest.createServer(server2, ports[0]);
 
-    final int port2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
+    final int port2 = server2.invoke(CqQueryDUnitTest::getCacheServerPort);
     System.out
         .println("### Port on which server1 running : " + port1 + " Server2 running : " + port2);
 
@@ -739,7 +649,6 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
     cqDUnitTest.waitForUpdated(client2, "testMatchingCQsWithMultipleServers_2",
         CqQueryDUnitTest.KEY + 4);
 
-
     int[] resultsCnt = new int[] {10, 1, 2};
 
     for (int i = 0; i < numCQs; i++) {
@@ -756,19 +665,21 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
 
 
   @Test
-  public void testFailOverMatchingCQsWithMultipleServers() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client1 = host.getVM(2);
-    VM client2 = host.getVM(3);
+  public void testFailOverMatchingCQsWithMultipleServers() {
+
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client1 = VM.getVM(2);
+    VM client2 = VM.getVM(3);
 
+    logger.info("Ready to create server 1");
     cqDUnitTest.createServer(server1);
+    logger.info("Ready to create server 1");
 
     VM clients[] = new VM[] {client1, client2};
 
-    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
+    final String host0 = NetworkUtils.getServerHostName();
     // Create client.
 
     // Create client with redundancyLevel -1
@@ -788,9 +699,8 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
       cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, false, null);
     }
 
-    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], 1 * clients.length);
-    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], 1 * clients.length);
-
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], clients.length);
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], clients.length);
 
     cqDUnitTest.createServer(server2, ports[0]);
 
@@ -809,18 +719,16 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
   }
 
 
-
   /**
    * Test for CQ Fail over.
-   *
    */
   @Test
-  public void testMatchingCQsOnDataNodeWithMultipleServers() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client1 = host.getVM(2);
-    VM client2 = host.getVM(3);
+  public void testMatchingCQsOnDataNodeWithMultipleServers() {
+
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client1 = VM.getVM(2);
+    VM client2 = VM.getVM(3);
 
     cqDUnitTest.createServerOnly(server1, 0);
     cqDUnitTest.createServerOnly(server2, 0);
@@ -829,8 +737,8 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
 
     VM clients[] = new VM[] {client1, client2};
 
-    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
+    final String host0 = NetworkUtils.getServerHostName();
     final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
 
     cqDUnitTest.createLocalRegion(client1, new int[] {port1, ports[0]}, host0, "-1",
@@ -850,11 +758,11 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
       cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, false, null);
     }
 
-    validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[0], 1 * clients.length);
-    validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[1], 1 * clients.length);
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[0], clients.length);
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[1], clients.length);
 
-    validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[0], 1 * clients.length);
-    validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[1], 1 * clients.length);
+    validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[0], clients.length);
+    validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[1], clients.length);
 
     // Close.
     cqDUnitTest.closeClient(client1);
@@ -865,25 +773,24 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Performance test for Matching CQ optimization changes.
-   *
    */
   @Ignore("perf")
   @Test
-  public void testPerformanceForMatchingCQs() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client1 = host.getVM(2);
-    VM client2 = host.getVM(3);
+  public void testPerformanceForMatchingCQs() {
+
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client1 = VM.getVM(2);
+    VM client2 = VM.getVM(3);
 
     cqDUnitTest.createServer(server1);
     cqDUnitTest.createServer(server2);
 
     // VM clients[] = new VM[]{client1, client2};
 
-    final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final int port2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final int port1 = server1.invoke(CqQueryDUnitTest::getCacheServerPort);
+    final int port2 = server2.invoke(CqQueryDUnitTest::getCacheServerPort);
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     // final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
@@ -895,22 +802,20 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
     cqDUnitTest.createClient(client2, new int[] {port2}, host0, "-1");
 
     // Client1 registers matching CQs on server1.
-    boolean uniqueQueries = false;
-    String[] matchingCqs = this.generateCqQueries(uniqueQueries);
+    String[] matchingCqs = this.generateCqQueries(false);
     for (int i = 0; i < matchingCqs.length; i++) {
       cqDUnitTest.createCQ(client1, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
       cqDUnitTest.executeCQ(client1, "testPerformanceForMatchingCQs_" + i, false, null);
     }
 
     // Client2 registers non-matching CQs on server2.
-    uniqueQueries = true;
-    matchingCqs = this.generateCqQueries(uniqueQueries);
+    matchingCqs = this.generateCqQueries(true);
     for (int i = 0; i < matchingCqs.length; i++) {
       cqDUnitTest.createCQ(client2, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
       cqDUnitTest.executeCQ(client2, "testPerformanceForMatchingCQs_" + i, false, null);
     }
 
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
 
     // CREATE.
     int size = 1000;
@@ -937,7 +842,7 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
           CqQueryDUnitTest.KEY + k);
     }
 
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
     printCqQueryExecutionTime(server1);
     printCqQueryExecutionTime(server2);
 
@@ -949,58 +854,41 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
 
   }
 
-  public void validateMatchingCqs(VM server, final int mapSize, final String query,
+  private void validateMatchingCqs(VM server, final int mapSize, final String query,
       final int numCqSize) {
-    server.invoke(new CacheSerializableRunnable("validateMatchingCqs") {
-      public void run2() throws CacheException {
-        CqServiceImpl cqService = null;
-        try {
-          cqService =
-              (CqServiceImpl) ((DefaultQueryService) getCache().getQueryService()).getCqService();
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
-          Assert.fail("Failed to get the internal CqService.", ex);
-        }
+    server.invoke(() -> {
+      CqServiceImpl cqService =
+          (CqServiceImpl) ((DefaultQueryService) getCache().getQueryService()).getCqService();
 
-        Map matchedCqMap = cqService.getMatchingCqMap();
-        assertEquals("The number of matched cq is not as expected.", mapSize, matchedCqMap.size());
+      Map matchedCqMap = cqService.getMatchingCqMap();
+      Awaitility.waitAtMost(30, TimeUnit.SECONDS)
+          .until(matchedCqMap::size, equalTo(mapSize));
 
-        if (query != null) {
-          if (!matchedCqMap.containsKey(query)) {
-            fail("Query not found in the matched cq map. Query:" + query);
-          }
-          Collection cqs = (Collection) matchedCqMap.get(query);
-          assertEquals("Number of matched cqs are not equal to the expected matched cqs", numCqSize,
-              cqs.size());
-        }
+      if (query != null) {
+        assertThat(matchedCqMap.containsKey(query)).isTrue();
+
+        Collection cqs = (Collection) matchedCqMap.get(query);
+        Awaitility.waitAtMost(30, TimeUnit.SECONDS)
+            .until(cqs::size, equalTo(numCqSize));
       }
     });
   }
 
-  public void printCqQueryExecutionTime(VM server) {
-    server.invoke(new CacheSerializableRunnable("printCqQueryExecutionTime") {
-      public void run2() throws CacheException {
-        CqServiceImpl cqService = null;
-        try {
-          cqService =
-              (CqServiceImpl) ((DefaultQueryService) getCache().getQueryService()).getCqService();
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("Failed to get the internal CqService.", ex);
-          Assert.fail("Failed to get the internal CqService.", ex);
-        }
+  private void printCqQueryExecutionTime(VM server) {
+    server.invoke(() -> {
+      CqServiceImpl cqService =
+          (CqServiceImpl) ((DefaultQueryService) getCache().getQueryService()).getCqService();
 
-        long timeTaken = cqService.getCqServiceVsdStats().getCqQueryExecutionTime();
-        LogWriterUtils.getLogWriter().info("Total Time taken to Execute CQ Query :" + timeTaken);
-        // System.out.println("Total Time taken to Execute CQ Query :" + timeTaken);
-      }
+      long timeTaken = cqService.getCqServiceVsdStats().getCqQueryExecutionTime();
+      logger.info("Total Time taken to Execute CQ Query :" + timeTaken);
     });
   }
 
-  public String[] generateCqQueries(boolean uniqueQueries) {
-    ArrayList initQueries = new ArrayList();
+  private String[] generateCqQueries(boolean uniqueQueries) {
+    List<String> initQueries = new ArrayList<>();
     // From Portfolio object.
     String[] names = {"aaa", "bbb", "ccc", "ddd"};
-    int nameIndex = 0;
+    int nameIndex;
 
     // Construct few unique Queries.
     for (int i = 0; i < 3; i++) {
@@ -1015,10 +903,8 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
     }
 
     int numMatchedQueries = 10;
-    ArrayList cqQueries = new ArrayList();
-    Iterator iter = initQueries.iterator();
-    while (iter.hasNext()) {
-      String query = (String) iter.next();
+    List<String> cqQueries = new ArrayList<>();
+    for (String query : initQueries) {
       for (int cnt = 0; cnt < numMatchedQueries; cnt++) {
         if (uniqueQueries) {
           // Append blank string, so that query string is different but the
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
index 1f72f98..7e66814 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
@@ -16,32 +16,32 @@ package org.apache.geode.cache.query.cq.dunit;
 
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAttributes;
-import org.apache.geode.cache.MirrorType;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionFactory;
@@ -71,34 +71,34 @@ import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.DistributedTombstoneOperation;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Host;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 
 /**
- * This class tests the ContiunousQuery mechanism in GemFire. It does so by creating a cache server
+ * This class tests the ContinuousQuery mechanism in GemFire. It does so by creating a cache server
  * with a cache and a pre-defined region and a data loader. The client creates the same region and
  * attaches the connection pool.
  */
 @Category({ClientSubscriptionTest.class})
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "Convert2MethodRef"})
 public class CqQueryDUnitTest extends JUnit4CacheTestCase {
-
-  /** The port on which the bridge server was started in this VM */
+  private static final Logger logger = LogService.getLogger();
+  /**
+   * The port on which the bridge server was started in this VM
+   */
   private static int bridgeServerPort;
 
-  protected static int port = 0;
+  private static final int port = 0;
   protected static int port2 = 0;
 
-  public static int noTest = -1;
+  public static final int noTest = -1;
 
   public final String[] regions = new String[] {"regionA", "regionB"};
 
@@ -116,7 +116,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   private static final int WAIT_DEFAULT = (20 * 1000);
 
-  public static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY, WAIT_DEFAULT).intValue();
+  private static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY, WAIT_DEFAULT);
 
   public final String[] cqs = new String[] {
       // 0 - Test for ">"
@@ -159,12 +159,12 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       // 1 - Test for "=" and "and".
       "SELECT ALL * FROM /" + regions[0] + " p where p.ID = 2 and p.status='active'"};
 
-  private String[] invalidCQs = new String[] {
+  private final String[] invalidCQs = new String[] {
       // Test for ">"
       "SELECT ALL * FROM /root/invalidRegion p where p.ID > 0"};
 
 
-  private String[] shortTypeCQs = new String[] {
+  private final String[] shortTypeCQs = new String[] {
       // 11 - Test for "short" number type
       "SELECT ALL * FROM /root/" + regions[0] + " p where p.shortID IN SET(1,2,3,4,5)"};
 
@@ -198,61 +198,40 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   }
 
   public void createServer(VM server, final int thePort, final boolean eviction) {
-    MirrorType mirrorType = MirrorType.KEYS_VALUES;
-    createServer(server, thePort, eviction, mirrorType);
+    createServer(server, thePort, eviction, DataPolicy.REPLICATE);
   }
 
   public void createServer(VM server, final int thePort, final boolean eviction,
-      final MirrorType mirrorType) {
-    SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
-        AttributesFactory factory = new AttributesFactory();
-        factory.setScope(Scope.DISTRIBUTED_ACK);
-        factory.setMirrorType(mirrorType);
-
-        // setting the eviction attributes.
-        if (eviction) {
-          EvictionAttributes evictAttrs =
-              EvictionAttributes.createLRUEntryAttributes(100000, EvictionAction.OVERFLOW_TO_DISK);
-          factory.setEvictionAttributes(evictAttrs);
-        }
+      final DataPolicy dataPolicy) {
+    server.invoke(() -> {
+      logger.info("### Create Cache Server. ###");
+      AttributesFactory factory = new AttributesFactory();
+      factory.setScope(Scope.DISTRIBUTED_ACK);
+      factory.setDataPolicy(dataPolicy);
 
-        for (int i = 0; i < regions.length; i++) {
-          createRegion(regions[i], factory.createRegionAttributes());
-        }
-        Wait.pause(2000);
-
-        try {
-          startBridgeServer(thePort, true);
-        }
+      // setting the eviction attributes.
+      if (eviction) {
+        EvictionAttributes evictAttrs =
+            EvictionAttributes.createLRUEntryAttributes(100000, EvictionAction.OVERFLOW_TO_DISK);
+        factory.setEvictionAttributes(evictAttrs);
+      }
 
-        catch (Exception ex) {
-          Assert.fail("While starting CacheServer", ex);
+      for (String region : regions) {
+        createRegion(region, factory.createRegionAttributes());
+        if (getRootRegion("root").getSubregion(region).isEmpty()) {
+          logger.info("### CreateServer: Region is empty ###");
         }
-        Wait.pause(2000);
-
       }
-    };
 
-    server.invoke(createServer);
+      startBridgeServer(thePort, true);
+    });
   }
 
   public void createServerOnly(VM server, final int thePort) {
-    SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
-        try {
-          startBridgeServer(thePort, true);
-        }
-
-        catch (Exception ex) {
-          Assert.fail("While starting CacheServer", ex);
-        }
-      }
-    };
-
-    server.invoke(createServer);
+    server.invoke(() -> {
+      logger.info("### Create Cache Server. ###");
+      startBridgeServer(thePort, true);
+    });
   }
 
   public void createPartitionRegion(final VM server, final String[] regionNames) {
@@ -260,8 +239,8 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
       public void run2() throws CacheException {
         RegionFactory rf = getCache().createRegionFactory(RegionShortcut.PARTITION);
-        for (int i = 0; i < regionNames.length; i++) {
-          rf.create(regionNames[i]);
+        for (String regionName : regionNames) {
+          rf.create(regionName);
         }
       }
     };
@@ -269,15 +248,15 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     server.invoke(createRegion);
   }
 
-  public void createReplicateRegionWithLocalDestroy(final VM server, final String[] regionNames) {
+  private void createReplicateRegionWithLocalDestroy(final VM server, final String[] regionNames) {
     SerializableRunnable createRegion = new CacheSerializableRunnable("Create Region") {
 
       public void run2() throws CacheException {
         RegionFactory rf =
             getCache().createRegionFactory(RegionShortcut.REPLICATE).setEvictionAttributes(
                 EvictionAttributes.createLIFOEntryAttributes(10, EvictionAction.LOCAL_DESTROY));
-        for (int i = 0; i < regionNames.length; i++) {
-          rf.create(regionNames[i]);
+        for (String regionName : regionNames) {
+          rf.create(regionName);
         }
       }
     };
@@ -290,29 +269,29 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   public void closeServer(VM server) {
     server.invoke(new SerializableRunnable("Close CacheServer") {
       public void run() {
-        LogWriterUtils.getLogWriter().info("### Close CacheServer. ###");
+        logger.info("### Close CacheServer. ###");
         stopBridgeServer(getCache());
       }
     });
-    Wait.pause(2 * 1000);
+
   }
 
-  public void crashServer(VM server) {
+  private void crashServer(VM server) {
     server.invoke(new SerializableRunnable("Crash CacheServer") {
       public void run() {
         org.apache.geode.cache.client.internal.ConnectionImpl.setTEST_DURABLE_CLIENT_CRASH(true);
-        LogWriterUtils.getLogWriter().info("### Crashing CacheServer. ###");
+        logger.info("### Crashing CacheServer. ###");
         stopBridgeServer(getCache());
       }
     });
     Wait.pause(2 * 1000);
   }
 
-  public void closeCrashServer(VM server) {
+  private void closeCrashServer(VM server) {
     server.invoke(new SerializableRunnable("Close CacheServer") {
       public void run() {
         org.apache.geode.cache.client.internal.ConnectionImpl.setTEST_DURABLE_CLIENT_CRASH(false);
-        LogWriterUtils.getLogWriter().info("### Crashing CacheServer. ###");
+        logger.info("### Crashing CacheServer. ###");
         stopBridgeServer(getCache());
       }
     });
@@ -328,37 +307,26 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   /* Create Client */
   public void createClient(VM client, final int[] serverPorts, final String serverHost,
       final String redundancyLevel) {
-    SerializableRunnable createQService = new CacheSerializableRunnable("Create Client") {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Create Client. ###");
-        // Region region1 = null;
-        // Initialize CQ Service.
-        try {
-          getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
-
-        AttributesFactory regionFactory = new AttributesFactory();
-        regionFactory.setScope(Scope.LOCAL);
-        if (redundancyLevel != null) {
-          ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts, true,
-              Integer.parseInt(redundancyLevel), -1, null);
-        } else {
-          ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts, true,
-              -1, -1, null);
-        }
-        for (int i = 0; i < regions.length; i++) {
-          createRegion(regions[i], regionFactory.createRegionAttributes());
-          LogWriterUtils.getLogWriter()
-              .info("### Successfully Created Region on Client :" + regions[i]);
-          // region1.getAttributesMutator().setCacheListener(new CqListener());
-        }
+    client.invoke(() -> {
+      logger.info("### Create Client. ###");
+
+      // Initialize CQ Service.
+      getCache().getQueryService();
+
+      AttributesFactory regionFactory = new AttributesFactory();
+      regionFactory.setScope(Scope.LOCAL);
+      if (redundancyLevel != null) {
+        ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts, true,
+            Integer.parseInt(redundancyLevel), -1, null);
+      } else {
+        ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts, true,
+            -1, -1, null);
       }
-    };
-
-    client.invoke(createQService);
+      for (String region : regions) {
+        createRegion(region, regionFactory.createRegionAttributes());
+        logger.info("### Successfully Created Region on Client :" + region);
+      }
+    });
   }
 
   /* Create Local Region */
@@ -366,7 +334,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       final String redundancyLevel, final String[] regionNames) {
     SerializableRunnable createQService = new CacheSerializableRunnable("Create Local Region") {
       public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Create Local Region. ###");
+        logger.info("### Create Local Region. ###");
         AttributesFactory af = new AttributesFactory();
         af.setScope(Scope.LOCAL);
 
@@ -381,8 +349,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         RegionFactory rf = getCache().createRegionFactory(af.create());
         for (int i = 0; i < regionNames.length; i++) {
           rf.create(regionNames[i]);
-          LogWriterUtils.getLogWriter()
-              .info("### Successfully Created Region on Client :" + regions[i]);
+          logger.info("### Successfully Created Region on Client :" + regions[i]);
         }
       }
     };
@@ -392,44 +359,39 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   public void createClientWith2Pools(VM client, final int[] serverPorts1, final int[] serverPorts2,
       final String serverHost, final String redundancyLevel) {
-    SerializableRunnable createQService = new CacheSerializableRunnable("Create Client") {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Create Client. ###");
-        // Region region1 = null;
-        // Initialize CQ Service.
-        try {
-          getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
+    client.invoke(() -> {
+      logger.info("### Create Client. ###");
+
+      // Initialize CQ Service.
+      getCache().getQueryService();
+
+      AttributesFactory regionFactory0 = new AttributesFactory();
+      AttributesFactory regionFactory1 = new AttributesFactory();
+      regionFactory0.setScope(Scope.LOCAL);
+      regionFactory1.setScope(Scope.LOCAL);
+      if (redundancyLevel != null) {
+        ClientServerTestCase.configureConnectionPoolWithName(regionFactory0, serverHost,
+            serverPorts1, true,
+            Integer.parseInt(redundancyLevel), -1,
+            null, "testPoolA");
+        ClientServerTestCase.configureConnectionPoolWithName(regionFactory1, serverHost,
+            serverPorts2, true,
+            Integer.parseInt(redundancyLevel), -1,
+            null, "testPoolB");
+      } else {
+        ClientServerTestCase.configureConnectionPoolWithName(regionFactory0, serverHost,
+            serverPorts1, true, -1, -1, null,
+            "testPoolA");
+        ClientServerTestCase.configureConnectionPoolWithName(regionFactory1, serverHost,
+            serverPorts2, true, -1, -1, null,
+            "testPoolB");
+      }
+      createRegion(regions[0], regionFactory0.createRegionAttributes());
+      createRegion(regions[1], regionFactory1.createRegionAttributes());
+      logger.info("### Successfully Created Region on Client :" + regions[0]);
+      logger.info("### Successfully Created Region on Client :" + regions[1]);
 
-        AttributesFactory regionFactory0 = new AttributesFactory();
-        AttributesFactory regionFactory1 = new AttributesFactory();
-        regionFactory0.setScope(Scope.LOCAL);
-        regionFactory1.setScope(Scope.LOCAL);
-        if (redundancyLevel != null) {
-          ClientServerTestCase.configureConnectionPoolWithName(regionFactory0, serverHost,
-              serverPorts1, true, Integer.parseInt(redundancyLevel), -1, null, "testPoolA");
-          ClientServerTestCase.configureConnectionPoolWithName(regionFactory1, serverHost,
-              serverPorts2, true, Integer.parseInt(redundancyLevel), -1, null, "testPoolB");
-        } else {
-          ClientServerTestCase.configureConnectionPoolWithName(regionFactory0, serverHost,
-              serverPorts1, true, -1, -1, null, "testPoolA");
-          ClientServerTestCase.configureConnectionPoolWithName(regionFactory1, serverHost,
-              serverPorts2, true, -1, -1, null, "testPoolB");
-        }
-        createRegion(regions[0], regionFactory0.createRegionAttributes());
-        createRegion(regions[1], regionFactory1.createRegionAttributes());
-        LogWriterUtils.getLogWriter()
-            .info("### Successfully Created Region on Client :" + regions[0]);
-        LogWriterUtils.getLogWriter()
-            .info("### Successfully Created Region on Client :" + regions[1]);
-
-      }
-    };
-
-    client.invoke(createQService);
+    });
   }
 
 
@@ -437,12 +399,11 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   public void closeClient(VM client) {
     SerializableRunnable closeCQService = new CacheSerializableRunnable("Close Client") {
       public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Close Client. ###");
+        logger.info("### Close Client. ###");
         try {
           ((DefaultQueryService) getCache().getQueryService()).closeCqService();
         } catch (Exception ex) {
-          LogWriterUtils.getLogWriter()
-              .info("### Failed to get CqService during ClientClose() ###");
+          logger.info("### Failed to get CqService during ClientClose() ###");
         }
 
       }
@@ -461,8 +422,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         for (int i = 1; i <= size; i++) {
           region1.put(KEY + i, new Portfolio(i));
         }
-        LogWriterUtils.getLogWriter()
-            .info("### Number of Entries in Region :" + region1.keySet().size());
+        logger.info("### Number of Entries in Region :" + region1.keySet().size());
       }
     });
   }
@@ -477,13 +437,12 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
           portfolio.createTime = System.currentTimeMillis();
           region1.put(KEY + i, portfolio);
         }
-        LogWriterUtils.getLogWriter()
-            .info("### Number of Entries in Region :" + region1.keySet().size());
+        logger.info("### Number of Entries in Region :" + region1.keySet().size());
       }
     });
   }
 
-  public void createValuesWithShort(VM vm, final String regionName, final int size) {
+  private void createValuesWithShort(VM vm, final String regionName, final int size) {
     vm.invoke(new CacheSerializableRunnable("Create values") {
       public void run2() throws CacheException {
         Region region1 = getRootRegion().getSubregion(regionName);
@@ -492,13 +451,12 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
           portfolio.shortID = new Short("" + i);
           region1.put(KEY + i, portfolio);
         }
-        LogWriterUtils.getLogWriter()
-            .info("### Number of Entries in Region :" + region1.keySet().size());
+        logger.info("### Number of Entries in Region :" + region1.keySet().size());
       }
     });
   }
 
-  public void createValuesAsPrimitives(VM vm, final String regionName, final int size) {
+  private void createValuesAsPrimitives(VM vm, final String regionName, final int size) {
     vm.invoke(new CacheSerializableRunnable("Create values") {
       public void run2() throws CacheException {
         Region region1 = getRootRegion().getSubregion(regionName);
@@ -511,7 +469,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
               region1.put("key" + i, "seeding");
               break;
             case 2:
-              region1.put("key" + i, new Double(i));
+              region1.put("key" + i, (double) i);
               break;
             case 3:
               region1.put("key" + i, i);
@@ -525,8 +483,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
           }
         }
-        LogWriterUtils.getLogWriter()
-            .info("### Number of Entries in Region :" + region1.keySet().size());
+        logger.info("### Number of Entries in Region :" + region1.keySet().size());
       }
     });
   }
@@ -550,7 +507,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
               region1.put("key" + i, new Portfolio(i));
               break;
             case 4:
-              region1.put("key" + i, new Double(i));
+              region1.put("key" + i, (double) i);
               break;
             default:
               region1.put("key" + i, i);
@@ -558,8 +515,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
           }
         }
-        LogWriterUtils.getLogWriter()
-            .info("### Number of Entries in Region :" + region1.keySet().size());
+        logger.info("### Number of Entries in Region :" + region1.keySet().size());
       }
     });
   }
@@ -571,13 +527,12 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         for (int i = 1; i <= size; i++) {
           region1.put("key" + i, new Portfolio(i));
         }
-        LogWriterUtils.getLogWriter()
-            .info("### Number of Entries in Region :" + region1.keySet().size());
+        logger.info("### Number of Entries in Region :" + region1.keySet().size());
       }
     });
   }
 
-  public void createIndex(VM vm, final String indexName, final String indexedExpression,
+  private void createIndex(VM vm, final String indexName, final String indexedExpression,
       final String regionPath) {
     vm.invoke(new CacheSerializableRunnable("Create values") {
       public void run2() throws CacheException {
@@ -600,8 +555,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         for (int i = 1; i <= size; i++) {
           region1.destroy(KEY + i);
         }
-        LogWriterUtils.getLogWriter()
-            .info("### Number of Entries In Region after Delete :" + region1.keySet().size());
+        logger.info("### Number of Entries In Region after Delete :" + region1.keySet().size());
       }
 
     });
@@ -617,8 +571,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         for (int i = 1; i <= size; i++) {
           region1.invalidate(KEY + i);
         }
-        LogWriterUtils.getLogWriter()
-            .info("### Number of Entries In Region after Delete :" + region1.keySet().size());
+        logger.info("### Number of Entries In Region after Delete :" + region1.keySet().size());
       }
 
     });
@@ -631,127 +584,84 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   public void createCQ(VM vm, final String cqName, final String queryStr,
       final boolean isBridgeMemberTest) {
-    vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
-      public void run2() throws CacheException {
-        // pause(60 * 1000);
-        // getLogWriter().info("### DEBUG CREATE CQ START ####");
-        // pause(20 * 1000);
+    vm.invoke(() -> {
 
-        LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
-        // Create CQ Attributes.
-        CqAttributesFactory cqf = new CqAttributesFactory();
-        CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
-        // ((CqQueryTestListener)cqListeners[0]).cqName = cqName;
-        // if (isBridgeMemberTest) {
-        // testListenerForBridgeMembershipTest = (CqQueryTestListener)cqListeners[0];
-        // }
+      logger.info("### Create CQ. ###" + cqName);
+      // Get CQ Service.
+      QueryService cqService = getCache().getQueryService();
 
-        cqf.initCqListeners(cqListeners);
-        CqAttributes cqa = cqf.create();
+      // Create CQ Attributes.
+      CqAttributesFactory cqf = new CqAttributesFactory();
+      CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
+
+      cqf.initCqListeners(cqListeners);
+      CqAttributes cqa = cqf.create();
+
+      // Create CQ.
+      CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
+      assertThat(cq1.getState().isStopped()).describedAs("newCq() state mismatch").isTrue();
 
-        // Create CQ.
-        try {
-          CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
-          assertTrue("newCq() state mismatch", cq1.getState().isStopped());
-        } catch (Exception ex) {
-          AssertionError err = new AssertionError("Failed to create CQ " + cqName + " . ");
-          err.initCause(ex);
-          LogWriterUtils.getLogWriter().info("CqService is :" + cqService, err);
-          throw err;
-        }
-      }
     });
   }
 
   /* Register CQs with no name, execute, and close */
   public void createAndExecCQNoName(VM vm, final String queryStr) {
-    vm.invoke(new CacheSerializableRunnable("Create CQ with no name:") {
-      public void run2() throws CacheException {
-        // pause(60 * 1000);
-        LogWriterUtils.getLogWriter().info("### DEBUG CREATE CQ START ####");
-        // pause(20 * 1000);
+    vm.invoke(() -> {
+      logger.info("### DEBUG CREATE CQ START ####");
+      logger.info("### Create CQ with no name. ###");
+      String cqName = null;
+      QueryService cqService = getCache().getQueryService();
+
+      SelectResults cqResults;
+      for (int i = 0; i < 20; ++i) {
+        // Create CQ Attributes.
+        CqAttributesFactory cqf = new CqAttributesFactory();
+        CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
 
-        LogWriterUtils.getLogWriter().info("### Create CQ with no name. ###");
-        // Get CQ Service.
-        QueryService cqService = null;
-        CqQuery cq1 = null;
-        String cqName = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
+        cqf.initCqListeners(cqListeners);
+        CqAttributes cqa = cqf.create();
 
-        SelectResults cqResults = null;
-        for (int i = 0; i < 20; ++i) {
-          // Create CQ Attributes.
-          CqAttributesFactory cqf = new CqAttributesFactory();
-          CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
+        // Create CQ with no name and execute with initial results.
 
-          cqf.initCqListeners(cqListeners);
-          CqAttributes cqa = cqf.create();
+        CqQuery cq1 = cqService.newCq(queryStr, cqa);
+        ((CqQueryTestListener) cqListeners[0]).cqName = cq1.getName();
 
-          // Create CQ with no name and execute with initial results.
+        if (cq1 == null) {
+          logger.info("Failed to get CqQuery object for CQ with no name.");
+        } else {
+          cqName = cq1.getName();
+          logger.info("Created CQ with no name, generated CQ name: "
+              + cqName + " CQ state:" + cq1.getState());
+          assertThat(cq1.getState().isStopped()).describedAs("Create CQ with no name illegal state")
+              .isTrue();
+        }
+        if (i % 2 == 0) {
+          cqResults = cq1.executeWithInitialResults();
+
+          logger.info("initial result size = " + cqResults.size());
+          logger.info("CQ state after execute with initial results = " + cq1.getState());
+          assertThat(cq1.getState().isRunning())
+              .describedAs("executeWithInitialResults() state mismatch").isTrue();
+        } else {
           try {
-            cq1 = cqService.newCq(queryStr, cqa);
-            ((CqQueryTestListener) cqListeners[0]).cqName = cq1.getName();
+            cq1.execute();
           } catch (Exception ex) {
-            LogWriterUtils.getLogWriter().info("CQService is :" + cqService);
+            logger.info("CQService is :" + cqService);
             ex.printStackTrace();
-            fail("Failed to create CQ with no name" + " . " + ex.getMessage());
-          }
-
-          if (cq1 == null) {
-            LogWriterUtils.getLogWriter().info("Failed to get CqQuery object for CQ with no name.");
-          } else {
-            cqName = cq1.getName();
-            LogWriterUtils.getLogWriter().info("Created CQ with no name, generated CQ name: "
-                + cqName + " CQ state:" + cq1.getState());
-            assertTrue("Create CQ with no name illegal state", cq1.getState().isStopped());
-          }
-          if (i % 2 == 0) {
-            try {
-              cqResults = cq1.executeWithInitialResults();
-            } catch (Exception ex) {
-              LogWriterUtils.getLogWriter().info("CqService is :" + cqService);
-              ex.printStackTrace();
-              fail("Failed to execute CQ with initial results, cq name: " + cqName + " . "
-                  + ex.getMessage());
-            }
-            LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size());
-            LogWriterUtils.getLogWriter()
-                .info("CQ state after execute with initial results = " + cq1.getState());
-            assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning());
-          } else {
-            try {
-              cq1.execute();
-            } catch (Exception ex) {
-              LogWriterUtils.getLogWriter().info("CQService is :" + cqService);
-              ex.printStackTrace();
-              fail("Failed to execute CQ " + cqName + " . " + ex.getMessage());
-            }
-            LogWriterUtils.getLogWriter().info("CQ state after execute = " + cq1.getState());
-            assertTrue("execute() state mismatch", cq1.getState().isRunning());
+            fail("Failed to execute CQ " + cqName + " . " + ex.getMessage());
           }
+          logger.info("CQ state after execute = " + cq1.getState());
+          assertThat(cq1.getState().isRunning()).describedAs("execute() state mismatch").isTrue();
+        }
 
-          // Close the CQ
-          try {
-            cq1.close();
-          } catch (Exception ex) {
-            LogWriterUtils.getLogWriter().info("CqService is :" + cqService, ex);
-            fail("Failed to close CQ " + cqName + " . " + ex.getMessage());
-          }
-          assertTrue("closeCq() state mismatch", cq1.getState().isClosed());
+        // Close the CQ
+        try {
+          cq1.close();
+        } catch (Exception ex) {
+          logger.info("CqService is :" + cqService, ex);
+          fail("Failed to close CQ " + cqName + " . " + ex.getMessage());
         }
+        assertThat(cq1.getState().isClosed()).describedAs("closeCq() state mismatch").isTrue();
       }
     });
   }
@@ -770,317 +680,150 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    */
   private void executeCQ(VM vm, final String cqName, final boolean initialResults,
       final int expectedResultsSize, final String expectedErr) {
-    vm.invoke(new CacheSerializableRunnable("Execute CQ :" + cqName) {
-
-      private void work() throws CacheException {
-        // pause(60 * 1000);
-        LogWriterUtils.getLogWriter().info("### DEBUG EXECUTE CQ START ####");
-        // pause(20 * 1000);
-
-        // Get CQ Service.
-        QueryService cqService = null;
-        CqQuery cq1 = null;
-        // try {
-        cqService = getCache().getQueryService();
-        // } catch (Exception cqe) {
-        // getLogWriter().error(cqe);
-        // AssertionError err = new AssertionError("Failed to get QueryService" + cqName);
-        // err.initCause(ex);
-        // throw err;
-        // fail("Failed to getCQService.");
-        // }
-
-        // Get CqQuery object.
-        try {
-          cq1 = cqService.getCq(cqName);
-          if (cq1 == null) {
-            LogWriterUtils.getLogWriter()
-                .info("Failed to get CqQuery object for CQ name: " + cqName);
-            fail("Failed to get CQ " + cqName);
-          } else {
-            LogWriterUtils.getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName());
-            assertTrue("newCq() state mismatch", cq1.getState().isStopped());
-          }
-        } catch (Exception ex) {
-          LogWriterUtils.getLogWriter().info("CqService is :" + cqService);
-          LogWriterUtils.getLogWriter().error(ex);
-          AssertionError err = new AssertionError("Failed to execute  CQ " + cqName);
-          err.initCause(ex);
-          throw err;
-        }
+    vm.invoke(() -> {
+      if (expectedErr != null) {
+        getCache().getLogger()
+            .info("<ExpectedException action=add>" + expectedErr + "</ExpectedException>");
+      }
+      try {
+        QueryService cqService = getCache().getQueryService();
+        CqQuery cq1 = cqService.getCq(cqName);
+        assertThat(cq1).isNotNull();
+        assertThat(cq1.getState().isStopped()).describedAs("newCq() state mismatch").isTrue();
 
         if (initialResults) {
-          SelectResults cqResults = null;
-
-          try {
-            cqResults = cq1.executeWithInitialResults();
-          } catch (Exception ex) {
-            LogWriterUtils.getLogWriter().info("CqService is :" + cqService);
-            ex.printStackTrace();
-            AssertionError err = new AssertionError("Failed to execute  CQ " + cqName);
-            err.initCause(ex);
-            throw err;
-          }
-          LogWriterUtils.getLogWriter().info("initial result size = " + cqResults.size());
-          assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning());
+          SelectResults cqResults;
+          cqResults = cq1.executeWithInitialResults();
+          assertThat(cq1.getState().isRunning())
+              .describedAs("executeWithInitialResults() state mismatch").isTrue();
           if (expectedResultsSize >= 0) {
-            assertEquals("unexpected results size", expectedResultsSize, cqResults.size());
+            assertThat(cqResults.size()).describedAs("unexpected results size")
+                .isEqualTo(expectedResultsSize);
           }
         } else {
-          try {
-            cq1.execute();
-          } catch (Exception ex) {
-            AssertionError err = new AssertionError("Failed to execute  CQ " + cqName);
-            err.initCause(ex);
-            if (expectedErr == null) {
-              LogWriterUtils.getLogWriter().info("CqService is :" + cqService, err);
-            }
-            throw err;
-          }
-          assertTrue("execute() state mismatch", cq1.getState().isRunning());
+          cq1.execute();
+          assertThat(cq1.getState().isRunning()).describedAs("execute() state mismatch").isTrue();
         }
-      }
-
-      public void run2() throws CacheException {
+      } finally {
         if (expectedErr != null) {
           getCache().getLogger()
-              .info("<ExpectedException action=add>" + expectedErr + "</ExpectedException>");
-        }
-        try {
-          work();
-        } finally {
-          if (expectedErr != null) {
-            getCache().getLogger()
-                .info("<ExpectedException action=remove>" + expectedErr + "</ExpectedException>");
-          }
+              .info("<ExpectedException action=remove>" + expectedErr + "</ExpectedException>");
         }
       }
     });
   }
 
   /* Stop/pause CQ */
-  public void stopCQ(VM vm, final String cqName) throws Exception {
-    vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Stop CQ. ###" + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
+  public void stopCQ(VM vm, final String cqName) {
+    vm.invoke(() -> {
+      logger.info("### Stop CQ. ###" + cqName);
+      QueryService cqService = getCache().getQueryService();
 
-        // Stop CQ.
-        CqQuery cq1 = null;
-        try {
-          cq1 = cqService.getCq(cqName);
-          cq1.stop();
-        } catch (Exception ex) {
-          ex.printStackTrace();
-          fail("Failed to stop CQ " + cqName + " . " + ex.getMessage());
-        }
-        assertTrue("Stop CQ state mismatch", cq1.getState().isStopped());
-      }
+      CqQuery cq1 = cqService.getCq(cqName);
+      cq1.stop();
+
+      assertThat(cq1.getState().isStopped()).describedAs("Stop CQ state mismatch").isTrue();
     });
   }
 
   // Stop and execute CQ repeatedly
   /* Stop/pause CQ */
-  private void stopExecCQ(VM vm, final String cqName, final int count) throws Exception {
-    vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
-      public void run2() throws CacheException {
-        CqQuery cq1 = null;
-        LogWriterUtils.getLogWriter().info("### Stop and Exec CQ. ###" + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCqService.");
-        }
+  private void stopExecCQ(VM vm) {
+    vm.invoke(() -> {
+      logger.info("### Stop and Exec CQ. ###" + "testCQStopExecute_0");
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cq1 = cqService.getCq("testCQStopExecute_0");
 
-        // Get CQ.
-        try {
-          cq1 = cqService.getCq(cqName);
-        } catch (Exception ex) {
-          ex.printStackTrace();
-          fail("Failed to get CQ " + cqName + " . " + ex.getMessage());
-        }
+      for (int i = 0; i < 20; ++i) {
 
-        for (int i = 0; i < count; ++i) {
-          // Stop CQ.
-          try {
-            cq1.stop();
-          } catch (Exception ex) {
-            ex.printStackTrace();
-            fail("Count = " + i + "Failed to stop CQ " + cqName + " . " + ex.getMessage());
-          }
-          assertTrue("Stop CQ state mismatch, count = " + i, cq1.getState().isStopped());
-          LogWriterUtils.getLogWriter()
-              .info("After stop in Stop and Execute loop, ran successfully, loop count: " + i);
-          LogWriterUtils.getLogWriter().info("CQ state: " + cq1.getState());
+        cq1.stop();
 
-          // Re-execute CQ
-          try {
-            cq1.execute();
-          } catch (Exception ex) {
-            ex.printStackTrace();
-            fail("Count = " + i + "Failed to execute CQ " + cqName + " . " + ex.getMessage());
-          }
-          assertTrue("Execute CQ state mismatch, count = " + i, cq1.getState().isRunning());
-          LogWriterUtils.getLogWriter()
-              .info("After execute in Stop and Execute loop, ran successfully, loop count: " + i);
-          LogWriterUtils.getLogWriter().info("CQ state: " + cq1.getState());
-        }
+        assertThat(cq1.getState().isStopped()).describedAs("Stop CQ state mismatch, count = " + i)
+            .isTrue();
+        logger.info("After stop in Stop and Execute loop, ran successfully, loop count: " + i);
+        logger.info("CQ state: " + cq1.getState());
+
+        cq1.execute();
+
+        assertThat(cq1.getState().isRunning())
+            .describedAs("Execute CQ state mismatch, count = " + i).isTrue();
+        logger.info("After execute in Stop and Execute loop, ran successfully, loop count: " + i);
+        logger.info("CQ state: " + cq1.getState());
       }
     });
   }
 
 
   /* UnRegister CQs */
-  public void closeCQ(VM vm, final String cqName) throws Exception {
-    vm.invoke(new CacheSerializableRunnable("Close CQ :" + cqName) {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Close CQ. ###" + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCqService.");
-        }
+  public void closeCQ(VM vm, final String cqName) {
+    vm.invoke(() -> {
+      // Get CQ Service.
+      QueryService cqService = getCache().getQueryService();
 
-        // Close CQ.
-        CqQuery cq1 = null;
-        try {
-          cq1 = cqService.getCq(cqName);
-          cq1.close();
-        } catch (Exception ex) {
-          ex.printStackTrace();
-          fail("Failed to close CQ " + cqName + " . " + ex.getMessage());
-        }
-        assertTrue("Close CQ state mismatch", cq1.getState().isClosed());
-      }
+      // Close CQ.
+      CqQuery cq1 = cqService.getCq(cqName);
+      cq1.close();
+
+      assertThat(cq1.getState().isClosed()).describedAs("Close CQ state mismatch").isTrue();
     });
   }
 
   /* Register CQs */
-  private void registerInterestListCQ(VM vm, final String regionName, final int keySize) {
-    vm.invoke(new CacheSerializableRunnable("Register InterestList and CQ") {
-      public void run2() throws CacheException {
-
-        // Get CQ Service.
-        Region region = null;
-        try {
-          region = getRootRegion().getSubregion(regionName);
-          region.getAttributesMutator()
-              .addCacheListener(new CertifiableTestCacheListener(LogWriterUtils.getLogWriter()));
-        } catch (Exception cqe) {
-          AssertionError err = new AssertionError("Failed to get Region.");
-          err.initCause(cqe);
-          throw err;
-
-        }
+  private void registerInterestListCQ(VM vm, final String regionName) {
+    vm.invoke(() -> {
+      Region region = getRootRegion().getSubregion(regionName);
+      region.getAttributesMutator()
+          .addCacheListener(new CertifiableTestCacheListener(LogWriterUtils.getLogWriter()));
 
-        try {
-          List list = new ArrayList();
-          for (int i = 1; i <= keySize; i++) {
-            list.add(KEY + i);
-          }
-          region.registerInterest(list);
-        } catch (Exception ex) {
-          AssertionError err = new AssertionError("Failed to Register InterestList");
-          err.initCause(ex);
-          throw err;
-        }
+      List list = new ArrayList();
+      for (int i = 1; i <= 10; i++) {
+        list.add(KEY + i);
       }
+      region.registerInterest(list);
     });
   }
 
   // helps test case where executeIR is called multiple times as well as after close
-  public void executeAndCloseAndExecuteIRMultipleTimes(VM vm, final String cqName,
+  private void executeAndCloseAndExecuteIRMultipleTimes(VM vm,
       final String queryStr) {
-    vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Create CQ. ###" + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
-        // Create CQ Attributes.
-        CqAttributesFactory cqf = new CqAttributesFactory();
-        CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
+    vm.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqAttributesFactory cqf = new CqAttributesFactory();
+      CqListener[] cqListeners = {new CqQueryTestListener(LogWriterUtils.getLogWriter())};
 
-        cqf.initCqListeners(cqListeners);
-        CqAttributes cqa = cqf.create();
+      cqf.initCqListeners(cqListeners);
+      CqAttributes cqa = cqf.create();
 
-        CqQuery cq1;
-        // Create CQ.
-        try {
-          cq1 = cqService.newCq(cqName, queryStr, cqa);
-          assertTrue("newCq() state mismatch", cq1.getState().isStopped());
-        } catch (Exception ex) {
-          AssertionError err = new AssertionError("Failed to create CQ " + cqName + " . ");
-          err.initCause(ex);
-          LogWriterUtils.getLogWriter().info("CqService is :" + cqService, err);
-          throw err;
-        }
+      CqQuery cq1 = cqService.newCq("testCQResultSet_0", queryStr, cqa);
+      assertThat(cq1.getState().isStopped()).describedAs("newCq() state mismatch").isTrue();
 
-        try {
-          cq1.executeWithInitialResults();
-          try {
-            cq1.executeWithInitialResults();
-          } catch (IllegalStateException e) {
-            // expected
-          }
-          cq1.close();
+      cq1.executeWithInitialResults();
+      try {
+        cq1.executeWithInitialResults();
+      } catch (IllegalStateException e) {
+        // expected
+      }
+      cq1.close();
 
-          try {
-            cq1.executeWithInitialResults();
-          } catch (CqClosedException e) {
-            // expected
-            return;
-          }
-          fail("should have received cqClosedException");
-        } catch (Exception e) {
-          fail("exception not expected here " + e);
-        }
+      try {
+        cq1.executeWithInitialResults();
+      } catch (CqClosedException e) {
+        // expected
+        return;
       }
+      fail("should have received cqClosedException");
     });
   }
 
 
-
   /* Validate CQ Count */
-  public void validateCQCount(VM vm, final int cqCnt) throws Exception {
-    vm.invoke(new CacheSerializableRunnable("validate cq count") {
-      public void run2() throws CacheException {
-        // Get CQ Service.
-
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
+  public void validateCQCount(VM vm, final int cqCnt) {
+    vm.invoke(() -> {
+      // Get CQ Service.
+      QueryService cqService = getCache().getQueryService();
 
-        int numCqs = 0;
-        try {
-          numCqs = cqService.getCqs().length;
-        } catch (Exception ex) {
-          fail("Failed to get the CQ Count.");
-        }
-        assertEquals("Number of cqs mismatch.", cqCnt, numCqs);
-      }
+      int numCqs = cqService.getCqs().length;
+      assertThat(numCqs).describedAs("Number of cqs mismatch.").isEqualTo(cqCnt);
     });
   }
 
@@ -1089,56 +832,32 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    * Throws AssertionError if the CQ can be found or if any other error occurs
    */
   private void failIfCQExists(VM vm, final String cqName) {
-    vm.invoke(new CacheSerializableRunnable("Fail if CQ exists") {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Fail if CQ Exists. ### " + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
+    vm.invoke(() -> {
+      // Get CQ Service.
+      QueryService cqService = getCache().getQueryService();
 
-        CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery != null) {
-          fail("Unexpectedly found CqQuery for CQ : " + cqName);
-        }
-      }
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Unexpectedly found CqQuery for CQ : " + cqName).isNull();
     });
   }
 
   private void validateCQError(VM vm, final String cqName, final int numError) {
-    vm.invoke(new CacheSerializableRunnable("Validate CQs") {
-      public void run2() throws CacheException {
-
-        LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
-
-        CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery == null) {
-          fail("Failed to get CqQuery for CQ : " + cqName);
-        }
-
-        CqAttributes cqAttr = cQuery.getCqAttributes();
-        CqListener cqListener = cqAttr.getCqListener();
-        CqQueryTestListener listener = (CqQueryTestListener) cqListener;
-        listener.printInfo(false);
-
-        // Check for totalEvents count.
-        if (numError != noTest) {
-          // Result size validation.
-          listener.printInfo(true);
-          assertEquals("Total Event Count mismatch", numError, listener.getErrorEventCount());
-        }
+    vm.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();
+
+      CqAttributes cqAttr = cQuery.getCqAttributes();
+      CqListener cqListener = cqAttr.getCqListener();
+      CqQueryTestListener listener = (CqQueryTestListener) cqListener;
+      listener.printInfo(false);
+
+      // Check for totalEvents count.
+      if (numError != noTest) {
+        // Result size validation.
+        listener.printInfo(true);
+        assertThat(listener.getErrorEventCount()).describedAs("Total Event Count mismatch")
+            .isEqualTo(numError);
       }
     });
   }
@@ -1151,90 +870,73 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   public void validateCQ(VM vm, final String cqName, final int resultSize, final int creates,
       final int updates, final int deletes, final int queryInserts, final int queryUpdates,
       final int queryDeletes, final int totalEvents) {
-    vm.invoke(new CacheSerializableRunnable("Validate CQs") {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Validating CQ. ### " + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
-
-        CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery == null) {
-          fail("Failed to get CqQuery for CQ : " + cqName);
-        }
-
-        CqAttributes cqAttr = cQuery.getCqAttributes();
-        CqListener cqListeners[] = cqAttr.getCqListeners();
-        CqQueryTestListener listener = (CqQueryTestListener) cqListeners[0];
-        listener.printInfo(false);
-
-        // Check for totalEvents count.
-        if (totalEvents != noTest) {
-          // Result size validation.
-          listener.printInfo(true);
-          assertEquals("Total Event Count mismatch", totalEvents, listener.getTotalEventCount());
-        }
-
-        if (resultSize != noTest) {
-          // SelectResults results = cQuery.getCqResults();
-          // getLogWriter().info("### CQ Result Size is :" + results.size());
-          // Result size validation.
-          // Since ResultSet is not maintained for this release.
-          // Instead of resultSize its been validated with total number of events.
-          fail("test for event counts instead of results size");
-          // assertIndexDetailsEquals("Result Size mismatch", resultSize,
-          // listener.getTotalEventCount());
-        }
-
-        // Check for create count.
-        if (creates != noTest) {
-          // Result size validation.
-          listener.printInfo(true);
-          assertEquals("Create Event mismatch", creates, listener.getCreateEventCount());
-        }
-
-        // Check for update count.
-        if (updates != noTest) {
-          // Result size validation.
-          listener.printInfo(true);
-          assertEquals("Update Event mismatch", updates, listener.getUpdateEventCount());
-        }
-
-        // Check for delete count.
-        if (deletes != noTest) {
-          // Result size validation.
-          listener.printInfo(true);
-          assertEquals("Delete Event mismatch", deletes, listener.getDeleteEventCount());
-        }
-
-        // Check for queryInsert count.
-        if (queryInserts != noTest) {
-          // Result size validation.
-          listener.printInfo(true);
-          assertEquals("Query Insert Event mismatch", queryInserts,
-              listener.getQueryInsertEventCount());
-        }
-
-        // Check for queryUpdate count.
-        if (queryUpdates != noTest) {
-          // Result size validation.
-          listener.printInfo(true);
-          assertEquals("Query Update Event mismatch", queryUpdates,
-              listener.getQueryUpdateEventCount());
-        }
-
-        // Check for queryDelete count.
-        if (queryDeletes != noTest) {
-          // Result size validation.
-          listener.printInfo(true);
-          assertEquals("Query Delete Event mismatch", queryDeletes,
-              listener.getQueryDeleteEventCount());
-        }
+    vm.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();
+
+      CqAttributes cqAttr = cQuery.getCqAttributes();
+      CqListener cqListeners[] = cqAttr.getCqListeners();
+      CqQueryTestListener listener = (CqQueryTestListener) cqListeners[0];
+      listener.printInfo(false);
+
+      // Check for totalEvents count.
+      if (totalEvents != noTest) {
+        // Result size validation.
+        listener.printInfo(true);
+        assertThat(listener.getTotalEventCount()).describedAs("Total Event Count mismatch")
+            .isEqualTo(totalEvents);
+      }
+
+      assertThat(resultSize).describedAs("test for event counts instead of results size")
+          .isEqualTo(noTest);
+
+      // Check for create count.
+      if (creates != noTest) {
+        // Result size validation.
+        listener.printInfo(true);
+        assertThat(listener.getCreateEventCount()).describedAs("Create Event mismatch")
+            .isEqualTo(creates);
+      }
+
+      // Check for update count.
+      if (updates != noTest) {
+        // Result size validation.
+        listener.printInfo(true);
+        assertThat(listener.getUpdateEventCount()).describedAs("Update Event mismatch")
+            .isEqualTo(updates);
+      }
+
+      // Check for delete count.
+      if (deletes != noTest) {
+        // Result size validation.
+        listener.printInfo(true);
+        assertThat(listener.getDeleteEventCount()).describedAs("Delete Event mismatch")
+            .isEqualTo(deletes);
+      }
+
+      // Check for queryInsert count.
+      if (queryInserts != noTest) {
+        // Result size validation.
+        listener.printInfo(true);
+        assertThat(listener.getQueryInsertEventCount()).describedAs("Query Insert Event mismatch")
+            .isEqualTo(queryInserts);
+      }
+
+      // Check for queryUpdate count.
+      if (queryUpdates != noTest) {
+        // Result size validation.
+        listener.printInfo(true);
+        assertThat(listener.getQueryUpdateEventCount()).describedAs("Query Update Event mismatch")
+            .isEqualTo(queryUpdates);
+      }
+
+      // Check for queryDelete count.
+      if (queryDeletes != noTest) {
+        // Result size validation.
+        listener.printInfo(true);
+        assertThat(listener.getQueryDeleteEventCount()).describedAs("Query Delete Event mismatch")
+            .isEqualTo(queryDeletes);
       }
     });
   }
@@ -1268,128 +970,90 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   }
 
   private void waitForError(VM vm, final String cqName, final String errorMessage) {
-    vm.invoke(new CacheSerializableRunnable("validate cq count") {
-      public void run2() throws CacheException {
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
+    vm.invoke(() -> {
+      // Get CQ Service.
+      QueryService cqService = getCache().getQueryService();
 
-        CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery == null) {
-          fail("Failed to get CqQuery for CQ : " + cqName);
-        }
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();
 
-        CqAttributes cqAttr = cQuery.getCqAttributes();
-        CqListener[] cqListener = cqAttr.getCqListeners();
-        CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
-        listener.waitForError(errorMessage);
-      }
+      CqAttributes cqAttr = cQuery.getCqAttributes();
+      CqListener[] cqListener = cqAttr.getCqListeners();
+      CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
+      listener.waitForError(errorMessage);
     });
   }
 
   protected void waitForCqsDisconnected(VM vm, final String cqName, final int count) {
-    vm.invoke(new CacheSerializableRunnable("validate cq disconnected count") {
-      public void run2() throws CacheException {
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
+    vm.invoke(() -> {
+      // Get CQ Service.
+      QueryService cqService = getCache().getQueryService();
 
-        CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery == null) {
-          fail("Failed to get CqQuery for CQ : " + cqName);
-        }
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();
 
-        CqAttributes cqAttr = cQuery.getCqAttributes();
-        CqListener[] cqListener = cqAttr.getCqListeners();
-        CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
-        listener.waitForCqsDisconnectedEvents(count);
-      }
+      CqAttributes cqAttr = cQuery.getCqAttributes();
+      CqListener[] cqListener = cqAttr.getCqListeners();
+      CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
+      listener.waitForCqsDisconnectedEvents(count);
     });
   }
 
   protected void waitForCqsConnected(VM vm, final String cqName, final int count) {
-    vm.invoke(new CacheSerializableRunnable("validate cq connected count") {
-      public void run2() throws CacheException {
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
+    vm.invoke(() -> {
+      // Get CQ Service.
+      QueryService cqService = getCache().getQueryService();
 
-        CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery == null) {
-          fail("Failed to get CqQuery for CQ : " + cqName);
-        }
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();
 
-        CqAttributes cqAttr = cQuery.getCqAttributes();
-        CqListener[] cqListener = cqAttr.getCqListeners();
-        CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
-        listener.waitForCqsConnectedEvents(count);
-      }
+      CqAttributes cqAttr = cQuery.getCqAttributes();
+      CqListener[] cqListener = cqAttr.getCqListeners();
+      CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
+      listener.waitForCqsConnectedEvents(count);
     });
   }
 
   private void waitForEvent(VM vm, final int event, final String cqName, final String key) {
-    vm.invoke(new CacheSerializableRunnable("validate cq count") {
-      public void run2() throws CacheException {
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          Assert.fail("Failed to getCQService.", cqe);
-        }
+    vm.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
 
-        CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery == null) {
-          fail("Failed to get CqQuery for CQ : " + cqName);
-        }
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();
 
-        CqAttributes cqAttr = cQuery.getCqAttributes();
-        CqListener[] cqListener = cqAttr.getCqListeners();
-        CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
+      CqAttributes cqAttr = cQuery.getCqAttributes();
+      CqListener[] cqListener = cqAttr.getCqListeners();
+      CqQueryTestListener listener = (CqQueryTestListener) cqListener[0];
 
-        switch (event) {
-          case CREATE:
-            listener.waitForCreated(key);
-            break;
+      switch (event) {
+        case CREATE:
+          listener.waitForCreated(key);
+          break;
 
-          case UPDATE:
-            listener.waitForUpdated(key);
-            break;
+        case UPDATE:
+          listener.waitForUpdated(key);
+          break;
 
-          case DESTROY:
-            listener.waitForDestroyed(key);
-            break;
+        case DESTROY:
+          listener.waitForDestroyed(key);
+          break;
 
-          case INVALIDATE:
-            listener.waitForInvalidated(key);
-            break;
+        case INVALIDATE:
+          listener.waitForInvalidated(key);
+          break;
 
-          case CLOSE:
-            listener.waitForClose();
-            break;
+        case CLOSE:
+          listener.waitForClose();
+          break;
 
-          case REGION_CLEAR:
-            listener.waitForRegionClear();
-            break;
+        case REGION_CLEAR:
+          listener.waitForRegionClear();
+          break;
 
-          case REGION_INVALIDATE:
-            listener.waitForRegionInvalidate();
-            break;
+        case REGION_INVALIDATE:
+          listener.waitForRegionInvalidate();
+          break;
 
-        }
       }
     });
   }
@@ -1399,82 +1063,45 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    * same as expected throws exception.
    */
   public void waitForCqState(VM vm, final String cqName, final int state) {
-    vm.invoke(new CacheSerializableRunnable("Wait For cq State") {
-      public void run2() throws CacheException {
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
+    vm.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
 
-        CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery == null) {
-          fail("Failed to get CqQuery for CQ : " + cqName);
-        }
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();
 
-        // Get CQ State.
-        final CqStateImpl cqState = (CqStateImpl) cQuery.getState();
-        // Wait max time, till the CQ state is as expected.
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return cqState.getState() == state;
-          }
-
-          public String description() {
-            return "cqState never became " + state;
-          }
-        };
-        Wait.waitForCriterion(ev, MAX_TIME, 200, true);
-      }
+      final CqStateImpl cqState = (CqStateImpl) cQuery.getState();
+      // Wait max time, till the CQ state is as expected.
+      Awaitility.await("cqState never became " + state).atMost(MAX_TIME, TimeUnit.MILLISECONDS)
+          .pollInterval(200, TimeUnit.MILLISECONDS)
+          .until(() -> cqState.getState(), Matchers.equalTo(state));
     });
   }
 
   public void clearCQListenerEvents(VM vm, final String cqName) {
-    vm.invoke(new CacheSerializableRunnable("validate cq count") {
-      public void run2() throws CacheException {
-        // Get CQ Service.
+    vm.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
 
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
-
-        CqQuery cQuery = cqService.getCq(cqName);
-        if (cQuery == null) {
-          fail("Failed to get CqQuery for CQ : " + cqName);
-        }
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();
 
-        CqAttributes cqAttr = cQuery.getCqAttributes();
-        CqListener cqListener = cqAttr.getCqListener();
-        CqQueryTestListener listener = (CqQueryTestListener) cqListener;
-        listener.getEventHistory();
-      }
+      CqAttributes cqAttr = cQuery.getCqAttributes();
+      CqListener cqListener = cqAttr.getCqListener();
+      CqQueryTestListener listener = (CqQueryTestListener) cqListener;
+      listener.getEventHistory();
     });
   }
 
   private void validateQuery(VM vm, final String query, final int resultSize) {
-    vm.invoke(new CacheSerializableRunnable("Validate Query") {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Validating Query. ###");
-        QueryService qs = getCache().getQueryService();
+    vm.invoke(() -> {
+      QueryService cqService = getCache().getQueryService();
 
-        Query q = qs.newQuery(query);
-        try {
-          Object r = q.execute();
-          if (r instanceof Collection) {
-            int rSize = ((Collection) r).size();
-            LogWriterUtils.getLogWriter().info("### Result Size is :" + rSize);
-            assertEquals(rSize, rSize);
-          }
-        } catch (Exception e) {
-          fail("Failed to execute the query " + e.getMessage());
-        }
+      Query q = cqService.newQuery(query);
+
+      Object r = q.execute();
+      if (r instanceof Collection) {
+        int rSize = ((Collection) r).size();
+        logger.info("### Result Size is :" + rSize);
+        assertThat(rSize).isEqualTo(rSize);
       }
     });
   }
@@ -1482,23 +1109,20 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   private Properties getConnectionProps(String[] hosts, int[] ports, Properties newProps) {
 
     Properties props = new Properties();
-    String endPoints = "";
+    StringBuilder endPoints = new StringBuilder();
     String host = hosts[0];
     for (int i = 0; i < ports.length; i++) {
       if (hosts.length > 1) {
         host = hosts[i];
       }
-      endPoints = endPoints + "server" + i + "=" + host + ":" + ports[i];
+      endPoints.append("server").append(i).append("=").append(host).append(":").append(ports[i]);
       if (ports.length > (i + 1)) {
-        endPoints = endPoints + ",";
+        endPoints.append(",");
       }
     }
 
-    props.setProperty("endpoints", endPoints);
+    props.setProperty("endpoints", endPoints.toString());
     props.setProperty("retryAttempts", "1");
-    // props.setProperty("establishCallbackConnection", "true");
-    // props.setProperty("LBPolicy", "Sticky");
-    // props.setProperty("readTimeout", "120000");
 
     // Add other property elements.
     if (newProps != null) {
@@ -1513,64 +1137,51 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
 
   // Exercise CQ attributes mutator functions
-  private void mutateCQAttributes(VM vm, final String cqName, final int mutator_function)
-      throws Exception {
-    vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) {
-      public void run2() throws CacheException {
-        CqQuery cq1 = null;
-        LogWriterUtils.getLogWriter().info("### CQ attributes mutator for ###" + cqName);
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
-
-        // Get CQ.
-        try {
-          cq1 = cqService.getCq(cqName);
-        } catch (Exception ex) {
-          ex.printStackTrace();
-          fail("Failed to get CQ " + cqName + " . " + ex.getMessage());
-        }
-        CqAttributesMutator cqAttrMutator = cq1.getCqAttributesMutator();
-        CqAttributes cqAttr = cq1.getCqAttributes();
-        CqListener cqListeners[];
-        switch (mutator_function) {
-          case CREATE:
-            // Reinitialize with 2 CQ Listeners
-            CqListener cqListenersArray[] = {new CqQueryTestListener(getCache().getLogger()),
-                new CqQueryTestListener(getCache().getLogger())};
-            cqAttrMutator.initCqListeners(cqListenersArray);
-            cqListeners = cqAttr.getCqListeners();
-            assertEquals("CqListener count mismatch", cqListeners.length, 2);
-            break;
-
-          case UPDATE:
-            // Add 2 new CQ Listeners
-            CqListener newListener1 = new CqQueryTestListener(getCache().getLogger());
-            CqListener newListener2 = new CqQueryTestListener(getCache().getLogger());
-            cqAttrMutator.addCqListener(newListener1);
-            cqAttrMutator.addCqListener(newListener2);
-
-            cqListeners = cqAttr.getCqListeners();
-            assertEquals("CqListener count mismatch", cqListeners.length, 3);
-            break;
-
-          case DESTROY:
-            cqListeners = cqAttr.getCqListeners();
-            cqAttrMutator.removeCqListener(cqListeners[0]);
-            cqListeners = cqAttr.getCqListeners();
-            assertEquals("CqListener count mismatch", cqListeners.length, 2);
-
-            // Remove a listener and validate
-            cqAttrMutator.removeCqListener(cqListeners[0]);
-            cqListeners = cqAttr.getCqListeners();
-            assertEquals("CqListener count mismatch", cqListeners.length, 1);
-            break;
-        }
+  private void mutateCQAttributes(VM vm, final String cqName, final int mutator_function) {
+    vm.invoke(() -> {
+      CqQuery cq1;
+      QueryService cqService = getCache().getQueryService();
+
+      CqQuery cQuery = cqService.getCq(cqName);
+      assertThat(cQuery).describedAs("Failed to get CqQuery for CQ : " + cqName).isNotNull();
+
+      cq1 = cqService.getCq(cqName);
+
+      CqAttributesMutator cqAttrMutator = cq1.getCqAttributesMutator();
+      CqAttributes cqAttr = cq1.getCqAttributes();
+      CqListener cqListeners[];
+      switch (mutator_function) {
+        case CREATE:
+          // Reinitialize with 2 CQ Listeners
+          CqListener cqListenersArray[] = {new CqQueryTestListener(getCache().getLogger()),
+              new CqQueryTestListener(getCache().getLogger())};
+          cqAttrMutator.initCqListeners(cqListenersArray);
+          cqListeners = cqAttr.getCqListeners();
+          assertThat(2).describedAs("CqListener count mismatch").isEqualTo(cqListeners.length);
+          break;
+
+        case UPDATE:
+          // Add 2 new CQ Listeners
+          CqListener newListener1 = new CqQueryTestListener(getCache().getLogger());
+          CqListener newListener2 = new CqQueryTestListener(getCache().getLogger());
+          cqAttrMutator.addCqListener(newListener1);
+          cqAttrMutator.addCqListener(newListener2);
+
+          cqListeners = cqAttr.getCqListeners();
+          assertThat(3).describedAs("CqListener count mismatch").isEqualTo(cqListeners.length);
+          break;
+
+        case DESTROY:
+          cqListeners = cqAttr.getCqListeners();
+          cqAttrMutator.removeCqListener(cqListeners[0]);
+          cqListeners = cqAttr.getCqListeners();
+          assertThat(2).describedAs("CqListener count mismatch").isEqualTo(cqListeners.length);
+
+          // Remove a listener and validate
+          cqAttrMutator.removeCqListener(cqListeners[0]);
+          cqListeners = cqAttr.getCqListeners();
+          assertThat(1).describedAs("CqListener count mismatch").isEqualTo(cqListeners.length);
+          break;
       }
     });
   }
@@ -1588,14 +1199,17 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     server.invoke(task);
   }
 
-  private void ensureCQExists(VM server, final String regionName, final String cqName) {
+  private void ensureCQExists(VM server, final String regionName) {
     SerializableRunnable task = new CacheSerializableRunnable("check CQs") {
       public void run2() throws CacheException {
         CqQuery queries[] = getCache().getQueryService().getCqs();
-        assertTrue("expected to find a CQ but found none", queries.length > 0);
+        assertThat(queries.length > 0).describedAs("expected to find a CQ but found none").isTrue();
         System.out.println("found query " + queries[0]);
-        assertTrue("Couldn't find query " + cqName, queries[0].getName().startsWith(cqName));
-        assertTrue("expected the CQ to be open: " + queries[0], !queries[0].isClosed());
+        assertThat(queries[0].getName().startsWith(
+            "testCQResultSet_0")).describedAs("Couldn't find query " + "testCQResultSet_0")
+                .isTrue();
+        assertThat(!queries[0].isClosed()).describedAs("expected the CQ to be open: " + queries[0])
+            .isTrue();
       }
     };
     server.invoke(task);
@@ -1606,18 +1220,17 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    * bug #47494 - CQs were destroyed when a server did a tombstone GC
    */
   @Test
-  public void testCQRemainsWhenServerGCs() throws Exception {
+  public void testCQRemainsWhenServerGCs() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
-    VM server2 = host.getVM(2);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
+    VM server2 = VM.getVM(2);
 
     createServer(server);
     createServer(server2);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -1625,7 +1238,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       /* CQ Test with initial Values. */
       int size = 5;
       createValuesWithShort(server, regions[0], size);
-      Wait.pause(1 * 500);
+      Wait.pause(500);
 
       final String cqName = "testCQResultSet_0";
 
@@ -1639,7 +1252,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       performGC(server, regions[0]);
 
       // Check the CQs
-      ensureCQExists(server, regions[0], cqName);
+      ensureCQExists(server, regions[0]);
     } finally {
       closeClient(client);
       closeServer(server);
@@ -1650,18 +1263,16 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for InterestList and CQ registered from same clients.
-   *
    */
   @Test
-  public void testInterestListAndCQs() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+  public void testInterestListAndCQs() {
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     /* Init Server and Client */
     createServer(server);
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
     createClient(client, thePort, host0);
 
 
@@ -1673,10 +1284,10 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     final int size = 10;
 
     executeCQ(client, "testInterestListAndCQs_0", false, null);
-    registerInterestListCQ(client, regions[0], size);
+    registerInterestListCQ(client, regions[0]);
 
     createValues(server, regions[0], size);
-    // Wait for client to Synch.
+    // Wait for client to Sync.
 
     for (int i = 1; i <= 10; i++) {
       waitForCreated(client, "testInterestListAndCQs_0", KEY + i);
@@ -1687,51 +1298,51 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         /* updates: */ noTest, /* deletes; */ noTest, /* queryInserts: */ size,
         /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);
 
-
     // Validate InterestList.
     // CREATE
     client.invoke(new CacheSerializableRunnable("validate updates") {
       public void run2() throws CacheException {
         Region region = getRootRegion().getSubregion(regions[0]);
-        assertNotNull(region);
+        assertThat(region).isNotNull();
 
         Set keys = region.entrySet();
-        assertEquals(
-            "Mismatch, number of keys in local region is not equal to the interest list size", size,
-            keys.size());
+        assertThat(keys.size())
+            .describedAs(
+                "Mismatch, number of keys in local region is not equal to the interest list size")
+            .isEqualTo(size);
 
         CertifiableTestCacheListener ctl =
             (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
         for (int i = 1; i <= 10; i++) {
           ctl.waitForCreated(KEY + i);
-          assertNotNull(region.getEntry(KEY + i));
+          assertThat(region.getEntry(KEY + i)).isNotNull();
         }
       }
     });
 
     // UPDATE
     createValues(server, regions[0], size);
-    // Wait for client to Synch.
+    // Wait for client to sync.
     for (int i = 1; i <= 10; i++) {
       waitForUpdated(client, "testInterestListAndCQs_0", KEY + i);
     }
 
-
     client.invoke(new CacheSerializableRunnable("validate updates") {
       public void run2() throws CacheException {
         Region region = getRootRegion().getSubregion(regions[0]);
-        assertNotNull(region);
+        assertThat(region).isNotNull();
 
         Set keys = region.entrySet();
-        assertEquals(
-            "Mismatch, number of keys in local region is not equal to the interest list size", size,
-            keys.size());
+        assertThat(keys.size())
+            .describedAs(
+                "Mismatch, number of keys in local region is not equal to the interest list size")
+            .isEqualTo(size);
 
         CertifiableTestCacheListener ctl =
             (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
         for (int i = 1; i <= 10; i++) {
           ctl.waitForUpdated(KEY + i);
-          assertNotNull(region.getEntry(KEY + i));
+          assertThat(region.getEntry(KEY + i)).isNotNull();
         }
       }
     });
@@ -1746,25 +1357,24 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       }
     });
 
-
     waitForInvalidated(client, "testInterestListAndCQs_0", KEY + 10);
 
-
     client.invoke(new CacheSerializableRunnable("validate invalidates") {
       public void run2() throws CacheException {
         Region region = getRootRegion().getSubregion(regions[0]);
-        assertNotNull(region);
+        assertThat(region).isNotNull();
 
         Set keys = region.entrySet();
-        assertEquals(
-            "Mismatch, number of keys in local region is not equal to the interest list size", size,
-            keys.size());
+        assertThat(keys.size())
+            .describedAs(
+                "Mismatch, number of keys in local region is not equal to the interest list size")
+            .isEqualTo(size);
 
         CertifiableTestCacheListener ctl =
             (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
         for (int i = 1; i <= 10; i++) {
           ctl.waitForInvalidated(KEY + i);
-          assertNotNull(region.getEntry(KEY + i));
+          assertThat(region.getEntry(KEY + i)).isNotNull();
         }
       }
     });
@@ -1788,7 +1398,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     client.invoke(new CacheSerializableRunnable("validate destroys") {
       public void run2() throws CacheException {
         Region region = getRootRegion().getSubregion(regions[0]);
-        assertNotNull(region);
+        assertThat(region).isNotNull();
 
         CertifiableTestCacheListener ctl =
             (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
@@ -1809,19 +1419,17 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for CQ register and UnRegister.
-   *
    */
   @Test
-  public void testCQStopExecute() throws Exception {
+  public void testCQStopExecute() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     /* Init Server and Client */
     createServer(server);
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
     createClient(client, thePort, host0);
 
     /* Create CQs. */
@@ -1833,11 +1441,10 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     /* Init values at server. */
     int size = 10;
     createValues(server, regions[0], size);
-    // Wait for client to Synch.
+    // Wait for client to sync.
 
     waitForCreated(client, "testCQStopExecute_0", KEY + size);
 
-
     // Check if Client and Server in sync.
     // validateServerClientRegionEntries(server, client, regions[0]);
     validateQuery(server, cqs[0], 10);
@@ -1855,7 +1462,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     /* Init values at server. */
     createValues(server, regions[0], 20);
-    // Wait for client to Synch.
+    // Wait for client to sync.
     waitForCreated(client, "testCQStopExecute_0", KEY + 20);
     size = 30;
 
@@ -1868,9 +1475,8 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         /* updates: */ 10, /* deletes; */ 0, /* queryInserts: */ 20, /* queryUpdates: */ 10,
         /* queryDeletes: */ 0, /* totalEvents: */ size);
 
-
     // Stop and execute CQ 20 times
-    stopExecCQ(client, "testCQStopExecute_0", 20);
+    stopExecCQ(client);
 
     // Test CQ Close
     closeCQ(client, "testCQStopExecute_0");
@@ -1882,22 +1488,20 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for CQ Attributes Mutator functions
-   *
    */
   @Test
-  public void testCQAttributesMutator() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+  public void testCQAttributesMutator() {
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     /* Init Server and Client */
     createServer(server);
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
     createClient(client, thePort, host0);
 
     /* Create CQs. */
-    String cqName = new String("testCQAttributesMutator_0");
+    String cqName = "testCQAttributesMutator_0";
     createCQ(client, cqName, cqs[0]);
     validateCQCount(client, 1);
     executeCQ(client, cqName, false, null);
@@ -1905,7 +1509,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     /* Init values at server. */
     int size = 10;
     createValues(server, regions[0], size);
-    // Wait for client to Synch.
+    // Wait for client to sync.
     waitForCreated(client, cqName, KEY + size);
 
     // validate CQs.
@@ -1936,7 +1540,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     /* Delete values at server. */
     deleteValues(server, regions[0], 20);
-    // Wait for client to Synch.
+    // Wait for client to sync.
     waitForDestroyed(client, cqName, KEY + (size * 2));
 
     validateCQ(client, cqName, /* resultSize: */ noTest, /* creates: */ 0, /* updates: */ 0,
@@ -1946,7 +1550,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     // Close CQ
     closeCQ(client, cqName);
 
-
     // Close.
     closeClient(client);
     closeServer(server);
@@ -1954,26 +1557,18 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for CQ register and UnRegister.
-   *
    */
   @Test
-  public void testCQCreateClose() throws Exception {
-
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+  public void testCQCreateClose() {
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     /* Init Server and Client */
     createServer(server);
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
     createClient(client, thePort, host0);
 
-    /* debug */
-    // getLogWriter().info("### DEBUG STOP ####");
-    // pause(60 * 1000);
-    // getLogWriter().info("### DEBUG START ####");
-
     /* Create CQs. */
     createCQ(client, "testCQCreateClose_0", cqs[0]);
     validateCQCount(client, 1);
@@ -1983,11 +1578,10 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     /* Init values at server. */
     int size = 10;
     createValues(server, regions[0], size);
-    // Wait for client to Synch.
+    // Wait for client to sync.
     waitForCreated(client, "testCQCreateClose_0", KEY + size);
 
     // Check if Client and Server in sync.
-    // validateServerClientRegionEntries(server, client, regions[0]);
     validateQuery(server, cqs[0], 10);
     // validate CQs.
     validateCQ(client, "testCQCreateClose_0", /* resultSize: */ noTest, /* creates: */ size,
@@ -2020,12 +1614,10 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       createCQ(client, "testCQCreateClose_0", cqs[0]);
       fail("Trying to create CQ with same name. Should have thrown CQExistsException");
     } catch (org.apache.geode.test.dunit.RMIException rmiExc) {
-      Throwable cause = rmiExc.getCause();
-      assertTrue("unexpected cause: " + cause.getClass().getName(),
-          cause instanceof AssertionError);
-      Throwable causeCause = cause.getCause(); // should be a CQExistsException
-      assertTrue("Got wrong exception: " + causeCause.getClass().getName(),
-          causeCause instanceof CqExistsException);
+
+      Throwable cause = rmiExc.getCause(); // should be a CQExistsException
+      assertThat(cause).describedAs("Got wrong exception: " + cause.getClass().getName())
+          .isInstanceOf(CqExistsException.class);
     }
 
     // Getting values from non-existent CQ.
@@ -2036,12 +1628,10 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       createCQ(server, "testCQCreateClose_1", cqs[0]);
       fail("Trying to create CQ on Cache Server. Should have thrown Exception.");
     } catch (org.apache.geode.test.dunit.RMIException rmiExc) {
-      Throwable cause = rmiExc.getCause();
-      assertTrue("unexpected cause: " + cause.getClass().getName(),
-          cause instanceof AssertionError);
-      Throwable causeCause = cause.getCause(); // should be a IllegalStateException
-      assertTrue("Got wrong exception: " + causeCause.getClass().getName(),
-          causeCause instanceof IllegalStateException);
+
+      Throwable cause = rmiExc.getCause(); // should be a IllegalStateException
+      assertThat(cause).describedAs("Got wrong exception: " + cause.getClass().getName())
+          .isInstanceOf(IllegalStateException.class);
     }
 
     // Trying to execute CQ on non-existing region.
@@ -2050,16 +1640,10 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       executeCQ(client, "testCQCreateClose_2", false, "RegionNotFoundException");
       fail("Trying to create CQ on non-existing Region. Should have thrown Exception.");
     } catch (org.apache.geode.test.dunit.RMIException rmiExc) {
-      Throwable cause = rmiExc.getCause();
-      if (!(cause instanceof AssertionError)) {
-        LogWriterUtils.getLogWriter().severe("Expected to see an AssertionError.", cause);
-        fail("wrong error");
-      }
-      Throwable causeCause = cause.getCause(); // should be a RegionNotFoundException
-      if (!(causeCause instanceof RegionNotFoundException)) {
-        LogWriterUtils.getLogWriter().severe("Expected cause to be RegionNotFoundException", cause);
-        fail("wrong cause");
-      }
+
+      Throwable cause = rmiExc.getCause(); // should be a RegionNotFoundException
+      assertThat(cause).describedAs("Expected cause to be RegionNotFoundException")
+          .isInstanceOf(RegionNotFoundException.class);
     }
 
     /* Test CQ Count - Above failed create should not increment the CQ cnt. */
@@ -2071,28 +1655,10 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     /* Test for closeAllCQs() */
 
-    client.invoke(new CacheSerializableRunnable("CloseAll CQ :") {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Close All CQ. ###");
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          LogWriterUtils.getLogWriter().info("Failed to getCQService.", cqe);
-          fail("Failed to getCQService.");
-        }
-
-        // Close CQ.
-        try {
-          cqService.closeCqs();
-        } catch (Exception ex) {
-          ex.printStackTrace();
-          LogWriterUtils.getLogWriter().info("Failed to close All CQ.", ex);
-          fail("Failed to close All CQ. " + ex.getMessage());
-        }
-      }
+    client.invoke(() -> {
+      logger.info("### Close All CQ. ###");
+      QueryService cqService = getCache().getQueryService();
+      cqService.closeCqs();
     });
 
     validateCQCount(client, 0);
@@ -2107,26 +1673,11 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     executeCQ(client, "testCQCreateClose_5", false, null);
 
     // Call close all CQ.
-    client.invoke(new CacheSerializableRunnable("CloseAll CQ 2 :") {
-      public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Close All CQ 2. ###");
-        // Get CQ Service.
-        QueryService cqService = null;
-        try {
-          cqService = getCache().getQueryService();
-        } catch (Exception cqe) {
-          cqe.printStackTrace();
-          fail("Failed to getCQService.");
-        }
-
-        // Close CQ.
-        try {
-          cqService.closeCqs();
-        } catch (Exception ex) {
-          ex.printStackTrace();
-          fail("Failed to close All CQ  . " + ex.getMessage());
-        }
-      }
+    client.invoke(() -> {
+      logger.info("### Close All CQ 2. ###");
+      // Get CQ Service.
+      QueryService cqService = getCache().getQueryService();
+      cqService.closeCqs();
     });
 
     // Close.
@@ -2135,20 +1686,18 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   }
 
   /**
-   * This will test the events after region destory. The CQs on the destroy region needs to be
+   * This will test the events after region destroy. The CQs on the destroy region needs to be
    * closed.
-   *
    */
   @Test
-  public void testRegionDestroy() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+  public void testRegionDestroy() {
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     /* Init Server and Client */
     createServer(server);
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
     createClient(client, thePort, host0);
 
 
@@ -2163,37 +1712,36 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     /* Init values at server. */
     final int size = 10;
-    registerInterestListCQ(client, regions[0], size);
+    registerInterestListCQ(client, regions[0]);
     createValues(server, regions[0], size);
 
-    // Wait for client to Synch.
+    // Wait for client to sync.
 
     waitForCreated(client, "testRegionDestroy_0", KEY + 10);
 
-
     // validate CQs.
     validateCQ(client, "testRegionDestroy_0", /* resultSize: */ noTest, /* creates: */ size,
         /* updates: */ noTest, /* deletes; */ noTest, /* queryInserts: */ size,
         /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);
 
-
     // Validate InterestList.
     // CREATE
     client.invoke(new CacheSerializableRunnable("validate updates") {
       public void run2() throws CacheException {
         Region region = getRootRegion().getSubregion(regions[0]);
-        assertNotNull(region);
+        assertThat(region).isNotNull();
 
         Set keys = region.entrySet();
-        assertEquals(
-            "Mismatch, number of keys in local region is not equal to the interest list size", size,
-            keys.size());
+        assertThat(keys.size())
+            .describedAs(
+                "Mismatch, number of keys in local region is not equal to the interest list size")
+            .isEqualTo(size);
 
         CertifiableTestCacheListener ctl =
             (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
         for (int i = 1; i <= 10; i++) {
           ctl.waitForCreated(KEY + i);
-          assertNotNull(region.getEntry(KEY + i));
+          assertThat(region.getEntry(KEY + i)).isNotNull();
         }
       }
     });
@@ -2218,18 +1766,17 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    * Test for CQ with multiple clients.
    */
   @Test
-  public void testCQWithMultipleClients() throws Exception {
+  public void testCQWithMultipleClients() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client1 = host.getVM(1);
-    VM client2 = host.getVM(2);
-    VM client3 = host.getVM(3);
+    VM server = VM.getVM(0);
+    VM client1 = VM.getVM(1);
+    VM client2 = VM.getVM(2);
+    VM client3 = VM.getVM(3);
 
     /* Create Server and Client */
     createServer(server);
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
     createClient(client1, thePort, host0);
     createClient(client2, thePort, host0);
 
@@ -2244,7 +1791,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     // Create Values on Server.
     createValues(server, regions[0], size);
 
-
     waitForCreated(client1, "testCQWithMultipleClients_0", KEY + 10);
 
 
@@ -2253,10 +1799,8 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         /* creates: */ size, /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size,
         /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);
 
-
     waitForCreated(client2, "testCQWithMultipleClients_0", KEY + 10);
 
-
     validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
         /* creates: */ size, /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ size,
         /* queryUpdates: */ 0, /* queryDeletes: */ 0, /* totalEvents: */ size);
@@ -2279,10 +1823,8 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     // Update values on Server. This will be updated on new Client CQs.
     createValues(server, regions[0], size);
 
-
     waitForUpdated(client3, "testCQWithMultipleClients_0", KEY + 10);
 
-
     validateCQ(client3, "testCQWithMultipleClients_0", /* resultSize: */ noTest, /* creates: */ 0,
         /* updates: */ size, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ size,
         /* queryDeletes: */ 0, /* totalEvents: */ size);
@@ -2305,10 +1847,8 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     // Update values on server, update again.
     createValues(server, regions[0], size);
 
-
     waitForUpdated(client2, "testCQWithMultipleClients_0", KEY + 10);
 
-
     validateCQ(client2, "testCQWithMultipleClients_0", /* resultSize: */ noTest,
         /* creates: */ size, /* updates: */ size * 2, /* deletes; */ 0, /* queryInserts: */ size,
         /* queryUpdates: */ size * 2, /* queryDeletes: */ 0, /* totalEvents: */ size * 3);
@@ -2329,16 +1869,15 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    * Test for CQ ResultSet.
    */
   @Test
-  public void testCQResultSet() throws Exception {
+  public void testCQResultSet() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -2346,7 +1885,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     /* CQ Test with initial Values. */
     int size = 10;
     createValues(server, regions[0], size);
-    Wait.pause(1 * 500);
+    Wait.pause(500);
 
     // Create CQs.
     createCQ(client, "testCQResultSet_0", cqs[0]);
@@ -2370,7 +1909,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
      * compare values... Disabled since we don't currently maintain results on the client
      *
      * validateCQ(client, "testCQResultSet_1", 2, noTest, noTest, noTest); Portfolio[] values = new
-     * Portfolio[] {new Portfolio(2), new Portfolio(4)}; Hashtable t = new Hashtable(); String[]
+     * Portfolio[] {new Portfolio(2), new Portfolio(4)}; HashTable t = new HashTable(); String[]
      * keys = new String[] {"key-2", "key-4"}; t.put(keys[0], values[0]); t.put(keys[1], values[1]);
      *
      * compareValues(client, "testCQResultSet_1", t);
@@ -2390,19 +1929,17 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for CQ Listener events.
-   *
    */
   @Test
-  public void testCQEvents() throws Exception {
+  public void testCQEvents() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -2429,7 +1966,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     waitForUpdated(client, "testCQEvents_0", KEY + size);
 
-
     // validate Update events.
     validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
         /* updates: */ 15, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 15,
@@ -2448,15 +1984,14 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       public void run2() throws CacheException {
         Region region1 = getRootRegion().getSubregion(regions[0]);
         for (int i = -1; i >= -5; i--) {
-          // change : new Portfolio(i) rdubey ( for Suspect strings problem).
-          // region1.put(KEY+i, new Portfolio(i) );
+
           region1.put(KEY + i, KEY + i);
         }
       }
     });
 
-    Wait.pause(1 * 1000);
-    // cqs should not get any creates, deletes or updates. rdubey.
+    Wait.pause(1000);
+    // cqs should not get any creates, deletes or updates.
     validateCQ(client, "testCQEvents_0", /* resultSize: */ noTest, /* creates: */ size,
         /* updates: */ 15, /* deletes; */5, /* queryInserts: */ size, /* queryUpdates: */ 15,
         /* queryDeletes: */ 5, /* totalEvents: */ size + 15 + 5);
@@ -2467,11 +2002,10 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   }
 
   @Test
-  public void testCQMapValues() throws Exception {
+  public void testCQMapValues() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
     String mapQuery = "select * from /root/" + regions[0] + " er where er['field1'] > 'value2'";
     int size = 10;
 
@@ -2480,7 +2014,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     createMapValues(server, regions[0], size / 2);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -2529,18 +2063,17 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   }
 
 
-  public void createMapValues(VM vm, final String regionName, final int size) {
+  private void createMapValues(VM vm, final String regionName, final int size) {
     vm.invoke(new CacheSerializableRunnable("Create map values") {
       public void run2() throws CacheException {
         Region exampleRegion = getRootRegion().getSubregion(regionName);
         for (int i = 1; i <= size; i++) {
-          Map<String, String> value = new HashMap<String, String>();
+          Map<String, String> value = new HashMap<>();
           value.put("field1", "value" + i);
           value.put("field2", "key" + i);
           exampleRegion.put(KEY + i, value);
         }
-        LogWriterUtils.getLogWriter()
-            .info("### Number of Entries in Region :" + exampleRegion.keySet().size());
+        logger.info("### Number of Entries in Region :" + exampleRegion.keySet().size());
       }
     });
   }
@@ -2548,18 +2081,16 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for stopping and restarting CQs.
-   *
    */
   @Test
-  public void testEnableDisableCQ() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+  public void testEnableDisableCQ() {
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -2572,7 +2103,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
       public void run2() throws CacheException {
         // Get CQ Service.
-        QueryService cqService = null;
+        QueryService cqService;
         try {
           cqService = getCache().getQueryService();
           cqService.stopCqs();
@@ -2583,11 +2114,11 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       }
     });
 
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
     // Init values at server.
     int size = 10;
     createValues(server, regions[0], size);
-    Wait.pause(1 * 500);
+    Wait.pause(500);
     // There should not be any creates.
     validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ 0,
         /* updates: */ 0, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ 0,
@@ -2597,7 +2128,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     client.invoke(new CacheSerializableRunnable("Client enableCQs()") {
       public void run2() throws CacheException {
         // Get CQ Service.
-        QueryService cqService = null;
+        QueryService cqService;
         try {
           cqService = getCache().getQueryService();
           cqService.executeCqs();
@@ -2607,7 +2138,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         }
       }
     });
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
     createValues(server, regions[0], size);
     waitForUpdated(client, "testEnableDisable_0", KEY + size);
     // It gets created on the CQs
@@ -2619,7 +2150,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
       public void run2() throws CacheException {
         // Get CQ Service.
-        QueryService cqService = null;
+        QueryService cqService;
         try {
           cqService = getCache().getQueryService();
           cqService.stopCqs("/root/" + regions[0]);
@@ -2632,7 +2163,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     Wait.pause(2 * 1000);
     deleteValues(server, regions[0], size / 2);
-    Wait.pause(1 * 500);
+    Wait.pause(500);
     // There should not be any deletes.
     validateCQ(client, "testEnableDisable_0", /* resultSize: */ noTest, /* creates: */ 0,
         /* updates: */ size, /* deletes; */ 0, /* queryInserts: */ 0, /* queryUpdates: */ size,
@@ -2642,7 +2173,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     client.invoke(new CacheSerializableRunnable("Client enableCQs()") {
       public void run2() throws CacheException {
         // Get CQ Service.
-        QueryService cqService = null;
+        QueryService cqService;
         try {
           cqService = getCache().getQueryService();
           cqService.executeCqs("/root/" + regions[0]);
@@ -2652,7 +2183,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         }
       }
     });
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
     createValues(server, regions[0], size / 2);
     waitForCreated(client, "testEnableDisable_0", KEY + (size / 2));
     // Gets updated on the CQ.
@@ -2667,18 +2198,16 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for Complex queries.
-   *
    */
   @Test
-  public void testQuery() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+  public void testQuery() {
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -2709,19 +2238,17 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for CQ Fail over.
-   *
    */
   @Test
-  public void testCQFailOver() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client = host.getVM(2);
+  public void testCQFailOver() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client = VM.getVM(2);
 
     createServer(server1);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
     // Create client.
     // Properties props = new Properties();
     // Create client with redundancyLevel -1
@@ -2736,23 +2263,21 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       createCQ(client, "testCQFailOver_" + i, cqs[i]);
       executeCQ(client, "testCQFailOver_" + i, false, null);
     }
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
 
     // CREATE.
     createValues(server1, regions[0], 10);
     createValues(server1, regions[1], 10);
     waitForCreated(client, "testCQFailOver_0", KEY + 10);
 
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
 
     createServer(server2, ports[0]);
     final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
     System.out
         .println("### Port on which server1 running : " + port1 + " Server2 running : " + thePort2);
-    Wait.pause(3 * 1000);
 
-    // Extra pause - added after downmerging trunk r17050
-    Wait.pause(5 * 1000);
+    Wait.pause(8 * 1000);
 
     // UPDATE - 1.
     createValues(server1, regions[0], 10);
@@ -2796,21 +2321,19 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test for CQ Fail over/HA with redundancy level set.
-   *
    */
   @Test
-  public void testCQHA() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM server3 = host.getVM(2);
+  public void testCQHA() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM server3 = VM.getVM(2);
 
-    VM client = host.getVM(3);
+    VM client = VM.getVM(3);
 
     createServer(server1);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
 
@@ -2822,7 +2345,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     System.out.println("### Port on which server1 running : " + port1 + " server2 running : "
         + thePort2 + " Server3 running : " + port3);
 
-
     // Create client - With 3 server endpoints and redundancy level set to 2.
 
     // Create client with redundancyLevel 1
@@ -2836,16 +2358,14 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       executeCQ(client, "testCQHA_" + i, false, null);
     }
 
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
 
     // CREATE.
     createValues(server1, regions[0], 10);
     createValues(server1, regions[1], 10);
 
-
     waitForCreated(client, "testCQHA_0", KEY + 10);
 
-
     // Clients expected initial result.
     int[] resultsCnt = new int[] {10, 1, 2};
 
@@ -2858,10 +2378,8 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     createValues(server2, regions[0], 10);
     createValues(server2, regions[1], 10);
 
-
     waitForUpdated(client, "testCQHA_0", KEY + 10);
 
-
     // Validate CQ.
     for (int i = 0; i < numCQs; i++) {
       validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i], noTest);
@@ -2881,7 +2399,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     waitForUpdated(client, "testCQHA_0", KEY + 10);
 
-
     for (int i = 0; i < numCQs; i++) {
       validateCQ(client, "testCQHA_" + i, noTest, resultsCnt[i], resultsCnt[i] * 2, noTest);
     }
@@ -2893,21 +2410,19 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Test without CQs. This was added after an exception encountered with CQService, when there was
-   * no CQService intiated.
-   *
+   * no CQService initiated.
    */
   @Test
-  public void testWithoutCQs() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client = host.getVM(2);
+  public void testWithoutCQs() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client = VM.getVM(2);
 
     createServer(server1);
     createServer(server2);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
 
@@ -2925,7 +2440,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       }
     };
 
-
     // Create client.
     client.invoke(createConnectionPool);
 
@@ -2949,7 +2463,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       }
     });
 
-
     Wait.pause(2 * 1000);
     closeServer(server1);
     closeServer(server2);
@@ -2960,15 +2473,14 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    * Test getCQs for a regions
    */
   @Test
-  public void testGetCQsForARegionName() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+  public void testGetCQsForARegionName() {
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -2995,28 +2507,34 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     client.invoke(new CacheSerializableRunnable("Client disableCQs()") {
       public void run2() throws CacheException {
         // Get CQ Service.
-        QueryService cqService = null;
+        QueryService cqService;
         try {
           cqService = getCache().getQueryService();
           CqQuery[] cq = cqService.getCqs("/root/" + regions[0]);
-          assertNotNull(
-              "CQservice should not return null for cqs on this region : /root/" + regions[0], cq);
+          assertThat(cq)
+              .describedAs(
+                  "CQService should not return null for cqs on this region : /root/" + regions[0])
+              .isNotNull();
           getCache().getLogger().info("cqs for region: /root/" + regions[0] + " : " + cq.length);
           // closing on of the cqs.
 
           cq[0].close();
           cq = cqService.getCqs("/root/" + regions[0]);
-          assertNotNull(
-              "CQservice should not return null for cqs on this region : /root/" + regions[0], cq);
+          assertThat(cq)
+              .describedAs(
+                  "CQService should not return null for cqs on this region : /root/" + regions[0])
+              .isNotNull();
           getCache().getLogger().info("cqs for region: /root/" + regions[0]
-              + " after closeing one of the cqs : " + cq.length);
+              + " after closing one of the cqs : " + cq.length);
 
           cq = cqService.getCqs("/root/" + regions[1]);
           getCache().getLogger().info("cqs for region: /root/" + regions[1] + " : " + cq.length);
-          assertNotNull(
-              "CQservice should not return null for cqs on this region : /root/" + regions[1], cq);
+          assertThat(cq)
+              .describedAs(
+                  "CQService should not return null for cqs on this region : /root/" + regions[1])
+              .isNotNull();
         } catch (Exception cqe) {
-          Assert.fail("Failed to getCQService", cqe);
+          fail("Failed to getCQService", cqe);
         }
       }
     });
@@ -3032,17 +2550,15 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    * Test exception message thrown when replicate region with local destroy is used
    */
   @Test
-  public void testCqExceptionForReplicateRegionWithEvictionLocalDestroy() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+  public void testCqExceptionForReplicateRegionWithEvictionLocalDestroy() {
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServerOnly(server, 0);
     createReplicateRegionWithLocalDestroy(server, new String[] {regions[0]});
 
-
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createLocalRegion(client, new int[] {thePort}, host0, "-1", new String[] {regions[0]});
@@ -3054,7 +2570,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     try {
       executeCQ(client, "testQuery_3", false, expectedError);
     } catch (Exception e) {
-      assertTrue(e.getCause().getCause().getMessage().contains(expectedError));
+      assertThat(e.getCause().getCause().getMessage().contains(expectedError)).isTrue();
 
     }
     // Close.
@@ -3065,19 +2581,17 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Tests execution of queries with NULL in where clause like where ID = NULL etc.
-   *
    */
   @Test
-  public void testQueryWithNULLInWhereClause() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
-    VM producer = host.getVM(2);
+  public void testQueryWithNULLInWhereClause() {
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
+    VM producer = VM.getVM(2);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -3093,13 +2607,15 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     createValues(producer, regions[0], (2 * size));
 
     for (int i = 1; i <= size; i++) {
-      if (i % 2 == 0)
+      if (i % 2 == 0) {
         waitForUpdated(client, "testQuery_9", KEY + i);
+      }
     }
 
     for (int i = (size + 1); i <= 2 * size; i++) {
-      if (i % 2 == 0)
+      if (i % 2 == 0) {
         waitForCreated(client, "testQuery_9", KEY + i);
+      }
     }
 
     validateCQ(client, "testQuery_9", noTest, 25, 25, noTest);
@@ -3112,40 +2628,36 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   /**
    * Tests execution of queries with NULL in where clause like where ID = NULL etc.
-   *
    */
   @Test
-  public void testForSupportedRegionAttributes() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client = host.getVM(2);
+  public void testForSupportedRegionAttributes() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client = VM.getVM(2);
 
     // Create server with Global scope.
     SerializableRunnable createServer = new CacheSerializableRunnable("Create Cache Server") {
       public void run2() throws CacheException {
-        LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
+        logger.info("### Create Cache Server. ###");
 
         // Create region with Global scope
         AttributesFactory factory1 = new AttributesFactory();
         factory1.setScope(Scope.GLOBAL);
-        factory1.setMirrorType(MirrorType.KEYS_VALUES);
+        factory1.setDataPolicy(DataPolicy.REPLICATE);
         createRegion(regions[0], factory1.createRegionAttributes());
 
         // Create region with non Global, distributed_ack scope
         AttributesFactory factory2 = new AttributesFactory();
         factory2.setScope(Scope.DISTRIBUTED_NO_ACK);
-        factory2.setMirrorType(MirrorType.KEYS_VALUES);
+        factory2.setDataPolicy(DataPolicy.REPLICATE);
         createRegion(regions[1], factory2.createRegionAttributes());
 
         Wait.pause(2000);
 
         try {
           startBridgeServer(port, true);
-        }
-
-        catch (Exception ex) {
-          Assert.fail("While starting CacheServer", ex);
+        } catch (Exception ex) {
+          fail("While starting CacheServer", ex);
         }
         Wait.pause(2000);
 
@@ -3157,7 +2669,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
 
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
 
@@ -3212,16 +2724,15 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   }
 
   @Test
-  public void testCQWhereCondOnShort() throws Exception {
+  public void testCQWhereCondOnShort() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -3229,7 +2740,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     /* CQ Test with initial Values. */
     int size = 5;
     createValuesWithShort(server, regions[0], size);
-    Wait.pause(1 * 500);
+    Wait.pause(500);
 
     // Create CQs.
     createCQ(client, "testCQResultSet_0", shortTypeCQs[0]);
@@ -3242,16 +2753,15 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   }
 
   @Test
-  public void testCQEquals() throws Exception {
+  public void testCQEquals() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -3260,7 +2770,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     int size = 10;
     // create values
     createValuesAsPrimitives(server, regions[0], size);
-    Wait.pause(1 * 500);
+    Wait.pause(500);
 
     // Create CQs.
     createCQ(client, "equalsQuery1", "select * from /root/regionA p where p.equals('seeded')");
@@ -3271,7 +2781,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     // Create CQs.
     createCQ(client, "equalsQuery2", "select * from /root/regionA p where p='seeded'");
 
-
     // Check resultSet Size.
     executeCQ(client, "equalsQuery2", true, 2, null);
 
@@ -3294,16 +2803,15 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   }
 
   @Test
-  public void testCQEqualsWithIndex() throws Exception {
+  public void testCQEqualsWithIndex() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -3313,7 +2821,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     // create values
     createIndex(server, "index1", "p.status", "/root/regionA p");
     createValuesAsPrimitives(server, regions[0], size);
-    Wait.pause(1 * 500);
+    Wait.pause(500);
 
     // Create CQs.
     createCQ(client, "equalsQuery1", "select * from /root/regionA p where p.equals('seeded')");
@@ -3348,21 +2856,19 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   // Tests that cqs get an onCqDisconnect and onCqConnect
   @Test
-  public void testCQAllServersCrash() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client = host.getVM(2);
+  public void testCQAllServersCrash() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client = VM.getVM(2);
 
     createServer(server1);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
     createClient(client, new int[] {port1, ports[0]}, host0, "-1");
 
-    int numCQs = 1;
     // Create CQs.
     createCQ(client, "testCQAllServersLeave_" + 11, cqs[11], true);
     executeCQ(client, "testCQAllServersLeave_" + 11, false, null);
@@ -3375,7 +2881,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     waitForCreated(client, "testCQAllServersLeave_11", KEY + 10);
 
     createServer(server2, ports[0]);
-    final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
+    server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
     Wait.pause(8 * 1000);
 
     // Close server1.
@@ -3396,15 +2902,14 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
   // Tests that we receive both an onCqConnected and a onCqDisconnected message
   @Test
-  public void testCQAllServersLeave() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client = host.getVM(2);
+  public void testCQAllServersLeave() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client = VM.getVM(2);
 
     createServer(server1);
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
     final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
 
     createClient(client, new int[] {port1, ports[0]}, host0, "-1");
@@ -3438,19 +2943,18 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     closeClient(client);
   }
 
-  // Test cqstatus listeners, onCqDisconnect should trigger when All servers leave
+  // Test CQStatus listeners, onCqDisconnect should trigger when All servers leave
   // and onCqConnect should trigger when a cq is first connected and when the pool
   // goes from no primary queue to having a primary
   @Test
-  public void testCQAllServersLeaveAndRejoin() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client = host.getVM(2);
+  public void testCQAllServersLeaveAndRejoin() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client = VM.getVM(2);
 
     createServer(server1);
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
 
@@ -3469,7 +2973,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     // start server 2
     createServer(server2, ports[0]);
-    final int thePort2 = server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
+    server2.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
     Wait.pause(8 * 1000);
 
     // Close server1.
@@ -3503,15 +3007,14 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    * Tests that the cqs do not get notified if primary leaves and a new primary is elected
    */
   @Test
-  public void testCQPrimaryLeaves() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client = host.getVM(2);
+  public void testCQPrimaryLeaves() {
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client = VM.getVM(2);
 
     createServer(server1);
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
     createClient(client, new int[] {port1, ports[0]}, host0, "-1");
@@ -3551,15 +3054,14 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   // when their respective servers are shutdown
   @Test
   public void testCQAllServersLeaveMultiplePool() throws Exception {
-    final Host host = Host.getHost(0);
-    VM server1 = host.getVM(0);
-    VM server2 = host.getVM(1);
-    VM client = host.getVM(2);
+    VM server1 = VM.getVM(0);
+    VM server2 = VM.getVM(1);
+    VM client = VM.getVM(2);
 
     createServer(server1);
 
     final int port1 = server1.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server1.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
     final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
 
     createServer(server2, ports[0]);
@@ -3602,16 +3104,15 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
 
   @Test
-  public void testCqCloseAndExecuteWithInitialResults() throws Exception {
+  public void testCqCloseAndExecuteWithInitialResults() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -3619,29 +3120,26 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     /* CQ Test with initial Values. */
     int size = 5;
     createValuesWithShort(server, regions[0], size);
-    Wait.pause(1 * 500);
+    Wait.pause(500);
 
     // Create CQs.
-    executeAndCloseAndExecuteIRMultipleTimes(client, "testCQResultSet_0", shortTypeCQs[0]);
-
+    executeAndCloseAndExecuteIRMultipleTimes(client, shortTypeCQs[0]);
 
     closeClient(client);
 
-
     closeServer(server);
   }
 
   @Test
-  public void testCQEventsWithNotEqualsUndefined() throws Exception {
+  public void testCQEventsWithNotEqualsUndefined() {
 
-    final Host host = Host.getHost(0);
-    VM server = host.getVM(0);
-    VM client = host.getVM(1);
+    VM server = VM.getVM(0);
+    VM client = VM.getVM(1);
 
     createServer(server);
 
     final int thePort = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort());
-    final String host0 = NetworkUtils.getServerHostName(server.getHost());
+    final String host0 = NetworkUtils.getServerHostName();
 
     // Create client.
     createClient(client, thePort, host0);
@@ -3669,7 +3167,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
 
     waitForUpdated(client, "testCQEventsWithUndefined_0", KEY + size);
 
-
     // validate Update events.
     validateCQ(client, "testCQEventsWithUndefined_0", /* resultSize: */ noTest, /* creates: */ size,
         /* updates: */ 15, /* deletes; */ 0, /* queryInserts: */ size, /* queryUpdates: */ 15,
@@ -3688,14 +3185,12 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
       public void run2() throws CacheException {
         Region region1 = getRootRegion().getSubregion(regions[0]);
         for (int i = -1; i >= -5; i--) {
-          // change : new Portfolio(i) rdubey ( for Suspect strings problem).
-          // region1.put(KEY+i, new Portfolio(i) );
           region1.put(KEY + i, KEY + i);
         }
       }
     });
 
-    Wait.pause(1 * 1000);
+    Wait.pause(1000);
     // cqs should get any creates and inserts even for invalid
     // since this is a NOT EQUALS query which adds Undefined to
     // results
@@ -3708,7 +3203,6 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     closeServer(server);
   }
 
-
   // HELPER METHODS....
 
   /* For debug purpose - Compares entries in the region */
@@ -3717,14 +3211,14 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     server.invoke(new CacheSerializableRunnable("Server Region Entries") {
       public void run2() throws CacheException {
         Region region = getRootRegion().getSubregion(regionName);
-        LogWriterUtils.getLogWriter().info("### Entries in Server :" + region.keySet().size());
+        logger.info("### Entries in Server :" + region.keySet().size());
       }
     });
 
     client.invoke(new CacheSerializableRunnable("Client Region Entries") {
       public void run2() throws CacheException {
         Region region = getRootRegion().getSubregion(regionName);
-        LogWriterUtils.getLogWriter().info("### Entries in Client :" + region.keySet().size());
+        logger.info("### Entries in Client :" + region.keySet().size());
       }
     });
   }
@@ -3732,7 +3226,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
   /*
    * Used only by tests that start and stop a server only to need to start the bridge server again
    */
-  public void restartBridgeServer(VM server, final int port) {
+  private void restartBridgeServer(VM server, final int port) {
     server.invoke(new CacheSerializableRunnable("Start bridge server") {
       public void run2() {
         try {
@@ -3775,27 +3269,27 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    *
    * @since GemFire 4.0
    */
-  protected void stopBridgeServer(Cache cache) {
-    CacheServer bridge = (CacheServer) cache.getCacheServers().iterator().next();
+  private void stopBridgeServer(Cache cache) {
+    CacheServer bridge = cache.getCacheServers().iterator().next();
     bridge.stop();
-    assertFalse(bridge.isRunning());
+    assertThat(bridge.isRunning()).isFalse();
   }
 
   private void stopBridgeServers(Cache cache) {
-    CacheServer bridge = null;
-    for (Iterator bsI = cache.getCacheServers().iterator(); bsI.hasNext();) {
-      bridge = (CacheServer) bsI.next();
+    CacheServer bridge;
+    for (CacheServer cacheServer : cache.getCacheServers()) {
+      bridge = cacheServer;
       bridge.stop();
-      assertFalse(bridge.isRunning());
+      assertThat(bridge.isRunning()).isFalse();
     }
   }
 
   private void restartBridgeServers(Cache cache) throws IOException {
-    CacheServer bridge = null;
-    for (Iterator bsI = cache.getCacheServers().iterator(); bsI.hasNext();) {
-      bridge = (CacheServer) bsI.next();
+    CacheServer bridge;
+    for (CacheServer cacheServer : cache.getCacheServers()) {
+      bridge = cacheServer;
       bridge.start();
-      assertTrue(bridge.isRunning());
+      assertThat(bridge.isRunning()).isTrue();
     }
   }
 
@@ -3805,7 +3299,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
     lonerProps.setProperty(MCAST_PORT, "0");
     lonerProps.setProperty(LOCATORS, "");
     InternalDistributedSystem ds = getSystem(lonerProps);
-    assertEquals(0, ds.getDistributionManager().getOtherDistributionManagerIds().size());
+    assertThat(ds.getDistributionManager().getOtherDistributionManagerIds().size()).isEqualTo(0);
     return ds;
   }
 


Mime
View raw message