geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [35/45] geode git commit: GEODE-2279: Events will no longer linger in durable queue after being ack'd
Date Wed, 11 Jan 2017 00:00:35 GMT
GEODE-2279: Events will no longer linger in durable queue after being ack'd

  * Durable queues no longer block waiting for a message to dispatch


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f03925e2
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f03925e2
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f03925e2

Branch: refs/heads/feature/GEODE-1930-2
Commit: f03925e213c6efd7fb4f1fdfe58e0e293a8b288d
Parents: 870a80d
Author: Jason Huynh <huynhja@gmail.com>
Authored: Fri Dec 30 09:07:25 2016 -0800
Committer: Jason Huynh <huynhja@gmail.com>
Committed: Tue Jan 10 10:33:05 2017 -0800

----------------------------------------------------------------------
 .../geode/internal/cache/ha/HARegionQueue.java  |  22 +++-
 .../cache/tier/sockets/CacheClientProxy.java    |  22 ++--
 .../DurableClientQueueSizeDUnitTest.java        | 127 ++++++++++++++++++-
 3 files changed, 152 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/f03925e2/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 50a2d68..8d255c7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -2177,7 +2177,7 @@ public class HARegionQueue implements RegionQueue {
     /**
      * Condition object on which peek & take threads will block
      */
-    private final StoppableCondition blockCond;
+    protected final StoppableCondition blockCond;
 
     /**
      * 
@@ -2380,9 +2380,6 @@ public class HARegionQueue implements RegionQueue {
      * retrieval or throw an Exception.It can never return false.
      * 
      * @throws InterruptedException
-     * 
-     *         <p>
-     *         author Asif
      */
     @Override
     boolean waitForData() throws InterruptedException {
@@ -2460,6 +2457,23 @@ public class HARegionQueue implements RegionQueue {
     }
 
     @Override
+    boolean waitForData() throws InterruptedException {
+      region.getCache().getCancelCriterion().checkCancelInProgress(null);
+      boolean interrupted = Thread.currentThread().isInterrupted();
+      try {
+        blockCond.await(StoppableCondition.TIME_TO_WAIT);
+      } catch (InterruptedException ie) {
+        interrupted = true;
+        region.getCache().getCancelCriterion().checkCancelInProgress(ie);
+        throw new TimeoutException(ie);
+      } finally {
+        if (interrupted)
+          Thread.currentThread().interrupt();
+      }
+      return !this.internalIsEmpty();
+    }
+
+    @Override
     protected Object getNextAvailableIDFromList() throws InterruptedException {
       return this.getAndRemoveNextAvailableID();
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/f03925e2/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 562c0ef..d7b923c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2537,16 +2537,20 @@ public class CacheClientProxy implements ClientSession {
           if (isStopped()) {
             break;
           }
-          // Process the message
-          long start = getStatistics().startTime();
-          //// BUGFIX for BUG#38206 and BUG#37791
-          boolean isDispatched = dispatchMessage(clientMessage);
-          getStatistics().endMessage(start);
-          if (isDispatched) {
-            this._messageQueue.remove();
-            if (clientMessage instanceof ClientMarkerMessageImpl) {
-              getProxy().markerEnqueued = false;
+          if (clientMessage != null) {
+            // Process the message
+            long start = getStatistics().startTime();
+            //// BUGFIX for BUG#38206 and BUG#37791
+            boolean isDispatched = dispatchMessage(clientMessage);
+            getStatistics().endMessage(start);
+            if (isDispatched) {
+              this._messageQueue.remove();
+              if (clientMessage instanceof ClientMarkerMessageImpl) {
+                getProxy().markerEnqueued = false;
+              }
             }
+          } else {
+            this._messageQueue.remove();
           }
           clientMessage = null;
         } catch (MessageTooLargeException e) {

http://git-wip-us.apache.org/repos/asf/geode/blob/f03925e2/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientQueueSizeDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientQueueSizeDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientQueueSizeDUnitTest.java
index 472e79b..161ada4 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientQueueSizeDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/DurableClientQueueSizeDUnitTest.java
@@ -14,12 +14,21 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
+import static org.apache.geode.cache.query.functional.QueryREUpdateInProgressJUnitTest.regionName;
 import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.apache.geode.util.JSR166TestCase.m2;
 import static org.junit.Assert.*;
 
+import com.jayway.awaitility.Awaitility;
+
+import java.io.Serializable;
 import java.util.Iterator;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -51,6 +60,7 @@ import org.apache.geode.test.junit.categories.DistributedTest;
 @SuppressWarnings("serial")
 public class DurableClientQueueSizeDUnitTest extends JUnit4DistributedTestCase {
 
+  public static final String MY_DURABLE_CLIENT = "my-durable-client";
   private static VM vm0 = null;
   private static VM vm1 = null;
   private static VM vm2 = null;
@@ -133,6 +143,33 @@ public class DurableClientQueueSizeDUnitTest extends JUnit4DistributedTestCase
{
 
   }
 
+  // Slows down client consumption from queue, this should cause a delay from server queue
dispatch
+  // and receiving an ack from the client. The server should still clean up acked events
from the
+  // queue even if no more messages are being sent. Previously events would get stuck in
the queue
+  // if no new messages were sent because clean up was only done while sending messages
+  @Test
+  public void ackedEventsShouldBeRemovedFromTheQueueEventuallyEvenIfNoNewMessagesAreSent()
+      throws Exception {
+    int num = 10;
+    CacheListener slowListener = new SlowListener();
+
+    vm1.invoke(() -> DurableClientQueueSizeDUnitTest.closeCache());
+
+    vm2.invoke(DurableClientQueueSizeDUnitTest.class, "createClientCache",
+        new Object[] {vm2.getHost(), new Integer[] {port0, port1}, "300", Boolean.TRUE,
+            Boolean.FALSE, slowListener});
+    vm2.invoke(() -> DurableClientQueueSizeDUnitTest.doRI());
+    vm2.invoke(() -> DurableClientQueueSizeDUnitTest.readyForEvents());
+
+    vm0.invoke(() -> DurableClientQueueSizeDUnitTest.doPutsIntoRegion(REGION_NAME, num));
+
+    vm0.invoke(() -> Awaitility.waitAtMost(45, TimeUnit.SECONDS).until(() -> {
+      CacheClientProxy ccp = DurableClientQueueSizeDUnitTest.getCacheClientProxy(MY_DURABLE_CLIENT);
+      assertEquals(0, ccp.getQueueSize());
+      assertEquals(0, ccp.getQueueSizeStat());
+    }));
+  }
+
   @Test
   public void testSinglePoolClientReconnectsAfterTimeOut() throws Exception {
     int num = 10;
@@ -296,7 +333,7 @@ public class DurableClientQueueSizeDUnitTest extends JUnit4DistributedTestCase
{
 
   public static void createClientCache(Host host, Integer[] ports, String timeoutMilis,
       Boolean durable) throws Exception {
-    createClientCache(host, ports, timeoutMilis, durable, false);
+    createClientCache(host, ports, timeoutMilis, durable, false, null);
   }
 
   public static void setSpecialDurable(Boolean bool) {
@@ -305,13 +342,13 @@ public class DurableClientQueueSizeDUnitTest extends JUnit4DistributedTestCase
{
 
   @SuppressWarnings("deprecation")
   public static void createClientCache(Host host, Integer[] ports, String timeoutSeconds,
-      Boolean durable, Boolean multiPool) throws Exception {
+      Boolean durable, Boolean multiPool, CacheListener cacheListener) throws Exception {
     if (multiPool) {
       System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "SPECIAL_DURABLE", "true");
     }
     Properties props = new Properties();
     if (durable) {
-      props.setProperty(DURABLE_CLIENT_ID, "my-durable-client");
+      props.setProperty(DURABLE_CLIENT_ID, MY_DURABLE_CLIENT);
       props.setProperty(DURABLE_CLIENT_TIMEOUT, timeoutSeconds);
     }
 
@@ -329,7 +366,9 @@ public class DurableClientQueueSizeDUnitTest extends JUnit4DistributedTestCase
{
 
     ClientRegionFactory<String, String> crf =
         cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
-
+    if (cacheListener != null) {
+      crf.addCacheListener(cacheListener);
+    }
     crf.setPoolName(cache.getDefaultPool().getName());
     crf.create(REGION_NAME);
 
@@ -373,16 +412,35 @@ public class DurableClientQueueSizeDUnitTest extends JUnit4DistributedTestCase
{
     }
   }
 
-  public static void verifyQueueSizeAtServer(String poolName, Integer num) throws Exception
{
+  public static void doPutsIntoRegion(String regionName, Integer numOfPuts) throws Exception
{
+    Region<String, String> region = cache.getRegion(regionName);
+
+    for (int j = 0; j < numOfPuts; j++) {
+      region.put("KEY_" + j, "VALUE_" + j);
+    }
+  }
+
+  public static void verifyQueueSizeAtServer(String poolName, Integer num) {
     Iterator<CacheClientProxy> it = CacheClientNotifier.getInstance().getClientProxies().iterator();
     while (it.hasNext()) {
       CacheClientProxy ccp = it.next();
       if (ccp.getDurableId().contains(poolName)) {
-        assertEquals(num.intValue(), ccp.getQueueSize());
+        assertEquals(num.intValue(), ccp.getStatistics().getMessagesQueued());
       }
     }
   }
 
+  public static CacheClientProxy getCacheClientProxy(String durableClientName) {
+    Iterator<CacheClientProxy> it = CacheClientNotifier.getInstance().getClientProxies().iterator();
+    while (it.hasNext()) {
+      CacheClientProxy ccp = it.next();
+      if (ccp.getDurableId().contains(durableClientName)) {
+        return ccp;
+      }
+    }
+    return null;
+  }
+
   public static void verifyQueueSize(Integer num) throws Exception {
     verifyQueueSize(num, Integer.MIN_VALUE);
   }
@@ -401,4 +459,61 @@ public class DurableClientQueueSizeDUnitTest extends JUnit4DistributedTestCase
{
       }
     }
   }
+
+  private class SlowListener implements CacheListener, Serializable {
+
+    @Override
+    public void afterCreate(final EntryEvent event) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override
+    public void afterUpdate(final EntryEvent event) {
+
+    }
+
+    @Override
+    public void afterInvalidate(final EntryEvent event) {
+
+    }
+
+    @Override
+    public void afterDestroy(final EntryEvent event) {
+
+    }
+
+    @Override
+    public void afterRegionInvalidate(final RegionEvent event) {
+
+    }
+
+    @Override
+    public void afterRegionDestroy(final RegionEvent event) {
+
+    }
+
+    @Override
+    public void afterRegionClear(final RegionEvent event) {
+
+    }
+
+    @Override
+    public void afterRegionCreate(final RegionEvent event) {
+
+    }
+
+    @Override
+    public void afterRegionLive(final RegionEvent event) {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+  }
 }


Mime
View raw message