geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jensde...@apache.org
Subject [geode] 02/11: GEODE-7126: Added new API to resume AEQ event processing
Date Thu, 03 Oct 2019 20:52:46 GMT
This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch release/1.9.2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 4ccd5ff89b90436862cea9babc7299f0e4b2a307
Author: Naburun Nag <nag@cs.wisc.edu>
AuthorDate: Tue Sep 3 13:09:34 2019 -0700

    GEODE-7126: Added new API to resume AEQ event processing
    
             * New API to resume event processing when event processor is paused
             * All queued events will be processed
---
 .../asyncqueue/AsyncEventQueuePausedDUnitTest.java | 242 +++++++++++++++++++++
 .../AsyncEventQueueValidationsJUnitTest.java       |  21 ++
 .../geode/cache/asyncqueue/AsyncEventQueue.java    |  10 +
 .../asyncqueue/internal/AsyncEventQueueImpl.java   |   9 +
 .../cache/xmlcache/AsyncEventQueueCreation.java    |  15 ++
 5 files changed, 297 insertions(+)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueuePausedDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueuePausedDUnitTest.java
new file mode 100644
index 0000000..385afca
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueuePausedDUnitTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.asyncqueue;
+
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.core.ConditionTimeoutException;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.AEQTest;
+
+@Category({AEQTest.class})
+@RunWith(Parameterized.class)
+public class AsyncEventQueuePausedDUnitTest implements Serializable {
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        // RegionType , isParallel AEQ
+        {RegionShortcut.PARTITION, true},
+        {RegionShortcut.PARTITION, false},
+        {RegionShortcut.REPLICATE, false},
+        {RegionShortcut.PARTITION_PERSISTENT, true},
+        {RegionShortcut.PARTITION_PERSISTENT, false},
+        {RegionShortcut.PARTITION_REDUNDANT, true},
+        {RegionShortcut.PARTITION_REDUNDANT, false},
+        {RegionShortcut.REPLICATE_PERSISTENT, false},
+        {RegionShortcut.PARTITION_REDUNDANT_PERSISTENT, true},
+        {RegionShortcut.PARTITION_REDUNDANT_PERSISTENT, false}
+    });
+  }
+
+  @Parameterized.Parameter
+  public static RegionShortcut regionShortcut;
+
+  @Parameterized.Parameter(1)
+  public static boolean isParallel;
+
+  @Rule
+  public ClusterStartupRule lsRule = new ClusterStartupRule(4);
+
+  private static MemberVM locator, server1, server2;
+  private static ClientVM client;
+
+  @Before
+  public void beforeClass() throws Exception {
+    locator = lsRule.startLocatorVM(0);
+    server1 = lsRule.startServerVM(1, "group1", locator.getPort());
+    server2 = lsRule.startServerVM(2, "group1", locator.getPort());
+    int serverPort = server1.getPort();
+    client =
+        lsRule.startClientVM(3, new Properties(),
+            clientCacheFactory -> configureClientCacheFactory(clientCacheFactory, serverPort));
+  }
+
+  private static void configureClientCacheFactory(ClientCacheFactory ccf, int... serverPorts)
{
+    for (int serverPort : serverPorts) {
+      ccf.addPoolServer("localhost", serverPort);
+    }
+    ccf.setPoolReadTimeout(10 * 60 * 1000); // 10 min
+    ccf.setPoolSubscriptionEnabled(true);
+  }
+
+  @Test
+  public void whenAEQCreatedInPausedStateThenListenersMustNotBeInvoked() {
+    final AEQandRegionProperties props = new AEQandRegionProperties(regionShortcut, isParallel);
+    server1.invoke(() -> {
+      createRegionAndDispatchingPausedAEQ(props);
+    });
+    server2.invoke(() -> {
+      createRegionAndDispatchingPausedAEQ(props);
+    });
+    client.invoke(() -> {
+      createClientRegion();
+    });
+
+    server1.invoke(() -> {
+      validateAEQDispatchingIsPaused();
+    });
+
+    server2.invoke(() -> {
+      validateAEQDispatchingIsPaused();
+    });
+
+    // Resume dispatching.
+    server1.invoke(() -> {
+      ClusterStartupRule.getCache().getAsyncEventQueue("aeqID").resumeEventDispatching();
+    });
+
+    server2.invoke(() -> {
+      ClusterStartupRule.getCache().getAsyncEventQueue("aeqID").resumeEventDispatching();
+    });
+
+    // Validate dispatching resumed.
+    await().atMost(1, TimeUnit.MINUTES).until(() -> {
+
+      final int count1 = server1.invoke(() -> getEventDispatchedSize());
+      final int count2 = server2.invoke(() -> getEventDispatchedSize());
+      if ((count1 + count2) == 1000) {
+        return true;
+      } else {
+        return false;
+      }
+    });
+
+  }
+
+  @NotNull
+  private static Integer getEventDispatchedSize() {
+    Cache cache = ClusterStartupRule.getCache();
+    AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue("aeqID");
+    MyAsyncEventListener listener = (MyAsyncEventListener) aeq.getAsyncEventListener();
+    return listener.getEventsMap().size();
+  }
+
+  private static void createClientRegion() {
+    ClientCache cache = ClusterStartupRule.getClientCache();
+    Region region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+        .create("region");
+    for (int i = 0; i < 1000; i++) {
+      region.put(i, i);
+    }
+  }
+
+  private static void createRegionAndDispatchingPausedAEQ(AEQandRegionProperties props) {
+    Cache cache = ClusterStartupRule.getCache();
+    cache.createAsyncEventQueueFactory()
+        .pauseEventDispatchingToListener()
+        .setParallel(props.isParallel())
+        .setPersistent(isPersistent(props))
+        .create("aeqID", new MyAsyncEventListener());
+    cache.createRegionFactory(props.getRegionShortcut())
+        .addAsyncEventQueueId("aeqID")
+        .create("region");
+  }
+
+  private static boolean isPersistent(AEQandRegionProperties props) {
+    switch (props.getRegionShortcut()) {
+      case PARTITION:
+      case REPLICATE:
+      case PARTITION_REDUNDANT:
+        return false;
+      default:
+        return true;
+    }
+  }
+
+  private static void validateAEQDispatchingIsPaused() {
+    Cache cache = ClusterStartupRule.getCache();
+    AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue("aeqID");
+    assertTrue(aeq.isDispatchingPaused());
+    MyAsyncEventListener listener = (MyAsyncEventListener) aeq.getAsyncEventListener();
+    try {
+      await().atMost(10, TimeUnit.SECONDS).until(() -> listener.getEventsMap().size()
> 0);
+    } catch (ConditionTimeoutException ex) {
+      // Expected Exception
+    }
+    // Ensure that the the queues are filling up
+    assertTrue(aeq.getSender().getQueues().stream().mapToInt(i -> i.size()).sum() == 1000);
+  }
+
+  class AEQandRegionProperties implements DataSerializable, Serializable {
+    RegionShortcut regionShortcut;
+    boolean isParallel;
+
+    public AEQandRegionProperties(RegionShortcut regionShortcut, boolean isParallel) {
+      this.regionShortcut = regionShortcut;
+      this.isParallel = isParallel;
+    }
+
+    public RegionShortcut getRegionShortcut() {
+      return regionShortcut;
+    }
+
+    public void setRegionShortcut(RegionShortcut regionShortcut) {
+      this.regionShortcut = regionShortcut;
+    }
+
+    public boolean isParallel() {
+      return isParallel;
+    }
+
+    public void setParallel(boolean parallel) {
+      isParallel = parallel;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeObject(regionShortcut, out);
+      out.writeBoolean(isParallel);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      regionShortcut = DataSerializer.readObject(in);
+      isParallel = in.readBoolean();
+    }
+  }
+
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
index 3c10e36..6cd325a 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java
@@ -18,6 +18,7 @@ import static junitparams.JUnitParamsRunner.$;
 import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -86,6 +87,26 @@ public class AsyncEventQueueValidationsJUnitTest {
   }
 
   @Test
+  @Parameters({"true", "false"})
+  public void whenAEQCreatedInPausedStateIsUnPausedThenSenderIsResumed(boolean isParallel)
{
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    AsyncEventQueueFactory fact = cache.createAsyncEventQueueFactory()
+        .setParallel(isParallel)
+        .pauseEventDispatchingToListener()
+        .setDispatcherThreads(5);
+    AsyncEventQueue aeq =
+        fact.create("aeqID", new org.apache.geode.internal.cache.wan.MyAsyncEventListener());
+    assertTrue(aeq.isDispatchingPaused());
+    assertTrue(((AsyncEventQueueImpl) aeq).getSender().isPaused());
+
+    aeq.resumeEventDispatching();
+
+    assertFalse(aeq.isDispatchingPaused());
+    assertFalse(((AsyncEventQueueImpl) aeq).getSender().isPaused());
+
+  }
+
+  @Test
   public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyThread() {
     cache = new CacheFactory().set(MCAST_PORT, "0").create();
     try {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueue.java
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueue.java
index a1097ff..b06137a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueue.java
@@ -147,4 +147,14 @@ public interface AsyncEventQueue {
    */
   boolean isForwardExpirationDestroy();
 
+  /**
+   * Resumes the dispatching of then events queued to the listener.
+   */
+  void resumeEventDispatching();
+
+  /**
+   * Returns whether the queue is processing queued events or is paused
+   */
+  boolean isDispatchingPaused();
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index ae93b0a..df7c908 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -36,6 +36,15 @@ public class AsyncEventQueueImpl implements InternalAsyncEventQueue {
     this.asyncEventListener = asyncEventListener;
   }
 
+  public void resumeEventDispatching() {
+    this.sender.resume();
+  }
+
+  @Override
+  public boolean isDispatchingPaused() {
+    return sender.isPaused();
+  }
+
   @Override
   public String getId() {
     return getAsyncEventQueueIdFromSenderId(sender.getId());
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/AsyncEventQueueCreation.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/AsyncEventQueueCreation.java
index 7c315da..eb29f01 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/AsyncEventQueueCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/AsyncEventQueueCreation.java
@@ -42,6 +42,7 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
   private int dispatcherThreads = 1;
   private OrderPolicy orderPolicy = OrderPolicy.KEY;
   private boolean forwardExpirationDestroy = false;
+  private boolean pauseEventDispatching = false;
 
   public AsyncEventQueueCreation() {}
 
@@ -74,6 +75,15 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
     this.asyncEventListener = eventListener;
   }
 
+  @Override
+  public boolean isDispatchingPaused() {
+    return pauseEventDispatching;
+  }
+
+  public void setPauseEventDispatching(boolean pauseEventDispatching) {
+    this.pauseEventDispatching = pauseEventDispatching;
+  }
+
   public void addGatewayEventFilter(GatewayEventFilter filter) {
     this.gatewayEventFilters.add(filter);
   }
@@ -227,4 +237,9 @@ public class AsyncEventQueueCreation implements AsyncEventQueue {
   public boolean isForwardExpirationDestroy() {
     return this.forwardExpirationDestroy;
   }
+
+  @Override
+  public void resumeEventDispatching() {
+    this.pauseEventDispatching = false;
+  }
 }


Mime
View raw message