geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [45/70] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Thu, 28 Jan 2016 18:13:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfDUnitTest.java
new file mode 100644
index 0000000..7d47dd6
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfDUnitTest.java
@@ -0,0 +1,1036 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.cache.query.internal.cq.ServerCQImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+/**
+ * This class tests the ContiunousQuery mechanism in GemFire.
+ * This includes the test with diffetent data activities.
+ *
+ * @author anil
+ */
+public class CqPerfDUnitTest extends CacheTestCase {
+
+  protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest("CqPerfDUnitTest");
+  
+  public CqPerfDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    // avoid IllegalStateException from HandShake by connecting all vms tor
+    // system before creating connection pools
+    getSystem();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        getSystem();
+      }
+    });
+    
+  }
+    
+
+  /**
+   * Tests the cq performance.
+   * @throws Exception
+   */
+  public void perf_testCQPerf() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    
+    cqDUnitTest.createServer(server);
+    
+    final int port = server.invokeInt(CqQueryDUnitTest.class,
+    "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+    final String cqName = "testCQPerf_0";    
+    
+    client.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Create CQ. ###" + cqName);
+        // Get CQ Service.
+        QueryService cqService = null;
+        try {
+          cqService = getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+        // Create CQ Attributes.
+        CqAttributesFactory cqf = new CqAttributesFactory();
+        CqListener[] cqListeners = {new CqTimeTestListener(getLogWriter())};
+        ((CqTimeTestListener)cqListeners[0]).cqName = cqName;
+        
+        cqf.initCqListeners(cqListeners);
+        CqAttributes cqa = cqf.create();
+        
+        // Create and Execute CQ.
+        try {
+          CqQuery cq1 = cqService.newCq(cqName, cqDUnitTest.cqs[0], cqa);
+          assertTrue("newCq() state mismatch", cq1.getState().isStopped());
+          cq1.execute();
+        } catch (Exception ex){
+          getLogWriter().info("CqService is :" + cqService);
+          ex.printStackTrace();
+          AssertionError err = new AssertionError("Failed to create CQ " + cqName + " . ");
+          err.initCause(ex);
+          throw err;
+        }
+      }
+    });   
+    
+    final int size = 50;
+    
+    // Create values.
+    cqDUnitTest.createValuesWithTime(client, cqDUnitTest.regions[0], size);
+    pause(5000);
+    
+    // Update values
+    cqDUnitTest.createValuesWithTime(client, cqDUnitTest.regions[0], size);
+    
+    client.invoke(new CacheSerializableRunnable("Validate CQs") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Validating CQ. ### " + cqName);
+        // Get CQ Service.
+        QueryService cqService = null;
+        try {          
+          cqService = getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCqService.");
+        }
+        
+        CqQuery cQuery = cqService.getCq(cqName);
+        if (cQuery == null) {
+          fail("Failed to get CqQuery for CQ : " + cqName);
+        }
+        
+//        CqAttributes cqAttr = cQuery.getCqAttributes();
+//        CqListener cqListeners[] = cqAttr.getCqListeners();
+//        CqTimeTestListener listener = (CqTimeTestListener) cqListeners[0];
+        
+        // Wait for all the create to arrive.
+        //for (int i=1; i <= size; i++) {
+        //  listener.waitForCreated(cqDUnitTest.KEY+i);
+        //}
+        
+        // Wait for all the update to arrive.
+        //for (int i=1; i <= size; i++) {
+        //  listener.waitForUpdated(cqDUnitTest.KEY+i);
+        //}
+        //getLogWriter().info("### Time taken for Creation of " + size + " events is :" + listener.getTotalQueryCreateTime());
+        
+        //getLogWriter().info("### Time taken for Update of " + size + " events is :" + listener.getTotalQueryUpdateTime());
+      }
+    });
+    
+    pause( 10 * 60 * 1000);
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+    
+  }
+
+  /**
+   * Test for maintaining keys for update optimization.
+   * @throws Exception
+   */
+  public void testKeyMaintainance() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    cqDUnitTest.createClient(client, port, host0);
+    
+
+    // HashSet for caching purpose will be created for cqs.
+    final int cqSize = 2;
+    
+    // Cq1
+    cqDUnitTest.createCQ(client, "testKeyMaintainance_0", cqDUnitTest.cqs[0]);
+    cqDUnitTest.executeCQ(client, "testKeyMaintainance_0", false, null);      
+
+    // Cq2
+    cqDUnitTest.createCQ(client, "testKeyMaintainance_1", cqDUnitTest.cqs[10]);
+    cqDUnitTest.executeCQ(client, "testKeyMaintainance_1", false, null);      
+
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 1);
+    cqDUnitTest.waitForCreated(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY+1);
+
+    // Entry is made into the CQs cache hashset.
+    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 0
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeys1"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                1, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.",
+                0, cqQuery.getCqResultKeysSize());
+          }
+        }
+      }
+    });
+
+    // Update 1.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 10);
+    cqDUnitTest.waitForCreated(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY+10);
+    
+    // Entry/check is made into the CQs cache hashset.
+    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 1
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate1"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                10, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.",
+                5, cqQuery.getCqResultKeysSize());
+          }
+        }        
+      }
+    });
+
+    // Update.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);
+    cqDUnitTest.waitForCreated(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY+12);
+
+    // Entry/check is made into the CQs cache hashset.
+    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 1
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                12, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.",
+                6, cqQuery.getCqResultKeysSize());
+          }
+        }        
+        
+      }
+    });
+
+    // Delete.
+    cqDUnitTest.deleteValues(server, cqDUnitTest.regions[0], 6);
+    cqDUnitTest.waitForDestroyed(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY+6);
+
+    // Entry/check is made into the CQs cache hashset.
+    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 1
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                6, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.",
+                3, cqQuery.getCqResultKeysSize());
+          }
+        }        
+      }
+    });
+
+    // Stop CQ.
+    // This should still needs to process the events so that Results are up-to-date.
+    cqDUnitTest.stopCQ(client, "testKeyMaintainance_1");      
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);
+
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                12,cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.", 
+                6, cqQuery.getCqResultKeysSize());
+          }
+        }        
+        
+      }
+    });
+
+
+    // re-start the CQ.
+    cqDUnitTest.executeCQ(client, "testKeyMaintainance_1", false, null);      
+
+    // This will remove the caching for this CQ.
+    cqDUnitTest.closeCQ(client, "testKeyMaintainance_1");      
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+ 
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                12, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            fail("The key maintainance should not be present for this CQ.");
+          }
+        }
+        
+      }
+    });
+
+
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+  }
+
+  /**
+   * Test for common CQs.
+   * To test the changes relating to, executing CQ only once for all similar CQs.
+   * @throws Exception
+   */
+  public void testMatchingCqs() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    cqDUnitTest.createClient(client, port, host0);
+    
+    // Create and Execute same kind of CQs.
+    for (int i =0; i < 4; i++) {
+      cqDUnitTest.createCQ(client, "testMatchingCqs_"+i, cqDUnitTest.cqs[0]);
+      cqDUnitTest.executeCQ(client, "testMatchingCqs_"+i, false, null);      
+    }
+
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 4);
+    
+    int size = 1; 
+    
+    // Create.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    cqDUnitTest.waitForCreated(client, "testMatchingCqs_0", CqQueryDUnitTest.KEY+size);
+    cqDUnitTest.waitForCreated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY+size);
+
+    // Close one of the CQ.
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_0");      
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 3);
+    
+    // Update.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY+size);
+    
+    // Stop one of the CQ.
+    cqDUnitTest.stopCQ(client, "testMatchingCqs_1");      
+    
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 2);
+    
+    // Update - 2.
+    cqDUnitTest.clearCQListenerEvents(client, "testMatchingCqs_3");
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY+size);
+
+    // stopped CQ should not receive 2nd/previous updates.
+    cqDUnitTest.validateCQ(client, "testMatchingCqs_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: once */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size * 2);
+
+    // Execute the stopped CQ.
+    cqDUnitTest.executeCQ(client, "testMatchingCqs_1", false, null);      
+
+    
+    // Update - 3.
+    cqDUnitTest.clearCQListenerEvents(client, "testMatchingCqs_3");
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryDUnitTest.KEY+size);
+    
+    cqDUnitTest.validateCQ(client, "testMatchingCqs_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: 2 */ size * 2,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size * 2,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size * 3);
+
+    // Create different kind of CQs.
+    cqDUnitTest.createCQ(client, "testMatchingCqs_4", cqDUnitTest.cqs[1]);
+    cqDUnitTest.executeCQ(client, "testMatchingCqs_4", false, null);      
+
+    cqDUnitTest.createCQ(client, "testMatchingCqs_5", cqDUnitTest.cqs[1]);
+    cqDUnitTest.executeCQ(client, "testMatchingCqs_5", false, null);      
+    
+    cqDUnitTest.createCQ(client, "testMatchingCqs_6", cqDUnitTest.cqs[2]);
+    cqDUnitTest.executeCQ(client, "testMatchingCqs_6", false, null);      
+
+    validateMatchingCqs(server, 3, cqDUnitTest.cqs[1], 2);
+    
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_6");
+    validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2);
+
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_5");
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_4");
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_3");
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_2");
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_1");
+
+    validateMatchingCqs(server, 0, null, 0);
+
+    // update 4
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+  }
+  
+  
+  /**
+   * Test for common CQs.
+   * To test the changes relating to, executing CQ only once for all similar CQs.
+   * @throws Exception
+   */
+  public void testMatchingCQWithMultipleClients() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client1 = host.getVM(1);
+    VM client2 = host.getVM(2);
+    VM client3 = host.getVM(3);
+    
+    VM clients[] = new VM[]{client1, client2, client3};
+    
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+        
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.createClient(clients[clientIndex], port, host0);
+      // Create and Execute same kind of CQs.
+      for (int i =0; i < 4; i++) {
+        cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_"+i, cqDUnitTest.cqs[0]);
+        cqDUnitTest.executeCQ(clients[clientIndex], 
+            "testMatchingCQWithMultipleClients_"+i, false, null);      
+      }
+    }
+    
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], clients.length * 4);
+    
+    int size = 1; 
+    
+    // Create.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.waitForCreated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_0", CqQueryDUnitTest.KEY+size);
+      cqDUnitTest.waitForCreated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_3", CqQueryDUnitTest.KEY+size);
+    }
+
+    // Close one of the CQ.
+    cqDUnitTest.closeCQ(client1, "testMatchingCQWithMultipleClients_0");      
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 1);
+    
+    // Update.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.waitForUpdated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_3", CqQueryDUnitTest.KEY+size);
+    }
+
+    // Stop one of the CQ.
+    cqDUnitTest.stopCQ(client2, "testMatchingCQWithMultipleClients_1");      
+    
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 2);
+    
+    // Update - 2.
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.clearCQListenerEvents(clients[clientIndex], "testMatchingCQWithMultipleClients_3");
+    }
+
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.waitForUpdated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_3", CqQueryDUnitTest.KEY+size);
+    }
+    
+    // stopped CQ should not receive 2nd/previous updates.
+    cqDUnitTest.validateCQ(client2, "testMatchingCQWithMultipleClients_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: once */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size * 2);
+
+    // Execute the stopped CQ.
+    cqDUnitTest.executeCQ(client2, "testMatchingCQWithMultipleClients_1", false, null);      
+
+    
+    // Update - 3.
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.clearCQListenerEvents(clients[clientIndex], "testMatchingCQWithMultipleClients_3");
+    }
+
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 1);
+    
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.waitForUpdated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_3", CqQueryDUnitTest.KEY+size);
+    }
+
+    cqDUnitTest.validateCQ(client2, "testMatchingCQWithMultipleClients_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: 2 */ size * 2,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size * 2,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size * 3);
+
+    // Create different kind of CQs.
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_4", cqDUnitTest.cqs[1]);
+      cqDUnitTest.executeCQ(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_4", false, null);      
+
+      cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_5", cqDUnitTest.cqs[1]);
+      cqDUnitTest.executeCQ(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_5", false, null);      
+
+      cqDUnitTest.createCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_6", cqDUnitTest.cqs[2]);
+      cqDUnitTest.executeCQ(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_6", false, null);      
+    }
+    
+    validateMatchingCqs(server, 3, cqDUnitTest.cqs[1], 2 * clients.length);
+    
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_6");
+    }
+
+    validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2 * clients.length);
+
+
+    // update 4
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+
+    // Close.
+    cqDUnitTest.closeClient(client3);
+    validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2 * (clients.length - 1));
+    
+    for (int clientIndex=0; clientIndex < 2; clientIndex++){      
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_5");
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_4");
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_3");
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_2");
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_1");
+      if (clientIndex != 0) {
+        cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_0");
+      }
+    }
+
+    validateMatchingCqs(server, 0, null, 0);
+    
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeClient(client1);
+    
+    cqDUnitTest.closeServer(server);
+  }
+
+  /**
+   * Test for CQ Fail over.
+   * @throws Exception
+   */
+  public void testMatchingCQsWithMultipleServers() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    
+    cqDUnitTest.createServer(server1);
+    
+    VM clients[] = new VM[]{client1, client2};
+    
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    // Create client.
+    
+    // Create client with redundancyLevel -1
+    
+    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
+    
+    cqDUnitTest.createClient(client1, new int[] {port1, ports[0]}, host0, "-1");
+    cqDUnitTest.createClient(client2, new int[] {port1, ports[0]}, host0, "-1");
+    
+    int numCQs = 3;
+    
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.createCQ(client1, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
+      cqDUnitTest.executeCQ(client1, "testMatchingCQsWithMultipleServers_" + i, 
+          false, null);
+      
+      cqDUnitTest.createCQ(client2, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
+      cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, 
+          false, null);
+    }
+    
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], 1 * clients.length);
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], 1 * clients.length);
+    
+    pause(1 * 1000);
+    
+    // CREATE.
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], 10);
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], 10);
+    
+    for (int i=1; i <= 10; i++) {
+      cqDUnitTest.waitForCreated(client1, "testMatchingCQsWithMultipleServers_0", CqQueryDUnitTest.KEY+i);
+    }
+
+    pause(1 * 1000);
+    
+    cqDUnitTest.createServer(server2, ports[0]);
+    
+    
+    final int port2 = server2.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    System.out.println("### Port on which server1 running : " + port1 + 
+        " Server2 running : " + port2);
+
+    pause(3 * 1000);
+    
+
+    // UPDATE - 1.
+    for (int k=0; k < numCQs; k++) {
+      cqDUnitTest.clearCQListenerEvents(client1, "testMatchingCQsWithMultipleServers_" + k);
+      cqDUnitTest.clearCQListenerEvents(client2, "testMatchingCQsWithMultipleServers_" + k);
+    }
+
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], 10);    
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], 10);
+
+    // Wait for updates on regions[0]
+    for (int i=1; i <= 10; i++) {
+      cqDUnitTest.waitForUpdated(client1, 
+          "testMatchingCQsWithMultipleServers_0", CqQueryDUnitTest.KEY+i);
+      cqDUnitTest.waitForUpdated(client2, 
+          "testMatchingCQsWithMultipleServers_0", CqQueryDUnitTest.KEY+i);
+    }
+
+    // Wait for updates on regions[1] - Waiting for last key is good enough.
+    cqDUnitTest.waitForUpdated(client1, 
+        "testMatchingCQsWithMultipleServers_2", CqQueryDUnitTest.KEY+4);
+    cqDUnitTest.waitForUpdated(client2, 
+        "testMatchingCQsWithMultipleServers_2", CqQueryDUnitTest.KEY+4);
+    
+    
+    int[] resultsCnt = new int[] {10, 1, 2};
+    
+    
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.validateCQ(client1, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryDUnitTest.noTest);
+
+      cqDUnitTest.validateCQ(client2, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryDUnitTest.noTest);
+    }    
+    
+    // Close server1.
+    cqDUnitTest.closeServer(server1);
+    
+    // Fail over should happen.
+    pause(5 * 1000);
+    
+    validateMatchingCqs(server2, numCQs, cqDUnitTest.cqs[0], 1 * clients.length);
+
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.validateCQ(client1, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryDUnitTest.noTest);
+
+      cqDUnitTest.validateCQ(client2, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryDUnitTest.noTest);
+    }    
+    
+    // UPDATE - 2
+    for (int k=0; k < numCQs; k++) {
+      cqDUnitTest.clearCQListenerEvents(client1, "testMatchingCQsWithMultipleServers_" + k);
+      cqDUnitTest.clearCQListenerEvents(client2, "testMatchingCQsWithMultipleServers_" + k);
+    }
+    
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], 10);
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], 10);
+
+ 
+    // Wait for cq1. on region[0]
+    for (int i=1; i <= resultsCnt[0]; i++) {
+      cqDUnitTest.waitForUpdated(client1, 
+          "testMatchingCQsWithMultipleServers_0", CqQueryDUnitTest.KEY+i);
+      cqDUnitTest.waitForUpdated(client2, 
+          "testMatchingCQsWithMultipleServers_0", CqQueryDUnitTest.KEY+i);
+    }  
+
+    // Wait for cq3. on region[1]
+    {
+      cqDUnitTest.waitForUpdated(client1, 
+          "testMatchingCQsWithMultipleServers_2", CqQueryDUnitTest.KEY+"4");
+      cqDUnitTest.waitForUpdated(client2, 
+          "testMatchingCQsWithMultipleServers_2", CqQueryDUnitTest.KEY+"4");
+    }
+
+        
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.validateCQ(client1, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryDUnitTest.noTest, resultsCnt[i], resultsCnt[i] * 2, CqQueryDUnitTest.noTest);
+
+      cqDUnitTest.validateCQ(client2, "testMatchingCQsWithMultipleServers_" + i, CqQueryDUnitTest.noTest, 
+        resultsCnt[i], resultsCnt[i] * 2, CqQueryDUnitTest.noTest);
+    }    
+    
+
+    // Close.
+    cqDUnitTest.closeClient(client1);
+
+    validateMatchingCqs(server2, numCQs, cqDUnitTest.cqs[0], 1 * (clients.length - 1));
+
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeServer(server2);
+  }
+
+  
+  /**
+   * Test for CQ Fail over.
+   * @throws Exception
+   */
+  public void testMatchingCQsOnDataNodeWithMultipleServers() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    
+    cqDUnitTest.createServerOnly(server1, 0);
+    cqDUnitTest.createServerOnly(server2, 0);
+    cqDUnitTest.createPartitionRegion(server1, cqDUnitTest.regions);
+    cqDUnitTest.createPartitionRegion(server2, cqDUnitTest.regions);
+    
+    VM clients[] = new VM[]{client1, client2};
+    
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
+    
+    cqDUnitTest.createLocalRegion(client1, new int[] {port1, ports[0]}, host0, "-1", cqDUnitTest.regions);
+    cqDUnitTest.createLocalRegion(client2, new int[] {port1, ports[0]}, host0, "-1", cqDUnitTest.regions);
+    
+    int numCQs = cqDUnitTest.prCqs.length;
+    
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.createCQ(client1, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.prCqs[i]);
+      cqDUnitTest.executeCQ(client1, "testMatchingCQsWithMultipleServers_" + i, 
+          false, null);
+      
+      cqDUnitTest.createCQ(client2, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.prCqs[i]);
+      cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, 
+          false, null);
+    }
+    
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[0], 1 * clients.length);
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.prCqs[1], 1 * clients.length);
+    
+    validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[0], 1 * clients.length);
+    validateMatchingCqs(server2, numCQs, cqDUnitTest.prCqs[1], 1 * clients.length);
+
+    // Close.
+    cqDUnitTest.closeClient(client1);
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeServer(server1);
+    cqDUnitTest.closeServer(server2);
+  }
+
+  /**
+   * Performance test for Matching CQ optimization changes.
+   * @throws Exception
+   */
+  public void perf_testPerformanceForMatchingCQs() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    
+    cqDUnitTest.createServer(server1);
+    cqDUnitTest.createServer(server2);
+    
+//    VM clients[] = new VM[]{client1, client2};
+    
+    final int port1 = server1.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final int port2 = server2.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+
+    // Create client.
+//    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
+    
+    // Client1 connects to server1.
+    cqDUnitTest.createClient(client1, new int[] {port1}, host0, "-1");
+    
+    // Client2 connects to server2.
+    cqDUnitTest.createClient(client2, new int[] {port2}, host0, "-1");
+    
+    // Client1 registers matching CQs on server1.
+    boolean uniqueQueries = false;
+    String[] matchingCqs = this.generateCqQueries(uniqueQueries); 
+    for (int i=0; i < matchingCqs.length; i++) {
+      cqDUnitTest.createCQ(client1, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
+      cqDUnitTest.executeCQ(client1, "testPerformanceForMatchingCQs_" + i, 
+          false, null);
+    }
+
+    // Client2 registers non-matching CQs on server2.
+    uniqueQueries = true;
+    matchingCqs = this.generateCqQueries(uniqueQueries); 
+    for (int i=0; i < matchingCqs.length; i++) {
+      cqDUnitTest.createCQ(client2, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
+      cqDUnitTest.executeCQ(client2, "testPerformanceForMatchingCQs_" + i, 
+          false, null);
+    }
+
+    pause(1 * 1000);
+    
+    // CREATE.
+    int size = 1000;
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size);
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], size);
+   
+    // Update couple of times;
+    for (int j=0; j < 5; j++){
+      cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size-1);
+      cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], size-1);      
+    }
+
+    for (int j=0; j < 4; j++){
+      cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size-1);
+      cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size-1);      
+    }
+
+    // Update the last key.
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size);
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size);      
+    
+    for (int k=1; k <= size; k++){
+      cqDUnitTest.waitForUpdated(client1, 
+          "testPerformanceForMatchingCQs_0", CqQueryDUnitTest.KEY+k);
+    }
+ 
+    pause(1 * 1000);
+    printCqQueryExecutionTime(server1);
+    printCqQueryExecutionTime(server2);
+    
+    // Close.
+    cqDUnitTest.closeClient(client1);
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeServer(server2);
+    cqDUnitTest.closeServer(server1);
+
+  }
+
+  public void validateMatchingCqs(VM server, final int mapSize, final String query, final int numCqSize){
+    server.invoke(new CacheSerializableRunnable("validateMatchingCqs"){
+      public void run2()throws CacheException {
+        CqServiceImpl cqService = null;
+        try {
+          cqService = (CqServiceImpl) ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        HashMap matchedCqMap = cqService.getMatchingCqMap();
+        assertEquals("The number of matched cq is not as expected.", mapSize, matchedCqMap.size());
+
+        if (query != null) {        
+          if (!matchedCqMap.containsKey(query))
+          {
+            fail("Query not found in the matched cq map. Query:" + query);
+          }
+          Collection cqs = (Collection)matchedCqMap.get(query);
+          assertEquals("Number of matched cqs are not equal to the expected matched cqs", numCqSize, cqs.size());
+        }
+      }
+    });
+  }
+
+  public void printCqQueryExecutionTime(VM server){
+    server.invoke(new CacheSerializableRunnable("printCqQueryExecutionTime"){
+      public void run2()throws CacheException {
+        CqServiceImpl cqService = null;
+        try {
+          cqService = (CqServiceImpl) ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        long timeTaken = cqService.getCqServiceVsdStats().getCqQueryExecutionTime();
+        getLogWriter().info("Total Time taken to Execute CQ Query :" + timeTaken);
+        //System.out.println("Total Time taken to Execute CQ Query :" + timeTaken);
+      }
+    });
+  }
+
+  public String[] generateCqQueries(boolean uniqueQueries){
+    ArrayList initQueries = new ArrayList();
+    // From Portfolio object.
+    String[] names={"aaa","bbb","ccc","ddd"};
+    int nameIndex = 0;
+    
+    // Construct few unique Queries.
+    for(int i=0; i < 3; i++){
+      for(int cnt=0; cnt < 5; cnt++){
+        String query = cqDUnitTest.cqs[i];
+        if (cnt > 0){
+          nameIndex = (cnt % names.length);
+          query += " or p.names[" + nameIndex + "] = '" + names[nameIndex] + cnt + "'"; 
+        }
+        initQueries.add(query);
+      }
+    }
+    
+    int numMatchedQueries = 10;
+    ArrayList cqQueries = new ArrayList();
+    Iterator iter = initQueries.iterator();
+    while (iter.hasNext()) {
+      String query = (String)iter.next();
+      for (int cnt=0; cnt < numMatchedQueries; cnt++){
+         if (uniqueQueries){
+           // Append blank string, so that query string is different but the 
+           // Query constraint remains same.
+           query+= " ";
+         }
+         cqQueries.add(query);
+      }
+    }
+    String[] queries = new String[cqQueries.size()];
+    cqQueries.toArray(queries);
+    return queries; 
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfUsingPoolDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfUsingPoolDUnitTest.java
new file mode 100644
index 0000000..883e6ee
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfUsingPoolDUnitTest.java
@@ -0,0 +1,996 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.ServerCQImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+/**
+ * This class tests the ContiunousQuery mechanism in GemFire.
+ * This includes the test with diffetent data activities.
+ *
+ * @author anil
+ */
+public class CqPerfUsingPoolDUnitTest extends CacheTestCase {
+
+  protected CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest("CqPerfUsingPoolDUnitTest");
+  
+  public CqPerfUsingPoolDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    // avoid IllegalStateException from HandShake by connecting all vms tor
+    // system before creating connection pools
+    getSystem();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        getSystem();
+      }
+    });
+    
+  }
+    
+
+  /**
+   * Tests the cq performance.
+   * @throws Exception
+   */
+  public void perf_testCQPerf() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    
+    cqDUnitTest.createServer(server);
+    
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class,
+    "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+    final String cqName = "testCQPerf_0";    
+    
+    client.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Create CQ. ###" + cqName);
+        // Get CQ Service.
+        QueryService cqService = null;
+        try {
+          cqService = getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+        // Create CQ Attributes.
+        CqAttributesFactory cqf = new CqAttributesFactory();
+        CqListener[] cqListeners = {new CqTimeTestListener(getLogWriter())};
+        ((CqTimeTestListener)cqListeners[0]).cqName = cqName;
+        
+        cqf.initCqListeners(cqListeners);
+        CqAttributes cqa = cqf.create();
+        
+        // Create and Execute CQ.
+        try {
+          CqQuery cq1 = cqService.newCq(cqName, cqDUnitTest.cqs[0], cqa);
+          assertTrue("newCq() state mismatch", cq1.getState().isStopped());
+          cq1.execute();
+        } catch (Exception ex){
+          getLogWriter().info("CqService is :" + cqService);
+          ex.printStackTrace();
+          AssertionError err = new AssertionError("Failed to create CQ " + cqName + " . ");
+          err.initCause(ex);
+          throw err;
+        }
+      }
+    });   
+    
+    final int size = 50;
+    
+    // Create values.
+    cqDUnitTest.createValuesWithTime(client, cqDUnitTest.regions[0], size);
+    pause(5000);
+    
+    // Update values
+    cqDUnitTest.createValuesWithTime(client, cqDUnitTest.regions[0], size);
+    
+    client.invoke(new CacheSerializableRunnable("Validate CQs") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Validating CQ. ### " + cqName);
+        // Get CQ Service.
+        QueryService cqService = null;
+        try {          
+          cqService = getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCqService.");
+        }
+        
+        CqQuery cQuery = cqService.getCq(cqName);
+        if (cQuery == null) {
+          fail("Failed to get CqQuery for CQ : " + cqName);
+        }
+        
+//        CqAttributes cqAttr = cQuery.getCqAttributes();
+//        CqListener cqListeners[] = cqAttr.getCqListeners();
+//        CqTimeTestListener listener = (CqTimeTestListener) cqListeners[0];
+        
+        // Wait for all the create to arrive.
+        //for (int i=1; i <= size; i++) {
+        //  listener.waitForCreated(cqDUnitTest.KEY+i);
+        //}
+        
+        // Wait for all the update to arrive.
+        //for (int i=1; i <= size; i++) {
+        //  listener.waitForUpdated(cqDUnitTest.KEY+i);
+        //}
+        //getLogWriter().info("### Time taken for Creation of " + size + " events is :" + listener.getTotalQueryCreateTime());
+        
+        //getLogWriter().info("### Time taken for Update of " + size + " events is :" + listener.getTotalQueryUpdateTime());
+      }
+    });
+    
+    pause( 10 * 60 * 1000);
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+    
+  }
+
+  /**
+   * Test for maintaining keys for update optimization.
+   * @throws Exception
+   */
+  public void testKeyMaintenance() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    // cqDUnitTest.createClient(client, port, host0);
+
+    String poolName = "testKeyMaintainance";
+    cqDUnitTest.createPool(client, poolName, host0, port);
+
+    // HashSet for caching purpose will be created for cqs.
+    final int cqSize = 2;
+    
+    // Cq1
+    cqDUnitTest.createCQ(client, poolName, "testKeyMaintainance_0", cqDUnitTest.cqs[0]);
+    cqDUnitTest.executeCQ(client, "testKeyMaintainance_0", false, null);      
+
+    // Cq2
+    cqDUnitTest.createCQ(client, poolName, "testKeyMaintainance_1", cqDUnitTest.cqs[10]);
+    cqDUnitTest.executeCQ(client, "testKeyMaintainance_1", false, null);      
+
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 1);
+    cqDUnitTest.waitForCreated(client, "testKeyMaintainance_0", CqQueryUsingPoolDUnitTest.KEY+1);
+
+    // Entry is made into the CQs cache hashset.
+    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 0
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeys1"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                1, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.",
+                0, cqQuery.getCqResultKeysSize());
+          }
+        }
+      }
+    });
+
+    // Update 1.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 10);
+    cqDUnitTest.waitForCreated(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY+10);
+    
+    // Entry/check is made into the CQs cache hashset.
+    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 1
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate1"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                10, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.",
+                5, cqQuery.getCqResultKeysSize());
+          }
+        }        
+      }
+    });
+
+    // Update.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);
+    cqDUnitTest.waitForCreated(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY+12);
+
+    // Entry/check is made into the CQs cache hashset.
+    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 1
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                12, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.",
+                6, cqQuery.getCqResultKeysSize());
+          }
+        }        
+        
+      }
+    });
+
+    // Delete.
+    cqDUnitTest.deleteValues(server, cqDUnitTest.regions[0], 6);
+    cqDUnitTest.waitForDestroyed(client, "testKeyMaintainance_0", CqQueryDUnitTest.KEY+6);
+
+    // Entry/check is made into the CQs cache hashset.
+    // testKeyMaintainance_0 with 1 entry and testKeyMaintainance_1 with 1
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                6, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.",
+                3, cqQuery.getCqResultKeysSize());
+          }
+        }        
+      }
+    });
+
+    // This should still needs to process the events so that Results are uptodate.
+    cqDUnitTest.stopCQ(client, "testKeyMaintainance_1");      
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 12);
+
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                12, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_1 is wrong.", 
+                6, cqQuery.getCqResultKeysSize());
+          }
+        }        
+        
+      }
+    });
+
+    // This should re-start the caching for this CQ.
+    cqDUnitTest.executeCQ(client, "testKeyMaintainance_1", false, null);      
+
+    // This will remove the caching for this CQ.
+    cqDUnitTest.closeCQ(client, "testKeyMaintainance_1");      
+    server.invoke(new CacheSerializableRunnable("LookForCachedEventKeysAfterUpdate2"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          String serverCqName = (String)cqQuery.getServerCqName();
+          if (serverCqName.startsWith("testKeyMaintainance_0")){
+            assertEquals("The number of keys cached for cq testKeyMaintainance_0 is wrong.", 
+                12, cqQuery.getCqResultKeysSize());
+          } else if(serverCqName.startsWith("testKeyMaintainance_1")){
+            fail("The key maintainance should not be present for this CQ.");
+          }
+        }
+        
+      }
+    });
+
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+  }
+
+  /**
+   * Test for common CQs.
+   * To test the changes relating to, executing CQ only once for all similar CQs.
+   * @throws Exception
+   */
+  public void testMatchingCqs() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    //cqDUnitTest.createClient(client, port, host0);
+    
+    String poolName = "testMatchingCqs";
+    cqDUnitTest.createPool(client, poolName, host0, port);
+
+    // Create and Execute same kind of CQs.
+    for (int i =0; i < 4; i++) {
+      cqDUnitTest.createCQ(client, poolName, "testMatchingCqs_"+i, cqDUnitTest.cqs[0]);
+      cqDUnitTest.executeCQ(client, "testMatchingCqs_"+i, false, null);      
+    }
+
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 4);
+    
+    int size = 1; 
+    
+    // Create.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    cqDUnitTest.waitForCreated(client, "testMatchingCqs_0", CqQueryUsingPoolDUnitTest.KEY+size);
+    cqDUnitTest.waitForCreated(client, "testMatchingCqs_3", CqQueryUsingPoolDUnitTest.KEY+size);
+
+    // Close one of the CQ.
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_0");      
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 3);
+    
+    // Update.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryUsingPoolDUnitTest.KEY+size);
+    
+    // Stop one of the CQ.
+    cqDUnitTest.stopCQ(client, "testMatchingCqs_1");      
+    
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], 2);
+    
+    // Update - 2.
+    cqDUnitTest.clearCQListenerEvents(client, "testMatchingCqs_3");
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryUsingPoolDUnitTest.KEY+size);
+
+    // stopped CQ should not receive 2nd/previous updates.
+    cqDUnitTest.validateCQ(client, "testMatchingCqs_1",
+        /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: once */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size * 2);
+
+    // Execute the stopped CQ.
+    cqDUnitTest.executeCQ(client, "testMatchingCqs_1", false, null);      
+
+    
+    // Update - 3.
+    cqDUnitTest.clearCQListenerEvents(client, "testMatchingCqs_3");
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    cqDUnitTest.waitForUpdated(client, "testMatchingCqs_3", CqQueryUsingPoolDUnitTest.KEY+size);
+    
+    cqDUnitTest.validateCQ(client, "testMatchingCqs_1",
+        /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: 2 */ size * 2,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size * 2,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size * 3);
+
+    // Create different kind of CQs.
+    cqDUnitTest.createCQ(client, poolName, "testMatchingCqs_4", cqDUnitTest.cqs[1]);
+    cqDUnitTest.executeCQ(client, "testMatchingCqs_4", false, null);      
+
+    cqDUnitTest.createCQ(client, poolName, "testMatchingCqs_5", cqDUnitTest.cqs[1]);
+    cqDUnitTest.executeCQ(client, "testMatchingCqs_5", false, null);      
+    
+    cqDUnitTest.createCQ(client, poolName, "testMatchingCqs_6", cqDUnitTest.cqs[2]);
+    cqDUnitTest.executeCQ(client, "testMatchingCqs_6", false, null);      
+
+    validateMatchingCqs(server, 3, cqDUnitTest.cqs[1], 2);
+    
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_6");
+    validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2);
+
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_5");
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_4");
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_3");
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_2");
+    cqDUnitTest.closeCQ(client, "testMatchingCqs_1");
+
+    validateMatchingCqs(server, 0, null, 0);
+
+    // update 4
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+  }
+  
+  
+  /**
+   * Test for common CQs.
+   * To test the changes relating to, executing CQ only once for all similar CQs.
+   * @throws Exception
+   */
+  public void testMatchingCQWithMultipleClients() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client1 = host.getVM(1);
+    VM client2 = host.getVM(2);
+    VM client3 = host.getVM(3);
+    
+    VM clients[] = new VM[]{client1, client2, client3};
+    
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    String poolName = "testMatchingCQWithMultipleClients";     
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      String cPoolName = "testMatchingCQWithMultipleClients" + clientIndex;
+      cqDUnitTest.createPool(clients[clientIndex], cPoolName, host0, port);
+
+      //cqDUnitTest.createClient(clients[clientIndex], port, host0);
+      // Create and Execute same kind of CQs.
+      for (int i =0; i < 4; i++) {
+        cqDUnitTest.createCQ(clients[clientIndex], cPoolName, "testMatchingCQWithMultipleClients_"+i, cqDUnitTest.cqs[0]);
+        cqDUnitTest.executeCQ(clients[clientIndex], 
+            "testMatchingCQWithMultipleClients_"+i, false, null);      
+      }
+    }
+    
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], clients.length * 4);
+    
+    int size = 1; 
+    
+    // Create.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.waitForCreated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_0", CqQueryUsingPoolDUnitTest.KEY+size);
+      cqDUnitTest.waitForCreated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_3", CqQueryUsingPoolDUnitTest.KEY+size);
+    }
+
+    // Close one of the CQ.
+    cqDUnitTest.closeCQ(client1, "testMatchingCQWithMultipleClients_0");      
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 1);
+    
+    // Update.
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.waitForUpdated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_3", CqQueryUsingPoolDUnitTest.KEY+size);
+    }
+
+    // Stop one of the CQ.
+    cqDUnitTest.stopCQ(client2, "testMatchingCQWithMultipleClients_1");      
+    
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 2);
+    
+    // Update - 2.
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.clearCQListenerEvents(clients[clientIndex], "testMatchingCQWithMultipleClients_3");
+    }
+
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.waitForUpdated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_3", CqQueryUsingPoolDUnitTest.KEY+size);
+    }
+    
+    // stopped CQ should not receive 2nd/previous updates.
+    cqDUnitTest.validateCQ(client2, "testMatchingCQWithMultipleClients_1",
+        /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: once */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size * 2);
+
+    // Execute the stopped CQ.
+    cqDUnitTest.executeCQ(client2, "testMatchingCQWithMultipleClients_1", false, null);      
+
+    
+    // Update - 3.
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.clearCQListenerEvents(clients[clientIndex], "testMatchingCQWithMultipleClients_3");
+    }
+
+    validateMatchingCqs(server, 1, cqDUnitTest.cqs[0], (clients.length * 4) - 1);
+    
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.waitForUpdated(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_3", CqQueryUsingPoolDUnitTest.KEY+size);
+    }
+
+    cqDUnitTest.validateCQ(client2, "testMatchingCQWithMultipleClients_1",
+        /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: 2 */ size * 2,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size * 2,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size * 3);
+
+    // Create different kind of CQs.
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      String cPoolName = poolName + clientIndex;
+      cqDUnitTest.createCQ(clients[clientIndex], cPoolName, "testMatchingCQWithMultipleClients_4", cqDUnitTest.cqs[1]);
+      cqDUnitTest.executeCQ(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_4", false, null);      
+
+      cqDUnitTest.createCQ(clients[clientIndex], cPoolName, "testMatchingCQWithMultipleClients_5", cqDUnitTest.cqs[1]);
+      cqDUnitTest.executeCQ(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_5", false, null);      
+
+      cqDUnitTest.createCQ(clients[clientIndex], cPoolName, "testMatchingCQWithMultipleClients_6", cqDUnitTest.cqs[2]);
+      cqDUnitTest.executeCQ(clients[clientIndex], 
+          "testMatchingCQWithMultipleClients_6", false, null);      
+    }
+    
+    validateMatchingCqs(server, 3, cqDUnitTest.cqs[1], 2 * clients.length);
+    
+    for (int clientIndex=0; clientIndex < 3; clientIndex++){
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_6");
+    }
+
+    validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2 * clients.length);
+
+
+    // update 4
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+
+    // Close.
+    cqDUnitTest.closeClient(client3);
+    validateMatchingCqs(server, 2, cqDUnitTest.cqs[1], 2 * (clients.length - 1));
+    
+    for (int clientIndex=0; clientIndex < 2; clientIndex++){      
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_5");
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_4");
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_3");
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_2");
+      cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_1");
+      if (clientIndex != 0) {
+        cqDUnitTest.closeCQ(clients[clientIndex], "testMatchingCQWithMultipleClients_0");
+      }
+    }
+
+    validateMatchingCqs(server, 0, null, 0);
+    
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeClient(client1);
+    
+    cqDUnitTest.closeServer(server);
+  }
+
+  /**
+   * Test for CQ Fail over.
+   * @throws Exception
+   */
+  public void testMatchingCQsWithMultipleServers() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    
+    cqDUnitTest.createServer(server1);
+    
+    VM clients[] = new VM[]{client1, client2};
+    
+    final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    // Create client.
+    
+    // Create client with redundancyLevel -1
+    
+    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
+    String poolName1 = "testClientWithFeederAndCQ1";
+    String poolName2 = "testClientWithFeederAndCQ2";
+    
+    cqDUnitTest.createPool(client1, poolName1, new String[] {host0, host0}, new int[] {port1, ports[0]});
+    cqDUnitTest.createPool(client2, poolName2, new String[] {host0, host0}, new int[] {port1, ports[0]});
+
+    //cqDUnitTest.createClient(client1, new int[] {port1, ports[0]}, host0, "-1");
+    //cqDUnitTest.createClient(client2, new int[] {port1, ports[0]}, host0, "-1");
+    
+    int numCQs = 3;
+    
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.createCQ(client1, poolName1, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
+      cqDUnitTest.executeCQ(client1, "testMatchingCQsWithMultipleServers_" + i, 
+          false, null);
+      
+      cqDUnitTest.createCQ(client2, poolName2, "testMatchingCQsWithMultipleServers_" + i, cqDUnitTest.cqs[i]);
+      cqDUnitTest.executeCQ(client2, "testMatchingCQsWithMultipleServers_" + i, 
+          false, null);
+    }
+    
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[0], 1 * clients.length);
+    validateMatchingCqs(server1, numCQs, cqDUnitTest.cqs[1], 1 * clients.length);
+    
+    pause(1 * 1000);
+    
+    // CREATE.
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], 10);
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], 10);
+
+    for (int i=1; i <= 10; i++) {
+      cqDUnitTest.waitForCreated(client1, 
+          "testMatchingCQsWithMultipleServers_0", CqQueryUsingPoolDUnitTest.KEY+i);
+    }
+
+    pause(1 * 1000);
+    
+    cqDUnitTest.createServer(server2, ports[0]);
+    
+    
+    final int port2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
+    System.out.println("### Port on which server1 running : " + port1 + 
+        " Server2 running : " + port2);
+
+    pause(3 * 1000);
+    
+
+    // UPDATE - 1.
+    for (int k=0; k < numCQs; k++) {
+      cqDUnitTest.clearCQListenerEvents(client1, "testMatchingCQsWithMultipleServers_" + k);
+      cqDUnitTest.clearCQListenerEvents(client2, "testMatchingCQsWithMultipleServers_" + k);
+    }
+    
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], 10);    
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], 10);
+    
+    // Wait for updates on regions[0]
+    for (int i=1; i <= 10; i++) {
+      cqDUnitTest.waitForUpdated(client1, 
+          "testMatchingCQsWithMultipleServers_0", CqQueryDUnitTest.KEY+i);
+      cqDUnitTest.waitForUpdated(client2, 
+          "testMatchingCQsWithMultipleServers_0", CqQueryDUnitTest.KEY+i);
+    }
+
+    // Wait for updates on regions[1] - Waiting for last key is good enough.
+    cqDUnitTest.waitForUpdated(client1, 
+        "testMatchingCQsWithMultipleServers_2", CqQueryDUnitTest.KEY+4);
+    cqDUnitTest.waitForUpdated(client2, 
+        "testMatchingCQsWithMultipleServers_2", CqQueryDUnitTest.KEY+4);
+        
+    int[] resultsCnt = new int[] {10, 1, 2};
+    
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.validateCQ(client1, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryUsingPoolDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryUsingPoolDUnitTest.noTest);
+
+      cqDUnitTest.validateCQ(client2, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryUsingPoolDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryUsingPoolDUnitTest.noTest);
+    }    
+    
+    // Close server1.
+    cqDUnitTest.closeServer(server1);
+    
+    // Fail over should happen.
+    pause(5 * 1000);
+    
+    validateMatchingCqs(server2, numCQs, cqDUnitTest.cqs[0], 1 * clients.length);
+
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.validateCQ(client1, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryUsingPoolDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryUsingPoolDUnitTest.noTest);
+
+      cqDUnitTest.validateCQ(client2, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryUsingPoolDUnitTest.noTest, resultsCnt[i], resultsCnt[i], CqQueryUsingPoolDUnitTest.noTest);
+    }    
+    
+    // UPDATE - 2
+    for (int k=0; k < numCQs; k++) {
+      cqDUnitTest.clearCQListenerEvents(client1, "testMatchingCQsWithMultipleServers_" + k);
+      cqDUnitTest.clearCQListenerEvents(client2, "testMatchingCQsWithMultipleServers_" + k);
+    }
+    
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], 10);
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], 10);
+
+ 
+    // Wait for cq1. on region[0]
+    for (int i=1; i <= resultsCnt[0]; i++) {
+      cqDUnitTest.waitForUpdated(client1, 
+          "testMatchingCQsWithMultipleServers_0", CqQueryUsingPoolDUnitTest.KEY+i);
+      cqDUnitTest.waitForUpdated(client2, 
+          "testMatchingCQsWithMultipleServers_0", CqQueryUsingPoolDUnitTest.KEY+i);
+    }  
+
+    // Wait for cq3. on region[1]
+    {
+      cqDUnitTest.waitForUpdated(client1, 
+          "testMatchingCQsWithMultipleServers_2", CqQueryUsingPoolDUnitTest.KEY+"4");
+      cqDUnitTest.waitForUpdated(client2, 
+          "testMatchingCQsWithMultipleServers_2", CqQueryUsingPoolDUnitTest.KEY+"4");
+    }
+
+        
+    for (int i=0; i < numCQs; i++) {
+      cqDUnitTest.validateCQ(client1, "testMatchingCQsWithMultipleServers_" + i, 
+        CqQueryUsingPoolDUnitTest.noTest, resultsCnt[i], resultsCnt[i] * 2, CqQueryUsingPoolDUnitTest.noTest);
+
+      cqDUnitTest.validateCQ(client2, "testMatchingCQsWithMultipleServers_" + i, CqQueryUsingPoolDUnitTest.noTest, 
+        resultsCnt[i], resultsCnt[i] * 2, CqQueryUsingPoolDUnitTest.noTest);
+    }    
+    
+
+    // Close.
+    cqDUnitTest.closeClient(client1);
+
+    validateMatchingCqs(server2, numCQs, cqDUnitTest.cqs[0], 1 * (clients.length - 1));
+
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeServer(server2);
+  }
+
+  /**
+   * Performance test for Matching CQ optimization changes.
+   * @throws Exception
+   */
+  public void perf_testPerformanceForMatchingCQs() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    
+    cqDUnitTest.createServer(server1);
+    cqDUnitTest.createServer(server2);
+    
+//    VM clients[] = new VM[]{client1, client2};
+    
+    final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
+    final int port2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+
+    // Create client.
+//    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
+    
+    // Client1 connects to server1.
+    cqDUnitTest.createClient(client1, new int[] {port1}, host0, "-1", null);
+    
+    // Client2 connects to server2.
+    cqDUnitTest.createClient(client2, new int[] {port2}, host0, "-1", null);
+    
+    // Client1 registers matching CQs on server1.
+    boolean uniqueQueries = false;
+    String[] matchingCqs = this.generateCqQueries(uniqueQueries); 
+    for (int i=0; i < matchingCqs.length; i++) {
+      cqDUnitTest.createCQ(client1, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
+      cqDUnitTest.executeCQ(client1, "testPerformanceForMatchingCQs_" + i, 
+          false, null);
+    }
+
+    // Client2 registers non-matching CQs on server2.
+    uniqueQueries = true;
+    matchingCqs = this.generateCqQueries(uniqueQueries); 
+    for (int i=0; i < matchingCqs.length; i++) {
+      cqDUnitTest.createCQ(client2, "testPerformanceForMatchingCQs_" + i, matchingCqs[i]);
+      cqDUnitTest.executeCQ(client2, "testPerformanceForMatchingCQs_" + i, 
+          false, null);
+    }
+
+    pause(1 * 1000);
+    
+    // CREATE.
+    int size = 1000;
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size);
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], size);
+   
+    // Update couple of times;
+    for (int j=0; j < 5; j++){
+      cqDUnitTest.createValues(server1, cqDUnitTest.regions[0], size-1);
+      cqDUnitTest.createValues(server2, cqDUnitTest.regions[1], size-1);      
+    }
+
+    for (int j=0; j < 4; j++){
+      cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size-1);
+      cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size-1);      
+    }
+
+    // Update the last key.
+    cqDUnitTest.createValues(server2, cqDUnitTest.regions[0], size);
+    cqDUnitTest.createValues(server1, cqDUnitTest.regions[1], size);      
+    
+    for (int k=1; k <= size; k++){
+      cqDUnitTest.waitForUpdated(client1, 
+          "testPerformanceForMatchingCQs_0", CqQueryUsingPoolDUnitTest.KEY+k);
+    }
+ 
+    pause(1 * 1000);
+    printCqQueryExecutionTime(server1);
+    printCqQueryExecutionTime(server2);
+    
+    // Close.
+    cqDUnitTest.closeClient(client1);
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeServer(server2);
+    cqDUnitTest.closeServer(server1);
+
+  }
+
+  public void validateMatchingCqs(VM server, final int mapSize, final String query, final int numCqSize){
+    server.invoke(new CacheSerializableRunnable("validateMatchingCqs"){
+      public void run2()throws CacheException {
+        CqServiceImpl cqService = null;
+        try {
+          cqService = (CqServiceImpl) ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        HashMap matchedCqMap = cqService.getMatchingCqMap();
+        assertEquals("The number of matched cq is not as expected.", mapSize, matchedCqMap.size());
+
+        if (query != null) {        
+          if (!matchedCqMap.containsKey(query))
+          {
+            fail("Query not found in the matched cq map. Query:" + query);
+          }
+          Collection cqs = (Collection)matchedCqMap.get(query);
+          assertEquals("Number of matched cqs are not equal to the expected matched cqs", numCqSize, cqs.size());
+        }
+      }
+    });
+  }
+
+  public void printCqQueryExecutionTime(VM server){
+    server.invoke(new CacheSerializableRunnable("printCqQueryExecutionTime"){
+      public void run2()throws CacheException {
+        CqServiceImpl cqService = null;
+        try {
+          cqService = (CqServiceImpl) ((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+
+        long timeTaken = cqService.getCqServiceVsdStats().getCqQueryExecutionTime();
+        getLogWriter().info("Total Time taken to Execute CQ Query :" + timeTaken);
+        System.out.println("Total Time taken to Execute CQ Query :" + timeTaken);
+      }
+    });
+  }
+
+  public String[] generateCqQueries(boolean uniqueQueries){
+    ArrayList initQueries = new ArrayList();
+    // From Portfolio object.
+    String[] names={"aaa","bbb","ccc","ddd"};
+    int nameIndex = 0;
+    
+    // Construct few unique Queries.
+    for(int i=0; i < 3; i++){
+      for(int cnt=0; cnt < 5; cnt++){
+        String query = cqDUnitTest.cqs[i];
+        if (cnt > 0){
+          nameIndex = (cnt % names.length);
+          query += " or p.names[" + nameIndex + "] = '" + names[nameIndex] + cnt + "'"; 
+        }
+        initQueries.add(query);
+      }
+    }
+    
+    int numMatchedQueries = 10;
+    ArrayList cqQueries = new ArrayList();
+    Iterator iter = initQueries.iterator();
+    while (iter.hasNext()) {
+      String query = (String)iter.next();
+      for (int cnt=0; cnt < numMatchedQueries; cnt++){
+         if (uniqueQueries){
+           // Append blank string, so that query string is different but the 
+           // Query constraint remains same.
+           query+= " ";
+         }
+         cqQueries.add(query);
+      }
+    }
+    String[] queries = new String[cqQueries.size()];
+    cqQueries.toArray(queries);
+    return queries; 
+  }
+}


Mime
View raw message