geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ladyva...@apache.org
Subject [geode] 01/01: GEM-2052: updates to giiQueueing and respective test.
Date Thu, 19 Jul 2018 23:58:54 GMT
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch feature/giiQueueing
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 41bbbc637ad3cba6320b026356d26e601856eda3
Author: ladyVader <lhughesgodfrey@pivotal.io>
AuthorDate: Thu Jul 19 11:06:05 2018 -0700

    GEM-2052: updates to giiQueueing and respective test.
---
 .../cache/ha/HARegionQueueIntegrationTest.java     | 11 ++---
 .../geode/internal/cache/ha/HARegionQueue.java     | 49 +++++-----------------
 .../cache/tier/sockets/HAEventWrapper.java         |  4 +-
 3 files changed, 19 insertions(+), 45 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index 8a496d1..972d5f2 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -18,7 +18,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.spy;
@@ -137,6 +136,7 @@ public class HARegionQueueIntegrationTest {
         new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem()));
     HAEventWrapper wrapper = new HAEventWrapper(message);
     wrapper.setHAContainer(haContainerWrapper);
+    wrapper.incrementPutRefCount();
 
     // Create and update HARegionQueues forcing one queue to startGiiQueueing
     int numQueues = 10;
@@ -147,16 +147,17 @@ public class HARegionQueueIntegrationTest {
     assertEquals(1, haContainerWrapper.size());
 
     HAEventWrapper wrapperInContainer = (HAEventWrapper) haContainerWrapper.getKey(wrapper);
-    assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+    assertEquals(numQueues - 1, wrapperInContainer.getReferenceCount());
+    assertTrue(wrapperInContainer.getPutInProgress());
 
-    // Verify that the HAEventWrapper in the giiQueue now has msg = null
-    // this gets set to null when wrapper is added to HAContainer (for non-gii queues)
+    // Verify that the HAEventWrapper in the giiQueue now has msg != null
+    // We don't null this out while putInProgress > 0 (true)
     Queue giiQueue = targetQueue.getGiiQueue();
     assertEquals(1, giiQueue.size());
 
     HAEventWrapper giiQueueEntry = (HAEventWrapper) giiQueue.peek();
     assertNotNull(giiQueueEntry);
-    assertNull(giiQueueEntry.getClientUpdateMessage());
+    assertNotNull(giiQueueEntry.getClientUpdateMessage());
 
     // endGiiQueueing and verify queue empty and putEventInHARegion invoked with HAEventWrapper
     // not ClientUpdateMessageImpl
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 3624101..69dfd6d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -605,6 +605,8 @@ public class HARegionQueue implements RegionQueue {
     this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event
     try {
       if (this.giiCount > 0) {
+        logger.info("{}: adding message to GII queue of size {}: {}", this.regionName,
+            giiQueue.size(), object);
         if (logger.isDebugEnabled()) {
           logger.debug("{}: adding message to GII queue of size {}: {}", this.regionName,
               giiQueue.size(), object);
@@ -614,15 +616,8 @@ public class HARegionQueue implements RegionQueue {
         if (object instanceof HAEventWrapper) {
           HAEventWrapper wrapper = (HAEventWrapper) object;
 
-          boolean cumiNull = wrapper.getClientUpdateMessage() == null;
-          // bug #43609 - prevent loss of the message while in the queue
-          logger.info("RYGUY: GII Queueing - Putting conditionally into HA container. Event
ID: "
-              + wrapper.hashCode() + "; System ID: " + System.identityHashCode(wrapper)
-              + "; CUMI Null: "
-              + cumiNull + "; ToString: "
-              + wrapper);
+          // use to putRefCount to prevent loss of the message while in the queue
           wrapper.incrementPutRefCount();
-          putEntryConditionallyIntoHAContainer(wrapper);
         }
 
         this.giiQueue.add(object);
@@ -771,23 +766,16 @@ public class HARegionQueue implements RegionQueue {
           }
           actualCount++;
           try {
+            logger.info("{} draining #{}: {}", this.regionName, (actualCount + 1), value);
             if (isDebugEnabled) {
               logger.debug("{} draining #{}: {}", this.regionName, (actualCount + 1), value);
             }
             if (value instanceof HAEventWrapper) {
-              // TODO: RYGUY: This is a bandaid and we may not need it.
               if (((HAEventWrapper) value).getClientUpdateMessage() == null) {
-                // if there is no wrapped message look for it in the HA container map
-                ClientUpdateMessageImpl haContainerMessage =
-                    (ClientUpdateMessageImpl) haContainer.get(value);
-                if (haContainerMessage != null) {
-                  ((HAEventWrapper) value).setClientUpdateMessage(haContainerMessage);
-                } else {
-                  logger.info(
-                      "RYGUY: {} ATTENTION: found gii queued event with null event message.
 Please see bug #44852: {}",
-                      this.regionName, value);
-                  continue;
-                }
+                logger.info(
+                    "RYGUY: {} ATTENTION: found gii queued event with null event message.
 Please see bug #44852: {}",
+                    this.regionName, value);
+                continue;
               }
             }
             basicPut(value);
@@ -795,26 +783,11 @@ public class HARegionQueue implements RegionQueue {
             // incremented when it was queued in giiQueue.
             if (value instanceof HAEventWrapper) {
               HAEventWrapper wrapper = (HAEventWrapper) value;
-
               wrapper.decrementPutRefCount();
-
+              // if putInProgress is false, the clientUpdateMessage is safely in the HAContainer
               if (!wrapper.getPutInProgress()) {
                 wrapper.setClientUpdateMessage(null);
               }
-
-              // TODO: RYGUY: The put ref count should cover us and we no longer need to
bump/dec
-              // the
-              // HAEventWrapper ref count. If the wrapper was removed from the container,
say by
-              // QRM, we will
-              // just do putIfAbsent and add it back in. The CUMI will not be null because
we had
-              // not decremented
-              // the putRefCount yet when we did basicPut().
-
-              logger.info("RYGUY: GII Decrementing Event ID: " + wrapper.hashCode() + ";
Region: "
-                  + this.regionName + "; System identity: "
-                  + System.identityHashCode(wrapper) + "; ToString: " + wrapper);
-
-              decAndRemoveFromHAContainer((HAEventWrapper) value);
             }
           } catch (NoSuchElementException ignore) {
             break;
@@ -3609,7 +3582,7 @@ public class HARegionQueue implements RegionQueue {
             // After the initial put to the container, the client update message is set to
null.
             // Therefore we check if it is null and only add client CQs and interest lists
if it
             // is not.
-            if (haContainerKey.getClientUpdateMessage() != null) {
+            if (inputHaEventWrapper.getClientUpdateMessage() != null) {
               addClientCQsAndInterestList(haContainerEntry, inputHaEventWrapper,
                   this.haContainer, this.regionName);
             }
@@ -3726,8 +3699,6 @@ public class HARegionQueue implements RegionQueue {
           wrapperSet.add(this.region.get(wrapperArray[i]));
         }
 
-        logger.info("RYGUY: Destroying HARegion in updateHAContainer()", new Exception());
-
         // Start a new thread which will update the clientMessagesRegion for
         // each of the HAEventWrapper instances present in the wrapperSet
         Thread regionCleanupTask = new Thread(new Runnable() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
index 2b5bd85..0884523 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
@@ -243,7 +243,8 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID,
Siz
   public String toString() {
     if (this.clientUpdateMessage != null) {
       return "HAEventWrapper[refCount=" + getReferenceCount() + "; putInProgress="
-          + putInProgressCountUpdater.get(this) + "; msg=" + this.clientUpdateMessage + "]";
+          + putInProgressCountUpdater.get(this) + "; msg=" + this.clientUpdateMessage
+          + " clientCqs=" + this.getClientCqs() + "]";
     } else {
       return "HAEventWrapper[region=" + this.regionName + "; key=" + this.keyOfInterest
           + "; refCount=" + getReferenceCount()
@@ -254,6 +255,7 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID,
Siz
               : ("; op=" + this.clientUpdateMessage.getOperation()))
           + ((this.clientUpdateMessage == null) ? ""
               : ("; version=" + this.clientUpdateMessage.getVersionTag()))
+          + " clientCqs=" + this.getClientCqs()
           + "]";
     }
   }


Mime
View raw message