geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jensde...@apache.org
Subject [1/7] incubator-geode git commit: GEODE-1813: Don't pass fixed partition attributes to BucketRegionQueue
Date Thu, 25 Aug 2016 18:27:00 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1817 1cc598809 -> 67de9b94e


GEODE-1813: Don't pass fixed partition attributes to BucketRegionQueue

The BucketRegionQueue should not inherit the fixed partition attributes
from the parent region.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f253ad2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f253ad2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f253ad2f

Branch: refs/heads/feature/GEODE-1817
Commit: f253ad2f1e7c725a6497c434e9f8a23028bda8de
Parents: 3726973
Author: Dan Smith <upthewaterspout@apache.org>
Authored: Mon Aug 22 18:04:20 2016 -0700
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Wed Aug 24 09:32:52 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/PartitionedRegion.java       | 13 +++--
 .../cache/wan/AsyncEventQueueTestBase.java      | 55 ++++++++++++++++++++
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 29 +++++++++++
 3 files changed, 93 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f253ad2f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index df9ceba..175a284 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -660,18 +660,23 @@ public class PartitionedRegion extends LocalRegion implements
     this.partitionListeners = this.partitionAttributes.getPartitionListeners(); 
 
     this.colocatedWithRegion = ColocationHelper.getColocatedRegion(this);
-    if (colocatedWithRegion != null) {
-      //In colocation chain, child region inherita the fixed partitin attributes from parent
region.
-      this.fixedPAttrs = colocatedWithRegion.getFixedPartitionAttributesImpl();
-      this.fixedPASet = colocatedWithRegion.fixedPASet;
+
+    if(colocatedWithRegion != null) {
       synchronized (colocatedWithRegion.colocatedByList) {
         colocatedWithRegion.colocatedByList.add(this);
       }
     }
+
+    if (colocatedWithRegion != null && !internalRegionArgs.isUsedForParallelGatewaySenderQueue())
{
+      //In a colocation chain, the child region inherits the fixed partition attributes from
parent region.
+      this.fixedPAttrs = colocatedWithRegion.getFixedPartitionAttributesImpl();
+      this.fixedPASet = colocatedWithRegion.fixedPASet;
+    }
     else {
       this.fixedPAttrs = this.partitionAttributes.getFixedPartitionAttributes();
       this.fixedPASet = 0;
     }
+
     if (logger.isDebugEnabled()) {
       logger.debug("Partitioned Region {} constructed {}", regionname, (this.haveCacheLoader
? "with a cache loader" : ""));
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f253ad2f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
index d7739c5..c66ef0b 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -53,6 +53,9 @@ import com.gemstone.gemfire.cache.Declarable;
 import com.gemstone.gemfire.cache.DiskStore;
 import com.gemstone.gemfire.cache.DiskStoreFactory;
 import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.FixedPartitionAttributes;
+import com.gemstone.gemfire.cache.FixedPartitionResolver;
 import com.gemstone.gemfire.cache.LoaderHelper;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.Region;
@@ -628,6 +631,31 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase
{
     }
   }
 
+  public static void createFixedPartitionedRegionWithAsyncEventQueue(
+    String regionName, String asyncEventQueueId, String partitionName, final List<String>
allPartitions, boolean offHeap) {
+    IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+      .getName());
+    IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
+      .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      pfact.addFixedPartitionAttributes(FixedPartitionAttributes.createFixedPartition(partitionName,true));
+      pfact.setPartitionResolver(new MyFixedPartitionResolver(allPartitions));
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create())
+        .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+
   public static void createColocatedPartitionedRegionWithAsyncEventQueue(
       String regionName, String asyncEventQueueId, Integer totalNumBuckets,
       String colocatedWith) {
@@ -1616,6 +1644,33 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase
{
   public boolean isOffHeap() {
     return false;
   }
+
+  private static class MyFixedPartitionResolver implements FixedPartitionResolver {
+
+    private final List<String> allPartitions;
+
+    public MyFixedPartitionResolver(final List<String> allPartitions) {
+      this.allPartitions = allPartitions;
+    }
+
+    @Override
+    public String getPartitionName(final EntryOperation opDetails, @Deprecated final Set
targetPartitions) {
+      int hash =  Math.abs(opDetails.getKey().hashCode() % allPartitions.size());
+      return allPartitions.get(hash);
+    }
+
+    @Override public Object getRoutingObject(final EntryOperation opDetails) {
+      return opDetails.getKey();
+    }
+
+    @Override public String getName() {
+      return getClass().getName();
+    }
+
+    @Override public void close() {
+
+    }
+  }
 }
 
 class MyAsyncEventListener_CacheLoader implements AsyncEventListener {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f253ad2f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index f090402..4c78bdc 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -20,6 +20,7 @@ import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
 import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -135,6 +136,34 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
     assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm2size);
   }
 
+  @Test
+  public void testParallelAsyncEventQueueWithFixedPartition() {
+    Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(
1 ));
+
+    vm1.invoke(createCacheRunnable(lnPort));
+    vm2.invoke(createCacheRunnable(lnPort));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
+      true, 100, 100, false, false, null, false ));
+    vm2.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
+      true, 100, 100, false, false, null, false ));
+
+    List<String> allPartitions = Arrays.asList("part1", "part2");
+    vm1.invoke(() -> AsyncEventQueueTestBase.createFixedPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", "part1", allPartitions, isOffHeap()));
+    vm2.invoke(() -> AsyncEventQueueTestBase.createFixedPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", "part2", allPartitions, isOffHeap()));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR",
+      256 ));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
+    vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
+
+    int vm1size = (Integer)vm1.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize(
"ln"));
+    int vm2size = (Integer)vm2.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize(
"ln"));
+
+    assertEquals(vm1size + vm2size, 256);
+  }
+
   protected SerializableRunnableIF pauseAsyncEventQueueRunnable() {
     return () -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" );
   }


Mime
View raw message