geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject incubator-geode git commit: Adding an implementation of RepositoryManager for PartitionedRegions
Date Mon, 21 Sep 2015 17:36:28 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-11 ac7c9286d -> 378176869


Adding an implementation of RepositoryManager for PartitionedRegions

Adding a RepositoryManager that lazily creates IndexRepository instances
for partitioned region buckets.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/37817686
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/37817686
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/37817686

Branch: refs/heads/feature/GEODE-11
Commit: 37817686922d6ac993fdabc581bc3f911340c895
Parents: ac7c928
Author: Dan Smith <upthewaterspout@apache.org>
Authored: Fri Sep 18 17:46:00 2015 -0700
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Mon Sep 21 10:34:19 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/BucketRegion.java    |   2 +-
 .../cache/PartitionedRegionDataStore.java       |   2 +-
 .../util/concurrent/CopyOnWriteWeakHashMap.java |  12 ++
 .../lucene/internal/LuceneEventListener.java    |   6 +-
 .../LuceneIndexForPartitionedRegion.java        |   5 +-
 .../internal/PartitionedRepositoryManager.java  | 134 ++++++++++++++++
 .../internal/directory/RegionDirectory.java     |   9 ++
 .../internal/distributed/LuceneFunction.java    |   2 +-
 .../lucene/internal/filesystem/FileSystem.java  |   8 +
 .../repository/IndexRepositoryImpl.java         |  10 ++
 .../internal/repository/RepositoryManager.java  |   7 +-
 .../internal/LuceneEventListenerJUnitTest.java  |  13 +-
 .../PartitionedRepositoryManagerJUnitTest.java  | 157 +++++++++++++++++++
 .../directory/RegionDirectoryJUnitTest.java     |   9 --
 .../distributed/LuceneFunctionJUnitTest.java    |  12 +-
 15 files changed, 356 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index e9e1523..865a372 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -1992,7 +1992,7 @@ implements Bucket
     }
   }
 
-  public final int getId() {
+  public int getId() {
     return getBucketAdvisor().getProxyBucketRegion().getId();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
index d567b5a..2e2b0d7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
@@ -111,7 +111,7 @@ import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQue
  * @author tapshank
  * @author Mitch Thomas
  */
-public final class PartitionedRegionDataStore implements HasCachePerfStats
+public class PartitionedRegionDataStore implements HasCachePerfStats
 {
   private static final Logger logger = LogService.getLogger();
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java
index e4dc57a..982a3af 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java
@@ -44,6 +44,18 @@ public class CopyOnWriteWeakHashMap<K,V> extends AbstractMap<K,
V> {
     map = Collections.unmodifiableMap(tmp);
     return result;
   }
+  
+  public synchronized V putIfAbsent(K key, V value) {
+    V oldValue = map.get(key);
+    if(oldValue != null) {
+      return oldValue;
+    }
+    
+    WeakHashMap<K, V> tmp = new WeakHashMap<K, V>(map);
+    V result = tmp.put(key, value);
+    map = Collections.unmodifiableMap(tmp);
+    return result;
+  }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
index 4c289bf..049fb64 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
@@ -15,6 +15,7 @@ import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
 import com.gemstone.gemfire.internal.logging.LogService;
 
 /**
@@ -45,8 +46,9 @@ public class LuceneEventListener implements AsyncEventListener {
       for (AsyncEvent event : events) {
         Region region = event.getRegion();
         Object key = event.getKey();
+        Object callbackArgument = event.getCallbackArgument();
         
-        IndexRepository repository = repositoryManager.getRepository(region, key);
+        IndexRepository repository = repositoryManager.getRepository(region, key, callbackArgument);
 
         Operation op = event.getOperation();
 
@@ -68,7 +70,7 @@ public class LuceneEventListener implements AsyncEventListener {
         repo.commit();
       }
       return true;
-    } catch(IOException e) {
+    } catch(IOException | BucketNotFoundException e) {
       logger.error("Unable to save to lucene index", e);
       return false;
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index faddbbc..f648c9a 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -27,10 +27,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 /* wrapper of IndexWriter */
 public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
 
-
-  Cache cache;
-  // map to map bucketId to repo, -1 means its DR
-  HashMap<Integer, IndexRepository> indexRepositories = new HashMap<Integer, IndexRepository>();
+  private final Cache cache;
 
   public LuceneIndexForPartitionedRegion(String indexName, String regionPath, Cache cache)
{
     this.indexName = indexName;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
new file mode 100644
index 0000000..ba89a40
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -0,0 +1,134 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.LocalDataSet;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteWeakHashMap;
+
+/**
+ * Manages index repositories for partitioned regions.
+ * 
+ * This class lazily creates the IndexRepository for each individual
+ * bucket. If a Bucket is rebalanced, this class will create a new
+ * index repository when the bucket returns to this node.
+ */
+public class PartitionedRepositoryManager implements RepositoryManager {
+  /** map of the parent bucket region to the index repository 
+   * 
+   * This is based on the BucketRegion in case a bucket is rebalanced, we don't want to 
+   * return a stale index repository. If a bucket moves off of this node and
+   * comes back, it will have a new BucketRegion object.
+   * 
+   * It is weak so that the old BucketRegion will be garbage collected. 
+   */
+  CopyOnWriteWeakHashMap<BucketRegion, IndexRepository> indexRepositories = new CopyOnWriteWeakHashMap<BucketRegion,
IndexRepository>();
+  
+  /** The user region for this index */
+  private final PartitionedRegion userRegion;
+  
+  private final PartitionedRegion fileRegion;
+  private final PartitionedRegion chunkRegion;
+
+  private final LuceneSerializer serializer;
+  
+  /**
+   * 
+   * @param userRegion The user partition region
+   * @param fileRegion The partition region used for file metadata. Should be colocated with
the user pr
+   * @param chunkRegion The partition region users for chunk metadata.
+   * @param serializer The serializer that should be used for converting objects to lucene
docs.
+   */
+  public PartitionedRepositoryManager(PartitionedRegion userRegion, PartitionedRegion fileRegion,
+      PartitionedRegion chunkRegion,
+      LuceneSerializer serializer) {
+    this.userRegion = userRegion;
+    this.fileRegion = fileRegion;
+    this.chunkRegion = chunkRegion;
+    this.serializer = serializer;
+  }
+
+  @Override
+  public IndexRepository getRepository(Region region, Object key, Object callbackArg) throws
BucketNotFoundException {
+    BucketRegion userBucket = userRegion.getBucketRegion(key, callbackArg);
+    if(userBucket == null) {
+      throw new BucketNotFoundException("User bucket was not found for region " + region
+ "key " +  key + " callbackarg " + callbackArg);
+    }
+    
+    return getRepository(userBucket);
+  }
+  
+  @Override
+  public Collection<IndexRepository> getRepositories(Region region) throws BucketNotFoundException
{
+    if(!(region instanceof LocalDataSet)) {
+      throw new IllegalStateException("Trying to find the repositories for a region which
is not the local data set of a function");
+    }
+    
+    LocalDataSet dataSet = (LocalDataSet) region;
+    Set<Integer> buckets = dataSet.getBucketSet();
+    ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size());
+    for(Integer bucketId : buckets) {
+      BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(bucketId);
+      if(userBucket == null) {
+        throw new BucketNotFoundException("User bucket was not found for region " + region
+ "bucket id " + bucketId);
+      } else {
+        repos.add(getRepository(userBucket));
+      }
+    }
+
+    return repos;
+  }
+
+  /**
+   * Return the repository for a given user bucket
+   */
+  private IndexRepository getRepository(BucketRegion userBucket) throws BucketNotFoundException
{
+    IndexRepository repo = indexRepositories.get(userBucket);
+    if(repo == null) {
+      try {
+        Analyzer analyzer = new StandardAnalyzer();
+        RegionDirectory dir = new RegionDirectory(getMatchingBucket(userBucket, fileRegion),
getMatchingBucket(userBucket, chunkRegion));
+        IndexWriterConfig config = new IndexWriterConfig(analyzer);
+        IndexWriter writer = new IndexWriter(dir, config);
+        repo = new IndexRepositoryImpl(writer, serializer);
+        IndexRepository oldRepo = indexRepositories.putIfAbsent(userBucket, repo);
+        if(oldRepo != null) {
+          repo = oldRepo;
+        }
+      } catch(IOException e) {
+        throw new InternalGemFireError("Unable to create index repository", e);
+      }
+    }
+    
+    return repo;
+  }
+
+  /**
+   * Find the bucket in region2 that matches the bucket id from region1.
+   */
+  private BucketRegion getMatchingBucket(BucketRegion region1, PartitionedRegion region2)
throws BucketNotFoundException {
+    BucketRegion result = region2.getDataStore().getLocalBucketById(region1.getId());
+    if(result == null) {
+      throw new BucketNotFoundException("Bucket not found for region " + region2 + " bucekt
id " + region1.getId());
+    }
+    
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java
index f72dfec..38e7714 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java
@@ -101,4 +101,13 @@ public class RegionDirectory extends BaseDirectory {
     isOpen = false;
   }
 
+  /**
+   * For testing, the file system
+   */
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+  
+  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java
index ca28154..e2e33f3 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java
@@ -103,7 +103,7 @@ public class LuceneFunction extends FunctionAdapter {
 
   private Collection<IndexRepository> getIndexRepositories(RegionFunctionContext ctx,
Region region) throws BucketNotFoundException {
     synchronized (LuceneFunction.class) {
-      return repoManager.getRepositories(region, ctx);
+      return repoManager.getRepositories(region);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
index 2a7c22b..50b9f50 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java
@@ -125,4 +125,12 @@ public class FileSystem {
   void updateFile(File file) {
     fileRegion.put(file.getName(), file);
   }
+
+  public ConcurrentMap<String, File> getFileRegion() {
+    return fileRegion;
+  }
+
+  public ConcurrentMap<ChunkKey, byte[]> getChunkRegion() {
+    return chunkRegion;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
index 4c0b21b..5c248cf 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
@@ -75,4 +75,14 @@ public class IndexRepositoryImpl implements IndexRepository {
     writer.commit();
     searcherManager.maybeRefresh();
   }
+
+  public IndexWriter getWriter() {
+    return writer;
+  }
+
+  public LuceneSerializer getSerializer() {
+    return serializer;
+  }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
index a1d0f86..b61e8be 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
@@ -12,16 +12,15 @@ import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
  */
 public interface RepositoryManager {
 
-  IndexRepository getRepository(Region region, Object key);
+  IndexRepository getRepository(Region region, Object key, Object callbackArg) throws BucketNotFoundException;
 
   /**
    * Returns a collection of {@link IndexRepository} instances hosting index data of the
input list of bucket ids. The
    * bucket needs to be present on this member.
    * 
-   * @param region
-   * @param ctx function context for which {@link IndexRepository}s needs to be discovered.

+   * @param localDataSet The local data set of a function
    * @return a collection of {@link IndexRepository} instances
    * @throws BucketNotFoundException if any of the requested buckets is not found on this
member
    */
-  Collection<IndexRepository> getRepositories(Region region, RegionFunctionContext
ctx) throws BucketNotFoundException;
+  Collection<IndexRepository> getRepositories(Region localDataSet) throws BucketNotFoundException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
index f296fd5..617020b 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
@@ -20,6 +20,7 @@ import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
@@ -31,15 +32,17 @@ import com.gemstone.gemfire.test.junit.categories.UnitTest;
 public class LuceneEventListenerJUnitTest {
 
   @Test
-  public void testProcessBatch() throws IOException {
+  public void testProcessBatch() throws IOException, BucketNotFoundException {
     RepositoryManager manager = Mockito.mock(RepositoryManager.class);
     IndexRepository repo1 = Mockito.mock(IndexRepository.class);
     IndexRepository repo2 = Mockito.mock(IndexRepository.class);
     Region region1 = Mockito.mock(Region.class);
     Region region2 = Mockito.mock(Region.class);
-
-    Mockito.when(manager.getRepository(eq(region1), any())).thenReturn(repo1);
-    Mockito.when(manager.getRepository(eq(region2), any())).thenReturn(repo2);
+    
+    Object callback1 = new Object();
+    
+    Mockito.when(manager.getRepository(eq(region1), any(), eq(callback1))).thenReturn(repo1);
+    Mockito.when(manager.getRepository(eq(region2), any(), eq(null))).thenReturn(repo2);
 
     LuceneEventListener listener = new LuceneEventListener(manager);
 
@@ -50,8 +53,10 @@ public class LuceneEventListenerJUnitTest {
       AsyncEvent event = Mockito.mock(AsyncEvent.class);
 
       Region region = i % 2 == 0 ? region1 : region2;
+      Object callback = i % 2 == 0 ? callback1 : null;
       Mockito.when(event.getRegion()).thenReturn(region);
       Mockito.when(event.getKey()).thenReturn(i);
+      Mockito.when(event.getCallbackArgument()).thenReturn(callback);
 
       switch (i % 3) {
       case 0:

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
new file mode 100644
index 0000000..c7a2362
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
@@ -0,0 +1,157 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.index.IndexWriter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.LocalDataSet;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class PartitionedRepositoryManagerJUnitTest {
+
+  
+  private PartitionedRegion userRegion;
+  private PartitionedRegion fileRegion;
+  private PartitionedRegion chunkRegion;
+  private LuceneSerializer serializer;
+  private PartitionedRegionDataStore userDataStore;
+  private PartitionedRegionDataStore fileDataStore;
+  private PartitionedRegionDataStore chunkDataStore;
+  
+  private Map<Integer, BucketRegion> fileBuckets = new HashMap<Integer, BucketRegion>();
+  private Map<Integer, BucketRegion> chunkBuckets= new HashMap<Integer, BucketRegion>();
+
+  @Before
+  public void setUp() {
+    userRegion = Mockito.mock(PartitionedRegion.class);
+    userDataStore = Mockito.mock(PartitionedRegionDataStore.class);
+    Mockito.when(userRegion.getDataStore()).thenReturn(userDataStore);
+    
+    fileRegion = Mockito.mock(PartitionedRegion.class);
+    fileDataStore = Mockito.mock(PartitionedRegionDataStore.class);
+    Mockito.when(fileRegion.getDataStore()).thenReturn(fileDataStore);
+    chunkRegion = Mockito.mock(PartitionedRegion.class);
+    chunkDataStore = Mockito.mock(PartitionedRegionDataStore.class);
+    Mockito.when(chunkRegion.getDataStore()).thenReturn(chunkDataStore);
+    serializer = new HeterogenousLuceneSerializer(new String[] {"a", "b"} );  
+  }
+  
+  @Test
+  public void getByKey() throws BucketNotFoundException, IOException {
+    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion,
fileRegion, chunkRegion, serializer);
+    
+    BucketRegion mockBucket0 = getMockBucket(0);
+    BucketRegion mockBucket1 = getMockBucket(1);
+    
+    IndexRepositoryImpl repo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion,
0, null);
+    IndexRepositoryImpl repo1 = (IndexRepositoryImpl) repoManager.getRepository(userRegion,
1, null);
+    IndexRepositoryImpl repo113 = (IndexRepositoryImpl) repoManager.getRepository(userRegion,
113, null);
+
+    assertNotNull(repo0);
+    assertNotNull(repo1);
+    assertNotNull(repo113);
+    assertEquals(repo0, repo113);
+    assertNotEquals(repo0, repo1);
+    
+    checkRepository(repo0, 0);
+    checkRepository(repo1, 1);
+  }
+  
+  /**
+   * Test that we get the expected exception when a user bucket is missing
+   */
+  @Test(expected = BucketNotFoundException.class)
+  public void getMissingBucketByKey() throws BucketNotFoundException {
+    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion,
fileRegion, chunkRegion, serializer);
+    repoManager.getRepository(userRegion, 0, null);
+  }
+  
+  @Test
+  public void getByRegion() throws BucketNotFoundException {
+
+    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion,
fileRegion, chunkRegion, serializer);
+    
+    BucketRegion mockBucket0 = getMockBucket(0);
+    BucketRegion mockBucket1 = getMockBucket(1);
+
+    Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1));
+    LocalDataSet ldr = new LocalDataSet(null, buckets);
+    Collection<IndexRepository> repos = repoManager.getRepositories(ldr);
+    assertEquals(2, repos.size());
+
+    Iterator<IndexRepository> itr = repos.iterator();
+    IndexRepositoryImpl repo0 = (IndexRepositoryImpl) itr.next();
+    IndexRepositoryImpl repo1 = (IndexRepositoryImpl) itr.next();
+    
+    assertNotNull(repo0);
+    assertNotNull(repo1);
+    assertNotEquals(repo0, repo1);
+    
+    checkRepository(repo0, 0);
+    checkRepository(repo1, 1);
+  }
+  
+  /**
+   * Test that we get the expected exception when a user bucket is missing
+   */
+  @Test(expected = BucketNotFoundException.class)
+  public void getMissingBucketByRegion() throws BucketNotFoundException {
+    PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion,
fileRegion, chunkRegion, serializer);
+    
+    BucketRegion mockBucket0 = getMockBucket(0);
+
+    Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1));
+    LocalDataSet ldr = new LocalDataSet(userRegion, buckets);
+
+    repoManager.getRepositories(ldr);
+  }
+  
+  private void checkRepository(IndexRepositoryImpl repo0, int bucketId) {
+    IndexWriter writer0 = repo0.getWriter();
+    RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory();
+    assertEquals(fileBuckets.get(bucketId), dir0.getFileSystem().getFileRegion());
+    assertEquals(chunkBuckets.get(bucketId), dir0.getFileSystem().getChunkRegion());
+    assertEquals(serializer, repo0.getSerializer());
+  }
+  
+  private BucketRegion getMockBucket(int id) {
+    BucketRegion mockBucket = Mockito.mock(BucketRegion.class);
+    BucketRegion fileBucket = Mockito.mock(BucketRegion.class);
+    BucketRegion chunkBucket = Mockito.mock(BucketRegion.class);
+    Mockito.when(mockBucket.getId()).thenReturn(id);
+    Mockito.when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket);
+    Mockito.when(userDataStore.getLocalBucketById(eq(id))).thenReturn(mockBucket);
+    Mockito.when(userRegion.getBucketRegion(eq(id + 113), eq(null))).thenReturn(mockBucket);
+    Mockito.when(userDataStore.getLocalBucketById(eq(id + 113))).thenReturn(mockBucket);
+    Mockito.when(fileDataStore.getLocalBucketById(eq(id))).thenReturn(fileBucket);
+    Mockito.when(chunkDataStore.getLocalBucketById(eq(id))).thenReturn(chunkBucket);
+    
+    fileBuckets.put(id, fileBucket);
+    chunkBuckets.put(id, chunkBucket);
+    return mockBucket;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java
index cd88a46..9dd1d6b 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java
@@ -6,7 +6,6 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.lucene.store.BaseDirectoryTestCase;
 import org.apache.lucene.store.Directory;
-import org.junit.After;
 import org.junit.Rule;
 import org.junit.experimental.categories.Category;
 
@@ -27,14 +26,6 @@ public class RegionDirectoryJUnitTest extends BaseDirectoryTestCase {
   @Rule
   public SystemPropertiesRestoreRule restoreProps = new SystemPropertiesRestoreRule();
   
-//  @After
-//  public void clearLog4J() {
-//    //The lucene test ensures that no system properties
-//    //have been modified by the test. GFE leaves this property
-//    //set
-//    System.clearProperty("log4j.configurationFile");
-//  }
-  
   protected Directory getDirectory(Path path) throws IOException {
     
     //This is super lame, but log4j automatically sets the system property, and the lucene

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
index 24f8ad0..c354411 100644
--- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
@@ -68,7 +68,7 @@ public class LuceneFunctionJUnitTest {
         oneOf(mockContext).getArguments();
         will(returnValue(searchArgs));
 
-        oneOf(mockRepoManager).getRepositories(mockRegion, mockContext);
+        oneOf(mockRepoManager).getRepositories(mockRegion);
         will(returnValue(repos));
 
         oneOf(mockContext).getResultSender();
@@ -133,7 +133,7 @@ public class LuceneFunctionJUnitTest {
         oneOf(mockContext).getResultSender();
         will(returnValue(mockResultSender));
 
-        oneOf(mockRepoManager).getRepositories(mockRegion, mockContext);
+        oneOf(mockRepoManager).getRepositories(mockRegion);
         will(returnValue(repos));
 
         oneOf(mockRepository1).query(with(query), with(equal(0)), with(any(IndexResultCollector.class)));
@@ -192,7 +192,7 @@ public class LuceneFunctionJUnitTest {
         oneOf(mockContext).getResultSender();
         will(returnValue(mockResultSender));
 
-        oneOf(mockRepoManager).getRepositories(mockRegion, mockContext);
+        oneOf(mockRepoManager).getRepositories(mockRegion);
         repos.remove(0);
         will(returnValue(repos));
 
@@ -240,7 +240,7 @@ public class LuceneFunctionJUnitTest {
         oneOf(mockContext).getArguments();
         will(returnValue(searchArgs));
 
-        oneOf(mockRepoManager).getRepositories(mockRegion, mockContext);
+        oneOf(mockRepoManager).getRepositories(mockRegion);
         will(returnValue(repos));
 
         oneOf(mockContext).getResultSender();
@@ -267,7 +267,7 @@ public class LuceneFunctionJUnitTest {
         oneOf(mockContext).getArguments();
         will(returnValue(searchArgs));
 
-        oneOf(mockRepoManager).getRepositories(mockRegion, mockContext);
+        oneOf(mockRepoManager).getRepositories(mockRegion);
         will(throwException(new BucketNotFoundException("")));
 
         oneOf(mockContext).getResultSender();
@@ -300,7 +300,7 @@ public class LuceneFunctionJUnitTest {
         oneOf(mockManager).reduce(with(any(Collection.class)));
         will(throwException(new IOException()));
 
-        oneOf(mockRepoManager).getRepositories(mockRegion, mockContext);
+        oneOf(mockRepoManager).getRepositories(mockRegion);
         repos.remove(1);
         will(returnValue(repos));
 


Mime
View raw message