geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ladyva...@apache.org
Subject [1/2] geode git commit: Add dunit test for cache close while enqueuing event in AEQ. Rethrow CacheClosedException encountered while enqueuing event.
Date Wed, 24 May 2017 21:25:49 GMT
Repository: geode
Updated Branches:
  refs/heads/feature/close_cache_aeq_enqueue 070494d37 -> 4c02fe35c


Add dunit test for cache close while enqueuing event in AEQ.
Rethrow CacheClosedException encountered while enqueuing event.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f84159ab
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f84159ab
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f84159ab

Branch: refs/heads/feature/close_cache_aeq_enqueue
Commit: f84159ab3b7a59d423ff6afc0f456a98ab30c235
Parents: 070494d
Author: Lynn Hughes-Godfrey <lhughesgodfrey@pivotal.io>
Authored: Wed May 24 13:43:25 2017 -0700
Committer: Lynn Hughes-Godfrey <lhughesgodfrey@pivotal.io>
Committed: Wed May 24 13:43:25 2017 -0700

----------------------------------------------------------------------
 .../cache/wan/AbstractGatewaySender.java        |   1 +
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 120 ++++++++++++-------
 2 files changed, 79 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/f84159ab/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 7ed9b51..c38d547 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -973,6 +973,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
Distributi
           ev.enqueueEvent(operation, clonedEvent, substituteValue);
         } catch (CancelException e) {
           logger.debug("caught cancel exception", e);
+          throw e;
         } catch (RegionDestroyedException e) {
           logger.warn(LocalizedMessage.create(
               LocalizedStrings.GatewayImpl_0_AN_EXCEPTION_OCCURRED_WHILE_QUEUEING_1_TO_PERFORM_OPERATION_2_FOR_3,

http://git-wip-us.apache.org/repos/asf/geode/blob/f84159ab/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index ac89b48..74aa776 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -41,6 +41,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
@@ -49,11 +50,14 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
 import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase;
+import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
@@ -1688,51 +1692,43 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
 
     vm1.invoke(createCacheRunnable(lnPort));
     vm2.invoke(createCacheRunnable(lnPort));
+    vm3.invoke(createCacheRunnable(lnPort));
     final DistributedMember member1 =
         vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember());
 
-    vm1.invoke(() -> {
-        cache.createAsyncEventQueueFactory().addGatewayEventFilter(new GatewayEventFilter()
{
-          @Override
-          public boolean beforeEnqueue(final GatewayQueueEvent event) {
-//            if (event.getOperation().isDestroy()) {
-            if (event.getOperation().isRemoveAll()) {
-              new Thread(() -> cache.close()).start();
-              try {
-                Thread.sleep(1000);
-              } catch (InterruptedException e) {
-                //ignore
-              }
-              throw new CacheClosedException();
-            }
-            return true;
-          };
-
-          @Override
-          public boolean beforeTransmit(final GatewayQueueEvent event) {
-            return false;
-          }
-
-          @Override
-          public void afterAcknowledgement(final GatewayQueueEvent event) {
-
-          }
-
-          @Override
-          public void close() {
-
-          }
-        }).create("ln",  new MyAsyncEventListener());
-    });
+    vm1.invoke(() -> addAEQWithCacheCloseFilter());
+    vm2.invoke(() -> addAEQWithCacheCloseFilter());
 
-    vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
-        getTestMethodName() + "_PR", "ln", isOffHeap()));
+    //vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
+        //getTestMethodName() + "_PR", "ln", isOffHeap()));
+
+    vm1.invoke(() -> {
+      AttributesFactory fact = new AttributesFactory();
 
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+      fact.setOffHeap(isOffHeap());
+      Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln")
+          .create(getTestMethodName() + "_PR");
+    });
     vm2.invoke(() -> {
       AttributesFactory fact = new AttributesFactory();
 
       PartitionAttributesFactory pfact = new PartitionAttributesFactory();
       pfact.setTotalNumBuckets(16);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+      fact.setOffHeap(isOffHeap());
+      Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln")
+          .create(getTestMethodName() + "_PR");
+    });
+    vm3.invoke(() -> {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
       pfact.setLocalMaxMemory(0);
       fact.setPartitionAttributes(pfact.create());
       fact.setOffHeap(isOffHeap());
@@ -1741,20 +1737,27 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
 
     });
 
-    vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln"));
-//    vm2.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_PR", 3));
-    vm2.invoke(() -> {
+    vm3.invoke(() -> {
       Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_PR");
       r.put(1, 1);
       r.put(2, 2);
-      r.removeAll(Collections.singleton(1));
-      r.remove(1);
+      //This will trigger the gateway event filter to close the cache
+      try {
+        r.removeAll(Collections.singleton(1));
+        fail("Should have received a partition offline exception");
+      } catch(PartitionOfflineException expected) {
+    	  
+      }
     });
 
     vm1.invoke(() -> Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> cache.isClosed()));
-//    vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
   }
 
+private void addAEQWithCacheCloseFilter() {
+	cache.createAsyncEventQueueFactory().addGatewayEventFilter(new CloseCacheGatewayFilter()).setPersistent(true).setParallel(true)
+	    .create("ln", new MyAsyncEventListener());
+}
+
   private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) {
     return vm.invoke(() -> {
       final BucketMovingAsyncEventListener listener =
@@ -1771,7 +1774,40 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
     });
   }
 
-  private static class BucketMovingAsyncEventListener implements AsyncEventListener {
+  private final class CloseCacheGatewayFilter implements GatewayEventFilter {
+	@Override
+	public boolean beforeEnqueue(final GatewayQueueEvent event) {
+	  // if (event.getOperation().isDestroy()) {
+	  if (event.getOperation().isRemoveAll()) {
+		System.out.println("cacheCloseFilter: isRemoveAll = true");
+	    new Thread(() -> cache.close()).start();
+	    try {
+	      Thread.sleep(1000);
+	    } catch (InterruptedException e) {
+	      // ignore
+	    }
+	    throw new CacheClosedException();
+	  }
+	  return true;
+	}
+
+	@Override
+	public boolean beforeTransmit(final GatewayQueueEvent event) {
+	  return false;
+	}
+
+	@Override
+	public void afterAcknowledgement(final GatewayQueueEvent event) {
+
+	}
+
+	@Override
+	public void close() {
+
+	}
+}
+
+private static class BucketMovingAsyncEventListener implements AsyncEventListener {
     private final DistributedMember destination;
     private boolean moved;
     private Set<Object> keysSeen = new HashSet<Object>();


Mime
View raw message