Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 444FB200C7F for ; Wed, 24 May 2017 23:25:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 42FE4160BB6; Wed, 24 May 2017 21:25:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 622FD160BA5 for ; Wed, 24 May 2017 23:25:50 +0200 (CEST) Received: (qmail 66460 invoked by uid 500); 24 May 2017 21:25:49 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 66448 invoked by uid 99); 24 May 2017 21:25:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 May 2017 21:25:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 62FCEDFAEB; Wed, 24 May 2017 21:25:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ladyvader@apache.org To: commits@geode.apache.org Date: Wed, 24 May 2017 21:25:49 -0000 Message-Id: <66f4b648e3434e51a76165de7a99df91@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] geode git commit: Add dunit test for cache close while enqueuing event in AEQ. Rethrow CacheClosedException encountered while enqueuing event. archived-at: Wed, 24 May 2017 21:25:51 -0000 Repository: geode Updated Branches: refs/heads/feature/close_cache_aeq_enqueue 070494d37 -> 4c02fe35c Add dunit test for cache close while enqueuing event in AEQ. Rethrow CacheClosedException encountered while enqueuing event. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f84159ab Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f84159ab Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f84159ab Branch: refs/heads/feature/close_cache_aeq_enqueue Commit: f84159ab3b7a59d423ff6afc0f456a98ab30c235 Parents: 070494d Author: Lynn Hughes-Godfrey Authored: Wed May 24 13:43:25 2017 -0700 Committer: Lynn Hughes-Godfrey Committed: Wed May 24 13:43:25 2017 -0700 ---------------------------------------------------------------------- .../cache/wan/AbstractGatewaySender.java | 1 + .../asyncqueue/AsyncEventListenerDUnitTest.java | 120 ++++++++++++------- 2 files changed, 79 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/f84159ab/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 7ed9b51..c38d547 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -973,6 +973,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi ev.enqueueEvent(operation, clonedEvent, substituteValue); } catch (CancelException e) { logger.debug("caught cancel exception", e); + throw e; } catch (RegionDestroyedException e) { logger.warn(LocalizedMessage.create( LocalizedStrings.GatewayImpl_0_AN_EXCEPTION_OCCURRED_WHILE_QUEUEING_1_TO_PERFORM_OPERATION_2_FOR_3, http://git-wip-us.apache.org/repos/asf/geode/blob/f84159ab/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index ac89b48..74aa776 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -41,6 +41,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.asyncqueue.AsyncEvent; @@ -49,11 +50,14 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.cache.persistence.PartitionOfflineException; import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase; +import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.SerializableRunnableIF; import org.apache.geode.test.dunit.VM; @@ -1688,51 +1692,43 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm1.invoke(createCacheRunnable(lnPort)); vm2.invoke(createCacheRunnable(lnPort)); + vm3.invoke(createCacheRunnable(lnPort)); final DistributedMember member1 = vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember()); - vm1.invoke(() -> { - cache.createAsyncEventQueueFactory().addGatewayEventFilter(new GatewayEventFilter() { - @Override - public boolean beforeEnqueue(final GatewayQueueEvent event) { -// if (event.getOperation().isDestroy()) { - if (event.getOperation().isRemoveAll()) { - new Thread(() -> cache.close()).start(); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - //ignore - } - throw new CacheClosedException(); - } - return true; - }; - - @Override - public boolean beforeTransmit(final GatewayQueueEvent event) { - return false; - } - - @Override - public void afterAcknowledgement(final GatewayQueueEvent event) { - - } - - @Override - public void close() { - - } - }).create("ln", new MyAsyncEventListener()); - }); + vm1.invoke(() -> addAEQWithCacheCloseFilter()); + vm2.invoke(() -> addAEQWithCacheCloseFilter()); - vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( - getTestMethodName() + "_PR", "ln", isOffHeap())); + //vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( + //getTestMethodName() + "_PR", "ln", isOffHeap())); + + vm1.invoke(() -> { + AttributesFactory fact = new AttributesFactory(); + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + fact.setPartitionAttributes(pfact.create()); + fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + fact.setOffHeap(isOffHeap()); + Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln") + .create(getTestMethodName() + "_PR"); + }); vm2.invoke(() -> { AttributesFactory fact = new AttributesFactory(); PartitionAttributesFactory pfact = new PartitionAttributesFactory(); pfact.setTotalNumBuckets(16); + fact.setPartitionAttributes(pfact.create()); + fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); + fact.setOffHeap(isOffHeap()); + Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln") + .create(getTestMethodName() + "_PR"); + }); + vm3.invoke(() -> { + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); pfact.setLocalMaxMemory(0); fact.setPartitionAttributes(pfact.create()); fact.setOffHeap(isOffHeap()); @@ -1741,20 +1737,27 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { }); - vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln")); -// vm2.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_PR", 3)); - vm2.invoke(() -> { + vm3.invoke(() -> { Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR"); r.put(1, 1); r.put(2, 2); - r.removeAll(Collections.singleton(1)); - r.remove(1); + //This will trigger the gateway event filter to close the cache + try { + r.removeAll(Collections.singleton(1)); + fail("Should have received a partition offline exception"); + } catch(PartitionOfflineException expected) { + + } }); vm1.invoke(() -> Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> cache.isClosed())); -// vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln")); } +private void addAEQWithCacheCloseFilter() { + cache.createAsyncEventQueueFactory().addGatewayEventFilter(new CloseCacheGatewayFilter()).setPersistent(true).setParallel(true) + .create("ln", new MyAsyncEventListener()); +} + private static Set getKeysSeen(VM vm, String asyncEventQueueId) { return vm.invoke(() -> { final BucketMovingAsyncEventListener listener = @@ -1771,7 +1774,40 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { }); } - private static class BucketMovingAsyncEventListener implements AsyncEventListener { + private final class CloseCacheGatewayFilter implements GatewayEventFilter { + @Override + public boolean beforeEnqueue(final GatewayQueueEvent event) { + // if (event.getOperation().isDestroy()) { + if (event.getOperation().isRemoveAll()) { + System.out.println("cacheCloseFilter: isRemoveAll = true"); + new Thread(() -> cache.close()).start(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + throw new CacheClosedException(); + } + return true; + } + + @Override + public boolean beforeTransmit(final GatewayQueueEvent event) { + return false; + } + + @Override + public void afterAcknowledgement(final GatewayQueueEvent event) { + + } + + @Override + public void close() { + + } +} + +private static class BucketMovingAsyncEventListener implements AsyncEventListener { private final DistributedMember destination; private boolean moved; private Set keysSeen = new HashSet();