geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ladyva...@apache.org
Subject [geode] 01/01: GEODE-5187: clients can miss events when servers recycled
Date Fri, 11 May 2018 17:52:15 GMT
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch feature/GEODE-5187
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 98c7eaec4cb656e36a1a165a7bb0ec57b0603c90
Author: Lynn Hughes-Godfrey <lhughesgodfrey@pivotal.io>
AuthorDate: Wed May 9 17:27:32 2018 -0700

    GEODE-5187: clients can miss events when servers recycled
    
    * When draining events from the giiQueue, the msg field of HAEventWrapper may be null.
      Update the HAEventWrapper to point to the message in the HAContainer vs. calling
      putEventInHARegion with the original HAContainer message (a ClientUpdateMessageImpl).
      This is necessary as the ClientUpdateMessageImpl does not have the eventId (this is
not
      serialized/deserialized in toData/fromData).  The HAEventWrapper is required on the
      remote side to reconstruct the event.
    
    * Updated log messages to include the HARegionQueue.regionName
    
    * Added corresponding IntegrationTest
---
 .../geode/internal/cache/ha/HARegionQueue.java     | 50 ++++++-----
 .../cache/ha/HARegionQueueIntegrationTest.java     | 98 +++++++++++++++++++++-
 2 files changed, 125 insertions(+), 23 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index a49adfb..8001ffd 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -484,7 +484,8 @@ public class HARegionQueue implements RegionQueue {
           entry = (Region.Entry) iterator.next();
           key = entry.getKey();
           if (isDebugEnabled) {
-            logger.debug("processing queue key {} and value {}", key, entry.getValue());
+            logger.debug("{} processing queue key {} and value {}", this.regionName, key,
+                entry.getValue());
           }
           if (key instanceof Long) {
             if (!(entry.getValue() instanceof ClientMarkerMessageImpl)) {
@@ -510,8 +511,8 @@ public class HARegionQueue implements RegionQueue {
               this.put(val);
             } else if (isDebugEnabled) {
               logger.debug(
-                  "bug 44959 encountered: HARegion.putGIIDataInRegion found null eventId
in {}",
-                  val);
+                  "{} bug 44959 encountered: HARegion.putGIIDataInRegion found null eventId
in {}",
+                  this.regionName, val);
             }
           }
         }
@@ -537,8 +538,8 @@ public class HARegionQueue implements RegionQueue {
     if (val instanceof HAEventWrapper && ((HAEventWrapper) val).getClientUpdateMessage()
== null) {
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "HARegionQueue.putGIIDataInRegion(): key={} was removed at sender side, so not
putting it into the ha queue.",
-            ((Conflatable) val).getKeyToConflate());
+            "{} HARegionQueue.putGIIDataInRegion(): key={} was removed at sender side, so
not putting it into the ha queue.",
+            this.regionName, ((Conflatable) val).getKeyToConflate());
       }
     } else {
       this.put(val);
@@ -605,7 +606,7 @@ public class HARegionQueue implements RegionQueue {
     try {
       if (this.giiCount > 0) {
         if (logger.isDebugEnabled()) {
-          logger.debug("{}: adding message to GII queue of size {}: {}", this.region.getName(),
+          logger.debug("{}: adding message to GII queue of size {}: {}", this.regionName,
               giiQueue.size(), object);
         }
         if (object instanceof HAEventWrapper) {
@@ -615,7 +616,7 @@ public class HARegionQueue implements RegionQueue {
         this.giiQueue.add(object);
       } else {
         if (logger.isTraceEnabled()) {
-          logger.trace("{}: adding message to HA queue: {}", this.region.getName(), object);
+          logger.trace("{}: adding message to HA queue: {}", this.regionName, object);
         }
         basicPut(object);
       }
@@ -717,7 +718,7 @@ public class HARegionQueue implements RegionQueue {
     this.giiLock.writeLock().lock();
     this.giiCount++; // TODO: non-atomic operation on volatile!
     if (logger.isDebugEnabled()) {
-      logger.debug("{}: startGiiQueueing count is now {}", this.region.getName(), this.giiCount);
+      logger.debug("{}: startGiiQueueing count is now {}", this.regionName, this.giiCount);
     }
     this.giiLock.writeLock().unlock();
   }
@@ -733,17 +734,18 @@ public class HARegionQueue implements RegionQueue {
     try {
       this.giiCount--; // TODO: non-atomic operation on volatile!
       if (isDebugEnabled) {
-        logger.debug("{}: endGiiQueueing count is now {}", this.region.getName(), this.giiCount);
+        logger.debug("{}: endGiiQueueing count is now {}", this.regionName, this.giiCount);
       }
       if (this.giiCount < 0) {
         if (isDebugEnabled) {
-          logger.debug("{} found giiCount to be {}", this.region.getName(), this.giiCount);
+          logger.debug("{} found giiCount to be {}", this.regionName, this.giiCount);
         }
         this.giiCount = 0;
       }
       if (this.giiCount == 0) {
         if (isDebugEnabled) {
-          logger.debug("all GII requests completed - draining {} messages", this.giiQueue.size());
+          logger.debug("{} all GII requests completed - draining {} messages", this.regionName,
+              this.giiQueue.size());
         }
         boolean interrupted = false;
         int expectedCount = this.giiQueue.size();
@@ -758,17 +760,20 @@ public class HARegionQueue implements RegionQueue {
           actualCount++;
           try {
             if (isDebugEnabled) {
-              logger.debug("draining #{}: {}", (actualCount + 1), value);
+              logger.debug("{} draining #{}: {}", this.regionName, (actualCount + 1), value);
             }
             if (value instanceof HAEventWrapper) {
               if (((HAEventWrapper) value).getClientUpdateMessage() == null) {
                 // if there is no wrapped message look for it in the HA container map
-                value = haContainer.get(value);
-                if (value == null) {
+                ClientUpdateMessageImpl haContainerMessage =
+                    (ClientUpdateMessageImpl) haContainer.get(value);
+                if (haContainerMessage != null) {
+                  ((HAEventWrapper) value).setClientUpdateMessage(haContainerMessage);
+                } else {
                   if (isDebugEnabled) {
                     logger.debug(
-                        "ATTENTION: found gii queued event with null event message.  Please
see bug #44852: {}",
-                        value);
+                        "{} ATTENTION: found gii queued event with null event message.  Please
see bug #44852: {}",
+                        this.regionName, value);
                   }
                   continue;
                 }
@@ -800,7 +805,7 @@ public class HARegionQueue implements RegionQueue {
       throw t;
     } finally {
       if (logger.isTraceEnabled()) {
-        logger.trace("endGiiQueueing completed");
+        logger.trace("{} endGiiQueueing completed", this.regionName);
       }
       this.giiLock.writeLock().unlock();
     }
@@ -2097,7 +2102,6 @@ public class HARegionQueue implements RegionQueue {
         return null;
       }
       HAEventWrapper entryHaEventWrapper = null;
-      // synchronized (haContainer) {
       do {
         ClientUpdateMessageImpl entryMessage = (ClientUpdateMessageImpl) haContainer
             .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage());
@@ -3474,12 +3478,14 @@ public class HARegionQueue implements RegionQueue {
       }
       // Put the reference to the HAEventWrapper instance into the
       // HA queue.
+      // logger.info("adding inputHaEventWrapper to HARegion at " + position + ":" +
+      // inputHaEventWrapper + " for " + this.regionName);
       this.region.put(position, inputHaEventWrapper);
-      // logger.info(LocalizedStrings.DEBUG, "added message at position " + position);
     } else { // (event instanceof ClientMarkerMessageImpl OR ConflatableObject OR
              // ClientInstantiatorMessage)
+      // logger.info("adding ClientUpdateMessage to HARegion at " + position + ":" + event
+ " for "
+      // + this.regionName);
       this.region.put(position, event);
-      // logger.info(LocalizedStrings.DEBUG, "added non-msg at position " + position);
     }
   }
 
@@ -3882,4 +3888,8 @@ public class HARegionQueue implements RegionQueue {
       return expiryTime.orElse(DEFAULT_THREAD_ID_EXPIRY_TIME);
     }
   }
+
+  public Queue getGiiQueue() {
+    return this.giiQueue;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index 6ce921e..0d0e278 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -16,17 +16,28 @@ package org.apache.geode.internal.cache.ha;
 
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.geode.internal.cache.Conflatable;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.powermock.api.mockito.PowerMockito;
@@ -120,6 +131,51 @@ public class HARegionQueueIntegrationTest {
   }
 
   @Test
+  public void verifyEndGiiQueueingPutsHAEventWrapperNotClientUpdateMessage() throws Exception
{
+    // Create a HAContainerRegion
+    HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+
+    // create message and HAEventWrapper
+    ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+        (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+        new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem()));
+    HAEventWrapper wrapper = new HAEventWrapper(message);
+    wrapper.setHAContainer(haContainerWrapper);
+
+    // Create and update HARegionQueues forcing one queue to startGiiQueueing
+    int numQueues = 10;
+    HARegionQueue targetQueue = createAndUpdateHARegionQueuesWithGiiQueueing(haContainerWrapper,
+        wrapper, message, numQueues);
+
+    // Verify HAContainerWrapper (1) and refCount (numQueues(10))
+    assertEquals(1, haContainerWrapper.size());
+
+    HAEventWrapper wrapperInContainer = (HAEventWrapper) haContainerWrapper.getKey(wrapper);
+    assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+
+    // Verify that the HAEventWrapper in the giiQueue now has msg = null
+    // this gets set to null when wrapper is added to HAContainer (for non-gii queues)
+    Queue giiQueue = targetQueue.getGiiQueue();
+    assertEquals(1, giiQueue.size());
+
+    HAEventWrapper giiQueueEntry = (HAEventWrapper) giiQueue.peek();
+    assertNotNull(giiQueueEntry);
+    assertNull(giiQueueEntry.getClientUpdateMessage());
+
+    // endGiiQueueing and verify queue empty and putEventInHARegion invoked with HAEventWrapper
+    // not ClientUpdateMessageImpl
+    HARegionQueue spyTargetQueue = spy(targetQueue);
+    spyTargetQueue.endGiiQueueing();
+    assertEquals(0, giiQueue.size());
+
+    ArgumentCaptor<Conflatable> eventCaptor = ArgumentCaptor.forClass(Conflatable.class);
+    verify(spyTargetQueue).putEventInHARegion(eventCaptor.capture(), anyLong());
+    Conflatable capturedEvent = eventCaptor.getValue();
+    assertTrue(capturedEvent instanceof HAEventWrapper);
+    assertNotNull(((HAEventWrapper)capturedEvent).getClientUpdateMessage());
+  }
+
+  @Test
   public void verifySequentialUpdateHAEventWrapperWithMap() throws Exception {
     // Create a HAContainerMap to be used by the CacheClientNotifier
     HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
@@ -210,9 +266,23 @@ public class HARegionQueueIntegrationTest {
   }
 
   private HARegionQueue createHARegionQueue(Map haContainer, int index) throws Exception
{
-    return new HARegionQueue("haRegion+" + index, mock(HARegion.class), (InternalCache) cache,
-        haContainer, null, (byte) 1, true, mock(HARegionQueueStats.class),
-        mock(StoppableReentrantReadWriteLock.class), mock(StoppableReentrantReadWriteLock.class),
+    StoppableReentrantReadWriteLock giiLock = Mockito.mock(StoppableReentrantReadWriteLock.class);
+    when(giiLock.writeLock())
+        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class));
+    when(giiLock.readLock())
+        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
+
+    StoppableReentrantReadWriteLock rwLock = Mockito.mock(StoppableReentrantReadWriteLock.class);
+    when(rwLock.writeLock())
+        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class));
+    when(rwLock.readLock())
+        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
+
+    HARegion haRegion = Mockito.mock(HARegion.class);
+    when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
+
+    return new HARegionQueue("haRegion+" + index, haRegion, (InternalCache) cache, haContainer,
+        null, (byte) 1, true, mock(HARegionQueueStats.class), giiLock, rwLock,
         mock(CancelCriterion.class), false);
   }
 
@@ -244,6 +314,28 @@ public class HARegionQueueIntegrationTest {
     }
   }
 
+  private HARegionQueue createAndUpdateHARegionQueuesWithGiiQueueing(
+      HAContainerWrapper haContainerWrapper, HAEventWrapper wrapper, ClientUpdateMessage
message,
+      int numQueues) throws Exception {
+
+    HARegionQueue targetQueue = null;
+    int startGiiQueueingIndex = numQueues / 2;
+
+    // create HARegionQueues and startGiiQueuing on a region about half way through
+    for (int i = 0; i < numQueues; i++) {
+      HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, i);
+
+      // start GII Queueing (targetRegionQueue)
+      if (i == startGiiQueueingIndex) {
+        targetQueue = haRegionQueue;
+        targetQueue.startGiiQueueing();
+      }
+
+      haRegionQueue.put(wrapper);
+    }
+    return targetQueue;
+  }
+
   private void createAndUpdateHARegionQueuesSimultaneously(HAContainerWrapper haContainerWrapper,
       CachedDeserializable cd, int numQueues) throws Exception {
     // Create some HARegionQueues

-- 
To stop receiving notification emails like this one, please contact
ladyvader@apache.org.

Mime
View raw message