geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [44/62] [abbrv] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-837
Date Tue, 07 Jun 2016 20:54:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataUsingPoolDUnitTest.java
index 613648e,b94c7e4..53d473e
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataUsingPoolDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqDataUsingPoolDUnitTest.java
@@@ -16,29 -16,7 +16,26 @@@
   */
  package com.gemstone.gemfire.cache.query.cq.dunit;
  
- import org.junit.experimental.categories.Category;
- import org.junit.Test;
- 
- import static org.junit.Assert.*;
- 
- import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
- import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
- import com.gemstone.gemfire.test.junit.categories.DistributedTest;
 -import com.gemstone.gemfire.cache.*;
++import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.*;
++import static com.gemstone.gemfire.test.dunit.Assert.*;
 +
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Properties;
 +import java.util.Set;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +
++import org.junit.Test;
++import org.junit.experimental.categories.Category;
++
 +import com.gemstone.gemfire.cache.AttributesFactory;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.EvictionAttributes;
 +import com.gemstone.gemfire.cache.MirrorType;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.Scope;
  import com.gemstone.gemfire.cache.client.Pool;
  import com.gemstone.gemfire.cache.client.PoolFactory;
  import com.gemstone.gemfire.cache.client.PoolManager;
@@@ -57,38 -25,36 +54,33 @@@ import com.gemstone.gemfire.cache.query
  import com.gemstone.gemfire.cache.query.internal.cq.CqQueryImpl;
  import com.gemstone.gemfire.cache.query.internal.cq.CqQueryImpl.TestHook;
  import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
--import com.gemstone.gemfire.cache30.CacheTestCase;
  import com.gemstone.gemfire.cache30.CertifiableTestCacheListener;
- import com.gemstone.gemfire.distributed.internal.DistributionConfig;
  import com.gemstone.gemfire.internal.AvailablePortHelper;
  import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
  import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
  import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
 -import com.gemstone.gemfire.test.dunit.*;
 -import util.TestException;
 -
 -import java.util.HashSet;
 -import java.util.List;
 -import java.util.Properties;
 -import java.util.Set;
 -import java.util.concurrent.CountDownLatch;
 -import java.util.concurrent.TimeUnit;
 -
 -import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.*;
 +import com.gemstone.gemfire.test.dunit.Assert;
 +import com.gemstone.gemfire.test.dunit.AsyncInvocation;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.IgnoredException;
 +import com.gemstone.gemfire.test.dunit.Invoke;
 +import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 +import com.gemstone.gemfire.test.dunit.NetworkUtils;
 +import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 +import com.gemstone.gemfire.test.dunit.VM;
 +import com.gemstone.gemfire.test.dunit.Wait;
 +import com.gemstone.gemfire.test.dunit.WaitCriterion;
++import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
++import com.gemstone.gemfire.test.junit.categories.DistributedTest;
  
  /**
-- * This class tests the ContiunousQuery mechanism in GemFire.
++ * This class tests the ContinuousQuery mechanism in GemFire.
   * This includes the test with different data activities.
-- *
   */
 -public class CqDataUsingPoolDUnitTest extends CacheTestCase {
 +@Category(DistributedTest.class)
 +public class CqDataUsingPoolDUnitTest extends JUnit4CacheTestCase {
  
-   protected CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest();
 -  protected CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest("CqDataUsingPoolDUnitTest");
--  
-   public CqDataUsingPoolDUnitTest() {
-     super();
 -  public CqDataUsingPoolDUnitTest(String name) {
 -    super(name);
--  }
++  protected CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest(); // TODO: don't do this!
  
    @Override
    public final void postSetUp() throws Exception {
@@@ -96,6 -62,6 +88,7 @@@
      // system before creating ConnectionPools
      getSystem();
      Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
++      @Override
        public void run() {
          getSystem();
        }
@@@ -111,10 -77,9 +104,8 @@@
     * Added wrt bug 37161.
     * In case of InterestList the events are not sent back to the client
     * if its the originator, this is not true for cq.
--   * 
--   * @throws Exception
     */
 +  @Test
    public void testClientWithFeederAndCQ() throws Exception
    {
      final Host host = Host.getHost(0);
@@@ -128,7 -93,7 +119,7 @@@
  
      String poolName = "testClientWithFeederAndCQ";
      cqDUnitTest.createPool(client, poolName, host0, port);
--    
++
      // Create client.
      cqDUnitTest.createClient(client, port, host0);
  
@@@ -149,41 -114,40 +140,38 @@@
          /* queryUpdates: */ 0,
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
--    
++
      // Close.
      cqDUnitTest.closeClient(client);
      cqDUnitTest.closeServer(server);
--    
    }
  
    /**
     * Test for CQ Fail over/HA with redundancy level set.
--   * @throws Exception
     */
 +  @Test
    public void testCQHAWithState() throws Exception {
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
      VM server2 = host.getVM(1);
      VM server3 = host.getVM(2);
--    
++
      VM client = host.getVM(3);
--    
++
      cqDUnitTest.createServer(server1);
--    
++
      final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server1.getHost());
--    
++
      final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
--    
++
      cqDUnitTest.createServer(server2, ports[0]);
      final int port2 = server2.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
--            
++
      // Create client - With 3 server endpoints and redundancy level set to 2.
--    
++
      // Create client with redundancyLevel 1
--    //cqDUnitTest.createClient(client, new int[] {port1, port2, ports[1]}, host0, "1");
--    
++
      String poolName = "testCQHAWithState";
      cqDUnitTest.createPool(client, poolName, new String[] {host0, host0, host0}, new int[] {port1, port2, ports[1]}, "1");
  
@@@ -194,95 -158,94 +182,91 @@@
        cqDUnitTest.createCQ(client, poolName, "testCQHAWithState_" + i, cqDUnitTest.cqs[i]);
        cqDUnitTest.executeCQ(client, "testCQHAWithState_" + i, false, null);
      }
--    
++
      Wait.pause(1 * 1000);
--    
++
      int size = 10;
--    
++
      // CREATE.
      cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size);
      cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size);
--    
++
      for (int i=1; i <= size; i++) {
        cqDUnitTest.waitForCreated(client, "testCQHAWithState_0", CqQueryUsingPoolDUnitTest.KEY + i);
      }
--    
++
      // Clients expected initial result.
      int[] resultsCnt = new int[] {10, 1, 2};
  
      for (int i=0; i < numCQs; i++) {
        cqDUnitTest.validateCQ(client, "testCQHAWithState_" + i, CqQueryUsingPoolDUnitTest.noTest, resultsCnt[i], 0, 0);
--    }    
++    }
  
      // Close server1.
      // To maintain the redundancy; it will make connection to endpoint-3.
      cqDUnitTest.closeServer(server1);
      Wait.pause(3 * 1000);
--    
--    
++
++
      // UPDATE-1.
      cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], 10);
      cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], 10);
--    
++
      for (int i=1; i <= size; i++) {
        cqDUnitTest.waitForUpdated(client, "testCQHAWithState_0", CqQueryUsingPoolDUnitTest.KEY + size);
      }
--    
++
      for (int i=0; i < numCQs; i++) {
        cqDUnitTest.validateCQ(client, "testCQHAWithState_" + i, CqQueryUsingPoolDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryUsingPoolDUnitTest.noTest);
--    }    
++    }
  
      //Stop cq.
      cqDUnitTest.stopCQ(client, "testCQHAWithState_0");
--    
++
      Wait.pause(2 * 1000);
--    
++
      // UPDATE with stop.
      cqDUnitTest.createServer(server3, ports[1]);
      server3.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      Wait.pause(2 * 1000);
--    
++
      cqDUnitTest.clearCQListenerEvents(client, "testCQHAWithState_0");
--    
--    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], 10);    
++
++    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], 10);
      cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], 10);
--    
++
      // Wait for events at client.
      try {
        cqDUnitTest.waitForUpdated(client, "testCQHAWithState_0", CqQueryUsingPoolDUnitTest.KEY + 1);
        fail("Events not expected since CQ is in stop state.");
--    } catch (Exception ex) {
++    } catch (Exception expected) {
        // Success.
      }
--    
++
      cqDUnitTest.executeCQ(client, "testCQHAWithState_0", false, null);
  
--    // Update - 2 
--    cqDUnitTest.createValues(server3, cqDUnitTest.regions[0], 10);    
++    // Update - 2
++    cqDUnitTest.createValues(server3, cqDUnitTest.regions[0], 10);
      cqDUnitTest.createValues(server3, cqDUnitTest.regions[1], 10);
--    
++
      for (int i=1; i <= size; i++) {
        cqDUnitTest.waitForUpdated(client, "testCQHAWithState_0", CqQueryUsingPoolDUnitTest.KEY + size);
      }
  
      for (int i=0; i < numCQs; i++) {
        cqDUnitTest.validateCQ(client, "testCQHAWithState_" + i, CqQueryUsingPoolDUnitTest.noTest, resultsCnt[i], resultsCnt[i] * 2, CqQueryUsingPoolDUnitTest.noTest);
--    }    
--    
++    }
++
      // Close.
      cqDUnitTest.closeClient(client);
      cqDUnitTest.closeServer(server2);
      cqDUnitTest.closeServer(server3);
    }
  
--  
--  
    /**
     * Tests propogation of invalidates and destorys to the clients. Bug 37242.
--   * 
--   * @throws Exception
     */
 +  @Test
    public void testCQWithDestroysAndInvalidates() throws Exception
    {
      final Host host = Host.getHost(0);
@@@ -298,7 -261,7 +282,7 @@@
  
      // Create client.
      //cqDUnitTest.createClient(client, port, host0);
--    
++
      // producer is not doing any thing.
      cqDUnitTest.createClient(producer, port, host0);
  
@@@ -308,9 -271,9 +292,10 @@@
  
      cqDUnitTest.createCQ(client, poolName, name, cqDUnitTest.cqs[4]);
      cqDUnitTest.executeCQ(client, name, true, null);
--    
++
      // do destroys and invalidates.
      server.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException
        {
          Region region1 = getRootRegion().getSubregion(cqDUnitTest.regions[0]);
@@@ -328,11 -291,11 +313,12 @@@
      for (int i = 1; i <= 5; i++) {
        cqDUnitTest.waitForCreated(client, name , CqQueryUsingPoolDUnitTest.KEY+i);
      }
--    
++
      // do more puts to push first five key-value to disk.
      cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 10);
      // do invalidates on fisrt five keys.
      server.invoke(new CacheSerializableRunnable("Create values") {
++      @Override
        public void run2() throws CacheException
        {
          Region region1 = getRootRegion().getSubregion(cqDUnitTest.regions[0]);
@@@ -345,28 -308,27 +331,25 @@@
      for (int i = 1; i <= 5; i++) {
        cqDUnitTest.waitForInvalidated(client, name , CqQueryUsingPoolDUnitTest.KEY+i);
      }
--        
++
      // Close.
      cqDUnitTest.closeClient(client);
      cqDUnitTest.closeServer(server);
--  
    }
--  
++
    /**
     * Tests make sure that the second client doesnt get more
--   * events then there should be. This will test the fix for 
++   * events then there should be. This will test the fix for
     * bug 37295.
--   * 
     */
 +  @Test
    public void testCQWithMultipleClients() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client1 = host.getVM(1);
      VM client2 = host.getVM(2);
      VM client3 = host.getVM(3);
--    
++
      /* Create Server and Client */
      cqDUnitTest.createServer(server);
      final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
@@@ -377,28 -339,28 +360,23 @@@
  
      cqDUnitTest.createPool(client1, poolName1, host0, port);
      cqDUnitTest.createPool(client2, poolName2, host0, port);
--    
--    //cqDUnitTest.createClient(client1, port, host0);
--    //cqDUnitTest.createClient(client2, port, host0);
--    
++
      /* Create CQs. and initialize the region */
--    // this should statisfy every thing since id is always greater than 
++    // this should stasify every thing since id is always greater than
      // zero.
      cqDUnitTest.createCQ(client1, poolName1, "testCQWithMultipleClients_0", cqDUnitTest.cqs[0]);
      cqDUnitTest.executeCQ(client1, "testCQWithMultipleClients_0", false, null);
      // should only satisfy one key-value pair in the region.
      cqDUnitTest.createCQ(client2, poolName2, "testCQWithMultipleClients_0", cqDUnitTest.cqs[1]);
      cqDUnitTest.executeCQ(client2, "testCQWithMultipleClients_0", false, null);
--    
++
      int size = 10;
--    
++
      // Create Values on Server.
      cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
--    
--    
++
      cqDUnitTest.waitForCreated(client1, "testCQWithMultipleClients_0", CqQueryUsingPoolDUnitTest.KEY + 10);
--    
--    
++
      /* Validate the CQs */
      cqDUnitTest.validateCQ(client1, "testCQWithMultipleClients_0",
          /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
@@@ -409,11 -371,11 +387,9 @@@
          /* queryUpdates: */ 0,
          /* queryDeletes: */ 0,
          /* totalEvents: */ size);
--    
--    
++
      cqDUnitTest.waitForCreated(client2, "testCQWithMultipleClients_0", CqQueryUsingPoolDUnitTest.KEY + 2 );
--    
--    
++
      cqDUnitTest.validateCQ(client2, "testCQWithMultipleClients_0",
          /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
          /* creates: */ 1,
@@@ -423,8 -385,8 +399,7 @@@
          /* queryUpdates: */ 0,
          /* queryDeletes: */ 0,
          /* totalEvents: */ 1);
--         
--        
++
      /* Close Server and Client */
      cqDUnitTest.closeClient(client2);
      cqDUnitTest.closeClient(client3);
@@@ -433,39 -395,38 +408,39 @@@
  
    /**
     * Test for CQ when region is populated with net load.
--   * @throws Exception
     */
 +  @Test
    public void testCQWithLoad() throws Exception {
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
      VM server2 = host.getVM(1);
--    
++
      VM client = host.getVM(2);
--    
++
      cqDUnitTest.createServer(server1, 0, false, MirrorType.KEYS_VALUES);
      cqDUnitTest.createServer(server2, 0, false, MirrorType.KEYS);
--        
++
      final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server1.getHost());
--    
++
      String poolName = "testCQWithLoad";
      cqDUnitTest.createPool(client, poolName, host0, port1);
  
      //cqDUnitTest.createClient(client, port1, host0);
--    
++
      // Create CQs.
--    cqDUnitTest.createCQ(client, poolName, "testCQWithLoad_0", cqDUnitTest.cqs[0]);  
--    cqDUnitTest.executeCQ(client, "testCQWithLoad_0", false, null); 
--    
++    cqDUnitTest.createCQ(client, poolName, "testCQWithLoad_0", cqDUnitTest.cqs[0]);
++    cqDUnitTest.executeCQ(client, "testCQWithLoad_0", false, null);
++
      Wait.pause(1 * 1000);
--    
++
      final int size = 10;
--    
++
      // CREATE VALUES.
      cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size);
--    
++
      server1.invoke(new CacheSerializableRunnable("Load from second server") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
          for (int i=1; i <= size; i++){
@@@ -473,13 -434,13 +448,13 @@@
          }
        }
      });
--    
++
      for (int i=1; i <= size; i++) {
        cqDUnitTest.waitForCreated(client, "testCQWithLoad_0", CqQueryUsingPoolDUnitTest.KEY + i);
      }
--        
++
      cqDUnitTest.validateCQ(client, "testCQWithLoad_0", CqQueryUsingPoolDUnitTest.noTest, size, 0, 0);
--        
++
      // Close.
      cqDUnitTest.closeClient(client);
      cqDUnitTest.closeServer(server1);
@@@ -488,20 -449,19 +463,18 @@@
  
    /**
     * Test for CQ when entries are evicted from region.
--   * @throws Exception
     */
 +  @Test
    public void testCQWithEviction() throws Exception {
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
      VM server2 = host.getVM(1);
--    
++
      VM client = host.getVM(2);
--    
--    //cqDUnitTest.createServer(server1, 0, false, MirrorType.NONE);
  
      final int evictionThreshold = 5;
      server1.invoke(new CacheSerializableRunnable("Create Cache Server") {
++      @Override
        public void run2() throws CacheException {
          LogWriterUtils.getLogWriter().info("### Create Cache Server. ###");
          AttributesFactory factory = new AttributesFactory();
@@@ -510,53 -470,53 +483,53 @@@
  
          // setting the eviction attributes.
          factory.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(evictionThreshold));
--        for (int i = 0; i < cqDUnitTest.regions.length; i++) { 
++        for (int i = 0; i < cqDUnitTest.regions.length; i++) {
            Region region = createRegion(cqDUnitTest.regions[i], factory.createRegionAttributes());
            // Set CacheListener.
--          region.getAttributesMutator().setCacheListener(new CertifiableTestCacheListener(LogWriterUtils.getLogWriter()));  
--        } 
++          region.getAttributesMutator().setCacheListener(new CertifiableTestCacheListener(LogWriterUtils.getLogWriter()));
++        }
          Wait.pause(2000);
--        
++
          try {
            cqDUnitTest.startBridgeServer(0, true);
          } catch (Exception ex) {
            Assert.fail("While starting CacheServer", ex);
          }
          Wait.pause(2000);
--
        }
      });
  
      cqDUnitTest.createServer(server2, 0, false, MirrorType.NONE);
--        
++
      final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server1.getHost());
--      
++
      String poolName = "testCQWithEviction";
      cqDUnitTest.createPool(client, poolName, host0, port1);
  
      //cqDUnitTest.createClient(client, port1, host0);
--    
++
      // Create CQs.
--    cqDUnitTest.createCQ(client, poolName, "testCQWithEviction_0", cqDUnitTest.cqs[0]);  
++    cqDUnitTest.createCQ(client, poolName, "testCQWithEviction_0", cqDUnitTest.cqs[0]);
  
      // This should fail as Region is not replicated.
      // There is a bug37966 filed on this.
      try {
--      cqDUnitTest.executeCQ(client, "testCQWithEviction_0", false, "CqException"); 
++      cqDUnitTest.executeCQ(client, "testCQWithEviction_0", false, "CqException");
        fail("Should have thrown exception, cq not supported on Non-replicated region.");
--    } catch (Exception ex) {
++    } catch (Exception expected) {
        // Ignore  expected.
      }
--    
++
      Wait.pause(1 * 1000);
--    
++
      final int size = 10;
--    
++
      // CREATE VALUES.
      cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size);
--    
++
      server1.invoke(new CacheSerializableRunnable("Load from second server") {
++      @Override
        public void run2() throws CacheException {
          Region region1 = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
          for (int i=1; i <= size; i++){
@@@ -564,23 -524,23 +537,20 @@@
          }
        }
      });
--    
--    //for (int i=1; i <= size; i++) {
--    //  cqDUnitTest.waitForCreated(client, "testCQWithEviction_0", cqDUnitTest.KEY + i);
--    //}
--        
++
      Wait.pause(2 * 1000);
--    
++
      server1.invoke(new CacheSerializableRunnable("validate destroy") {
++      @Override
        public void run2() throws CacheException {
          Region region = getRootRegion().getSubregion(cqDUnitTest.regions[0]);
          assertNotNull(region);
--        
++
          Set keys = region.entrySet();
          int keyCnt= size - evictionThreshold;
--        assertEquals("Mismatch, number of keys in local region is not equal to the expected size", 
++        assertEquals("Mismatch, number of keys in local region is not equal to the expected size",
            keyCnt, keys.size());
--        
++
          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
          for (int i = 1; i <= keyCnt; i++) {
            ctl.waitForDestroyed(CqQueryUsingPoolDUnitTest.KEY+i);
@@@ -589,21 -549,20 +559,16 @@@
        }
      });
  
--    // There should not be any destroy events.
--    //cqDUnitTest.validateCQ(client, "testCQWithEviction_0", cqDUnitTest.noTest, size, 0, 0);
--        
      // Close.
      cqDUnitTest.closeClient(client);
      cqDUnitTest.closeServer(server1);
      cqDUnitTest.closeServer(server2);
    }
--  
--  
++
    /**
     * Test for CQ with establishCallBackConnection.
--   * @throws Exception
     */
 +  @Test
    public void testCQWithEstablishCallBackConnection() throws Exception {
      final Host host = Host.getHost(0);
      VM server1 = host.getVM(0);
@@@ -614,31 -573,31 +579,29 @@@
      final int port1 = server1.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String serverHost = NetworkUtils.getServerHostName(server1.getHost());
  
--//    final String[] regions = cqDUnitTest.regions;
--//    final int[] serverPorts = new int[] {port1};
--
      final String poolName = "testCQWithEstablishCallBackConnection";
  
      client.invoke(new CacheSerializableRunnable("createPool :" + poolName) {
++      @Override
        public void run2() throws CacheException {
          // Create Cache.
          getCache();
--        
++
          PoolFactory cpf = PoolManager.createFactory();
--        cpf.setSubscriptionEnabled(false);        
++        cpf.setSubscriptionEnabled(false);
          cpf.addServer(serverHost, port1);
          cpf.create(poolName);
        }
--    });   
++    });
  
      // Create CQs.
--    cqDUnitTest.createCQ(client, poolName, "testCQWithEstablishCallBackConnection_0", cqDUnitTest.cqs[0]);  
++    cqDUnitTest.createCQ(client, poolName, "testCQWithEstablishCallBackConnection_0", cqDUnitTest.cqs[0]);
  
      // This should fail.
      try {
        cqDUnitTest.executeCQ(client, "testCQWithEstablishCallBackConnection_0", false, "CqException");
        fail("Test should have failed with connection with establishCallBackConnection not found.");
--    } catch (Exception ex) {
++    } catch (Exception expected) {
        // Expected.
      }
  
@@@ -646,40 -605,39 +609,39 @@@
      cqDUnitTest.closeClient(client);
      cqDUnitTest.closeServer(server1);
    }
--  
++
    /**
     * Test for:
     * Region destroy, calls close on the server.
     * Region clear triggers cqEvent with query op region clear.
     * Region invalidate triggers cqEvent with query op region invalidate.
--   * @throws Exception
     */
 +  @Test
    public void testRegionEvents() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client = host.getVM(1);
--    
++
      cqDUnitTest.createServer(server);
      final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server.getHost());
--    
++
      String poolName = "testRegionEvents";
      cqDUnitTest.createPool(client, poolName, host0, port);
  
      //cqDUnitTest.createClient(client, port, host0);
--    
++
      // Create CQ on regionA
      cqDUnitTest.createCQ(client, poolName, "testRegionEvents_0", cqDUnitTest.cqs[0]);
      cqDUnitTest.executeCQ(client, "testRegionEvents_0", false, null);
--    
++
      // Create CQ on regionB
      cqDUnitTest.createCQ(client, poolName, "testRegionEvents_1", cqDUnitTest.cqs[2]);
      cqDUnitTest.executeCQ(client, "testRegionEvents_1", false, null);
  
      // Test for Event on Region Clear.
      server.invoke(new CacheSerializableRunnable("testRegionEvents"){
++      @Override
        public void run2()throws CacheException {
          LogWriterUtils.getLogWriter().info("### Clearing the region on the server ###");
          Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
@@@ -689,11 -647,11 +651,12 @@@
          region.clear();
        }
      });
--    
++
      cqDUnitTest.waitForRegionClear(client,"testRegionEvents_0");
  
      // Test for Event on Region invalidate.
      server.invoke(new CacheSerializableRunnable("testRegionEvents"){
++      @Override
        public void run2()throws CacheException {
          LogWriterUtils.getLogWriter().info("### Invalidate the region on the server ###");
          Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
@@@ -708,6 -666,6 +671,7 @@@
  
      // Test for Event on Region destroy.
      server.invoke(new CacheSerializableRunnable("testRegionEvents"){
++      @Override
        public void run2()throws CacheException {
          LogWriterUtils.getLogWriter().info("### Destroying the region on the server ###");
          Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[1]);
@@@ -723,44 -681,43 +687,41 @@@
      //cqDUnitTest.waitForClose(client,"testRegionEvents_1");
      cqDUnitTest.validateCQCount(client,1);
  
--        
      // Close.
      cqDUnitTest.closeClient(client);
      cqDUnitTest.closeServer(server);
--
    }
  
    /**
     * Test for events created during the CQ query execution.
--   * When CQs are executed using executeWithInitialResults 
++   * When CQs are executed using executeWithInitialResults
     * there may be possibility that the region changes during
     * that time may not be reflected in the query result set
     * thus making the query data and region data inconsistent.
--   * @throws Exception
     */
 +  @Test
    public void testEventsDuringQueryExecution() throws Exception {
--    
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      final VM client = host.getVM(1);
      final String cqName = "testEventsDuringQueryExecution_0";
      cqDUnitTest.createServer(server);
--    
++
      final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server.getHost());
-- 
++
      String poolName = "testEventsDuringQueryExecution";
      cqDUnitTest.createPool(client, poolName, host0, port);
--    
++
      // create CQ.
      cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
  
      final int numObjects = 200;
      final int totalObjects = 500;
--    
++
      // initialize Region.
      server.invoke(new CacheSerializableRunnable("Update Region"){
++      @Override
        public void run2()throws CacheException {
          Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]);
          for (int i = 1; i <= numObjects; i++) {
@@@ -769,13 -726,13 +730,14 @@@
          }
        }
      });
--    
++
      // First set testhook in executeWithInitialResults so that queued events
--    // are not drained before we verify there number.    
--    client.invoke(setTestHook());    
--    
++    // are not drained before we verify there number.
++    client.invoke(setTestHook());
++
      // Execute CQ while update is in progress.
      AsyncInvocation executeCq = client.invokeAsync(new CacheSerializableRunnable("Execute CQ AsyncInvoke") {
++      @Override
        public void run2()throws CacheException {
          QueryService cqService = getCache().getQueryService();
          // Get CqQuery object.
@@@ -798,26 -755,26 +760,26 @@@
            public boolean done() {
              return testHook.numQueuedEvents() > 0;
            }
--          
++
            @Override
            public String description() {
              return "No queued events found.";
            }
          }, 3000, 5, true);
--        
++
          getCache().getLogger().fine("Queued Events Size" + testHook.numQueuedEvents());
          // Make sure CQEvents are queued during execute with initial results.
--        
++
          CqQueryTestListener cqListener = (CqQueryTestListener)cq1.getCqAttributes().getCqListener();
          // Wait for the last key to arrive.
          cqListener.waitForCreated("" + totalObjects);
--        
++
          // Check if the events from CqListener are in order.
          int oldId = 0;
--        for (Object cqEvent : cqListener.events.toArray()) { 
++        for (Object cqEvent : cqListener.events.toArray()) {
            int newId = new Integer(cqEvent.toString()).intValue();
            if (oldId > newId){
--            fail("Queued events for CQ Listener during execution with " + 
++            fail("Queued events for CQ Listener during execution with " +
                  "Initial results is not in the order in which they are created.");
            }
            oldId = newId;
@@@ -825,7 -782,7 +787,7 @@@
  
          // Check if all the IDs are present as part of Select Results and CQ Events.
          HashSet ids = new HashSet(cqListener.events);
--    
++
          for (Object o : cqResults.asList()) {
            Struct s = (Struct)o;
            ids.add(s.get("key"));
@@@ -839,7 -796,7 +801,7 @@@
              missingIds.add(key);
            }
          }
--        
++
          if (!missingIds.isEmpty()) {
            fail("Missing Keys in either ResultSet or the Cq Event list. " +
                " Missing keys : [size : " + missingIds.size() + "]" + missingIds +
@@@ -850,9 -807,9 +812,11 @@@
  
      // Keep updating region (async invocation).
      server.invoke(new CacheSerializableRunnable("Update Region"){
++      @Override
        public void run2()throws CacheException {
          Wait.pause(200);
          client.invoke(new CacheSerializableRunnable("Releasing the latch") {
++          @Override
            public void run2()throws CacheException {
              // Now release the testhook so that CQListener can proceed.
              final TestHook testHook = CqQueryImpl.testHook;
@@@ -870,10 -827,9 +834,9 @@@
      // Close.
      cqDUnitTest.closeClient(client);
      cqDUnitTest.closeServer(server);
--
    }
  
 +  @Test
    public void testCqStatInitializationTimingIssue() {
      disconnectAllFromDS();
  
@@@ -885,35 -841,35 +848,37 @@@
      VM server = host.getVM(0);
      VM client = host.getVM(1);
      VM client2 = host.getVM(2);
-- 
++
      // Start server 1
      final int server1Port = ((Integer) server.invoke(() -> CacheServerTestUtil.createCacheServer(regionName, new Boolean(true))))
          .intValue();
--    
++
      // Start a  client
      client.invoke(() -> CacheServerTestUtil.createCacheClient(getClientPool(NetworkUtils.getServerHostName(client.getHost()), server1Port), regionName));
--    
++
      // Start a pub client
      client2.invoke(() -> CacheServerTestUtil.createCacheClient(getClientPool(NetworkUtils.getServerHostName(client2.getHost()), server1Port), regionName));
--    
++
      //client has thread that invokes new and remove cq over and over
      client.invokeAsync(new CacheSerializableRunnable("Register cq") {
++      @Override
        public void run2() throws CacheException {
          for (int i = 0; i < 10000; i++) {
            CqQuery query = createCq(regionName, cq1Name);
            if (query != null) {
--            try {              
++            try {
                query.close();
              }
              catch (Exception e) {
--              System.out.println("exception while closing cq:" + e);
++              fail("exception while closing cq:", e);
              }
            }
          }
        }
      });
--    
++
      client2.invokeAsync(new CacheSerializableRunnable("pub updates") {
++      @Override
        public void run2() throws CacheException {
          Region region = CacheServerTestUtil.getCache().getRegion(regionName);
          while (true) {
@@@ -923,8 -879,8 +888,9 @@@
          }
        }
      });
--    
++
      server.invokeAsync(new CacheSerializableRunnable("pub updates") {
++      @Override
        public void run2() throws CacheException {
          Region region = CacheServerTestUtil.getCache().getRegion(regionName);
          while (true) {
@@@ -934,43 -890,42 +900,45 @@@
          }
        }
      });
--    
++
      //client has another thread that retrieves cq map and checks stat over and over
      client.invoke(new CacheSerializableRunnable("Check Stats") {
++      @Override
        public void run2() throws CacheException {
          for (int i = 0; i < 10000; i++) {
            checkCqStats(cq1Name);
          }
        }
      });
--    
++
      client.invoke(() -> CacheServerTestUtil.closeCache());
      client2.invoke(() -> CacheServerTestUtil.closeCache());
      server.invoke(() -> CacheServerTestUtil.closeCache());
    }
--  
++
 +  @Test
    public void testGetDurableCQsFromPoolOnly() throws Exception {
      final String regionName = "regionA";
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client1 = host.getVM(1);
      VM client2 = host.getVM(2);
--    
++
      /* Create Server and Client */
      cqDUnitTest.createServer(server);
      final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server.getHost());
--  
++
      final String poolName1 = "pool1";
      final String poolName2 = "pool2";
--  
++
      cqDUnitTest.createPool(client1, poolName1, host0, port);
      cqDUnitTest.createPool(client2, poolName2, host0, port);
--    
++
      client1.invoke(new CacheSerializableRunnable("Register cq for client 1") {
++      @Override
        public void run2() throws CacheException {
--    
++
          QueryService queryService = null;
          try {
            queryService = (PoolManager.find(poolName1)).getQueryService();
@@@ -986,26 -941,26 +954,21 @@@
            queryService.newCq("client1NoDC2", "Select * From /root/" + regionName + " where id = 3", attributes, false).execute();
          }
          catch (CqException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqExistsException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (RegionNotFoundException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
        }
      });
--    
++
      client2.invoke(new CacheSerializableRunnable("Register cq for client 2") {
++      @Override
        public void run2() throws CacheException {
--    
++
          QueryService queryService = null;
          try {
            queryService = (PoolManager.find(poolName2)).getQueryService();
@@@ -1021,24 -976,24 +984,19 @@@
            queryService.newCq("client2DCQ4", "Select * From /root/" + regionName + " where id = 3", attributes, true).execute();
          }
          catch (CqException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqExistsException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (RegionNotFoundException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
        }
      });
--    
++
      client2.invoke(new CacheSerializableRunnable("test getDurableCQsFromServer for client2") {
++      @Override
        public void run2() throws CacheException {
          QueryService queryService = null;
          try {
@@@ -1046,14 -1001,14 +1004,12 @@@
          } catch (Exception cqe) {
            Assert.fail("Failed to getCQService.", cqe);
          }
--        List<String> list;
++        List<String> list = null;
          try {
            list = queryService.getAllDurableCqsFromServer();
          }
          catch (CqException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          assertEquals(4, list.size());
          assertTrue(list.contains("client2DCQ1"));
@@@ -1062,8 -1017,8 +1018,9 @@@
          assertTrue(list.contains("client2DCQ4"));
        }
      });
--    
++
      client1.invoke(new CacheSerializableRunnable("test getDurableCQsFromServer for client1") {
++      @Override
        public void run2() throws CacheException {
          QueryService queryService = null;
          try {
@@@ -1071,42 -1026,41 +1028,41 @@@
          } catch (Exception cqe) {
            Assert.fail("Failed to getCQService.", cqe);
          }
--        List<String> list;
++        List<String> list = null;
          try {
            list = queryService.getAllDurableCqsFromServer();
          }
          catch (CqException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          assertEquals(2, list.size());
          assertTrue(list.contains("client1DCQ1"));
          assertTrue(list.contains("client1DCQ2"));
        }
      });
--    
++
      cqDUnitTest.closeClient(client2);
      cqDUnitTest.closeClient(client1);
      cqDUnitTest.closeServer(server);
    }
--  
++
 +  @Test
    public void testGetDurableCQsFromServerWithEmptyList() throws Exception {
      final Host host = Host.getHost(0);
      VM server = host.getVM(0);
      VM client1 = host.getVM(1);
--    
++
      /* Create Server and Client */
      cqDUnitTest.createServer(server);
      final int port = server.invoke(() -> CqQueryUsingPoolDUnitTest.getCacheServerPort());
      final String host0 = NetworkUtils.getServerHostName(server.getHost());
--  
++
      final String poolName1 = "pool1";
--  
++
      cqDUnitTest.createPool(client1, poolName1, host0, port);
--    
++
      client1.invoke(new CacheSerializableRunnable("test getDurableCQsFromServer for client1") {
++      @Override
        public void run2() throws CacheException {
          QueryService queryService = null;
          try {
@@@ -1114,21 -1068,21 +1070,19 @@@
          } catch (Exception cqe) {
            Assert.fail("Failed to getCQService.", cqe);
          }
--        List<String> list;
++        List<String> list = null;
          try {
            list = queryService.getAllDurableCqsFromServer();
          }
          catch (CqException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          assertEquals(0, list.size());
          assertFalse(list.contains("client1DCQ1"));
          assertFalse(list.contains("client1DCQ2"));
        }
      });
--    
++
      cqDUnitTest.closeClient(client1);
      cqDUnitTest.closeServer(server);
    }
@@@ -1158,14 -1111,14 +1112,15 @@@
      createClient2CqsAndDurableCqs(client2, regionName);
      
      client2.invoke(new CacheSerializableRunnable("check durable cqs for client 2") {
++      @Override
        public void run2() throws CacheException {
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
--        List<String> list;
++        List<String> list = null;
          try {
            list = queryService.getAllDurableCqsFromServer();
          }
          catch (CqException e) {
--          throw new CacheException(e) {};
++          fail("failed", e);
          }
          assertEquals(4, list.size());
          assertTrue(list.contains("client2DCQ1"));
@@@ -1176,14 -1129,14 +1131,15 @@@
      });
      
      client1.invoke(new CacheSerializableRunnable("check durable cqs for client 1") {
++      @Override
        public void run2() throws CacheException {
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
--        List<String> list;
++        List<String> list = null;
          try {
            list = queryService.getAllDurableCqsFromServer();
          }
          catch (CqException e) {
--          throw new CacheException(e) {};
++          fail("failed", e);
          }
          assertEquals(2, list.size());
          assertTrue(list.contains("client1DCQ1"));
@@@ -1223,14 -1175,14 +1179,15 @@@
      cycleDurableClient(client2, "client2_dc", server1Port, regionName, timeout);
      
      client2.invoke(new CacheSerializableRunnable("check durable cqs for client 2") {
++      @Override
        public void run2() throws CacheException {
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
--        List<String> list;
++        List<String> list = null;
          try {
            list = queryService.getAllDurableCqsFromServer();
          }
          catch (CqException e) {
--          throw new CacheException(e) {};
++          fail("failed", e);
          }
          assertEquals(4, list.size());
          assertTrue(list.contains("client2DCQ1"));
@@@ -1241,14 -1193,14 +1198,15 @@@
      });
      
      client1.invoke(new CacheSerializableRunnable("check durable cqs for client 1") {
++      @Override
        public void run2() throws CacheException {
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
--        List<String> list;
++        List<String> list = null;
          try {
            list = queryService.getAllDurableCqsFromServer();
          }
          catch (CqException e) {
--          throw new CacheException(e) {};
++          fail("failed", e);
          }
          assertEquals(2, list.size());
          assertTrue(list.contains("client1DCQ1"));
@@@ -1287,6 -1238,6 +1245,7 @@@
      cycleDurableClient(client2, "client2_dc", server1Port, regionName, timeout);
      
      client1.invoke(new CacheSerializableRunnable("Register more cq for client 1") {
++      @Override
        public void run2() throws CacheException {
          //register the cq's
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
@@@ -1299,24 -1250,24 +1258,19 @@@
            queryService.newCq("client1MoreNoDC2", "Select * From /" + regionName + " where id = 3", attributes, false).execute();
          }
          catch (RegionNotFoundException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqExistsException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
        }
      });
      
      client2.invoke(new CacheSerializableRunnable("Register more cq for client 2") {
++      @Override
        public void run2() throws CacheException {
          //register the cq's
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
@@@ -1329,19 -1280,19 +1283,13 @@@
            queryService.newCq("client2MoreDCQ4", "Select * From /" + regionName + " where id = 3", attributes, true).execute();
          }
          catch (RegionNotFoundException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqExistsException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
        }
      });
@@@ -1351,14 -1302,14 +1299,15 @@@
      cycleDurableClient(client2, "client2_dc", server1Port, regionName, timeout);
      
      client2.invoke(new CacheSerializableRunnable("check durable cqs for client 2") {
++      @Override
        public void run2() throws CacheException {
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
--        List<String> list;
++        List<String> list = null;
          try {
            list = queryService.getAllDurableCqsFromServer();
          }
          catch (CqException e) {
--          throw new CacheException(e) {};
++          fail("failed", e);
          }
          assertEquals(8, list.size());
          assertTrue(list.contains("client2DCQ1"));
@@@ -1373,14 -1324,14 +1322,15 @@@
      });
      
      client1.invoke(new CacheSerializableRunnable("check durable cqs for client 1") {
++      @Override
        public void run2() throws CacheException {
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
--        List<String> list;
++        List<String> list = null;
          try {
            list = queryService.getAllDurableCqsFromServer();
          }
          catch (CqException e) {
--          throw new CacheException(e) {};
++          fail("failed", e);
          }
          assertEquals(4, list.size());
          assertTrue(list.contains("client1DCQ1"));
@@@ -1407,6 -1358,6 +1357,7 @@@
    //helper to create durable cqs to test out getAllDurableCqs functionality
    private void createClient1CqsAndDurableCqs(VM client, final String regionName) {
      client.invoke(new CacheSerializableRunnable("Register cq for client 1") {
++      @Override
        public void run2() throws CacheException {
          //register the cq's
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
@@@ -1419,19 -1370,19 +1370,13 @@@
            queryService.newCq("client1NoDC2", "Select * From /" + regionName + " where id = 3", attributes, false).execute();
          }
          catch (RegionNotFoundException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqExistsException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
        }
      });
@@@ -1439,6 -1390,6 +1384,7 @@@
    
    private void createClient2CqsAndDurableCqs(VM client, final String regionName) {
      client.invoke(new CacheSerializableRunnable("Register cq for client 2") {
++      @Override
        public void run2() throws CacheException {
          //register the cq's
          QueryService queryService = CacheServerTestUtil.getCache().getQueryService();
@@@ -1451,19 -1402,19 +1397,13 @@@
            queryService.newCq("client2DCQ4", "Select * From /" + regionName + " where id = 3", attributes, true).execute();
          }
          catch (RegionNotFoundException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
          catch (CqExistsException e) {
--          throw new CacheException(e) {
--            
--          };
++          fail("failed", e);
          }
  
        }
@@@ -1472,6 -1423,6 +1412,7 @@@
    
    private void cycleDurableClient(VM client, final String dcName, final int serverPort, final String regionName, final int durableClientTimeout) {
      client.invoke(new CacheSerializableRunnable("cycle client") {
++      @Override
        public void run2() throws CacheException {
          CacheServerTestUtil.closeCache(true);
        }
@@@ -1479,7 -1430,7 +1420,6 @@@
      
      client.invoke(() -> CacheServerTestUtil.createClientCache(getClientPool(NetworkUtils.getServerHostName(client.getHost()), serverPort), regionName, getDurableClientProperties(dcName, durableClientTimeout)));
    }
--  
  
    private CqQuery createCq(String regionName, String cqName) {
      // Create CQ Attributes.
@@@ -1487,19 -1438,19 +1427,15 @@@
      
      // Initialize and set CqListener.
      CqListener[] cqListeners = { new CqListener() {
--
        @Override
        public void close() {
        }
--
        @Override
        public void onEvent(CqEvent aCqEvent) {
        }
--
        @Override
        public void onError(CqEvent aCqEvent) {
        }
--      
      }};
      cqAf.initCqListeners(cqListeners);
      CqAttributes cqa = cqAf.create();
@@@ -1513,14 -1464,14 +1449,14 @@@
        query.execute();
      }
      catch (CqExistsException e) {
--      fail("Could not find specified region:" + regionName + ":" + e);
++      fail("Could not find specified region:" + regionName + ":", e);
      }
      catch (CqException e) {
--      fail("Could not find specified region:" + regionName + ":" + e);
++      fail("Could not find specified region:" + regionName + ":", e);
  
      }
      catch (RegionNotFoundException e) {
--      fail("Could not find specified region:" + regionName + ":" + e);
++      fail("Could not find specified region:" + regionName + ":", e);
      }
      return query;
    }
@@@ -1541,10 -1492,10 +1477,10 @@@
        .setSubscriptionAckInterval(1).setSubscriptionEnabled(true);
      return ((PoolFactoryImpl)pf).getPoolAttributes();
    }
--  
  
    public CacheSerializableRunnable setTestHook() {
      SerializableRunnable sr = new CacheSerializableRunnable("TestHook") {
++      @Override
        public void run2() {
          class CqQueryTestHook implements CqQueryImpl.TestHook {
  
@@@ -1552,27 -1503,27 +1488,30 @@@
            private int numEvents = 0;
            Cache cache = GemFireCacheImpl.getInstance();
            
++          @Override
            public void pauseUntilReady() {
              try {
                cache.getLogger().fine("CqQueryTestHook: Going to wait on latch until ready is called.");
                if (!latch.await(10, TimeUnit.SECONDS)) {
 -                throw new TestException("query was never unlatched");
 +                fail("query was never unlatched");
                }
              } catch (Exception e) {
--              e.printStackTrace();
--              Thread.currentThread().interrupt();
++              fail("interrupted", e);
              }
            }
  
++          @Override
            public void ready() {
              latch.countDown();
              cache.getLogger().fine("CqQueryTestHook: The latch has been released.");
            }
  
++          @Override
            public int numQueuedEvents() {
              return numEvents;
            }
            
++          @Override
            public void setEventCount(int count) {
              cache.getLogger().fine("CqQueryTestHook: Setting numEVents to: " + count);
              numEvents = count;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryDUnitTest.java
index 64fb388,5c993ce..25d3380
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryDUnitTest.java
@@@ -16,83 -16,39 +16,78 @@@
   */
  package com.gemstone.gemfire.cache.query.cq.dunit;
  
- import org.junit.experimental.categories.Category;
- import org.junit.Test;
- 
 -import com.gemstone.gemfire.cache.*;
 -import com.gemstone.gemfire.cache.query.*;
++import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.*;
 +import static org.junit.Assert.*;
 +
- import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
- import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
- import com.gemstone.gemfire.test.junit.categories.DistributedTest;
- 
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Enumeration;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +import java.util.Set;
 +
++import org.junit.Test;
++import org.junit.experimental.categories.Category;
++
 +import com.gemstone.gemfire.cache.AttributesFactory;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.EvictionAction;
 +import com.gemstone.gemfire.cache.EvictionAttributes;
 +import com.gemstone.gemfire.cache.MirrorType;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionAttributes;
 +import com.gemstone.gemfire.cache.RegionFactory;
 +import com.gemstone.gemfire.cache.RegionShortcut;
 +import com.gemstone.gemfire.cache.Scope;
 +import com.gemstone.gemfire.cache.query.CqAttributes;
 +import com.gemstone.gemfire.cache.query.CqAttributesFactory;
 +import com.gemstone.gemfire.cache.query.CqAttributesMutator;
 +import com.gemstone.gemfire.cache.query.CqClosedException;
 +import com.gemstone.gemfire.cache.query.CqExistsException;
 +import com.gemstone.gemfire.cache.query.CqListener;
 +import com.gemstone.gemfire.cache.query.CqQuery;
 +import com.gemstone.gemfire.cache.query.Query;
 +import com.gemstone.gemfire.cache.query.QueryService;
 +import com.gemstone.gemfire.cache.query.RegionNotFoundException;
 +import com.gemstone.gemfire.cache.query.SelectResults;
  import com.gemstone.gemfire.cache.query.data.Portfolio;
  import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
  import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
  import com.gemstone.gemfire.cache.server.CacheServer;
- import com.gemstone.gemfire.cache30.ClientServerTestCase;
  import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
--import com.gemstone.gemfire.cache30.CacheTestCase;
  import com.gemstone.gemfire.cache30.CertifiableTestCacheListener;
- import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+ import com.gemstone.gemfire.cache30.ClientServerTestCase;
  import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
  import com.gemstone.gemfire.internal.AvailablePortHelper;
  import com.gemstone.gemfire.internal.cache.DistributedRegion;
  import com.gemstone.gemfire.internal.cache.DistributedTombstoneOperation;
  import com.gemstone.gemfire.internal.cache.EventID;
  import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 -import com.gemstone.gemfire.test.dunit.*;
 -
 -import java.io.IOException;
 -import java.util.*;
 -
 -import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.LOCATORS;
 -import static com.gemstone.gemfire.distributed.DistributedSystemConfigProperties.MCAST_PORT;
 +import com.gemstone.gemfire.test.dunit.Assert;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.Invoke;
 +import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 +import com.gemstone.gemfire.test.dunit.NetworkUtils;
 +import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 +import com.gemstone.gemfire.test.dunit.VM;
 +import com.gemstone.gemfire.test.dunit.Wait;
 +import com.gemstone.gemfire.test.dunit.WaitCriterion;
++import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
++import com.gemstone.gemfire.test.junit.categories.DistributedTest;
  
  /**
   * This class tests the ContiunousQuery mechanism in GemFire.
   * It does so by creating a cache server with a cache and a pre-defined region and
   * a data loader. The client creates the same region and attaches the connection pool.
-- * 
-- *
   */
- @SuppressWarnings("serial")
 +@Category(DistributedTest.class)
+ @SuppressWarnings("serial")
 -public class CqQueryDUnitTest extends CacheTestCase {
 +public class CqQueryDUnitTest extends JUnit4CacheTestCase {
    
    /** The port on which the bridge server was started in this VM */
    private static int bridgeServerPort;
@@@ -177,10 -133,10 +172,6 @@@
    // 11 - Test for "short" number type
    "SELECT ALL * FROM /root/" + regions[0] + " p where p.shortID IN SET(1,2,3,4,5)" };
  
-   public CqQueryDUnitTest() {
-     super();
 -  public CqQueryDUnitTest(String name) {
 -    super(name);
--  }
--
    @Override
    public final void postSetUp() throws Exception {
      // avoid IllegalStateException from HandShake by connecting all vms tor

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd38e10f/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryOptimizedExecuteDUnitTest.java
----------------------------------------------------------------------


Mime
View raw message