Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 62DE4200CFB for ; Sat, 29 Jul 2017 02:52:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5E31E16DCCD; Sat, 29 Jul 2017 00:52:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5F14B16DCCC for ; Sat, 29 Jul 2017 02:52:14 +0200 (CEST) Received: (qmail 51677 invoked by uid 500); 29 Jul 2017 00:52:13 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 51647 invoked by uid 99); 29 Jul 2017 00:52:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 29 Jul 2017 00:52:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9E2BDF321C; Sat, 29 Jul 2017 00:52:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhouxj@apache.org To: commits@geode.apache.org Date: Sat, 29 Jul 2017 00:52:14 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/50] [abbrv] geode git commit: GEODE-2900: DUnit test of moving primary during AEQ dispatching archived-at: Sat, 29 Jul 2017 00:52:15 -0000 GEODE-2900: DUnit test of moving primary during AEQ dispatching Adding a dunit test that moves the primary and moves it again during AEQ dispatching. This closes #506 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/acdf2e80 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/acdf2e80 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/acdf2e80 Branch: refs/heads/feature/GEM-1483 Commit: acdf2e80184edf9c3ba934a6f0fa760a83020953 Parents: 64eab45 Author: Dan Smith Authored: Wed May 10 16:32:05 2017 -0700 Committer: Dan Smith Committed: Fri Jul 21 17:17:11 2017 -0700 ---------------------------------------------------------------------- .../AbstractMovingAsyncEventListener.java | 57 +++++++ .../asyncqueue/AsyncEventListenerDUnitTest.java | 169 +++++++++++++------ 2 files changed, 173 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/acdf2e80/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AbstractMovingAsyncEventListener.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AbstractMovingAsyncEventListener.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AbstractMovingAsyncEventListener.java new file mode 100644 index 0000000..2e746ae --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AbstractMovingAsyncEventListener.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.asyncqueue; + +import org.apache.geode.cache.asyncqueue.AsyncEvent; +import org.apache.geode.cache.asyncqueue.AsyncEventListener; +import org.apache.geode.distributed.DistributedMember; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Created by dan on 5/10/17. + */ +public abstract class AbstractMovingAsyncEventListener implements AsyncEventListener { + protected final DistributedMember destination; + boolean moved; + Set keysSeen = new HashSet(); + + public AbstractMovingAsyncEventListener(final DistributedMember destination) { + this.destination = destination; + } + + @Override + public boolean processEvents(final List events) { + if (!moved) { + + AsyncEvent event1 = events.get(0); + move(event1); + moved = true; + return false; + } + + events.stream().map(AsyncEvent::getKey).forEach(keysSeen::add); + return true; + } + + protected abstract void move(AsyncEvent event1); + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/acdf2e80/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index 795af36..1a57bde 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -14,57 +14,57 @@ */ package org.apache.geode.internal.cache.wan.asyncqueue; -import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; -import static org.mockito.Matchers.any; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.LongStream; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.apache.geode.cache.AttributesFactory; import org.apache.geode.cache.CacheClosedException; -import org.apache.geode.cache.PartitionAttributesFactory; -import org.apache.geode.cache.wan.GatewayEventFilter; -import org.apache.geode.cache.wan.GatewayQueueEvent; -import org.apache.geode.internal.cache.wan.MyAsyncEventListener; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; - import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.asyncqueue.AsyncEvent; -import org.apache.geode.cache.asyncqueue.AsyncEventListener; import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.partition.PartitionRegionHelper; import org.apache.geode.cache.persistence.PartitionOfflineException; +import org.apache.geode.cache.wan.GatewayEventFilter; +import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.AvailablePortHelper; -import org.apache.geode.internal.cache.ForceReattemptException; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage; +import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse; import org.apache.geode.internal.cache.wan.AsyncEventQueueTestBase; -import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.internal.cache.wan.MyAsyncEventListener; import org.apache.geode.test.dunit.LogWriterUtils; import org.apache.geode.test.dunit.SerializableRunnableIF; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.categories.FlakyTest; import org.awaitility.Awaitility; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.LongStream; @Category(DistributedTest.class) public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { @@ -1745,18 +1745,78 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { .setPersistent(true).setParallel(true).create("ln", new MyAsyncEventListener()); } + public void testParallelAsyncEventQueueMovePrimaryAndMoveItBackDuringDispatching() { + Integer lnPort = + (Integer) vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(1)); + + String regionName = getTestMethodName() + "_PR"; + vm1.invoke(createCacheRunnable(lnPort)); + vm2.invoke(createCacheRunnable(lnPort)); + + SerializableRunnableIF createPartitionedRegion = () -> { + AttributesFactory fact = new AttributesFactory(); + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(1); + pfact.setRedundantCopies(1); + fact.setPartitionAttributes(pfact.create()); + Region r = + cache.createRegionFactory(fact.create()).addAsyncEventQueueId("ln").create(regionName); + + }; + + final DistributedMember member1 = + vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember()); + final DistributedMember member2 = + vm2.invoke(() -> cache.getDistributedSystem().getDistributedMember()); + + + // Create a PR with 1 bucket in vm1. Pause the sender and put some data in it + vm1.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue("ln", true, 100, 10, false, + false, null, false, new PrimaryMovingAsyncEventListener(member2))); + vm1.invoke(createPartitionedRegion); + vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln")); + vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_PR", 113)); + + // Create the PR in vm2. This will create a redundant copy, but will be the secondary + vm2.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue("ln", true, 100, 10, false, + false, null, false, new PrimaryMovingAsyncEventListener(member1))); + vm2.invoke(createPartitionedRegion); + // do a rebalance just to make sure we have restored redundancy + vm2.invoke(() -> { + cache.getResourceManager().createRebalanceFactory().start().getResults(); + }); + + // Resume the AEQ. This should trigger the primary to move to vm2, which will then move it back + vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln")); + + Awaitility.waitAtMost(10000, TimeUnit.MILLISECONDS).until(() -> getBucketMoved(vm1, "ln")); + Awaitility.waitAtMost(10000, TimeUnit.MILLISECONDS).until(() -> getBucketMoved(vm2, "ln")); + + vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln")); + vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln")); + + Set allKeys = new HashSet(); + allKeys.addAll(getKeysSeen(vm1, "ln")); + allKeys.addAll(getKeysSeen(vm2, "ln")); + + final Set expectedKeys = + LongStream.range(0, 113).mapToObj(Long::valueOf).collect(Collectors.toSet()); + assertEquals(expectedKeys, allKeys); + + } + private static Set getKeysSeen(VM vm, String asyncEventQueueId) { return vm.invoke(() -> { - final BucketMovingAsyncEventListener listener = - (BucketMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId); + final AbstractMovingAsyncEventListener listener = + (AbstractMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId); return listener.keysSeen; }); } private static boolean getBucketMoved(VM vm, String asyncEventQueueId) { return vm.invoke(() -> { - final BucketMovingAsyncEventListener listener = - (BucketMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId); + final AbstractMovingAsyncEventListener listener = + (AbstractMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId); return listener.moved; }); } @@ -1792,40 +1852,43 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { } } - private static class BucketMovingAsyncEventListener implements AsyncEventListener { - private final DistributedMember destination; - private boolean moved; - private Set keysSeen = new HashSet(); + private static final class BucketMovingAsyncEventListener + extends AbstractMovingAsyncEventListener { public BucketMovingAsyncEventListener(final DistributedMember destination) { - this.destination = destination; + super(destination); } @Override - public boolean processEvents(final List events) { - if (!moved) { - - AsyncEvent event1 = events.get(0); - moveBucket(destination, event1.getKey()); - moved = true; - return false; - } - - events.stream().map(AsyncEvent::getKey).forEach(keysSeen::add); - return true; + protected void move(final AsyncEvent event1) { + Object key = event1.getKey(); + Region region = cache.getRegion(getTestMethodName() + "_PR"); + DistributedMember source = cache.getDistributedSystem().getDistributedMember(); + PartitionRegionHelper.moveBucketByKey(region, source, destination, key); } + } - @Override - public void close() { + private static final class PrimaryMovingAsyncEventListener + extends AbstractMovingAsyncEventListener { + public PrimaryMovingAsyncEventListener(final DistributedMember destination) { + super(destination); } - private static void moveBucket(final DistributedMember destination, final Object key) { - Region region = cache.getRegion(getTestMethodName() + "_PR"); + @Override + protected void move(final AsyncEvent event1) { + Object key = event1.getKey(); + PartitionedRegion region = (PartitionedRegion) event1.getRegion(); DistributedMember source = cache.getDistributedSystem().getDistributedMember(); - PartitionRegionHelper.moveBucketByKey(region, source, destination, key); + + BecomePrimaryBucketResponse response = + BecomePrimaryBucketMessage.send((InternalDistributedMember) destination, region, + region.getKeyInfo(key).getBucketId(), true); + assertNotNull(response); + assertTrue(response.waitForResponse()); } } + }