geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [40/70] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Thu, 28 Jan 2016 18:13:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
new file mode 100644
index 0000000..47a3fe7
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsUsingPoolDUnitTest.java
@@ -0,0 +1,444 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import java.util.Collection;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.CqServiceStatistics;
+import com.gemstone.gemfire.cache.query.CqStatistics;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.internal.CqQueryVsdStats;
+import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqQueryImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceVsdStats;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+/**
+ * This class tests the ContiunousQuery mechanism in GemFire.
+ * This includes the test with different data activities.
+ *
+ * @author Rao
+ */
+public class CqStatsUsingPoolDUnitTest extends CacheTestCase {
+
+  private CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest("CqStatsUsingPoolDUnitTest");
+  
+  public CqStatsUsingPoolDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    // avoid IllegalStateException from HandShake by connecting all vms to
+    // system before creating pool
+    getSystem();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        getSystem();
+      }
+    });
+    
+  }
+  
+  private void validateCQStats(VM vm, final String cqName,
+      final int creates,
+      final int updates,
+      final int deletes,
+      final int totalEvents,
+      final int cqListenerInvocations) {
+    vm.invoke(new CacheSerializableRunnable("Validate CQs") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Validating CQ Stats. ### " + cqName);
+//      Get CQ Service.
+        QueryService qService = null;
+        try {          
+          qService = getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to get query service.");
+        }
+        
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)qService).getCqService();
+        }
+        catch (CqException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+          fail("Failed to get CqService, CQ : " + cqName);
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        if (cqs.size() == 0) {
+          fail("Failed to get CqQuery for CQ : " + cqName);
+        }
+        CqQueryImpl cQuery = (CqQueryImpl) cqs.iterator().next();
+        
+        CqStatistics cqStats = cQuery.getStatistics();
+        CqQueryVsdStats cqVsdStats = ((CqQueryImpl)cQuery).getVsdStats();
+        if (cqStats == null || cqVsdStats == null) {
+          fail("Failed to get CqQuery Stats for CQ : " + cqName);
+        }
+        
+        getCache().getLogger().info("#### CQ stats for " + cQuery.getName() + ": " + 
+            " Events Total: " + cqStats.numEvents() +
+            " Events Created: " + cqStats.numInserts() +
+            " Events Updated: " + cqStats.numUpdates() +
+            " Events Deleted: " + cqStats.numDeletes() +
+            " CQ Listener invocations: " + cqVsdStats.getNumCqListenerInvocations() +
+            " Initial results time (nano sec): " + cqVsdStats.getCqInitialResultsTime());
+        
+        
+//      Check for totalEvents count.
+        if (totalEvents != CqQueryUsingPoolDUnitTest.noTest) {
+//        Result size validation.
+          assertEquals("Total Event Count mismatch", totalEvents, cqStats.numEvents());
+        }
+        
+//      Check for create count.
+        if (creates != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Create Event mismatch", creates, cqStats.numInserts());
+        }
+        
+//      Check for update count.
+        if (updates != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Update Event mismatch", updates, cqStats.numUpdates());
+        }
+        
+//      Check for delete count.
+        if (deletes != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Delete Event mismatch", deletes, cqStats.numDeletes());
+        }
+        
+//      Check for CQ listener invocations.
+        if (cqListenerInvocations != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("CQ Listener invocations mismatch", cqListenerInvocations, cqVsdStats.getNumCqListenerInvocations());
+        }
+////      Check for initial results time.
+//        if (initialResultsTime != CqQueryUsingPoolDUnitTest.noTest && cqVsdStats.getCqInitialResultsTime()
<= 0) {
+//          assertEquals("Initial results time mismatch", initialResultsTime, cqVsdStats.getCqInitialResultsTime());
+//        }
+      }
+    });
+  }
+  
+  private void validateCQServiceStats(VM vm,
+      final int created,
+      final int activated,
+      final int stopped,
+      final int closed,
+      final int cqsOnClient,
+      final int cqsOnRegion,
+      final int clientsWithCqs) {
+    vm.invoke(new CacheSerializableRunnable("Validate CQ Service Stats") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Validating CQ Service Stats. ### ");
+//      Get CQ Service.
+        QueryService qService = null;
+        try {          
+          qService = getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+        CqServiceStatistics cqServiceStats = null;        
+        cqServiceStats = qService.getCqStatistics();
+        CqServiceVsdStats cqServiceVsdStats = null;
+        try {
+          cqServiceVsdStats = ((CqServiceImpl) ((DefaultQueryService)qService).getCqService()).stats;
+        }
+        catch (CqException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+        if (cqServiceStats == null) {
+          fail("Failed to get CQ Service Stats");
+        }
+        
+        getCache().getLogger().info("#### CQ Service stats: " + 
+            " CQs created: " + cqServiceStats.numCqsCreated() +
+            " CQs active: " + cqServiceStats.numCqsActive() +
+            " CQs stopped: " + cqServiceStats.numCqsStopped() +
+            " CQs closed: " + cqServiceStats.numCqsClosed() +
+            " CQs on Client: " + cqServiceStats.numCqsOnClient() +
+            " CQs on region /root/regionA : " + cqServiceVsdStats.numCqsOnRegion("/root/regionA")
+
+            " Clients with CQs: " + cqServiceVsdStats.getNumClientsWithCqs());
+        
+        
+        //        Check for created count.
+        if (created != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Number of CQs created mismatch", created, cqServiceStats.numCqsCreated());
+        }
+        
+        //        Check for activated count.
+        if (activated != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Number of CQs activated mismatch", activated, cqServiceStats.numCqsActive());
+        }
+        
+        //        Check for stopped count.
+        if (stopped != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Number of CQs stopped mismatch", stopped, cqServiceStats.numCqsStopped());
+        }
+        
+        //        Check for closed count.
+        if (closed != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Number of CQs closed mismatch", closed, cqServiceStats.numCqsClosed());
+        }
+        
+        // Check for CQs on client count.
+        if (cqsOnClient != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Number of CQs on client mismatch", cqsOnClient, cqServiceStats.numCqsOnClient());
+        }
+        
+        // Check for CQs on region.
+        if (cqsOnRegion != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Number of CQs on region /root/regionA mismatch", 
+              cqsOnRegion, cqServiceVsdStats.numCqsOnRegion("/root/regionA"));
+        }
+        
+        // Check for clients with CQs count.
+        if (clientsWithCqs != CqQueryUsingPoolDUnitTest.noTest) {
+          assertEquals("Clints with CQs mismatch", 
+              clientsWithCqs, cqServiceVsdStats.getNumClientsWithCqs());
+        }
+      }
+    });
+  }
+  
+  private final static int PAUSE = 8 * 1000; // 5 * 1000
+  
+  /**
+   * Test for CQ and CQ Service Statistics
+   * @throws Exception
+   */
+  public void testCQStatistics() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    
+    /* Init Server and Client */
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    
+    String poolName = "testCQStatistics";
+    cqDUnitTest.createPool(client, poolName, host0, port);
+
+    //cqDUnitTest.createClient(client, port, host0);
+    
+    /* Create CQs. */
+    cqDUnitTest.createCQ(client, poolName, "testCQStatistics_0", cqDUnitTest.cqs[0]);
+    
+    /* Init values at server. */
+    int size = 100;
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    
+    cqDUnitTest.executeCQ(client, "testCQStatistics_0", true, null);
+    
+    // Wait for CQ to be executed.
+    cqDUnitTest.waitForCqState(client, "testCQStatistics_0", CqStateImpl.RUNNING);    
+
+    // Test CQ stats
+    validateCQStats(client, "testCQStatistics_0", 0, 0, 0, 0, 0);
+    
+    // The stat would have not updated yet.
+    // Commenting out the following check; the check for resultset initialization 
+    // is anyway done in the next validation. 
+    // validateCQStats(server, "testCQStatistics_0", 0, 0, 0, 0, CqQueryUsingPoolDUnitTest.noTest,
1);
+    
+    /* Init values at server. */
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 200);
+    // Wait for client to Synch.
+    cqDUnitTest.waitForCreated(client, "testCQStatistics_0", CqQueryUsingPoolDUnitTest.KEY+200);
+    pause(PAUSE);
+    size = 200;
+    
+    // validate CQs.
+    cqDUnitTest.validateCQ(client, "testCQStatistics_0",
+        /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
+        /* creates: */ 100,
+        /* updates: */ 100,
+        /* deletes; */ 0,
+        /* queryInserts: */ 100,
+        /* queryUpdates: */ 100,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ 200);
+    
+    // Test CQ stats
+    validateCQStats(client, "testCQStatistics_0", 100, 100, 0, 200, 200);
+    //We don't have serverside CQ name
+    validateCQStats(server, "testCQStatistics_0", 100, 100, 0, 200, CqQueryUsingPoolDUnitTest.noTest);
+    
+    /* Delete values at server. */
+    cqDUnitTest.deleteValues(server, cqDUnitTest.regions[0], 100);
+    // Wait for client to Synch.
+    cqDUnitTest.waitForDestroyed(client, "testCQStatistics_0", CqQueryUsingPoolDUnitTest.KEY+100);
+    size = 10;
+    pause(PAUSE);
+    
+    cqDUnitTest.validateCQ(client, "testCQStatistics_0",
+        /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
+        /* creates: */100,
+        /* updates: */ 100,
+        /* deletes; */ 100,
+        /* queryInserts: */ 100,
+        /* queryUpdates: */ 100,
+        /* queryDeletes: */ 100,
+        /* totalEvents: */ 300);
+    
+    // Test CQ stats
+    validateCQStats(client, "testCQStatistics_0", 100, 100, 100, 300, 300);
+    //We don't have serverside CQ name
+    validateCQStats(server, "testCQStatistics_0", 100, 100, 100, 300, CqQueryUsingPoolDUnitTest.noTest);
+    
+    // Test  CQ Close
+    cqDUnitTest.closeCQ(client, "testCQStatistics_0");
+    pause(PAUSE);
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+  }
+  /**
+   * Test for CQ Service Statistics
+   * @throws Exception
+   */
+  public void testCQServiceStatistics() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client1 = host.getVM(1);
+    VM client2 = host.getVM(2);
+    
+    
+    /* Init Server and Client */
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    
+    String poolName1 = "testCQServiceStatistics1";
+    String poolName2 = "testCQServiceStatistics2";
+    
+    cqDUnitTest.createPool(client1, poolName1, host0, port);
+    cqDUnitTest.createPool(client2, poolName2, host0, port);
+
+    //cqDUnitTest.createClient(client1, port, host0);
+    //cqDUnitTest.createClient(client2, port, host0);
+    
+    /* Create CQs. */
+    String cqName = new String("testCQServiceStatistics_0");
+    String cqName10 = new String("testCQServiceStatistics_10");   
+    cqDUnitTest.createCQ(client1, poolName1, cqName, cqDUnitTest.cqs[0]);
+    cqDUnitTest.createCQ(client2, poolName2, cqName10, cqDUnitTest.cqs[2]); 
+    pause(PAUSE);
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on clients: #1");
+    validateCQServiceStats(client1, 1, 0, 1, 0, 1, 1, CqQueryUsingPoolDUnitTest.noTest);
+    validateCQServiceStats(server, 0, 0, 0, 0, CqQueryUsingPoolDUnitTest.noTest, 0, 0);
+    
+    cqDUnitTest.executeCQ(client1, cqName, false, null);
+    cqDUnitTest.executeCQ(client2, cqName10, false, null);
+    pause(PAUSE);
+    
+    getCache().getLogger().info("Validating CQ Service stats on clients: #2");
+    validateCQServiceStats(client1, 1, 1, 0, 0, 1, 1, CqQueryUsingPoolDUnitTest.noTest);
+    validateCQServiceStats(client2, 1, 1, 0, 0, 1, CqQueryUsingPoolDUnitTest.noTest, CqQueryUsingPoolDUnitTest.noTest);
+    
+    getCache().getLogger().info("Validating CQ Service stats on server: #1");
+    validateCQServiceStats(server, 2, 2, 0, 0, CqQueryUsingPoolDUnitTest.noTest, 1, 2);
+    
+    /* Init values at server. */
+    int size = 10;
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    // Wait for client to Synch.
+    cqDUnitTest.waitForCreated(client1, "testCQServiceStatistics_0", CqQueryUsingPoolDUnitTest.KEY+size);
+    
+    // validate CQs.
+    cqDUnitTest.validateCQ(client1, cqName,
+        /* resultSize: */ CqQueryUsingPoolDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    pause(PAUSE);
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on clients: #3");
+    validateCQServiceStats(client1, 1, 1, 0, 0, 1, 1, CqQueryUsingPoolDUnitTest.noTest);
+    validateCQServiceStats(client2, 1, 1, 0, 0, 1, CqQueryUsingPoolDUnitTest.noTest, CqQueryUsingPoolDUnitTest.noTest);
+    
+    getCache().getLogger().info("Validating CQ Service stats on server: #1");
+    validateCQServiceStats(server, 2, 2, 0, 0, CqQueryUsingPoolDUnitTest.noTest, 1, 2);
+    
+    
+    //Create CQs with no name, execute, and close. 
+    cqDUnitTest.createAndExecCQNoName(client1, poolName1, cqDUnitTest.cqs[0]); 
+    pause(PAUSE);      
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on client: #4");
+    validateCQServiceStats(client1, 21, 1, 0, 20, 1, 1, CqQueryUsingPoolDUnitTest.noTest);
+    
+    getCache().getLogger().info("Validating CQ Service stats on server: #2");
+    validateCQServiceStats(server, 22, 2, 0, 20, CqQueryUsingPoolDUnitTest.noTest, 1, 2);
+    
+    // Test  CQ Close
+    cqDUnitTest.closeCQ(client1, cqName);
+    pause(PAUSE);      
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on client: #5");
+    validateCQServiceStats(client1, 21, 0, 0, 21, 0, 0, CqQueryUsingPoolDUnitTest.noTest);
+    
+    getCache().getLogger().info("Validating CQ Service stats on server: #3");
+    validateCQServiceStats(server, 22, 1, 0, 21, CqQueryUsingPoolDUnitTest.noTest, 0, 1);
+    
+    //Test stop CQ
+    cqDUnitTest.stopCQ(client2, cqName10);
+    pause(PAUSE);
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on client: #6");
+    validateCQServiceStats(client2, 1, 0, 1, 0, 1, CqQueryUsingPoolDUnitTest.noTest, CqQueryUsingPoolDUnitTest.noTest);
+    getCache().getLogger().info("Validating CQ Service stats on server: #4");
+    validateCQServiceStats(server, 22, 0, 1, 21, CqQueryUsingPoolDUnitTest.noTest, CqQueryUsingPoolDUnitTest.noTest,
1);
+    
+    // Test  CQ Close
+    cqDUnitTest.closeCQ(client2, cqName10);
+    pause(PAUSE);
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on client: #7");
+    validateCQServiceStats(client1, 21, 0, 0, 21, 0, 0, CqQueryUsingPoolDUnitTest.noTest);
+    validateCQServiceStats(client2, 1, 0, 0, 1, 0, 0, CqQueryUsingPoolDUnitTest.noTest);
+    getCache().getLogger().info("Validating CQ Service stats on server: #5");
+    validateCQServiceStats(server, 22, 0, 0, 22, CqQueryUsingPoolDUnitTest.noTest, 0, 0);
+    
+    // Close.
+    cqDUnitTest.closeClient(client1);
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeServer(server);
+  }
+
+} 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsUsingPoolOptimizedExecuteDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsUsingPoolOptimizedExecuteDUnitTest.java
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsUsingPoolOptimizedExecuteDUnitTest.java
new file mode 100644
index 0000000..1a61233
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsUsingPoolOptimizedExecuteDUnitTest.java
@@ -0,0 +1,43 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+
+import dunit.SerializableRunnable;
+
+/**
+ * Test class for testing {@link CqService#EXECUTE_QUERY_DURING_INIT} flag
+ *
+ */
+public class CqStatsUsingPoolOptimizedExecuteDUnitTest extends CqStatsUsingPoolDUnitTest{
+
+  public CqStatsUsingPoolOptimizedExecuteDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false;
+      }
+    });
+  }
+  
+  @Override
+  public void tearDown2() throws Exception {
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true;
+      }
+    });
+    super.tearDown2();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqTimeTestListener.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqTimeTestListener.java
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqTimeTestListener.java
new file mode 100644
index 0000000..4948fe9
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqTimeTestListener.java
@@ -0,0 +1,258 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.query.CqEvent;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+
+import dunit.DistributedTestCase;
+import dunit.DistributedTestCase.WaitCriterion;
+
+/**
+ * @author anil.
+ *
+ */
+public class CqTimeTestListener implements CqListener {
+  protected final LogWriter logger;
+  protected volatile int eventCreateCount = 0;
+  protected volatile int eventUpdateCount = 0;
+  protected volatile int eventDeleteCount = 0;
+  protected volatile int eventInvalidateCount = 0;
+  protected volatile int eventErrorCount = 0;
+
+  protected volatile int totalEventCount = 0;
+  protected volatile int eventQueryInsertCount = 0;
+  protected volatile int eventQueryUpdateCount = 0;
+  protected volatile int eventQueryDeleteCount = 0;
+  protected volatile int eventQueryInvalidateCount = 0;
+
+  protected volatile long eventQueryInsertTime = 0;
+  protected volatile long eventQueryUpdateTime = 0;
+
+  protected volatile boolean eventClose = false;
+
+  final public Set destroys = Collections.synchronizedSet(new HashSet());
+  final public Set creates = Collections.synchronizedSet(new HashSet());
+  final public Set invalidates = Collections.synchronizedSet(new HashSet());
+  final public Set updates = Collections.synchronizedSet(new HashSet());
+
+  static private final String WAIT_PROPERTY = "CQueryTestListener.maxWaitTime";
+
+  static private final int WAIT_DEFAULT = (20 * 1000);
+  
+  public static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY, 
+      WAIT_DEFAULT).intValue();;
+
+  public String cqName;
+  
+  public CqTimeTestListener(LogWriter logger) {
+    this.logger = logger;
+  }
+
+  public void onEvent(CqEvent cqEvent) {
+    this.totalEventCount++;
+    
+    long currentTime = System.currentTimeMillis();
+    
+    Operation baseOperation = cqEvent.getBaseOperation();
+    Operation queryOperation = cqEvent.getQueryOperation();
+    Object key = cqEvent.getKey();
+    logger.info("### Got CQ Event ###; baseOp=" + baseOperation
+                + ";queryOp=" + queryOperation);
+
+    logger.info("Number of events for the CQ: " +this.cqName  + " : " 
+                       + this.totalEventCount
+                       + " Key : " + key);
+    
+    if (baseOperation.isUpdate()) {
+      this.eventUpdateCount++;
+      this.updates.add(key);
+    }
+    else if (baseOperation.isCreate()) {
+      this.eventCreateCount++;
+      this.creates.add(key);
+    }
+    else if (baseOperation.isDestroy()) {
+      this.eventDeleteCount++;
+      this.destroys.add(key);
+    }
+    else if (baseOperation.isInvalidate()) {
+      this.eventDeleteCount++;
+      this.invalidates.add(key);
+    }
+
+    if (queryOperation.isUpdate()) {
+      this.eventQueryUpdateCount++;
+      long createTime = ((Portfolio)cqEvent.getNewValue()).getCreateTime();
+      this.eventQueryUpdateTime += (currentTime - createTime);
+    }
+    else if (queryOperation.isCreate()) {
+      this.eventQueryInsertCount++;
+      long createTime = ((Portfolio)cqEvent.getNewValue()).getCreateTime();
+      this.eventQueryInsertTime += (currentTime - createTime);
+    }
+    else if (queryOperation.isDestroy()) {
+      this.eventQueryDeleteCount++;
+    }
+    else if (queryOperation.isInvalidate()) {
+      this.eventQueryInvalidateCount++;
+    }
+
+  }
+
+  public void onError(CqEvent cqEvent) {
+    this.eventErrorCount++;
+  }
+  public int getErrorEventCount() {
+    return this.eventErrorCount;
+  }
+
+  public int getTotalEventCount() {
+    return this.totalEventCount;
+  }
+
+  public int getCreateEventCount() {
+    return this.eventCreateCount;
+  }
+
+  public int getUpdateEventCount() {
+    return this.eventUpdateCount;
+  }
+  
+  public int getDeleteEventCount() {
+    return this.eventDeleteCount;
+  }
+
+  public int getInvalidateEventCount() {
+    return this.eventInvalidateCount;
+  }
+
+  public int getQueryInsertEventCount() {
+    return this.eventQueryInsertCount;
+  }
+  
+  public int getQueryUpdateEventCount() {
+    return this.eventQueryUpdateCount;
+  }
+  
+  public int getQueryDeleteEventCount() {
+    return this.eventQueryDeleteCount;
+  }
+
+  public int getQueryInvalidateEventCount() {
+    return this.eventQueryInvalidateCount;
+  }
+
+  public long getTotalQueryUpdateTime() {
+    return this.eventQueryUpdateTime;
+  }
+
+  public long getTotalQueryCreateTime() {
+    return this.eventQueryInsertTime;
+  }
+
+  public void close() {
+    this.eventClose = true;
+  }
+
+  public void printInfo() {
+    logger.info("####" + this.cqName + ": " + 
+      " Events Total :" + this.getTotalEventCount() +
+      " Events Created :" + this.eventCreateCount +
+      " Events Updated :" + this.eventUpdateCount +
+      " Events Deleted :" + this.eventDeleteCount +
+      " Events Invalidated :" + this.eventInvalidateCount +
+      " Query Inserts :" + this.eventQueryInsertCount +
+      " Query Updates :" + this.eventQueryUpdateCount +
+      " Query Deletes :" + this.eventQueryDeleteCount +
+      " Query Invalidates :" + this.eventQueryInvalidateCount +
+      " Total Events :" + this.totalEventCount);
+  }
+
+  public boolean waitForCreated(final Object key) {
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return CqTimeTestListener.this.creates.contains(key);
+      }
+      public String description() {
+        return "never got create event for CQ " + CqTimeTestListener.this.cqName;
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, MAX_TIME, 200, true);
+    return true;
+  }
+  
+  public boolean waitForDestroyed(final Object key) {
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return CqTimeTestListener.this.destroys.contains(key);
+      }
+      public String description() {
+        return "never got destroy event for CQ " + CqTimeTestListener.this.cqName;
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, MAX_TIME, 200, true);
+    return true;
+  }
+  
+  public boolean waitForInvalidated(final Object key) {
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return CqTimeTestListener.this.invalidates.contains(key);
+      }
+      public String description() {
+        return "never got invalidate event for CQ " + CqTimeTestListener.this.cqName;
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, MAX_TIME, 200, true);
+    return true;
+  }
+  
+  public boolean waitForUpdated(final Object key) {
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return CqTimeTestListener.this.updates.contains(key);
+      }
+      public String description() {
+        return "never got update event for CQ " + CqTimeTestListener.this.cqName;
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, MAX_TIME, 200, true);
+    return true;
+  }
+
+  public boolean waitForClose() {
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return CqTimeTestListener.this.eventClose;
+      }
+      public String description() {
+        return "never got close event for CQ " + CqTimeTestListener.this.cqName;
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, MAX_TIME, 200, true);
+    return true;
+  }
+  
+
+  public void getEventHistory() {
+    destroys.clear();
+    creates.clear();
+    invalidates.clear();
+    updates.clear();
+    this.eventClose = false;
+  }
+
+}


Mime
View raw message