geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ladyva...@apache.org
Subject [geode] branch develop updated: GEODE-5495: Destroy available ID before decrement in updateHAContainer()
Date Fri, 03 Aug 2018 16:36:46 GMT
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new acb50f9  GEODE-5495: Destroy available ID before decrement in updateHAContainer()
acb50f9 is described below

commit acb50f94157c25ada4163771758115a1cb108516
Author: Ryan McMahon <rmcmahon@pivotal.io>
AuthorDate: Wed Aug 1 11:33:38 2018 -0700

    GEODE-5495: Destroy available ID before decrement in updateHAContainer()
    
    Co-authored-by: Ryan McMahon <rmcmahon@pivotal.io>
    Co-authored-by: Lynn Hughes-Godfrey <lhughesgodfrey@pivotal.io>
---
 .../internal/cache/ha/HARegionQueueDUnitTest.java  |   4 +-
 .../cache/ha/HARQAddOperationJUnitTest.java        |  36 ++---
 .../cache/ha/HARegionQueueIntegrationTest.java     | 173 ++++++++++++++++++---
 .../internal/cache/ha/HARegionQueueJUnitTest.java  |  24 +--
 .../geode/internal/cache/ha/HARegionQueue.java     |  20 +--
 5 files changed, 198 insertions(+), 59 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
index 19d2bf2..3d8c452 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
@@ -955,7 +955,7 @@ public class HARegionQueueDUnitTest extends JUnit4DistributedTestCase
{
     WaitCriterion ev = new WaitCriterion() {
       @Override
       public boolean done() {
-        return hrq.getAvalaibleIds().size() == 0;
+        return hrq.getAvailableIds().size() == 0;
       }
 
       @Override
@@ -964,7 +964,7 @@ public class HARegionQueueDUnitTest extends JUnit4DistributedTestCase
{
       }
     };
     Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-    // assertIndexDetailsEquals(0, hrq.getAvalaibleIds().size());
+    // assertIndexDetailsEquals(0, hrq.getAvailableIds().size());
   }
 
   @Test
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARQAddOperationJUnitTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARQAddOperationJUnitTest.java
index cffd3e7..871fa09 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARQAddOperationJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARQAddOperationJUnitTest.java
@@ -147,7 +147,7 @@ public class HARQAddOperationJUnitTest {
     Long cntr = (Long) conflationMap.get(KEY1);
     ConflatableObject retValue = (ConflatableObject) rq.getRegion().get(cntr);
     assertEquals(VALUE2, retValue.getValueToConflate());
-    assertEquals(1, rq.getAvalaibleIds().size());
+    assertEquals(1, rq.getAvailableIds().size());
 
     assertEquals(1, rq.getCurrentCounterSet(id1).size());
     this.logWriter.info("HARegionQueueJUnitTest : testQueueAddOperationWithConflation END");
@@ -171,13 +171,13 @@ public class HARQAddOperationJUnitTest {
     this.rq.put(c1);
 
     assertNull(rq.getConflationMapForTesting().get("region1"));
-    assertEquals(1, rq.getAvalaibleIds().size());
+    assertEquals(1, rq.getAvailableIds().size());
 
     assertEquals(1, rq.getCurrentCounterSet(id1).size());
 
     this.rq.put(c2);
     assertNull(rq.getConflationMapForTesting().get("region1"));
-    assertEquals(2, rq.getAvalaibleIds().size());
+    assertEquals(2, rq.getAvailableIds().size());
     assertEquals(2, rq.getCurrentCounterSet(id1).size());
 
     Iterator iter = rq.getCurrentCounterSet(id1).iterator();
@@ -212,7 +212,7 @@ public class HARQAddOperationJUnitTest {
     this.rq.put(obj);
     this.rq.take();
     assertNull(rq.getRegion().get(KEY1));
-    assertEquals(0, this.rq.getAvalaibleIds().size());
+    assertEquals(0, this.rq.getAvailableIds().size());
     Map eventsMap = this.rq.getEventsMapForTesting();
     assertEquals(1, eventsMap.size());
     assertEquals(0, rq.getCurrentCounterSet(id).size());
@@ -296,7 +296,7 @@ public class HARQAddOperationJUnitTest {
       // removed from Region, LastDispatchedWrapperSet should have size 0.
       Awaitility.await().atMost(60, TimeUnit.SECONDS)
           .until(() -> assertEquals(0, regionqueue.getRegion().entrySet(false).size()));
-      assertEquals(0, regionqueue.getAvalaibleIds().size());
+      assertEquals(0, regionqueue.getAvailableIds().size());
       assertNull(regionqueue.getCurrentCounterSet(id1));
 
     } catch (Exception e) {
@@ -323,11 +323,11 @@ public class HARQAddOperationJUnitTest {
     }
 
     // Available id size should be == 10 after puting ten entries
-    assertEquals(10, regionqueue.getAvalaibleIds().size());
+    assertEquals(10, regionqueue.getAvailableIds().size());
 
     // QRM message for therad id 1 and last sequence id 5
     regionqueue.removeDispatchedEvents(ids[4]);
-    assertEquals(5, regionqueue.getAvalaibleIds().size());
+    assertEquals(5, regionqueue.getAvailableIds().size());
     assertEquals(5, regionqueue.getCurrentCounterSet(ids[0]).size());
 
     Iterator iter = regionqueue.getCurrentCounterSet(ids[0]).iterator();
@@ -338,7 +338,7 @@ public class HARQAddOperationJUnitTest {
     }
 
     regionqueue.removeDispatchedEvents(ids[9]);
-    assertEquals(0, regionqueue.getAvalaibleIds().size());
+    assertEquals(0, regionqueue.getAvailableIds().size());
   }
 
   /**
@@ -388,7 +388,7 @@ public class HARQAddOperationJUnitTest {
     if (testFailed)
       fail("Test failed due to " + message);
 
-    assertEquals(0, regionqueue.getAvalaibleIds().size());
+    assertEquals(0, regionqueue.getAvailableIds().size());
     assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2));
   }
 
@@ -438,7 +438,7 @@ public class HARQAddOperationJUnitTest {
     if (testFailed)
       fail("Test failed due to " + message);
 
-    assertEquals(0, regionqueue.getAvalaibleIds().size());
+    assertEquals(0, regionqueue.getAvailableIds().size());
     assertEquals(2, regionqueue.getLastDispatchedSequenceId(id2));
   }
 
@@ -531,8 +531,8 @@ public class HARQAddOperationJUnitTest {
             + regionqueue.getEventsMapForTesting().size(),
         numOfThreads, regionqueue.getEventsMapForTesting().size());
     assertEquals(
-        "size of availableids should 1 but actual size " + regionqueue.getAvalaibleIds().size(),
1,
-        regionqueue.getAvalaibleIds().size());
+        "size of availableids should 1 but actual size " + regionqueue.getAvailableIds().size(),
1,
+        regionqueue.getAvailableIds().size());
     int count = 0;
     for (int i = 0; i < numOfThreads; i++) {
       if ((regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, i))).size()
> 0) {
@@ -543,8 +543,8 @@ public class HARQAddOperationJUnitTest {
     assertEquals("size of the counter set is  1 but the actual size is " + count, 1, count);
 
     Long position = null;
-    if (regionqueue.getAvalaibleIds().size() == 1) {
-      position = (Long) regionqueue.getAvalaibleIds().iterator().next();
+    if (regionqueue.getAvailableIds().size() == 1) {
+      position = (Long) regionqueue.getAvailableIds().iterator().next();
     }
     ConflatableObject id = (ConflatableObject) regionqueue.getRegion().get(position);
     assertEquals(regionqueue.getCurrentCounterSet(id.getEventId()).size(), 1);
@@ -605,7 +605,7 @@ public class HARQAddOperationJUnitTest {
           regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size());
     }
 
-    assertEquals(0, regionqueue.getAvalaibleIds().size());
+    assertEquals(0, regionqueue.getAvailableIds().size());
 
     this.logWriter.info("testPeekAndRemoveWithoutConflation() completed successfully");
   }
@@ -666,7 +666,7 @@ public class HARQAddOperationJUnitTest {
           regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size());
     }
 
-    assertEquals("size of availableIds map should be 0 ", 0, regionqueue.getAvalaibleIds().size());
+    assertEquals("size of availableIds map should be 0 ", 0, regionqueue.getAvailableIds().size());
     assertEquals("size of conflation map should be 0 ", 0,
         ((Map) regionqueue.getConflationMapForTesting().get("region1")).size());
 
@@ -770,7 +770,7 @@ public class HARQAddOperationJUnitTest {
           regionqueue.getCurrentCounterSet(new EventID(new byte[] {(byte) i}, i, 1)).size());
     }
 
-    assertEquals(0, regionqueue.getAvalaibleIds().size());
+    assertEquals(0, regionqueue.getAvailableIds().size());
 
     this.logWriter.info("testPeekForDiffBatchSizeAndRemoveAll() completed successfully");
   }
@@ -863,7 +863,7 @@ public class HARQAddOperationJUnitTest {
     if (testFailed)
       fail("Test failed due to " + message);
 
-    assertEquals(5, regionqueue.getAvalaibleIds().size());
+    assertEquals(5, regionqueue.getAvailableIds().size());
 
     this.logWriter.info("testPeekForDiffBatchSizeAndRemoveSome() completed successfully");
   }
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index a1b71a1..0270177 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.cache.ha;
 
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -39,6 +40,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.awaitility.core.ConditionTimeoutException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -387,24 +389,7 @@ public class HARegionQueueIntegrationTest {
     List<HARegionQueue> regionQueues = new ArrayList<>();
 
     for (int i = 0; i < 2; ++i) {
-      HARegion haRegion = Mockito.mock(HARegion.class);
-      when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
-
-      ConcurrentHashMap<Object, Object> mockRegion = new ConcurrentHashMap<>();
-
-      when(haRegion.put(Mockito.any(Object.class), Mockito.any(Object.class))).then(answer
-> {
-        Object existingValue = mockRegion.put(answer.getArgument(0), answer.getArgument(1));
-        return existingValue;
-      });
-
-      when(haRegion.get(Mockito.any(Object.class))).then(answer -> {
-        return mockRegion.get(answer.getArgument(0));
-      });
-
-      doAnswer(answer -> {
-        mockRegion.remove(answer.getArgument(0));
-        return null;
-      }).when(haRegion).localDestroy(Mockito.any(Object.class));
+      HARegion haRegion = createMockHARegion();
 
       regionQueues.add(createHARegionQueue(haContainerWrapper, i, haRegion, false));
     }
@@ -509,6 +494,158 @@ public class HARegionQueueIntegrationTest {
     Assert.assertEquals("Container size was not the expected value", haContainerWrapper.size(),
1);
   }
 
+  @Test
+  public void removeDispatchedEventsViaQRMAndDestroyQueueSimultaneouslySingleDecrement()
+      throws Exception {
+    HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+
+    HARegion haRegion = createMockHARegion();
+    HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, 0, haRegion, false);
+
+    EventID eventID = new EventID(cache.getDistributedSystem());
+    ClientUpdateMessage clientUpdateMessage =
+        new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+            (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+            new ClientProxyMembershipID(), eventID);
+    HAEventWrapper haEventWrapper = new HAEventWrapper(clientUpdateMessage);
+    haEventWrapper.incrementPutInProgressCounter();
+    haEventWrapper.setHAContainer(haContainerWrapper);
+
+    haRegionQueue.put(haEventWrapper);
+
+    ExecutorService service = Executors.newFixedThreadPool(2);
+
+    List<Callable<Object>> callables = new ArrayList<>();
+
+    // In one thread, simulate processing a queue removal message
+    // by removing the dispatched event
+    callables.add(Executors.callable(() -> {
+      try {
+        haRegionQueue.removeDispatchedEvents(eventID);
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }));
+
+    // In another thread, simulate that the region is being destroyed, for instance
+    // when a SocketTimeoutException is thrown and we are cleaning up
+    callables.add(Executors.callable(() -> {
+      try {
+        haRegionQueue.destroy();
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }));
+
+    List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS);
+
+    for (Future<Object> future : futures) {
+      try {
+        future.get();
+      } catch (Exception ex) {
+        throw new TestException(
+            "Exception thrown while executing queue removal and destroy region queue logic
concurrently.",
+            ex);
+      }
+    }
+
+    try {
+      await().atMost(10, TimeUnit.SECONDS).until(() -> haEventWrapper.getReferenceCount()
== 0);
+    } catch (ConditionTimeoutException conditionTimeoutException) {
+      throw new TestException(
+          "Expected HAEventWrapper reference count to be decremented to 0 by either the queue
removal or destroy queue logic, but the actual reference count was "
+              + haEventWrapper.getReferenceCount());
+    }
+  }
+
+  @Test
+  public void removeDispatchedEventsViaMessageDispatcherAndDestroyQueueSimultaneouslySingleDecrement()
+      throws Exception {
+    HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+
+    HARegion haRegion = createMockHARegion();
+    HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, 0, haRegion, false);
+
+    EventID eventID = new EventID(cache.getDistributedSystem());
+    ClientUpdateMessage clientUpdateMessage =
+        new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+            (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+            new ClientProxyMembershipID(), eventID);
+    HAEventWrapper haEventWrapper = new HAEventWrapper(clientUpdateMessage);
+    haEventWrapper.incrementPutInProgressCounter();
+    haEventWrapper.setHAContainer(haContainerWrapper);
+
+    haRegionQueue.put(haEventWrapper);
+
+    ExecutorService service = Executors.newFixedThreadPool(2);
+
+    List<Callable<Object>> callables = new ArrayList<>();
+
+    // In one thread, simulate processing a queue removal message
+    // by removing the dispatched event
+    callables.add(Executors.callable(() -> {
+      try {
+        // Simulate dispatching a message by peeking and removing the HAEventWrapper
+        haRegionQueue.peek();
+        haRegionQueue.remove();
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }));
+
+    // In another thread, simulate that the region is being destroyed, for instance
+    // when a SocketTimeoutException is thrown and we are cleaning up
+    callables.add(Executors.callable(() -> {
+      try {
+        haRegionQueue.destroy();
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }));
+
+    List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS);
+
+    for (Future<Object> future : futures) {
+      try {
+        future.get();
+      } catch (Exception ex) {
+        throw new TestException(
+            "Exception thrown while executing message dispatching and destroy region queue
logic concurrently.",
+            ex);
+      }
+    }
+
+    try {
+      await().atMost(10, TimeUnit.SECONDS).until(() -> haEventWrapper.getReferenceCount()
== 0);
+    } catch (ConditionTimeoutException conditionTimeoutException) {
+      throw new TestException(
+          "Expected HAEventWrapper reference count to be decremented to 0 by either the message
dispatcher or destroy queue logic, but the actual reference count was "
+              + haEventWrapper.getReferenceCount());
+    }
+  }
+
+  private HARegion createMockHARegion() {
+    HARegion haRegion = Mockito.mock(HARegion.class);
+    when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
+
+    ConcurrentHashMap<Object, Object> mockRegion = new ConcurrentHashMap<>();
+
+    when(haRegion.put(Mockito.any(Object.class), Mockito.any(Object.class))).then(answer
-> {
+      Object existingValue = mockRegion.put(answer.getArgument(0), answer.getArgument(1));
+      return existingValue;
+    });
+
+    when(haRegion.get(Mockito.any(Object.class))).then(answer -> {
+      return mockRegion.get(answer.getArgument(0));
+    });
+
+    doAnswer(answer -> {
+      mockRegion.remove(answer.getArgument(0));
+      return null;
+    }).when(haRegion).localDestroy(Mockito.any(Object.class));
+    return haRegion;
+  }
+
   private HAContainerRegion createHAContainerRegion() throws Exception {
     Region haContainerRegionRegion = createHAContainerRegionRegion();
 
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index e1baf6c..9efb7ce 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -284,7 +284,7 @@ public class HARegionQueueJUnitTest {
         !regionQueue.isEmpty(), is(true));
     assertThat(
         " Expected the available id's size not  to be zero since expiry time has not  been
exceeded but it is not so ",
-        !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+        !regionQueue.getAvailableIds().isEmpty(), is(true));
     assertThat(
         " Expected conflation map size not  to be zero since expiry time has not been exceeded
but it is not so "
             + ((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName()))
@@ -297,7 +297,7 @@ public class HARegionQueueJUnitTest {
 
     waitAtLeast(1000, start, () -> {
       assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet()));
-      assertThat(regionQueue.getAvalaibleIds(), is(Collections.emptySet()));
+      assertThat(regionQueue.getAvailableIds(), is(Collections.emptySet()));
       assertThat(regionQueue.getConflationMapForTesting().get(testName.getMethodName()),
           is(Collections.emptyMap()));
       assertThat(regionQueue.getEventsMapForTesting(), is(Collections.emptyMap()));
@@ -330,9 +330,9 @@ public class HARegionQueueJUnitTest {
         " Expected region size not to be zero since expiry time has not been exceeded but
it is not so ",
         !regionQueue.isEmpty(), is(true));
     assertThat(" Expected the available id's size not  to have counter 1 but it has ",
-        !regionQueue.getAvalaibleIds().contains(1L), is(true));
+        !regionQueue.getAvailableIds().contains(1L), is(true));
     assertThat(" Expected the available id's size to have counter 2 but it does not have
",
-        regionQueue.getAvalaibleIds().contains(2L), is(true));
+        regionQueue.getAvailableIds().contains(2L), is(true));
     assertThat(" Expected eventID map not to have the first event, but it has",
         !regionQueue.getCurrentCounterSet(ev1).contains(1L), is(true));
     assertThat(" Expected eventID map to have the second event, but it does not",
@@ -393,7 +393,7 @@ public class HARegionQueueJUnitTest {
     assertThat(" Expected region peek to return cf but it is not so ", regionQueue.peek(),
is(cf));
     assertThat(
         " Expected the available id's size not  to be zero since expiry time has not  been
exceeded but it is not so ",
-        !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+        !regionQueue.getAvailableIds().isEmpty(), is(true));
     assertThat(
         " Expected conflation map to have entry for this key since expiry time has not been
exceeded but it is not so ",
         ((Map) regionQueue.getConflationMapForTesting().get(testName.getMethodName())).get("key"),
@@ -458,12 +458,12 @@ public class HARegionQueueJUnitTest {
 
     // verify 1-5 not in available Id's map
     for (int i = 1; i < 6; i++) {
-      assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true));
+      assertThat(!regionQueue.getAvailableIds().contains((long) i), is(true));
     }
 
     // verify 6-10 in available id's map
     for (int i = 6; i < 11; i++) {
-      assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true));
+      assertThat(regionQueue.getAvailableIds().contains((long) i), is(true));
     }
   }
 
@@ -531,12 +531,12 @@ public class HARegionQueueJUnitTest {
 
     // verify 1-7 not in available Id's map
     for (int i = 4; i < 11; i++) {
-      assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true));
+      assertThat(!regionQueue.getAvailableIds().contains((long) i), is(true));
     }
 
     // verify 8-10 in available id's map
     for (int i = 1; i < 4; i++) {
-      assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true));
+      assertThat(regionQueue.getAvailableIds().contains((long) i), is(true));
     }
   }
 
@@ -562,9 +562,9 @@ public class HARegionQueueJUnitTest {
     // the old key should not be present
     assertThat(!regionQueue.getRegion().containsKey(1L), is(true));
     // available ids should not contain the old id (the old position)
-    assertThat(!regionQueue.getAvalaibleIds().contains(1L), is(true));
+    assertThat(!regionQueue.getAvailableIds().contains(1L), is(true));
     // available id should have the new id (the new position)
-    assertThat(regionQueue.getAvalaibleIds().contains(2L), is(true));
+    assertThat(regionQueue.getAvailableIds().contains(2L), is(true));
     // events map should not contain the old position
     assertThat(regionQueue.getCurrentCounterSet(ev1).isEmpty(), is(true));
     // events map should contain the new position
@@ -589,7 +589,7 @@ public class HARegionQueueJUnitTest {
     Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting();
     assertThat(((Map) conflationMap.get(testName.getMethodName())).size(), is(5));
 
-    Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds();
+    Set availableIDs = ((HARegionQueue) regionqueue).getAvailableIds();
     Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID);
 
     assertThat(availableIDs.size(), is(5));
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 8c241b2..860a410 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -1724,7 +1724,7 @@ public class HARegionQueue implements RegionQueue {
    * Used for testing purposes only
    *
    */
-  Set getAvalaibleIds() {
+  Set getAvailableIds() {
     acquireReadLock();
     try {
       return Collections.unmodifiableSet(this.idsAvailable);
@@ -1744,7 +1744,8 @@ public class HARegionQueue implements RegionQueue {
    * @param lastDispatched EventID containing the ThreadIdentifier and the last dispatched
sequence
    *        Id
    */
-  void removeDispatchedEvents(EventID lastDispatched) throws CacheException, InterruptedException
{
+  protected void removeDispatchedEvents(EventID lastDispatched)
+      throws CacheException, InterruptedException {
     ThreadIdentifier ti = getThreadIdentifier(lastDispatched);
     long sequenceID = lastDispatched.getSequenceID();
     // get the DispatchedAndCurrentEvents object for this threadID
@@ -3645,20 +3646,22 @@ public class HARegionQueue implements RegionQueue {
    */
   private void updateHAContainer() {
     try {
-      Object[] wrapperArray = null;
+      Object[] availableIdsArray = null;
       acquireReadLock();
       try {
         if (this.availableIDsSize() != 0) {
-          wrapperArray = this.availableIDsArray();
+          availableIdsArray = this.availableIDsArray();
         }
       } finally {
         releaseReadLock();
       }
-      if (wrapperArray != null) {
+      if (availableIdsArray != null) {
         final Set wrapperSet = new HashSet();
 
-        for (int i = 0; i < wrapperArray.length; i++) {
-          wrapperSet.add(this.region.get(wrapperArray[i]));
+        for (int i = 0; i < availableIdsArray.length; i++) {
+          if (destroyFromAvailableIDs((long) availableIdsArray[i])) {
+            wrapperSet.add(this.region.get(availableIdsArray[i]));
+          }
         }
 
         // Start a new thread which will update the clientMessagesRegion for
@@ -3671,8 +3674,7 @@ public class HARegionQueue implements RegionQueue {
               while (iter.hasNext()) {
                 Conflatable conflatable = (Conflatable) iter.next();
                 if (conflatable instanceof HAEventWrapper) {
-                  HARegionQueue.this
-                      .decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Destroy");
+                  decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Destroy");
                 }
               }
             } catch (CancelException ignore) {


Mime
View raw message