geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zho...@apache.org
Subject [geode] branch feature/GEODE-4624 updated: GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals
Date Fri, 06 Apr 2018 01:07:20 GMT
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-4624 by this push:
     new e54c54c  GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the
processing of queueRemovals
e54c54c is described below

commit e54c54c5890d4ef36029857aea61dad8c8543e21
Author: zhouxh <gzhou@pivotal.io>
AuthorDate: Thu Apr 5 18:05:24 2018 -0700

    GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of
queueRemovals
---
 .../internal/cache/wan/AbstractGatewaySender.java  | 10 +++++--
 .../wan/AbstractGatewaySenderEventProcessor.java   | 35 ++++++++++++++++++----
 .../ConcurrentParallelGatewaySenderQueue.java      |  9 ++++++
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 18 ++++++++++-
 .../geode/internal/cache/wan/WANTestBase.java      | 26 +++++++++++-----
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 24 +++++++--------
 6 files changed, 94 insertions(+), 28 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 4e68bfd..76c1e24 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
Distributi
     // If this gateway is not running, return
     if (!isRunning()) {
       if (isDebugEnabled) {
-        logger.debug("Returning back without putting into the gateway sender queue");
+        logger.debug("Returning back without putting into the gateway sender queue" + event);
+      }
+      if (this.eventProcessor != null) {
+        this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
       }
       return;
     }
@@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
Distributi
         // The sender may have stopped, after we have checked the status in the beginning.
         if (!isRunning()) {
           if (isDebugEnabled) {
-            logger.debug("Returning back without putting into the gateway sender queue");
+            logger.debug("Returning back without putting into the gateway sender queue" +
event);
+          }
+          if (this.eventProcessor != null) {
+            this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
           }
           return;
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index fce2b79..badafb0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -276,11 +276,6 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread
{
     }
 
     // if parallel, get both primary and secondary queues' size, then substract primary queue's
size
-    if (this.queue instanceof ParallelGatewaySenderQueue) {
-      int size = ((ParallelGatewaySenderQueue) queue).localSize(true)
-          - ((ParallelGatewaySenderQueue) queue).localSize(false);
-      return size;
-    }
     if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
       int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
           - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
@@ -289,6 +284,36 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread
{
     return this.queue.size();
   }
 
+  public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
+    if (queue == null) {
+      return;
+    }
+    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue)
queue;
+      PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
+      if (prQ == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("shadow partitioned region " + event.getRegion().getFullPath()
+              + " is not created yet.");
+        }
+        return;
+      }
+      int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event);
+      long shadowKey = event.getTailKey();
+
+      ParallelGatewaySenderQueue pgsq =
+          (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
+      boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+      if (isPrimary) {
+        pgsq.addRemovedEvent(prQ, bucketId, shadowKey);
+        if (logger.isDebugEnabled()) {
+          logger.debug("register dropped event for primary queue. BucketId is " + bucketId
+              + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
+        }
+      }
+    }
+  }
+
   /**
    * @return the sender
    */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index 4fc940c..e556910 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -121,6 +121,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue
{
     return this.processors[0].getQueue().size();
   }
 
+  public String displayContent() {
+    ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) (processors[0].getQueue());
+    return pgsq.displayContent();
+  }
+
   public int localSize() {
     return localSize(false);
   }
@@ -190,6 +195,10 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue
{
     return processors[index];
   }
 
+  public RegionQueue getQueueByBucket(int bucketId) {
+    return getPGSProcessor(bucketId).getQueue();
+  }
+
   public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) {
     return getPGSProcessor(bucketId).getBucketTmpQueue(bucketId);
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 3aa8534..907a265 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   // This method may need synchronization in case it is used by
   // ConcurrentParallelGatewaySender
-  protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
+  public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
     StoppableReentrantLock lock = buckToDispatchLock;
     if (lock != null) {
       lock.lock();
@@ -1401,6 +1401,22 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return (BucketRegionQueue) prQ.getDataStore().getLocalBucketById(bucketId);
   }
 
+  public String displayContent() {
+    int size = 0;
+    StringBuffer sb = new StringBuffer();
+    for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+      if (prQ != null && prQ.getDataStore() != null) {
+        Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions();
+        for (BucketRegion br : allLocalBuckets) {
+          if (br.size() > 0) {
+            sb.append("bucketId=" + br.getId() + ":" + br.keySet() + ";");
+          }
+        }
+      }
+    }
+    return sb.toString();
+  }
+
   public int localSize() {
     return localSize(false);
   }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 9b577d5..a3e5aeb 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -933,6 +933,8 @@ public class WANTestBase extends DistributedTestCase {
     }
     props.setProperty(MCAST_PORT, "0");
     props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+    String logLevel = System.getProperty(LOG_LEVEL, "info");
+    props.setProperty(LOG_LEVEL, logLevel);
     InternalDistributedSystem ds = test.getSystem(props);
     cache = CacheFactory.create(ds);
   }
@@ -2746,7 +2748,7 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void validateQueueSizeStat(String id, final int queueSize) {
     final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
-    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
         .until(() -> assertEquals(queueSize, sender.getEventQueueSize()));
     assertEquals(queueSize, sender.getEventQueueSize());
   }
@@ -3053,6 +3055,17 @@ public class WANTestBase extends DistributedTestCase {
     });
   }
 
+  public static String displayQueueContent(final RegionQueue queue) {
+    if (queue instanceof ParallelGatewaySenderQueue) {
+      ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue;
+      return pgsq.displayContent();
+    } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      ConcurrentParallelGatewaySenderQueue pgsq = (ConcurrentParallelGatewaySenderQueue)
queue;
+      return pgsq.displayContent();
+    }
+    return null;
+  }
+
   public static Integer getQueueContentSize(final String senderId) {
     return getQueueContentSize(senderId, false);
   }
@@ -3147,13 +3160,10 @@ public class WANTestBase extends DistributedTestCase {
       assertEquals("Except events in all primary queues after drain is 0", 0,
           abstractSender.getEventQueueSize());
 
-      Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
-        System.out
-            .println("GGG:secondary queue size is " + abstractSender.getEventSecondaryQueueSize());
-        assertEquals(
-            "Expected events in all secondary queues are drained but actual is "
-                + abstractSender.getEventSecondaryQueueSize(),
-            0, abstractSender.getEventSecondaryQueueSize());
+      Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
+        assertEquals("Expected events in all secondary queues are drained but actual is "
+            + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: "
+            + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize());
       });
       assertEquals("Except events in all secondary queues after drain is 0", 0,
           abstractSender.getEventSecondaryQueueSize());
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index e66c433..780f3a9 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -53,7 +53,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
     addIgnoredException("Broken pipe||Unexpected IOException");
   }
 
-//  @Test(timeout = 300_000)
+  // @Test(timeout = 300_000)
   public void testStopOneConcurrentGatewaySenderWithSSL() throws Exception {
     Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
     Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
@@ -90,7 +90,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
     vm5.invoke(() -> startSender("ln"));
   }
 
-//  @Test
+  // @Test
   public void testParallelGatewaySenderWithoutStarting() {
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
@@ -114,7 +114,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
    * <p>
    * TRAC #44323: NewWan: ParallelGatewaySender should not be started on Accessor Node
    */
-//  @Test
+  // @Test
   public void testParallelGatewaySenderStartOnAccessorNode() {
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
@@ -136,7 +136,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
   /**
    * Normal scenario in which the sender is paused in between.
    */
-//  @Test
+  // @Test
   public void testParallelPropagationSenderPause() throws Exception {
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
@@ -167,7 +167,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
   /**
    * Normal scenario in which a paused sender is resumed.
    */
-//  @Test
+  // @Test
   public void testParallelPropagationSenderResume() throws Exception {
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
@@ -204,7 +204,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
    * resume is only valid for pause. If a sender which is stopped is resumed, it will not
be started
    * again.
    */
-//  @Test
+  // @Test
   public void testParallelPropagationSenderResumeNegativeScenario() throws Exception {
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
@@ -259,7 +259,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
   /**
    * Normal scenario in which a sender is stopped.
    */
-//  @Test
+  // @Test
   public void testParallelPropagationSenderStop() throws Exception {
     addIgnoredException("Broken pipe");
     Integer[] locatorPorts = createLNAndNYLocators();
@@ -288,7 +288,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
   /**
    * Normal scenario in which a sender is stopped and then started again.
    */
-//  @Test
+  // @Test
   public void testParallelPropagationSenderStartAfterStop() throws Exception {
     addIgnoredException("Broken pipe");
     Integer[] locatorPorts = createLNAndNYLocators();
@@ -425,7 +425,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
   /**
    * Normal scenario in which a sender is stopped and then started again on accessor node.
    */
-//  @Test
+  // @Test
   public void testParallelPropagationSenderStartAfterStopOnAccessorNode() throws Exception
{
     addIgnoredException("Broken pipe");
     addIgnoredException("Connection reset");
@@ -473,7 +473,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
   /**
    * Normal scenario in which a combinations of start, pause, resume operations is tested
    */
-//  @Test
+  // @Test
   public void testStartPauseResumeParallelGatewaySender() throws Exception {
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
@@ -527,7 +527,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
    * Since the sender is attached to a region and in use, it can not be destroyed. Hence,
exception
    * is thrown by the sender API.
    */
-//  @Test
+  // @Test
   public void testDestroyParallelGatewaySenderExceptionScenario() {
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
@@ -556,7 +556,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
     vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 1000));
   }
 
-//  @Test
+  // @Test
   public void testDestroyParallelGatewaySender() {
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.

Mime
View raw message