geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [35/50] [abbrv] geode git commit: GEODE-3030: Set possibleDuplicate=true for all bucket events after failover
Date Tue, 01 Aug 2017 21:03:29 GMT
GEODE-3030: Set possibleDuplicate=true for all bucket events after failover


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/5ec40698
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/5ec40698
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/5ec40698

Branch: refs/heads/feature/GEODE-3299
Commit: 5ec4069848a0ec48466240c17003b8f25fe2fcdd
Parents: 0b4a1a2
Author: Barry Oglesby <boglesby@pivotal.io>
Authored: Thu Jul 27 17:02:45 2017 -0700
Committer: Barry Oglesby <boglesby@pivotal.io>
Committed: Fri Jul 28 12:19:43 2017 -0700

----------------------------------------------------------------------
 .../cache/AbstractBucketRegionQueue.java        | 10 +-
 .../geode/internal/cache/BucketRegionQueue.java |  3 +-
 .../PossibleDuplicateAsyncEventListener.java    | 78 ++++++++++++++++
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 96 ++++++++++++++++++++
 4 files changed, 180 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index eacb8fd..3674474 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -298,13 +298,13 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion
{
   }
 
   /**
-   * Marks batchSize number of events in the iterator as duplicate
+   * Marks all events in the iterator as duplicate
    */
-  protected void markEventsAsDuplicate(int batchSize, Iterator itr) {
+  protected void markEventsAsDuplicate(Iterator itr) {
     int i = 0;
-    // mark number of event equal to the batchSize for setPossibleDuplicate to
-    // true before this bucket becomes primary on the node
-    while (i < batchSize && itr.hasNext()) {
+    // mark setPossibleDuplicate to true for all events in this bucket before it becomes
primary on
+    // the node
+    while (itr.hasNext()) {
       Object key = itr.next();
       Object senderEvent = getNoLRU(key, true, false, false);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 5ce5963..567f46f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -203,9 +203,8 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
 
   @Override
   public void beforeAcquiringPrimaryState() {
-    int batchSize = this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
     Iterator<Object> itr = eventSeqNumDeque.iterator();
-    markEventsAsDuplicate(batchSize, itr);
+    markEventsAsDuplicate(itr);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java
new file mode 100644
index 0000000..45f3dc5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/PossibleDuplicateAsyncEventListener.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PossibleDuplicateAsyncEventListener implements AsyncEventListener {
+
+  private final CountDownLatch latch = new CountDownLatch(1);
+
+  private final AtomicInteger numberOfEvents = new AtomicInteger();
+
+  private final AtomicInteger numberOfPossibleDuplicateEvents = new AtomicInteger();
+
+  public boolean processEvents(List<AsyncEvent> events) {
+    try {
+      waitToStartProcessingEvents();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(
+          "PossibleDuplicateAsyncEventListener processEvents was interrupted");
+    }
+    for (AsyncEvent event : events) {
+      process(event);
+    }
+    return true;
+  }
+
+  private void process(AsyncEvent event) {
+    if (event.getPossibleDuplicate()) {
+      incrementTotalPossibleDuplicateEvents();
+    }
+    incrementTotalEvents();
+  }
+
+  private void waitToStartProcessingEvents() throws InterruptedException {
+    this.latch.await(60, TimeUnit.SECONDS);
+  }
+
+  public void startProcessingEvents() {
+    this.latch.countDown();
+  }
+
+  private int incrementTotalEvents() {
+    return this.numberOfEvents.incrementAndGet();
+  }
+
+  public int getTotalEvents() {
+    return this.numberOfEvents.get();
+  }
+
+  private void incrementTotalPossibleDuplicateEvents() {
+    this.numberOfPossibleDuplicateEvents.incrementAndGet();
+  }
+
+  public int getTotalPossibleDuplicateEvents() {
+    return this.numberOfPossibleDuplicateEvents.get();
+  }
+
+  public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/5ec40698/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 1a57bde..6964edf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -27,6 +27,8 @@ import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
@@ -44,6 +46,7 @@ import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
 import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
 import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase;
 import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
+import org.apache.geode.internal.cache.wan.PossibleDuplicateAsyncEventListener;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
@@ -1728,6 +1731,99 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
     });
   }
 
+  @Test
+  public void testParallelAsyncEventQueueWithPossibleDuplicateEvents() {
+    // Set disable move primaries on start up
+    vm1.invoke(() -> setDisableMovePrimary());
+    vm2.invoke(() -> setDisableMovePrimary());
+
+    try {
+      // Create locator
+      Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
+
+      // Create cache and async event queue in member 1
+      String aeqId = "ln";
+      vm1.invoke(() -> createCache(lnPort));
+      vm1.invoke(() -> createAsyncEventQueue(aeqId, true, 100, 1, false, false, null,
true,
+          "PossibleDuplicateAsyncEventListener"));
+
+      // Create region with async event queue in member 1
+      String regionName = getTestMethodName() + "_PR";
+      vm1.invoke(
+          () -> createPRWithRedundantCopyWithAsyncEventQueue(regionName, aeqId, isOffHeap()));
+
+      // Do puts so that all primaries are in member 1
+      int numPuts = 30;
+      vm1.invoke(() -> doPuts(regionName, numPuts));
+
+      // Create cache and async event queue in member 2
+      vm2.invoke(() -> createCache(lnPort));
+      vm2.invoke(() -> createAsyncEventQueue(aeqId, true, 100, 1, false, false, null,
true,
+          "PossibleDuplicateAsyncEventListener"));
+
+      // Create region with paused async event queue in member 2
+      vm2.invoke(
+          () -> createPRWithRedundantCopyWithAsyncEventQueue(regionName, aeqId, isOffHeap()));
+      vm2.invoke(() -> pauseAsyncEventQueue(aeqId));
+
+      // Close cache in member 1 (all AEQ buckets will fail over to member 2)
+      vm1.invoke(() -> closeCache());
+
+      // Start processing async event queue in member 2
+      vm2.invoke(() -> resumeAsyncEventQueue(aeqId));
+      vm2.invoke(() -> startProcessingAsyncEvents(aeqId));
+
+      // Wait for queue to be empty
+      vm2.invoke(() -> waitForAsyncQueueToGetEmpty(aeqId));
+
+      // Verify all events were processed in member 2
+      vm2.invoke(() -> verifyAsyncEventProcessing(aeqId, numPuts));
+    } finally {
+      // Clear disable move primaries on start up
+      vm1.invoke(() -> clearDisableMovePrimary());
+      vm2.invoke(() -> clearDisableMovePrimary());
+    }
+  }
+
+  public static void setDisableMovePrimary() {
+    System.setProperty("gemfire.DISABLE_MOVE_PRIMARIES_ON_STARTUP", "true");
+  }
+
+  public static void clearDisableMovePrimary() {
+    System.clearProperty("gemfire.DISABLE_MOVE_PRIMARIES_ON_STARTUP");
+  }
+
+  public static void startProcessingAsyncEvents(String aeqId) {
+    // Get the async event listener
+    PossibleDuplicateAsyncEventListener listener = getPossibleDuplicateAsyncEventListener(aeqId);
+
+    // Start processing waiting events
+    listener.startProcessingEvents();
+  }
+
+  public static void verifyAsyncEventProcessing(String aeqId, int numEvents) {
+    // Get the async event listener
+    PossibleDuplicateAsyncEventListener listener = getPossibleDuplicateAsyncEventListener(aeqId);
+
+    // Verify all events were processed
+    assertEquals(numEvents, listener.getTotalEvents());
+
+    // Verify all events are possibleDuplicate
+    assertEquals(numEvents, listener.getTotalPossibleDuplicateEvents());
+  }
+
+  private static PossibleDuplicateAsyncEventListener getPossibleDuplicateAsyncEventListener(
+      String aeqId) {
+    // Get the async event queue
+    AsyncEventQueue aeq = cache.getAsyncEventQueue(aeqId);
+    assertNotNull(aeq);
+
+    // Get and return the async event listener
+    AsyncEventListener aeqListener = aeq.getAsyncEventListener();
+    assertTrue(aeqListener instanceof PossibleDuplicateAsyncEventListener);
+    return (PossibleDuplicateAsyncEventListener) aeqListener;
+  }
+
   private void createPersistentPartitionRegion() {
     AttributesFactory fact = new AttributesFactory();
 


Mime
View raw message