geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [13/19] incubator-geode git commit: GEODE-716: Getting the bucket creation lock when setting the CacheLoader
Date Thu, 21 Jan 2016 19:08:59 GMT
GEODE-716: Getting the bucket creation lock when setting the CacheLoader

Modifying the CacheLoader using AttributesMutator had a race condition
with partitioned regions where a bucket might never get the new cache
loader. By getting the bucket creation lock, we ensure that the bucket
will either read the cache loader value after it has been set or the
cache loader modification thread will set the cache loader on the
bucket.

I've also indicated that setCacheLoader should not be used for
partitioned regions because of some of the consistency concerns with the
API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e497a4d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e497a4d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e497a4d5

Branch: refs/heads/feature/GEODE-715
Commit: e497a4d5b6eb0ec8020a7ef76106e17c7006af0f
Parents: 942b89e
Author: Dan Smith <upthewaterspout@apache.org>
Authored: Tue Jan 5 15:12:51 2016 -0800
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Thu Jan 21 09:09:15 2016 -0800

----------------------------------------------------------------------
 .../gemfire/cache/AttributesMutator.java        | 11 ++-
 .../cache/PartitionedRegionDataStore.java       | 27 +++++---
 .../partitioned/PartitionedRegionObserver.java  |  5 ++
 .../PartitionedRegionObserverAdapter.java       |  4 ++
 .../PartitionedRegionDataStoreJUnitTest.java    | 73 ++++++++++++++++++++
 5 files changed, 108 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e497a4d5/gemfire-core/src/main/java/com/gemstone/gemfire/cache/AttributesMutator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/AttributesMutator.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/AttributesMutator.java
index 7e4111e..1d36536 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/AttributesMutator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/AttributesMutator.java
@@ -146,8 +146,15 @@ public interface AttributesMutator<K,V> {
    */
   public CacheWriter<K,V> setCacheWriter(CacheWriter<K,V> cacheWriter);
   
-  /** Changes the cache loader for the region.
-   * @param cacheLoader the cache loader
+  /**
+   * Changes the cache loader for the region.
+   * 
+   * Changing the cache loader for partitioned regions is not recommended due to
+   * the fact that it can result in an inconsistent cache loader configuration.
+   * This feature may be removed in future releases.
+   * 
+   * @param cacheLoader
+   *          the cache loader
    * @return the previous CacheLoader
    */
   public CacheLoader<K,V> setCacheLoader(CacheLoader<K,V> cacheLoader);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e497a4d5/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
index 31ba0db..a4480f9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
@@ -491,6 +491,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
                   // so that other VMs which discover the real bucket via a
                   // profile exchange can send messages to the data store and
                   // safely use the bucket.
+                  observer.beforeAssignBucket(this.partitionedRegion, possiblyFreeBucketId);
                   assignBucketRegion(bukReg.getId(), bukReg);
                   buk.setHosting(true);
                   bukReg.invokePartitionListenerAfterBucketCreated();
@@ -997,17 +998,23 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
    * sent by the partitioned region when its loader has changed
    */
   protected void cacheLoaderChanged(final CacheLoader newLoader, final CacheLoader oldLoader)
{
-    this.loader = newLoader;
-    visitBuckets(new BucketVisitor() {
-      @Override
-      public void visit(Integer bucketId, Region r) {
-        AttributesMutator mut = r.getAttributesMutator();
-        if (logger.isDebugEnabled()) {
-          logger.debug("setting new cache loader in bucket region: {}", newLoader);
+    StoppableWriteLock lock = this.bucketCreationLock.writeLock();
+    lock.lock();
+    try {
+      this.loader = newLoader;
+      visitBuckets(new BucketVisitor() {
+        @Override
+        public void visit(Integer bucketId, Region r) {
+          AttributesMutator mut = r.getAttributesMutator();
+          if (logger.isDebugEnabled()) {
+            logger.debug("setting new cache loader in bucket region: {}", newLoader);
+          }
+          mut.setCacheLoader(newLoader);
         }
-        mut.setCacheLoader(newLoader);
-      }
-    });
+      });
+    } finally {
+      lock.unlock();
+    }
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e497a4d5/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserver.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserver.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserver.java
index 0ca82fe..b70dd8a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserver.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserver.java
@@ -36,4 +36,9 @@ public interface PartitionedRegionObserver {
   public void beforeCalculatingStartingBucketId();
   
   public void beforeBucketCreation(PartitionedRegion region, int bucketId);
+  /**
+   * Called after a bucket region is created, but before it is added to the 
+   * map of buckets.
+   */
+  public void beforeAssignBucket(PartitionedRegion partitionedRegion, int bucketId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e497a4d5/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserverAdapter.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserverAdapter.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserverAdapter.java
index 95273b2..f2f08f2 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserverAdapter.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionObserverAdapter.java
@@ -40,4 +40,8 @@ public class PartitionedRegionObserverAdapter implements PartitionedRegionObserv
   @Override
   public void beforeBucketCreation(PartitionedRegion region, int bucketId) {
   }
+
+  @Override
+  public void beforeAssignBucket(PartitionedRegion partitionedRegion, int bucketId) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e497a4d5/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStoreJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStoreJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStoreJUnitTest.java
index 7bc8346..bdfec29 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStoreJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStoreJUnitTest.java
@@ -26,10 +26,16 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheLoader;
+import com.gemstone.gemfire.cache.CacheLoaderException;
+import com.gemstone.gemfire.cache.LoaderHelper;
 import com.gemstone.gemfire.cache.PartitionAttributes;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.RegionAttributes;
@@ -37,6 +43,14 @@ import com.gemstone.gemfire.cache.RegionFactory;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.PureLogWriter;
+import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionObserverAdapter;
+import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionObserverHolder;
 
 /**
  * This test checks functionality of the PartitionedRegionDatastore on a sinle
@@ -120,6 +134,65 @@ public class PartitionedRegionDataStoreJUnitTest
     assertEquals(pr.get(key), value);
 
   }
+  
+  @Test
+  public void testChangeCacheLoaderDuringBucketCreation() throws Exception
+  {
+    final PartitionedRegion pr = (PartitionedRegion)cache.createRegionFactory(RegionShortcut.PARTITION)
+        .create("testChangeCacheLoaderDuringBucketCreation");
+
+    //Add an observer which will block bucket creation and wait for a loader to be added
+    final CountDownLatch loaderAdded = new CountDownLatch(1);
+    final CountDownLatch bucketCreated = new CountDownLatch(1);
+    PartitionedRegionObserverHolder.setInstance(new PartitionedRegionObserverAdapter() {
+      @Override
+      public void beforeAssignBucket(PartitionedRegion partitionedRegion, int bucketId) {
+        try {
+          //Indicate that the bucket has been created
+          bucketCreated.countDown();
+          
+          //Wait for the loader to be added. if the synchronization
+          //is correct, this would wait for ever because setting the
+          //cache loader will wait for this method. So time out after
+          //1 second, which should be good enough to cause a failure
+          //if the synchronization is broken.
+          loaderAdded.await(1, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          throw new RuntimeException("Interrupted");
+        }
+      }
+    });
+    
+    Thread createBuckets = new Thread() {
+      public void run() {
+        PartitionRegionHelper.assignBucketsToPartitions(pr);
+      }
+    };
+    
+    createBuckets.start();
+    
+    CacheLoader loader = new CacheLoader() {
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public Object load(LoaderHelper helper) throws CacheLoaderException {
+        return null;
+      }
+    };
+    
+    bucketCreated.await();
+    pr.getAttributesMutator().setCacheLoader(loader);
+    loaderAdded.countDown();
+    createBuckets.join();
+    
+
+    //Assert that all buckets have received the cache loader
+    for(BucketRegion bucket: pr.getDataStore().getAllLocalBucketRegions()) {
+      assertEquals(loader, bucket.getCacheLoader()); 
+    }
+  }
 
   /**
    * This method checks whether the canAccomodateMoreBytesSafely returns false


Mime
View raw message