Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 752C318E3C for ; Mon, 21 Sep 2015 17:36:37 +0000 (UTC) Received: (qmail 51572 invoked by uid 500); 21 Sep 2015 17:36:37 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 51543 invoked by uid 500); 21 Sep 2015 17:36:37 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 51534 invoked by uid 99); 21 Sep 2015 17:36:37 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Sep 2015 17:36:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A6F821A51C3 for ; Mon, 21 Sep 2015 17:36:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.226 X-Spam-Level: X-Spam-Status: No, score=-3.226 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id s7LEEAkW2chX for ; Mon, 21 Sep 2015 17:36:30 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 86E52209B7 for ; Mon, 21 Sep 2015 17:36:29 +0000 (UTC) Received: (qmail 51427 invoked by uid 99); 21 Sep 2015 17:36:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Sep 2015 17:36:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9DCA7E03C8; Mon, 21 Sep 2015 17:36:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-geode git commit: Adding an implementation of RepositoryManager for PartitionedRegions Date: Mon, 21 Sep 2015 17:36:28 +0000 (UTC) Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-11 ac7c9286d -> 378176869 Adding an implementation of RepositoryManager for PartitionedRegions Adding a RepositoryManager that lazily creates IndexRepository instances for partitioned region buckets. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/37817686 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/37817686 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/37817686 Branch: refs/heads/feature/GEODE-11 Commit: 37817686922d6ac993fdabc581bc3f911340c895 Parents: ac7c928 Author: Dan Smith Authored: Fri Sep 18 17:46:00 2015 -0700 Committer: Dan Smith Committed: Mon Sep 21 10:34:19 2015 -0700 ---------------------------------------------------------------------- .../gemfire/internal/cache/BucketRegion.java | 2 +- .../cache/PartitionedRegionDataStore.java | 2 +- .../util/concurrent/CopyOnWriteWeakHashMap.java | 12 ++ .../lucene/internal/LuceneEventListener.java | 6 +- .../LuceneIndexForPartitionedRegion.java | 5 +- .../internal/PartitionedRepositoryManager.java | 134 ++++++++++++++++ .../internal/directory/RegionDirectory.java | 9 ++ .../internal/distributed/LuceneFunction.java | 2 +- .../lucene/internal/filesystem/FileSystem.java | 8 + .../repository/IndexRepositoryImpl.java | 10 ++ .../internal/repository/RepositoryManager.java | 7 +- .../internal/LuceneEventListenerJUnitTest.java | 13 +- .../PartitionedRepositoryManagerJUnitTest.java | 157 +++++++++++++++++++ .../directory/RegionDirectoryJUnitTest.java | 9 -- .../distributed/LuceneFunctionJUnitTest.java | 12 +- 15 files changed, 356 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java index e9e1523..865a372 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java @@ -1992,7 +1992,7 @@ implements Bucket } } - public final int getId() { + public int getId() { return getBucketAdvisor().getProxyBucketRegion().getId(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java index d567b5a..2e2b0d7 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java @@ -111,7 +111,7 @@ import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQue * @author tapshank * @author Mitch Thomas */ -public final class PartitionedRegionDataStore implements HasCachePerfStats +public class PartitionedRegionDataStore implements HasCachePerfStats { private static final Logger logger = LogService.getLogger(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java index e4dc57a..982a3af 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/concurrent/CopyOnWriteWeakHashMap.java @@ -44,6 +44,18 @@ public class CopyOnWriteWeakHashMap extends AbstractMap { map = Collections.unmodifiableMap(tmp); return result; } + + public synchronized V putIfAbsent(K key, V value) { + V oldValue = map.get(key); + if(oldValue != null) { + return oldValue; + } + + WeakHashMap tmp = new WeakHashMap(map); + V result = tmp.put(key, value); + map = Collections.unmodifiableMap(tmp); + return result; + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java index 4c289bf..049fb64 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java @@ -15,6 +15,7 @@ import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; import com.gemstone.gemfire.cache.query.internal.DefaultQuery; +import com.gemstone.gemfire.internal.cache.BucketNotFoundException; import com.gemstone.gemfire.internal.logging.LogService; /** @@ -45,8 +46,9 @@ public class LuceneEventListener implements AsyncEventListener { for (AsyncEvent event : events) { Region region = event.getRegion(); Object key = event.getKey(); + Object callbackArgument = event.getCallbackArgument(); - IndexRepository repository = repositoryManager.getRepository(region, key); + IndexRepository repository = repositoryManager.getRepository(region, key, callbackArgument); Operation op = event.getOperation(); @@ -68,7 +70,7 @@ public class LuceneEventListener implements AsyncEventListener { repo.commit(); } return true; - } catch(IOException e) { + } catch(IOException | BucketNotFoundException e) { logger.error("Unable to save to lucene index", e); return false; } finally { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index faddbbc..f648c9a 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -27,10 +27,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings; /* wrapper of IndexWriter */ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { - - Cache cache; - // map to map bucketId to repo, -1 means its DR - HashMap indexRepositories = new HashMap(); + private final Cache cache; public LuceneIndexForPartitionedRegion(String indexName, String regionPath, Cache cache) { this.indexName = indexName; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java new file mode 100644 index 0000000..ba89a40 --- /dev/null +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java @@ -0,0 +1,134 @@ +package com.gemstone.gemfire.cache.lucene.internal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; + +import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl; +import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; +import com.gemstone.gemfire.internal.cache.BucketNotFoundException; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.LocalDataSet; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteWeakHashMap; + +/** + * Manages index repositories for partitioned regions. + * + * This class lazily creates the IndexRepository for each individual + * bucket. If a Bucket is rebalanced, this class will create a new + * index repository when the bucket returns to this node. + */ +public class PartitionedRepositoryManager implements RepositoryManager { + /** map of the parent bucket region to the index repository + * + * This is based on the BucketRegion in case a bucket is rebalanced, we don't want to + * return a stale index repository. If a bucket moves off of this node and + * comes back, it will have a new BucketRegion object. + * + * It is weak so that the old BucketRegion will be garbage collected. + */ + CopyOnWriteWeakHashMap indexRepositories = new CopyOnWriteWeakHashMap(); + + /** The user region for this index */ + private final PartitionedRegion userRegion; + + private final PartitionedRegion fileRegion; + private final PartitionedRegion chunkRegion; + + private final LuceneSerializer serializer; + + /** + * + * @param userRegion The user partition region + * @param fileRegion The partition region used for file metadata. Should be colocated with the user pr + * @param chunkRegion The partition region users for chunk metadata. + * @param serializer The serializer that should be used for converting objects to lucene docs. + */ + public PartitionedRepositoryManager(PartitionedRegion userRegion, PartitionedRegion fileRegion, + PartitionedRegion chunkRegion, + LuceneSerializer serializer) { + this.userRegion = userRegion; + this.fileRegion = fileRegion; + this.chunkRegion = chunkRegion; + this.serializer = serializer; + } + + @Override + public IndexRepository getRepository(Region region, Object key, Object callbackArg) throws BucketNotFoundException { + BucketRegion userBucket = userRegion.getBucketRegion(key, callbackArg); + if(userBucket == null) { + throw new BucketNotFoundException("User bucket was not found for region " + region + "key " + key + " callbackarg " + callbackArg); + } + + return getRepository(userBucket); + } + + @Override + public Collection getRepositories(Region region) throws BucketNotFoundException { + if(!(region instanceof LocalDataSet)) { + throw new IllegalStateException("Trying to find the repositories for a region which is not the local data set of a function"); + } + + LocalDataSet dataSet = (LocalDataSet) region; + Set buckets = dataSet.getBucketSet(); + ArrayList repos = new ArrayList(buckets.size()); + for(Integer bucketId : buckets) { + BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(bucketId); + if(userBucket == null) { + throw new BucketNotFoundException("User bucket was not found for region " + region + "bucket id " + bucketId); + } else { + repos.add(getRepository(userBucket)); + } + } + + return repos; + } + + /** + * Return the repository for a given user bucket + */ + private IndexRepository getRepository(BucketRegion userBucket) throws BucketNotFoundException { + IndexRepository repo = indexRepositories.get(userBucket); + if(repo == null) { + try { + Analyzer analyzer = new StandardAnalyzer(); + RegionDirectory dir = new RegionDirectory(getMatchingBucket(userBucket, fileRegion), getMatchingBucket(userBucket, chunkRegion)); + IndexWriterConfig config = new IndexWriterConfig(analyzer); + IndexWriter writer = new IndexWriter(dir, config); + repo = new IndexRepositoryImpl(writer, serializer); + IndexRepository oldRepo = indexRepositories.putIfAbsent(userBucket, repo); + if(oldRepo != null) { + repo = oldRepo; + } + } catch(IOException e) { + throw new InternalGemFireError("Unable to create index repository", e); + } + } + + return repo; + } + + /** + * Find the bucket in region2 that matches the bucket id from region1. + */ + private BucketRegion getMatchingBucket(BucketRegion region1, PartitionedRegion region2) throws BucketNotFoundException { + BucketRegion result = region2.getDataStore().getLocalBucketById(region1.getId()); + if(result == null) { + throw new BucketNotFoundException("Bucket not found for region " + region2 + " bucekt id " + region1.getId()); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java index f72dfec..38e7714 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectory.java @@ -101,4 +101,13 @@ public class RegionDirectory extends BaseDirectory { isOpen = false; } + /** + * For testing, the file system + */ + public FileSystem getFileSystem() { + return fs; + } + + + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java index ca28154..e2e33f3 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunction.java @@ -103,7 +103,7 @@ public class LuceneFunction extends FunctionAdapter { private Collection getIndexRepositories(RegionFunctionContext ctx, Region region) throws BucketNotFoundException { synchronized (LuceneFunction.class) { - return repoManager.getRepositories(region, ctx); + return repoManager.getRepositories(region); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java index 2a7c22b..50b9f50 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/filesystem/FileSystem.java @@ -125,4 +125,12 @@ public class FileSystem { void updateFile(File file) { fileRegion.put(file.getName(), file); } + + public ConcurrentMap getFileRegion() { + return fileRegion; + } + + public ConcurrentMap getChunkRegion() { + return chunkRegion; + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java index 4c0b21b..5c248cf 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java @@ -75,4 +75,14 @@ public class IndexRepositoryImpl implements IndexRepository { writer.commit(); searcherManager.maybeRefresh(); } + + public IndexWriter getWriter() { + return writer; + } + + public LuceneSerializer getSerializer() { + return serializer; + } + + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java index a1d0f86..b61e8be 100644 --- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java +++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java @@ -12,16 +12,15 @@ import com.gemstone.gemfire.internal.cache.BucketNotFoundException; */ public interface RepositoryManager { - IndexRepository getRepository(Region region, Object key); + IndexRepository getRepository(Region region, Object key, Object callbackArg) throws BucketNotFoundException; /** * Returns a collection of {@link IndexRepository} instances hosting index data of the input list of bucket ids. The * bucket needs to be present on this member. * - * @param region - * @param ctx function context for which {@link IndexRepository}s needs to be discovered. + * @param localDataSet The local data set of a function * @return a collection of {@link IndexRepository} instances * @throws BucketNotFoundException if any of the requested buckets is not found on this member */ - Collection getRepositories(Region region, RegionFunctionContext ctx) throws BucketNotFoundException; + Collection getRepositories(Region localDataSet) throws BucketNotFoundException; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java index f296fd5..617020b 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java @@ -20,6 +20,7 @@ import com.gemstone.gemfire.cache.Operation; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent; import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; +import com.gemstone.gemfire.internal.cache.BucketNotFoundException; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; import com.gemstone.gemfire.test.junit.categories.UnitTest; @@ -31,15 +32,17 @@ import com.gemstone.gemfire.test.junit.categories.UnitTest; public class LuceneEventListenerJUnitTest { @Test - public void testProcessBatch() throws IOException { + public void testProcessBatch() throws IOException, BucketNotFoundException { RepositoryManager manager = Mockito.mock(RepositoryManager.class); IndexRepository repo1 = Mockito.mock(IndexRepository.class); IndexRepository repo2 = Mockito.mock(IndexRepository.class); Region region1 = Mockito.mock(Region.class); Region region2 = Mockito.mock(Region.class); - - Mockito.when(manager.getRepository(eq(region1), any())).thenReturn(repo1); - Mockito.when(manager.getRepository(eq(region2), any())).thenReturn(repo2); + + Object callback1 = new Object(); + + Mockito.when(manager.getRepository(eq(region1), any(), eq(callback1))).thenReturn(repo1); + Mockito.when(manager.getRepository(eq(region2), any(), eq(null))).thenReturn(repo2); LuceneEventListener listener = new LuceneEventListener(manager); @@ -50,8 +53,10 @@ public class LuceneEventListenerJUnitTest { AsyncEvent event = Mockito.mock(AsyncEvent.class); Region region = i % 2 == 0 ? region1 : region2; + Object callback = i % 2 == 0 ? callback1 : null; Mockito.when(event.getRegion()).thenReturn(region); Mockito.when(event.getKey()).thenReturn(i); + Mockito.when(event.getCallbackArgument()).thenReturn(callback); switch (i % 3) { case 0: http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java new file mode 100644 index 0000000..c7a2362 --- /dev/null +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java @@ -0,0 +1,157 @@ +package com.gemstone.gemfire.cache.lucene.internal; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.lucene.index.IndexWriter; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; +import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer; +import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer; +import com.gemstone.gemfire.internal.cache.BucketNotFoundException; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.LocalDataSet; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class PartitionedRepositoryManagerJUnitTest { + + + private PartitionedRegion userRegion; + private PartitionedRegion fileRegion; + private PartitionedRegion chunkRegion; + private LuceneSerializer serializer; + private PartitionedRegionDataStore userDataStore; + private PartitionedRegionDataStore fileDataStore; + private PartitionedRegionDataStore chunkDataStore; + + private Map fileBuckets = new HashMap(); + private Map chunkBuckets= new HashMap(); + + @Before + public void setUp() { + userRegion = Mockito.mock(PartitionedRegion.class); + userDataStore = Mockito.mock(PartitionedRegionDataStore.class); + Mockito.when(userRegion.getDataStore()).thenReturn(userDataStore); + + fileRegion = Mockito.mock(PartitionedRegion.class); + fileDataStore = Mockito.mock(PartitionedRegionDataStore.class); + Mockito.when(fileRegion.getDataStore()).thenReturn(fileDataStore); + chunkRegion = Mockito.mock(PartitionedRegion.class); + chunkDataStore = Mockito.mock(PartitionedRegionDataStore.class); + Mockito.when(chunkRegion.getDataStore()).thenReturn(chunkDataStore); + serializer = new HeterogenousLuceneSerializer(new String[] {"a", "b"} ); + } + + @Test + public void getByKey() throws BucketNotFoundException, IOException { + PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer); + + BucketRegion mockBucket0 = getMockBucket(0); + BucketRegion mockBucket1 = getMockBucket(1); + + IndexRepositoryImpl repo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); + IndexRepositoryImpl repo1 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 1, null); + IndexRepositoryImpl repo113 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 113, null); + + assertNotNull(repo0); + assertNotNull(repo1); + assertNotNull(repo113); + assertEquals(repo0, repo113); + assertNotEquals(repo0, repo1); + + checkRepository(repo0, 0); + checkRepository(repo1, 1); + } + + /** + * Test that we get the expected exception when a user bucket is missing + */ + @Test(expected = BucketNotFoundException.class) + public void getMissingBucketByKey() throws BucketNotFoundException { + PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer); + repoManager.getRepository(userRegion, 0, null); + } + + @Test + public void getByRegion() throws BucketNotFoundException { + + PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer); + + BucketRegion mockBucket0 = getMockBucket(0); + BucketRegion mockBucket1 = getMockBucket(1); + + Set buckets = new LinkedHashSet(Arrays.asList(0, 1)); + LocalDataSet ldr = new LocalDataSet(null, buckets); + Collection repos = repoManager.getRepositories(ldr); + assertEquals(2, repos.size()); + + Iterator itr = repos.iterator(); + IndexRepositoryImpl repo0 = (IndexRepositoryImpl) itr.next(); + IndexRepositoryImpl repo1 = (IndexRepositoryImpl) itr.next(); + + assertNotNull(repo0); + assertNotNull(repo1); + assertNotEquals(repo0, repo1); + + checkRepository(repo0, 0); + checkRepository(repo1, 1); + } + + /** + * Test that we get the expected exception when a user bucket is missing + */ + @Test(expected = BucketNotFoundException.class) + public void getMissingBucketByRegion() throws BucketNotFoundException { + PartitionedRepositoryManager repoManager = new PartitionedRepositoryManager(userRegion, fileRegion, chunkRegion, serializer); + + BucketRegion mockBucket0 = getMockBucket(0); + + Set buckets = new LinkedHashSet(Arrays.asList(0, 1)); + LocalDataSet ldr = new LocalDataSet(userRegion, buckets); + + repoManager.getRepositories(ldr); + } + + private void checkRepository(IndexRepositoryImpl repo0, int bucketId) { + IndexWriter writer0 = repo0.getWriter(); + RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory(); + assertEquals(fileBuckets.get(bucketId), dir0.getFileSystem().getFileRegion()); + assertEquals(chunkBuckets.get(bucketId), dir0.getFileSystem().getChunkRegion()); + assertEquals(serializer, repo0.getSerializer()); + } + + private BucketRegion getMockBucket(int id) { + BucketRegion mockBucket = Mockito.mock(BucketRegion.class); + BucketRegion fileBucket = Mockito.mock(BucketRegion.class); + BucketRegion chunkBucket = Mockito.mock(BucketRegion.class); + Mockito.when(mockBucket.getId()).thenReturn(id); + Mockito.when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket); + Mockito.when(userDataStore.getLocalBucketById(eq(id))).thenReturn(mockBucket); + Mockito.when(userRegion.getBucketRegion(eq(id + 113), eq(null))).thenReturn(mockBucket); + Mockito.when(userDataStore.getLocalBucketById(eq(id + 113))).thenReturn(mockBucket); + Mockito.when(fileDataStore.getLocalBucketById(eq(id))).thenReturn(fileBucket); + Mockito.when(chunkDataStore.getLocalBucketById(eq(id))).thenReturn(chunkBucket); + + fileBuckets.put(id, fileBucket); + chunkBuckets.put(id, chunkBucket); + return mockBucket; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java index cd88a46..9dd1d6b 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/directory/RegionDirectoryJUnitTest.java @@ -6,7 +6,6 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.lucene.store.BaseDirectoryTestCase; import org.apache.lucene.store.Directory; -import org.junit.After; import org.junit.Rule; import org.junit.experimental.categories.Category; @@ -27,14 +26,6 @@ public class RegionDirectoryJUnitTest extends BaseDirectoryTestCase { @Rule public SystemPropertiesRestoreRule restoreProps = new SystemPropertiesRestoreRule(); -// @After -// public void clearLog4J() { -// //The lucene test ensures that no system properties -// //have been modified by the test. GFE leaves this property -// //set -// System.clearProperty("log4j.configurationFile"); -// } - protected Directory getDirectory(Path path) throws IOException { //This is super lame, but log4j automatically sets the system property, and the lucene http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/37817686/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java index 24f8ad0..c354411 100644 --- a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java +++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java @@ -68,7 +68,7 @@ public class LuceneFunctionJUnitTest { oneOf(mockContext).getArguments(); will(returnValue(searchArgs)); - oneOf(mockRepoManager).getRepositories(mockRegion, mockContext); + oneOf(mockRepoManager).getRepositories(mockRegion); will(returnValue(repos)); oneOf(mockContext).getResultSender(); @@ -133,7 +133,7 @@ public class LuceneFunctionJUnitTest { oneOf(mockContext).getResultSender(); will(returnValue(mockResultSender)); - oneOf(mockRepoManager).getRepositories(mockRegion, mockContext); + oneOf(mockRepoManager).getRepositories(mockRegion); will(returnValue(repos)); oneOf(mockRepository1).query(with(query), with(equal(0)), with(any(IndexResultCollector.class))); @@ -192,7 +192,7 @@ public class LuceneFunctionJUnitTest { oneOf(mockContext).getResultSender(); will(returnValue(mockResultSender)); - oneOf(mockRepoManager).getRepositories(mockRegion, mockContext); + oneOf(mockRepoManager).getRepositories(mockRegion); repos.remove(0); will(returnValue(repos)); @@ -240,7 +240,7 @@ public class LuceneFunctionJUnitTest { oneOf(mockContext).getArguments(); will(returnValue(searchArgs)); - oneOf(mockRepoManager).getRepositories(mockRegion, mockContext); + oneOf(mockRepoManager).getRepositories(mockRegion); will(returnValue(repos)); oneOf(mockContext).getResultSender(); @@ -267,7 +267,7 @@ public class LuceneFunctionJUnitTest { oneOf(mockContext).getArguments(); will(returnValue(searchArgs)); - oneOf(mockRepoManager).getRepositories(mockRegion, mockContext); + oneOf(mockRepoManager).getRepositories(mockRegion); will(throwException(new BucketNotFoundException(""))); oneOf(mockContext).getResultSender(); @@ -300,7 +300,7 @@ public class LuceneFunctionJUnitTest { oneOf(mockManager).reduce(with(any(Collection.class))); will(throwException(new IOException())); - oneOf(mockRepoManager).getRepositories(mockRegion, mockContext); + oneOf(mockRepoManager).getRepositories(mockRegion); repos.remove(1); will(returnValue(repos));