geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [1/3] incubator-geode git commit: Changing how PartitionRepositoryManager handles destroyed buckets
Date Tue, 22 Sep 2015 22:17:00 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-11 8bd006ab8 -> 74512e59b


Changing how PartitionRepositoryManager handles destroyed buckets

This class was using a weak hash map with BucketRegions as keys. Instead
of that, use the bucket id as the key. I added support to
IndexRepository for the repository to indicate if the underlying
BucketRegion has been destroyed. If the IndexRepository is destroyed, we
will create a new IndexRepository in PartitionRepositoryManager.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/dc3e8f75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/dc3e8f75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/dc3e8f75

Branch: refs/heads/feature/GEODE-11
Commit: dc3e8f75c4bb26d0b3223709acffa426e208f301
Parents: 3ad1fe7
Author: Dan Smith <upthewaterspout@apache.org>
Authored: Tue Sep 22 14:22:35 2015 -0700
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Tue Sep 22 14:25:14 2015 -0700

----------------------------------------------------------------------
 .../internal/PartitionedRepositoryManager.java  | 33 +++++++++++++-------
 .../internal/repository/IndexRepository.java    |  7 +++++
 .../repository/IndexRepositoryImpl.java         | 12 +++++--
 .../PartitionedRepositoryManagerJUnitTest.java  | 26 +++++++++++++++
 .../IndexRepositoryImplJUnitTest.java           |  9 ++++--
 5 files changed, 70 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
index e301482..bcec1c9 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -20,7 +20,7 @@ import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
 import com.gemstone.gemfire.internal.cache.LocalDataSet;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteWeakHashMap;
+import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap;
 
 /**
  * Manages index repositories for partitioned regions.
@@ -38,7 +38,7 @@ public class PartitionedRepositoryManager implements RepositoryManager {
    * 
    * It is weak so that the old BucketRegion will be garbage collected. 
    */
-  CopyOnWriteWeakHashMap<BucketRegion, IndexRepository> indexRepositories = new CopyOnWriteWeakHashMap<BucketRegion,
IndexRepository>();
+  CopyOnWriteHashMap<Integer, IndexRepository> indexRepositories = new CopyOnWriteHashMap<Integer,
IndexRepository>();
   
   /** The user region for this index */
   private final PartitionedRegion userRegion;
@@ -73,7 +73,7 @@ public class PartitionedRepositoryManager implements RepositoryManager {
       throw new BucketNotFoundException("User bucket was not found for region " + region
+ "key " +  key + " callbackarg " + callbackArg);
     }
     
-    return getRepository(userBucket);
+    return getRepository(userBucket.getId());
   }
   
   @Override
@@ -90,7 +90,7 @@ public class PartitionedRepositoryManager implements RepositoryManager {
       if(userBucket == null) {
         throw new BucketNotFoundException("User bucket was not found for region " + region
+ "bucket id " + bucketId);
       } else {
-        repos.add(getRepository(userBucket));
+        repos.add(getRepository(userBucket.getId()));
       }
     }
 
@@ -100,15 +100,24 @@ public class PartitionedRepositoryManager implements RepositoryManager
{
   /**
    * Return the repository for a given user bucket
    */
-  private IndexRepository getRepository(BucketRegion userBucket) throws BucketNotFoundException
{
-    IndexRepository repo = indexRepositories.get(userBucket);
+  private IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException
{
+    IndexRepository repo = indexRepositories.get(bucketId);
+    
+    //Remove the repository if it has been destroyed (due to rebalancing)
+    if(repo != null && repo.isClosed()) {
+      indexRepositories.remove(bucketId, repo);
+      repo = null;
+    }
+    
     if(repo == null) {
       try {
-        RegionDirectory dir = new RegionDirectory(getMatchingBucket(userBucket, fileRegion),
getMatchingBucket(userBucket, chunkRegion));
+        BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId);
+        BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId);
+        RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket);
         IndexWriterConfig config = new IndexWriterConfig(analyzer);
         IndexWriter writer = new IndexWriter(dir, config);
-        repo = new IndexRepositoryImpl(writer, serializer);
-        IndexRepository oldRepo = indexRepositories.putIfAbsent(userBucket, repo);
+        repo = new IndexRepositoryImpl(fileBucket, writer, serializer);
+        IndexRepository oldRepo = indexRepositories.putIfAbsent(bucketId, repo);
         if(oldRepo != null) {
           repo = oldRepo;
         }
@@ -123,10 +132,10 @@ public class PartitionedRepositoryManager implements RepositoryManager
{
   /**
    * Find the bucket in region2 that matches the bucket id from region1.
    */
-  private BucketRegion getMatchingBucket(BucketRegion region1, PartitionedRegion region2)
throws BucketNotFoundException {
-    BucketRegion result = region2.getDataStore().getLocalBucketById(region1.getId());
+  private BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) throws
BucketNotFoundException {
+    BucketRegion result = region.getDataStore().getLocalBucketById(bucketId);
     if(result == null) {
-      throw new BucketNotFoundException("Bucket not found for region " + region2 + " bucekt
id " + region1.getId());
+      throw new BucketNotFoundException("Bucket not found for region " + region + " bucekt
id " + bucketId);
     }
     
     return result;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java
index 549bf21..b852b82 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepository.java
@@ -45,4 +45,11 @@ public interface IndexRepository {
    * @throws IOException 
    */
   void commit() throws IOException;
+  
+  /**
+   * Check to see if this repository is closed due to
+   * underlying resources being closed or destroyed
+   * @return true if this repository is closed.
+   */
+  public boolean isClosed();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
index 5c248cf..fbbc5db 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
@@ -11,6 +11,7 @@ import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.SearcherManager;
 import org.apache.lucene.search.TopDocs;
 
+import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.SerializerUtil;
 
@@ -26,8 +27,10 @@ public class IndexRepositoryImpl implements IndexRepository {
   private final IndexWriter writer;
   private final LuceneSerializer serializer;
   private final SearcherManager searcherManager;
+  private Region<?,?> region;
   
-  public IndexRepositoryImpl(IndexWriter writer, LuceneSerializer serializer) throws IOException
{
+  public IndexRepositoryImpl(Region<?,?> region, IndexWriter writer, LuceneSerializer
serializer) throws IOException {
+    this.region = region;
     this.writer = writer;
     searcherManager = new SearcherManager(writer, APPLY_ALL_DELETES, null);
     this.serializer = serializer;
@@ -83,6 +86,9 @@ public class IndexRepositoryImpl implements IndexRepository {
   public LuceneSerializer getSerializer() {
     return serializer;
   }
-  
-  
+
+  @Override
+  public boolean isClosed() {
+    return region.isDestroyed();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
index db1085a..367f4f2 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
@@ -83,6 +83,32 @@ public class PartitionedRepositoryManagerJUnitTest {
   }
   
   /**
+   * Test what happens when a bucket is destroyed.
+   */
+  @Test
+  public void destroyBucket() throws BucketNotFoundException, IOException {
+    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion,
fileRegion, chunkRegion, serializer, new StandardAnalyzer());
+    
+    BucketRegion mockBucket0 = getMockBucket(0);
+    
+    IndexRepositoryImpl repo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion,
0, null);
+
+    assertNotNull(repo0);
+    checkRepository(repo0, 0);
+    
+    BucketRegion fileBucket0 = fileBuckets.get(0);
+    
+    //Simulate rebalancing of a bucket by marking the old bucket is destroyed
+    //and creating a new bucket
+    Mockito.when(fileBucket0.isDestroyed()).thenReturn(true);
+    mockBucket0 = getMockBucket(0);
+    
+    IndexRepositoryImpl newRepo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion,
0, null);
+    assertNotEquals(repo0, newRepo0);
+    checkRepository(newRepo0, 0);
+  }
+  
+  /**
    * Test that we get the expected exception when a user bucket is missing
    */
   @Test(expected = BucketNotFoundException.class)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dc3e8f75/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
index 0b4a4cd..3a25c97 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
@@ -1,6 +1,6 @@
 package com.gemstone.gemfire.cache.lucene.internal.repository;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -18,7 +18,9 @@ import org.apache.lucene.queryparser.classic.QueryParser;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 
+import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
@@ -38,6 +40,7 @@ public class IndexRepositoryImplJUnitTest {
   private HeterogenousLuceneSerializer mapper;
   private StandardAnalyzer analyzer = new StandardAnalyzer();
   private IndexWriter writer;
+  private Region region;
 
   @Before
   public void setUp() throws IOException {
@@ -48,7 +51,9 @@ public class IndexRepositoryImplJUnitTest {
     writer = new IndexWriter(dir, config);
     String[] indexedFields= new String[] {"s", "i", "l", "d", "f", "s2", "missing"};
     mapper = new HeterogenousLuceneSerializer(indexedFields);
-    repo = new IndexRepositoryImpl(writer, mapper);
+    region = Mockito.mock(Region.class);
+    Mockito.when(region.isDestroyed()).thenReturn(false);
+    repo = new IndexRepositoryImpl(region, writer, mapper);
   }
   
   @Test


Mime
View raw message