geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zho...@apache.org
Subject [03/50] [abbrv] geode git commit: GEODE-2900: DUnit test of moving primary during AEQ dispatching
Date Sat, 29 Jul 2017 00:52:14 GMT
GEODE-2900: DUnit test of moving primary during AEQ dispatching

Adding a dunit test that moves the primary and moves it again during AEQ
dispatching.

This closes #506


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/acdf2e80
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/acdf2e80
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/acdf2e80

Branch: refs/heads/feature/GEM-1483
Commit: acdf2e80184edf9c3ba934a6f0fa760a83020953
Parents: 64eab45
Author: Dan Smith <upthewaterspout@apache.org>
Authored: Wed May 10 16:32:05 2017 -0700
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Fri Jul 21 17:17:11 2017 -0700

----------------------------------------------------------------------
 .../AbstractMovingAsyncEventListener.java       |  57 +++++++
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 169 +++++++++++++------
 2 files changed, 173 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/acdf2e80/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AbstractMovingAsyncEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AbstractMovingAsyncEventListener.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AbstractMovingAsyncEventListener.java
new file mode 100644
index 0000000..2e746ae
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AbstractMovingAsyncEventListener.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.asyncqueue;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.distributed.DistributedMember;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Created by dan on 5/10/17.
+ */
+public abstract class AbstractMovingAsyncEventListener implements AsyncEventListener {
+  protected final DistributedMember destination;
+  boolean moved;
+  Set<Object> keysSeen = new HashSet<Object>();
+
+  public AbstractMovingAsyncEventListener(final DistributedMember destination) {
+    this.destination = destination;
+  }
+
+  @Override
+  public boolean processEvents(final List<AsyncEvent> events) {
+    if (!moved) {
+
+      AsyncEvent event1 = events.get(0);
+      move(event1);
+      moved = true;
+      return false;
+    }
+
+    events.stream().map(AsyncEvent::getKey).forEach(keysSeen::add);
+    return true;
+  }
+
+  protected abstract void move(AsyncEvent event1);
+
+  @Override
+  public void close() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/acdf2e80/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 795af36..1a57bde 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -14,57 +14,57 @@
  */
 package org.apache.geode.internal.cache.wan.asyncqueue;
 
-import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.PartitionAttributesFactory;
-import org.apache.geode.cache.wan.GatewayEventFilter;
-import org.apache.geode.cache.wan.GatewayQueueEvent;
-import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
-import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.cache.persistence.PartitionOfflineException;
+import org.apache.geode.cache.wan.GatewayEventFilter;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
 import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase;
-import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 import org.awaitility.Awaitility;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 @Category(DistributedTest.class)
 public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
@@ -1745,18 +1745,78 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
         .setPersistent(true).setParallel(true).create("ln", new MyAsyncEventListener());
   }
 
+  public void testParallelAsyncEventQueueMovePrimaryAndMoveItBackDuringDispatching() {
+    Integer lnPort =
+        (Integer) vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1));
+
+    String regionName = getTestMethodName() + "_PR";
+    vm1.invoke(createCacheRunnable(lnPort));
+    vm2.invoke(createCacheRunnable(lnPort));
+
+    SerializableRunnableIF createPartitionedRegion = () -> {
+      AttributesFactory fact = new AttributesFactory();
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(1);
+      pfact.setRedundantCopies(1);
+      fact.setPartitionAttributes(pfact.create());
+      Region r =
+          cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln").create(regionName);
+
+    };
+
+    final DistributedMember member1 =
+        vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember());
+    final DistributedMember member2 =
+        vm2.invoke(() -> cache.getDistributedSystem().getDistributedMember());
+
+
+    // Create a PR with 1 bucket in vm1. Pause the sender and put some data in it
+    vm1.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue("ln", true, 100, 10,
false,
+        false, null, false, new PrimaryMovingAsyncEventListener(member2)));
+    vm1.invoke(createPartitionedRegion);
+    vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln"));
+    vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_PR", 113));
+
+    // Create the PR in vm2. This will create a redundant copy, but will be the secondary
+    vm2.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue("ln", true, 100, 10,
false,
+        false, null, false, new PrimaryMovingAsyncEventListener(member1)));
+    vm2.invoke(createPartitionedRegion);
+    // do a rebalance just to make sure we have restored redundancy
+    vm2.invoke(() -> {
+      cache.getResourceManager().createRebalanceFactory().start().getResults();
+    });
+
+    // Resume the AEQ. This should trigger the primary to move to vm2, which will then move
it back
+    vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
+
+    Awaitility.waitAtMost(10000, TimeUnit.MILLISECONDS).until(() -> getBucketMoved(vm1,
"ln"));
+    Awaitility.waitAtMost(10000, TimeUnit.MILLISECONDS).until(() -> getBucketMoved(vm2,
"ln"));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
+    vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));
+
+    Set<Object> allKeys = new HashSet<Object>();
+    allKeys.addAll(getKeysSeen(vm1, "ln"));
+    allKeys.addAll(getKeysSeen(vm2, "ln"));
+
+    final Set<Long> expectedKeys =
+        LongStream.range(0, 113).mapToObj(Long::valueOf).collect(Collectors.toSet());
+    assertEquals(expectedKeys, allKeys);
+
+  }
+
   private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) {
     return vm.invoke(() -> {
-      final BucketMovingAsyncEventListener listener =
-          (BucketMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId);
+      final AbstractMovingAsyncEventListener listener =
+          (AbstractMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId);
       return listener.keysSeen;
     });
   }
 
   private static boolean getBucketMoved(VM vm, String asyncEventQueueId) {
     return vm.invoke(() -> {
-      final BucketMovingAsyncEventListener listener =
-          (BucketMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId);
+      final AbstractMovingAsyncEventListener listener =
+          (AbstractMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId);
       return listener.moved;
     });
   }
@@ -1792,40 +1852,43 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
     }
   }
 
-  private static class BucketMovingAsyncEventListener implements AsyncEventListener {
-    private final DistributedMember destination;
-    private boolean moved;
-    private Set<Object> keysSeen = new HashSet<Object>();
+  private static final class BucketMovingAsyncEventListener
+      extends AbstractMovingAsyncEventListener {
 
     public BucketMovingAsyncEventListener(final DistributedMember destination) {
-      this.destination = destination;
+      super(destination);
     }
 
     @Override
-    public boolean processEvents(final List<AsyncEvent> events) {
-      if (!moved) {
-
-        AsyncEvent event1 = events.get(0);
-        moveBucket(destination, event1.getKey());
-        moved = true;
-        return false;
-      }
-
-      events.stream().map(AsyncEvent::getKey).forEach(keysSeen::add);
-      return true;
+    protected void move(final AsyncEvent event1) {
+      Object key = event1.getKey();
+      Region<Object, Object> region = cache.getRegion(getTestMethodName() + "_PR");
+      DistributedMember source = cache.getDistributedSystem().getDistributedMember();
+      PartitionRegionHelper.moveBucketByKey(region, source, destination, key);
     }
+  }
 
-    @Override
-    public void close() {
+  private static final class PrimaryMovingAsyncEventListener
+      extends AbstractMovingAsyncEventListener {
 
+    public PrimaryMovingAsyncEventListener(final DistributedMember destination) {
+      super(destination);
     }
 
-    private static void moveBucket(final DistributedMember destination, final Object key)
{
-      Region<Object, Object> region = cache.getRegion(getTestMethodName() + "_PR");
+    @Override
+    protected void move(final AsyncEvent event1) {
+      Object key = event1.getKey();
+      PartitionedRegion region = (PartitionedRegion) event1.getRegion();
       DistributedMember source = cache.getDistributedSystem().getDistributedMember();
-      PartitionRegionHelper.moveBucketByKey(region, source, destination, key);
+
+      BecomePrimaryBucketResponse response =
+          BecomePrimaryBucketMessage.send((InternalDistributedMember) destination, region,
+              region.getKeyInfo(key).getBucketId(), true);
+      assertNotNull(response);
+      assertTrue(response.waitForResponse());
     }
   }
 
 
+
 }


Mime
View raw message