Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CB0E5200B6F for ; Wed, 24 Aug 2016 18:52:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C99AC160AC1; Wed, 24 Aug 2016 16:52:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EA69C160A91 for ; Wed, 24 Aug 2016 18:52:41 +0200 (CEST) Received: (qmail 17666 invoked by uid 500); 24 Aug 2016 16:52:40 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 17657 invoked by uid 99); 24 Aug 2016 16:52:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Aug 2016 16:52:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 5519FCCF5D for ; Wed, 24 Aug 2016 16:52:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.739 X-Spam-Level: X-Spam-Status: No, score=-3.739 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.519] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id CE2sWl1MNQng for ; Wed, 24 Aug 2016 16:52:39 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id B7DEB5F474 for ; Wed, 24 Aug 2016 16:52:38 +0000 (UTC) Received: (qmail 17458 invoked by uid 99); 24 Aug 2016 16:52:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Aug 2016 16:52:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2AA5EE0230; Wed, 24 Aug 2016 16:52:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.incubator.apache.org Date: Wed, 24 Aug 2016 16:52:38 -0000 Message-Id: <4ea0dbe7972f480ca184bee9636d8450@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-geode git commit: GEODE-1813: Don't pass fixed partition attributes to BucketRegionQueue archived-at: Wed, 24 Aug 2016 16:52:43 -0000 Repository: incubator-geode Updated Branches: refs/heads/develop 37269735e -> fb39f889a GEODE-1813: Don't pass fixed partition attributes to BucketRegionQueue The BucketRegionQueue should not inherit the fixed partition attributes from the parent region. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f253ad2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f253ad2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f253ad2f Branch: refs/heads/develop Commit: f253ad2f1e7c725a6497c434e9f8a23028bda8de Parents: 3726973 Author: Dan Smith Authored: Mon Aug 22 18:04:20 2016 -0700 Committer: Dan Smith Committed: Wed Aug 24 09:32:52 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/PartitionedRegion.java | 13 +++-- .../cache/wan/AsyncEventQueueTestBase.java | 55 ++++++++++++++++++++ .../asyncqueue/AsyncEventListenerDUnitTest.java | 29 +++++++++++ 3 files changed, 93 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f253ad2f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java index df9ceba..175a284 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java @@ -660,18 +660,23 @@ public class PartitionedRegion extends LocalRegion implements this.partitionListeners = this.partitionAttributes.getPartitionListeners(); this.colocatedWithRegion = ColocationHelper.getColocatedRegion(this); - if (colocatedWithRegion != null) { - //In colocation chain, child region inherita the fixed partitin attributes from parent region. - this.fixedPAttrs = colocatedWithRegion.getFixedPartitionAttributesImpl(); - this.fixedPASet = colocatedWithRegion.fixedPASet; + + if(colocatedWithRegion != null) { synchronized (colocatedWithRegion.colocatedByList) { colocatedWithRegion.colocatedByList.add(this); } } + + if (colocatedWithRegion != null && !internalRegionArgs.isUsedForParallelGatewaySenderQueue()) { + //In a colocation chain, the child region inherits the fixed partition attributes from parent region. + this.fixedPAttrs = colocatedWithRegion.getFixedPartitionAttributesImpl(); + this.fixedPASet = colocatedWithRegion.fixedPASet; + } else { this.fixedPAttrs = this.partitionAttributes.getFixedPartitionAttributes(); this.fixedPASet = 0; } + if (logger.isDebugEnabled()) { logger.debug("Partitioned Region {} constructed {}", regionname, (this.haveCacheLoader ? "with a cache loader" : "")); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f253ad2f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java index d7739c5..c66ef0b 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java @@ -53,6 +53,9 @@ import com.gemstone.gemfire.cache.Declarable; import com.gemstone.gemfire.cache.DiskStore; import com.gemstone.gemfire.cache.DiskStoreFactory; import com.gemstone.gemfire.cache.EntryEvent; +import com.gemstone.gemfire.cache.EntryOperation; +import com.gemstone.gemfire.cache.FixedPartitionAttributes; +import com.gemstone.gemfire.cache.FixedPartitionResolver; import com.gemstone.gemfire.cache.LoaderHelper; import com.gemstone.gemfire.cache.PartitionAttributesFactory; import com.gemstone.gemfire.cache.Region; @@ -628,6 +631,31 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase { } } + public static void createFixedPartitionedRegionWithAsyncEventQueue( + String regionName, String asyncEventQueueId, String partitionName, final List allPartitions, boolean offHeap) { + IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class + .getName()); + IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class + .getName()); + try { + AttributesFactory fact = new AttributesFactory(); + + PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + pfact.setTotalNumBuckets(16); + pfact.addFixedPartitionAttributes(FixedPartitionAttributes.createFixedPartition(partitionName,true)); + pfact.setPartitionResolver(new MyFixedPartitionResolver(allPartitions)); + fact.setPartitionAttributes(pfact.create()); + fact.setOffHeap(offHeap); + Region r = cache.createRegionFactory(fact.create()) + .addAsyncEventQueueId(asyncEventQueueId).create(regionName); + assertNotNull(r); + } + finally { + exp.remove(); + exp1.remove(); + } + } + public static void createColocatedPartitionedRegionWithAsyncEventQueue( String regionName, String asyncEventQueueId, Integer totalNumBuckets, String colocatedWith) { @@ -1616,6 +1644,33 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase { public boolean isOffHeap() { return false; } + + private static class MyFixedPartitionResolver implements FixedPartitionResolver { + + private final List allPartitions; + + public MyFixedPartitionResolver(final List allPartitions) { + this.allPartitions = allPartitions; + } + + @Override + public String getPartitionName(final EntryOperation opDetails, @Deprecated final Set targetPartitions) { + int hash = Math.abs(opDetails.getKey().hashCode() % allPartitions.size()); + return allPartitions.get(hash); + } + + @Override public Object getRoutingObject(final EntryOperation opDetails) { + return opDetails.getKey(); + } + + @Override public String getName() { + return getClass().getName(); + } + + @Override public void close() { + + } + } } class MyAsyncEventListener_CacheLoader implements AsyncEventListener { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f253ad2f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index f090402..4c78bdc 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -20,6 +20,7 @@ import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; import static org.junit.Assert.*; import static org.mockito.Matchers.any; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -135,6 +136,34 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm2size); } + @Test + public void testParallelAsyncEventQueueWithFixedPartition() { + Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); + + vm1.invoke(createCacheRunnable(lnPort)); + vm2.invoke(createCacheRunnable(lnPort)); + + vm1.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", + true, 100, 100, false, false, null, false )); + vm2.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", + true, 100, 100, false, false, null, false )); + + List allPartitions = Arrays.asList("part1", "part2"); + vm1.invoke(() -> AsyncEventQueueTestBase.createFixedPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", "part1", allPartitions, isOffHeap())); + vm2.invoke(() -> AsyncEventQueueTestBase.createFixedPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", "part2", allPartitions, isOffHeap())); + + vm1.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR", + 256 )); + + vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + + int vm1size = (Integer)vm1.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); + int vm2size = (Integer)vm2.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); + + assertEquals(vm1size + vm2size, 256); + } + protected SerializableRunnableIF pauseAsyncEventQueueRunnable() { return () -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ); }