From commits-return-32877-archive-asf-public=cust-asf.ponee.io@geode.apache.org Thu Oct 3 20:52:53 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1F3C018065B for ; Thu, 3 Oct 2019 22:52:53 +0200 (CEST) Received: (qmail 62393 invoked by uid 500); 3 Oct 2019 20:52:52 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 62384 invoked by uid 99); 3 Oct 2019 20:52:52 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Oct 2019 20:52:52 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 5CA77890A3; Thu, 3 Oct 2019 20:52:52 +0000 (UTC) Date: Thu, 03 Oct 2019 20:52:45 +0000 To: "commits@geode.apache.org" Subject: [geode] 01/11: GEODE-7124: Added new API to create AEQ with paused event processing MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: jensdeppe@apache.org In-Reply-To: <157013596241.2977.14540781347280343954@gitbox.apache.org> References: <157013596241.2977.14540781347280343954@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/release/1.9.2 X-Git-Reftype: branch X-Git-Rev: 24047c47f7dbe42809ecc77eeb8b2f8ff5e1769c X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20191003205252.5CA77890A3@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. jensdeppe pushed a commit to branch release/1.9.2 in repository https://gitbox.apache.org/repos/asf/geode.git commit 24047c47f7dbe42809ecc77eeb8b2f8ff5e1769c Author: Naburun Nag AuthorDate: Tue Sep 3 13:07:53 2019 -0700 GEODE-7124: Added new API to create AEQ with paused event processing * New API to pause the event processor when AEQ is created * Events will still be queued but will not be processed --- .../AsyncEventQueueValidationsJUnitTest.java | 14 ++++++++++ .../cache/asyncqueue/AsyncEventQueueFactory.java | 8 ++++++ .../internal/AsyncEventQueueFactoryImpl.java | 20 ++++++++++++++ .../internal/ParallelAsyncEventQueueImpl.java | 3 +++ .../internal/SerialAsyncEventQueueImpl.java | 3 +++ .../internal/cache/wan/AbstractGatewaySender.java | 31 ++++++++++++++++++++++ .../internal/cache/wan/InternalGatewaySender.java | 2 ++ .../internal/AsyncEventQueueFactoryImplTest.java | 9 +++++++ .../wan/parallel/ParallelGatewaySenderImpl.java | 3 +++ .../cache/wan/serial/SerialGatewaySenderImpl.java | 3 +++ 10 files changed, 96 insertions(+) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java index 9eadeef..3c10e36 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueValidationsJUnitTest.java @@ -35,6 +35,7 @@ import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.wan.GatewayEventFilter; import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException; @@ -72,6 +73,19 @@ public class AsyncEventQueueValidationsJUnitTest { } @Test + @Parameters({"true", "false"}) + public void whenAEQCreatedInPausedStateThenSenderIsStartedInPausedState(boolean isParallel) { + cache = new CacheFactory().set(MCAST_PORT, "0").create(); + AsyncEventQueueFactory fact = cache.createAsyncEventQueueFactory() + .setParallel(isParallel) + .pauseEventDispatchingToListener() + .setDispatcherThreads(5); + AsyncEventQueue aeq = + fact.create("aeqID", new org.apache.geode.internal.cache.wan.MyAsyncEventListener()); + assertTrue(((AsyncEventQueueImpl) aeq).getSender().isPaused()); + } + + @Test public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyThread() { cache = new CacheFactory().set(MCAST_PORT, "0").create(); try { diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java index e0329a2..1a5145e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/AsyncEventQueueFactory.java @@ -150,6 +150,12 @@ public interface AsyncEventQueueFactory { AsyncEventQueueFactory setForwardExpirationDestroy(boolean forward); /** + * Pauses the dispatching of the queued events to the listener. + * + */ + AsyncEventQueueFactory pauseEventDispatchingToListener(); + + /** * Creates the AsyncEventQueue. It accepts Id of AsyncEventQueue and instance of * AsyncEventListener. Multiple queues can be created using Same listener instance. So, the * instance of AsyncEventListener should be thread safe in that case. The @@ -162,4 +168,6 @@ public interface AsyncEventQueueFactory { * to use this queue. */ AsyncEventQueue create(String id, AsyncEventListener listener); + + } diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java index 70f0c2c..26f29ee 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImpl.java @@ -18,6 +18,7 @@ import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.get import org.apache.logging.log4j.Logger; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.asyncqueue.AsyncEventListener; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; @@ -46,6 +47,8 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { private final InternalCache cache; + private boolean pauseEventsDispatchingToListener = false; + /** * Used internally to pass the attributes from this factory to the real GatewaySender it is * creating. @@ -159,6 +162,9 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { if (cache instanceof CacheCreation) { asyncEventQueue = new AsyncEventQueueCreation(asyncQueueId, gatewaySenderAttributes, listener); + if (pauseEventsDispatchingToListener) { + ((AsyncEventQueueCreation) asyncEventQueue).setPauseEventDispatching(true); + } ((CacheCreation) cache).addAsyncEventQueue(asyncEventQueue); } else { if (logger.isDebugEnabled()) { @@ -171,6 +177,9 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { AsyncEventQueueImpl asyncEventQueueImpl = new AsyncEventQueueImpl(sender, listener); asyncEventQueue = asyncEventQueueImpl; cache.addAsyncEventQueue(asyncEventQueueImpl); + if (pauseEventsDispatchingToListener) { + sender.setStartEventProcessorInPausedState(); + } if (!gatewaySenderAttributes.isManualStart()) { sender.start(); } @@ -267,4 +276,15 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory { gatewaySenderAttributes.forwardExpirationDestroy = forward; return this; } + + @Override + public AsyncEventQueueFactory pauseEventDispatchingToListener() { + pauseEventsDispatchingToListener = true; + return this; + } + + @VisibleForTesting + protected boolean isPauseEventsDispatchingToListener() { + return pauseEventsDispatchingToListener; + } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java index 222f00b..a6d9799 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java @@ -78,6 +78,9 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender { */ eventProcessor = new ConcurrentParallelGatewaySenderEventProcessor(this, getThreadMonitorObj()); + if (startEventProcessorInPausedState) { + pauseEvenIfProcessorStopped(); + } eventProcessor.start(); waitForRunningStatus(); diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java index 259ae81..478a20a 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java @@ -89,6 +89,9 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender { eventProcessor = new SerialGatewaySenderEventProcessor(SerialAsyncEventQueueImpl.this, getId(), getThreadMonitorObj()); } + if (startEventProcessorInPausedState) { + pauseEvenIfProcessorStopped(); + } eventProcessor.start(); waitForRunningStatus(); this.startTime = System.currentTimeMillis(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 8563e8d..ad7b3a4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -207,6 +207,8 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di */ protected static final String META_DATA_REGION_NAME = "gatewayEventIdIndexMetaData"; + protected boolean startEventProcessorInPausedState = false; + protected int myDSId = DEFAULT_DISTRIBUTED_SYSTEM_ID; protected int connectionIdleTimeOut = GATEWAY_CONNECTION_IDLE_TIMEOUT; @@ -790,6 +792,35 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di } } + public boolean isStartEventProcessorInPausedState() { + return startEventProcessorInPausedState; + } + + public void setStartEventProcessorInPausedState() { + startEventProcessorInPausedState = true; + } + + /** + * This pause will set the pause flag even if the + * processor has not yet started. + */ + public void pauseEvenIfProcessorStopped() { + if (this.eventProcessor != null) { + this.getLifeCycleLock().writeLock().lock(); + try { + this.eventProcessor.pauseDispatching(); + InternalDistributedSystem system = + (InternalDistributedSystem) this.cache.getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_PAUSE, this); + logger.info("Paused {}", this); + + enqueueTempEvents(); + } finally { + this.getLifeCycleLock().writeLock().unlock(); + } + } + } + @Override public void pause() { if (this.eventProcessor != null) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java index 783e1d9..ba4f617 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/InternalGatewaySender.java @@ -42,4 +42,6 @@ public interface InternalGatewaySender extends GatewaySender { InternalCache getCache(); void destroy(boolean initiator); + + void setStartEventProcessorInPausedState(); } diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java index 2ecb459..9535782 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueFactoryImplTest.java @@ -17,6 +17,7 @@ package org.apache.geode.cache.asyncqueue.internal; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; @@ -43,6 +44,14 @@ public class AsyncEventQueueFactoryImplTest { } + @Test + public void whenAsyncEventQueueIsStartedInPausedStateThenSenderMustBePaused() { + asyncEventQueueFactory = new AsyncEventQueueFactoryImpl(cache); + asyncEventQueueFactory.pauseEventDispatchingToListener(); + assertTrue( + ((AsyncEventQueueFactoryImpl) asyncEventQueueFactory).isPauseEventsDispatchingToListener()); + } + /** * Test to verify that AsyncEventQueue can not be created when null listener is passed. */ diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java index a5c19d2..4f67933 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java @@ -72,6 +72,9 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender { */ eventProcessor = new RemoteConcurrentParallelGatewaySenderEventProcessor(this, getThreadMonitorObj()); + if (isStartEventProcessorInPausedState()) { + this.pauseEvenIfProcessorStopped(); + } eventProcessor.start(); waitForRunningStatus(); diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java index d5cfe31..548b4cb 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java @@ -83,6 +83,9 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender { eventProcessor = new RemoteSerialGatewaySenderEventProcessor(SerialGatewaySenderImpl.this, getId(), getThreadMonitorObj()); } + if (isStartEventProcessorInPausedState()) { + this.pauseEvenIfProcessorStopped(); + } eventProcessor.start(); waitForRunningStatus(); this.startTime = System.currentTimeMillis();