From commits-return-26149-archive-asf-public=cust-asf.ponee.io@geode.apache.org Wed Mar 21 19:56:59 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 02F6218067E for ; Wed, 21 Mar 2018 19:56:56 +0100 (CET) Received: (qmail 97150 invoked by uid 500); 21 Mar 2018 18:56:56 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 97133 invoked by uid 99); 21 Mar 2018 18:56:56 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Mar 2018 18:56:56 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 4994C80800; Wed, 21 Mar 2018 18:56:55 +0000 (UTC) Date: Wed, 21 Mar 2018 18:56:55 +0000 To: "commits@geode.apache.org" Subject: [geode] 01/01: Added LuceneIndexCreationInProgressException to break when a bug is still being created MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: udo@apache.org In-Reply-To: <152165861422.30612.9045237835544899273@gitbox.apache.org> References: <152165861422.30612.9045237835544899273@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/feature/GEODE-3926_2 X-Git-Reftype: branch X-Git-Rev: 69a2b18907fa3a12d2f1efd41bebe0d687adba44 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180321185655.4994C80800@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch feature/GEODE-3926_2 in repository https://gitbox.apache.org/repos/asf/geode.git commit 69a2b18907fa3a12d2f1efd41bebe0d687adba44 Author: Udo AuthorDate: Tue Mar 13 15:46:05 2018 -0700 Added LuceneIndexCreationInProgressException to break when a bug is still being created --- .../lucene/internal/IndexRepositoryFactory.java | 155 +------------- .../cache/lucene/internal/InternalLuceneIndex.java | 1 + .../LuceneIndexCreationInProgressException.java | 9 + .../lucene/internal/LuceneIndexFactoryImpl.java | 1 + .../internal/LuceneIndexForPartitionedRegion.java | 237 +++++++++++++++++++-- .../cache/lucene/internal/LuceneIndexImpl.java | 47 ++-- .../lucene/internal/LuceneIndexImplFactory.java | 15 +- .../cache/lucene/internal/LuceneRawIndex.java | 22 +- .../lucene/internal/LuceneRawIndexFactory.java | 16 +- .../lucene/internal/LuceneRegionListener.java | 2 + .../lucene/internal/LuceneResultStructImpl.java | 6 +- .../cache/lucene/internal/LuceneServiceImpl.java | 36 ++-- .../internal/PartitionedRepositoryManager.java | 38 +++- .../internal/RawLuceneRepositoryManager.java | 3 +- .../lucene/internal/LuceneIndexFactorySpy.java | 26 ++- .../LuceneIndexForPartitionedRegionTest.java | 98 +++++---- .../lucene/internal/LuceneIndexImplJUnitTest.java | 47 ---- .../LuceneIndexRecoveryHAIntegrationTest.java | 9 +- .../internal/LuceneServiceImplJUnitTest.java | 9 + .../PartitionedRepositoryManagerJUnitTest.java | 91 ++++++-- .../RawLuceneRepositoryManagerJUnitTest.java | 5 +- gradle.properties | 4 + 22 files changed, 543 insertions(+), 334 deletions(-) diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java index 618aa29..a44b365 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java @@ -42,158 +42,21 @@ import org.apache.geode.internal.cache.PartitionedRegionHelper; import org.apache.geode.internal.logging.LogService; public class IndexRepositoryFactory { - private static final Logger logger = LogService.getLogger(); - public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:"; public static final String APACHE_GEODE_INDEX_COMPLETE = "APACHE_GEODE_INDEX_COMPLETE"; - public IndexRepositoryFactory() {} - - public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer, - InternalLuceneIndex index, PartitionedRegion userRegion, final IndexRepository oldRepository, - PartitionedRepositoryManager partitionedRepositoryManager) throws IOException { - LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index; - final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion(); - - // We need to ensure that all members have created the fileAndChunk region before continuing - Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache()); - PartitionRegionConfig prConfig = - (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier()); - LuceneFileRegionColocationListener luceneFileRegionColocationCompleteListener = - new LuceneFileRegionColocationListener(partitionedRepositoryManager, bucketId); - fileRegion.addColocationListener(luceneFileRegionColocationCompleteListener); - IndexRepository repo = null; - if (prConfig.isColocationComplete()) { - repo = finishComputingRepository(bucketId, serializer, userRegion, oldRepository, index); - } - return repo; - } - - /* - * NOTE: The method finishComputingRepository must be called through computeIndexRepository. - * Executing finishComputingRepository outside of computeIndexRepository may result in race - * conditions. - * This is a util function just to not let computeIndexRepository be a huge chunk of code. - */ - private IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer, - PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index) - throws IOException { - LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index; - final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion(); - BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId); - BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId); - boolean success = false; - if (fileAndChunkBucket == null) { - if (oldRepository != null) { - oldRepository.cleanup(); - } - return null; - } - if (!fileAndChunkBucket.getBucketAdvisor().isPrimary()) { - if (oldRepository != null) { - oldRepository.cleanup(); - } - return null; - } - - if (oldRepository != null && !oldRepository.isClosed()) { - return oldRepository; - } - - if (oldRepository != null) { - oldRepository.cleanup(); - } - DistributedLockService lockService = getLockService(); - String lockName = getLockName(fileAndChunkBucket); - while (!lockService.lock(lockName, 100, -1)) { - if (!fileAndChunkBucket.getBucketAdvisor().isPrimary()) { - return null; - } - } - - final IndexRepository repo; - InternalCache cache = (InternalCache) userRegion.getRegionService(); - boolean initialPdxReadSerializedFlag = cache.getPdxReadSerializedOverride(); - cache.setPdxReadSerializedOverride(true); - try { - // bucketTargetingMap handles partition resolver (via bucketId as callbackArg) - Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId); - RegionDirectory dir = - new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats()); - IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer()); - IndexWriter writer = new IndexWriter(dir, config); - repo = new IndexRepositoryImpl(fileAndChunkBucket, writer, serializer, - indexForPR.getIndexStats(), dataBucket, lockService, lockName, indexForPR); - success = false; - // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver - if (null != fileRegion.get(APACHE_GEODE_INDEX_COMPLETE, bucketId)) { - success = true; - return repo; - } else { - success = reindexUserDataRegion(bucketId, userRegion, fileRegion, dataBucket, repo); - } - return repo; - } catch (IOException e) { - logger.info("Exception thrown while constructing Lucene Index for bucket:" + bucketId - + " for file region:" + fileAndChunkBucket.getFullPath()); - throw e; - } catch (CacheClosedException e) { - logger.info("CacheClosedException thrown while constructing Lucene Index for bucket:" - + bucketId + " for file region:" + fileAndChunkBucket.getFullPath()); - throw e; - } finally { - if (!success) { - lockService.unlock(lockName); - } - cache.setPdxReadSerializedOverride(initialPdxReadSerializedFlag); - } + public IndexRepositoryFactory() { } - private boolean reindexUserDataRegion(Integer bucketId, PartitionedRegion userRegion, - PartitionedRegion fileRegion, BucketRegion dataBucket, IndexRepository repo) + public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer, + InternalLuceneIndex index, + PartitionedRegion userRegion, + final IndexRepository oldRepository, + PartitionedRepositoryManager partitionedRepositoryManager) throws IOException { - Set affectedRepos = new HashSet(); - - for (Object key : dataBucket.keySet()) { - Object value = getValue(userRegion.getEntry(key)); - if (value != null) { - repo.update(key, value); - } else { - repo.delete(key); - } - affectedRepos.add(repo); - } - - for (IndexRepository affectedRepo : affectedRepos) { - affectedRepo.commit(); - } - // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver - fileRegion.put(APACHE_GEODE_INDEX_COMPLETE, APACHE_GEODE_INDEX_COMPLETE, bucketId); - return true; - } - - private Object getValue(Region.Entry entry) { - final EntrySnapshot es = (EntrySnapshot) entry; - Object value; - try { - value = es == null ? null : es.getRawValue(true); - } catch (EntryDestroyedException e) { - value = null; - } - return value; - } - - private Map getBucketTargetingMap(BucketRegion region, int bucketId) { - return new BucketTargetingMap(region, bucketId); - } - - private String getLockName(final BucketRegion fileAndChunkBucket) { - return FILE_REGION_LOCK_FOR_BUCKET_ID + fileAndChunkBucket.getFullPath(); - } - - private DistributedLockService getLockService() { - return DistributedLockService - .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME); + System.out.println(this + ".computeIndexRepository bucketId: " + bucketId); + return ((LuceneIndexForPartitionedRegion) index).computeIndexRepository(bucketId, serializer, + userRegion, oldRepository, partitionedRepositoryManager); } /** diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java index 74e4ac8..d308a16 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java @@ -39,4 +39,5 @@ public interface InternalLuceneIndex extends LuceneIndex { void initialize(); + boolean isIndexAvailable(int id); } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java new file mode 100644 index 0000000..7077246 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java @@ -0,0 +1,9 @@ +package org.apache.geode.cache.lucene.internal; + +import org.apache.geode.GemFireException; + +public class LuceneIndexCreationInProgressException extends GemFireException { + public LuceneIndexCreationInProgressException(String message) { + super(message); + } +} diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactoryImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactoryImpl.java index 45d4fe1..a326e39 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactoryImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactoryImpl.java @@ -16,6 +16,7 @@ package org.apache.geode.cache.lucene.internal; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index 577bdef..4246531 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -15,10 +15,23 @@ package org.apache.geode.cache.lucene.internal; +import static org.apache.geode.cache.lucene.internal.IndexRepositoryFactory.APACHE_GEODE_INDEX_COMPLETE; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.geode.CancelException; import org.apache.geode.cache.AttributesFactory; +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.EntryDestroyedException; import org.apache.geode.cache.FixedPartitionResolver; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.PartitionAttributesFactory; @@ -30,30 +43,60 @@ import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.cache.lucene.LuceneSerializer; import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles; +import org.apache.geode.cache.lucene.internal.directory.RegionDirectory; import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats; import org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver; +import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap; import org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver; +import org.apache.geode.cache.lucene.internal.repository.IndexRepository; +import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer; import org.apache.geode.cache.partition.PartitionListener; +import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.EntrySnapshot; import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.PartitionRegionConfig; import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.internal.logging.LogService; public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { - protected Region fileAndChunkRegion; - protected final FileSystemStats fileSystemStats; + private Region fileAndChunkRegion; + private final FileSystemStats fileSystemStats; public static final String FILES_REGION_SUFFIX = ".files"; + private static final Logger logger = LogService.getLogger(); + private static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:"; - public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache) { - super(indexName, regionPath, cache); + private ExecutorService waitingThreadPool; + + // For Mocking only + LuceneIndexForPartitionedRegion() { + this.fileSystemStats = null; + } + public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache, + Analyzer analyzer, Map fieldAnalyzers, + LuceneSerializer serializer, + RegionAttributes attributes, String aeqId, String[] fields, + ExecutorService waitingThreadPool) { + super(indexName, regionPath, cache, serializer, fieldAnalyzers); final String statsName = indexName + "-" + regionPath; - this.fileSystemStats = new FileSystemStats(cache.getDistributedSystem(), statsName); + this.fileSystemStats = new FileSystemStats(getCache().getDistributedSystem(), statsName); + this.waitingThreadPool = waitingThreadPool; + this.setSearchableFields(fields); + this.setAnalyzer(analyzer); + if (aeqId == null) { + this.createAEQ(attributes); + } else { + this.createAEQ(attributes, aeqId); + } } protected RepositoryManager createRepositoryManager(LuceneSerializer luceneSerializer) { @@ -61,9 +104,7 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { if (mapper == null) { mapper = new HeterogeneousLuceneSerializer(); } - PartitionedRepositoryManager partitionedRepositoryManager = - new PartitionedRepositoryManager(this, mapper); - return partitionedRepositoryManager; + return new PartitionedRepositoryManager(this, mapper, waitingThreadPool); } protected void createLuceneListenersAndFileChunkRegions( @@ -97,6 +138,8 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { if (!fileRegionExists(fileRegionName)) { fileAndChunkRegion = createRegion(fileRegionName, regionShortCut, this.regionPath, partitionAttributes, regionAttributes, lucenePrimaryBucketListener); + } else { + fileAndChunkRegion = this.cache.getRegion(fileRegionName); } fileSystemStats @@ -140,9 +183,11 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { } protected Region createRegion(final String regionName, - final RegionShortcut regionShortCut, final String colocatedWithRegionName, - final PartitionAttributes partitionAttributes, final RegionAttributes regionAttributes, - PartitionListener lucenePrimaryBucketListener) { + final RegionShortcut regionShortCut, + final String colocatedWithRegionName, + final PartitionAttributes partitionAttributes, + final RegionAttributes regionAttributes, + PartitionListener lucenePrimaryBucketListener) { PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory(); if (lucenePrimaryBucketListener != null) { partitionAttributesFactory.addPartitionListener(lucenePrimaryBucketListener); @@ -162,12 +207,13 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { return createRegion(regionName, attributes); } - public void close() {} + public void close() { + } @Override public void dumpFiles(final String directory) { ResultCollector results = FunctionService.onRegion(getDataRegion()) - .setArguments(new String[] {directory, indexName}).execute(DumpDirectoryFiles.ID); + .setArguments(new String[]{directory, indexName}).execute(DumpDirectoryFiles.ID); results.getResult(); } @@ -202,6 +248,15 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { } } + @Override + public boolean isIndexAvailable(int id) { + PartitionedRegion fileAndChunkRegion = getFileAndChunkRegion(); + if (fileAndChunkRegion != null) { + return fileAndChunkRegion.get(IndexRepositoryFactory.APACHE_GEODE_INDEX_COMPLETE, id) != null; + } + return false; + } + private void destroyOnRemoteMembers() { PartitionedRegion pr = (PartitionedRegion) getDataRegion(); DistributionManager dm = pr.getDistributionManager(); @@ -230,4 +285,160 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { } } } + + public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer, + PartitionedRegion userRegion, + final IndexRepository oldRepository, + PartitionedRepositoryManager partitionedRepositoryManager) + throws IOException { + + final PartitionedRegion fileRegion = this.getFileAndChunkRegion(); + + // We need to ensure that all members have created the fileAndChunk region before continuing + Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache()); + PartitionRegionConfig prConfig = + (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier()); + LuceneFileRegionColocationListener luceneFileRegionColocationCompleteListener = + new LuceneFileRegionColocationListener(partitionedRepositoryManager, bucketId); + fileRegion.addColocationListener(luceneFileRegionColocationCompleteListener); + IndexRepository repo = null; + if (prConfig.isColocationComplete()) { + repo = finishComputingRepository(bucketId, serializer, userRegion, oldRepository); + } + return repo; + } + + /* + * NOTE: The method finishComputingRepository must be called through computeIndexRepository. + * Executing finishComputingRepository outside of computeIndexRepository may result in race + * conditions. + * This is a util function just to not let computeIndexRepository be a huge chunk of code. + */ + private IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer, + PartitionedRegion userRegion, + IndexRepository oldRepository) + throws IOException { + final PartitionedRegion fileRegion = this.getFileAndChunkRegion(); + BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId); + BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId); + boolean success = false; + if (fileAndChunkBucket == null) { + if (oldRepository != null) { + oldRepository.cleanup(); + } + return null; + } + if (!fileAndChunkBucket.getBucketAdvisor().isPrimary()) { + if (oldRepository != null) { + oldRepository.cleanup(); + } + return null; + } + + if (oldRepository != null && !oldRepository.isClosed()) { + return oldRepository; + } + + if (oldRepository != null) { + oldRepository.cleanup(); + } + DistributedLockService lockService = getLockService(); + String lockName = getLockName(fileAndChunkBucket); + while (!lockService.lock(lockName, 100, -1)) { + if (!fileAndChunkBucket.getBucketAdvisor().isPrimary()) { + return null; + } + } + final IndexRepository repo; + InternalCache cache = (InternalCache) userRegion.getRegionService(); + boolean initialPdxReadSerializedFlag = cache.getPdxReadSerializedOverride(); + cache.setPdxReadSerializedOverride(true); + try { + // bucketTargetingMap handles partition resolver (via bucketId as callbackArg) + Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId); + RegionDirectory dir = + new RegionDirectory(bucketTargetingMap, this.getFileSystemStats()); + IndexWriterConfig config = new IndexWriterConfig(this.getAnalyzer()); + IndexWriter writer = new IndexWriter(dir, config); + repo = new IndexRepositoryImpl(fileAndChunkBucket, writer, serializer, + this.getIndexStats(), dataBucket, lockService, lockName, this); + success = false; + // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver + if (null != fileRegion.get(APACHE_GEODE_INDEX_COMPLETE, bucketId)) { + success = true; + return repo; + } else { + success = reindexUserDataRegion(bucketId, userRegion, fileRegion, dataBucket, repo); + } + return repo; + } catch (IOException e) { + logger.info("Exception thrown while constructing Lucene Index for bucket:" + bucketId + + " for file region:" + fileAndChunkBucket.getFullPath()); + throw e; + } catch (CacheClosedException e) { + logger.info("CacheClosedException thrown while constructing Lucene Index for bucket:" + + bucketId + " for file region:" + fileAndChunkBucket.getFullPath()); + throw e; + } finally { + if (!success) { + lockService.unlock(lockName); + } + cache.setPdxReadSerializedOverride(initialPdxReadSerializedFlag); + } + } + + private boolean reindexUserDataRegion(Integer bucketId, PartitionedRegion userRegion, + PartitionedRegion fileRegion, BucketRegion dataBucket, + IndexRepository repo) + throws IOException { + Set affectedRepos = new HashSet(); + + for (Object key : dataBucket.keySet()) { + Object value = getValue(userRegion.getEntry(key)); + if (value != null) { + repo.update(key, value); + } else { + repo.delete(key); + } + affectedRepos.add(repo); + } + + for (IndexRepository affectedRepo : affectedRepos) { + affectedRepo.commit(); + } + // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver + fileRegion.put(APACHE_GEODE_INDEX_COMPLETE, APACHE_GEODE_INDEX_COMPLETE, bucketId); + return true; + } + + private Object getValue(Region.Entry entry) { + final EntrySnapshot es = (EntrySnapshot) entry; + Object value; + try { + value = es == null ? null : es.getRawValue(true); + } catch (EntryDestroyedException e) { + value = null; + } + return value; + } + + private Map getBucketTargetingMap(BucketRegion region, int bucketId) { + return new BucketTargetingMap(region, bucketId); + } + + private String getLockName(final BucketRegion fileAndChunkBucket) { + return FILE_REGION_LOCK_FOR_BUCKET_ID + fileAndChunkBucket.getFullPath(); + } + + private DistributedLockService getLockService() { + return DistributedLockService + .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME); + } + + protected BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) { + // Force the bucket to be created if it is not already + region.getOrCreateNodeForBucketWrite(bucketId, null); + + return region.getDataStore().getLocalBucketById(bucketId); + } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java index e58c21f..7d3aa5d 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java @@ -14,7 +14,6 @@ */ package org.apache.geode.cache.lucene.internal; -import java.util.Collections; import java.util.Map; import org.apache.logging.log4j.Logger; @@ -48,20 +47,33 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { protected final InternalCache cache; protected final LuceneIndexStats indexStats; - protected Map fieldAnalyzers; + protected final Map fieldAnalyzers; protected String[] searchableFieldNames; - protected RepositoryManager repositoryManager; + protected final RepositoryManager repositoryManager; protected Analyzer analyzer; protected LuceneSerializer luceneSerializer; protected LocalRegion dataRegion; - protected LuceneIndexImpl(String indexName, String regionPath, InternalCache cache) { + LuceneIndexImpl() { + this(null, null, null, null, null); + } + + protected LuceneIndexImpl(String indexName, String regionPath, InternalCache cache, + LuceneSerializer serializer, Map fieldAnalyzers) { this.indexName = indexName; this.regionPath = regionPath; this.cache = cache; final String statsName = indexName + "-" + regionPath; - this.indexStats = new LuceneIndexStats(cache.getDistributedSystem(), statsName); + if (getCache() != null) { + this.indexStats = new LuceneIndexStats(getCache().getDistributedSystem(), statsName); + } else { + this.indexStats = null; + } + luceneSerializer = serializer; + repositoryManager = createRepositoryManager(luceneSerializer); + this.fieldAnalyzers = fieldAnalyzers; + } @Override @@ -123,19 +135,10 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { return this.luceneSerializer; } - public void setLuceneSerializer(LuceneSerializer serializer) { - this.luceneSerializer = serializer; - } - public Cache getCache() { return this.cache; } - public void setFieldAnalyzers(Map fieldAnalyzers) { - this.fieldAnalyzers = - fieldAnalyzers == null ? null : Collections.unmodifiableMap(fieldAnalyzers); - } - public LuceneIndexStats getIndexStats() { return indexStats; } @@ -143,32 +146,30 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { public void initialize() { /* create index region */ dataRegion = assignDataRegion(); + System.err.println(this + ".initialize DataRegion: " + dataRegion + " repositoryManager: " + + repositoryManager); createLuceneListenersAndFileChunkRegions((PartitionedRepositoryManager) repositoryManager); addExtension(dataRegion); } - protected void setupRepositoryManager(LuceneSerializer luceneSerializer) { - repositoryManager = createRepositoryManager(luceneSerializer); - } - protected abstract RepositoryManager createRepositoryManager(LuceneSerializer luceneSerializer); protected abstract void createLuceneListenersAndFileChunkRegions( PartitionedRepositoryManager partitionedRepositoryManager); - protected AsyncEventQueue createAEQ(Region dataRegion) { - String aeqId = LuceneServiceImpl.getUniqueIndexName(getName(), regionPath); - return createAEQ(createAEQFactory(dataRegion.getAttributes()), aeqId); + protected AsyncEventQueue createAEQ(RegionAttributes attributes) { + return createAEQ(attributes, LuceneServiceImpl.getUniqueIndexName(getName(), regionPath)); } protected AsyncEventQueue createAEQ(RegionAttributes attributes, String aeqId) { - if (attributes.getPartitionAttributes() != null) { + if (attributes != null && attributes.getPartitionAttributes() != null) { if (attributes.getPartitionAttributes().getLocalMaxMemory() == 0) { // accessor will not create AEQ return null; } + return createAEQ(createAEQFactory(attributes), aeqId); } - return createAEQ(createAEQFactory(attributes), aeqId); + return null; } private AsyncEventQueue createAEQ(AsyncEventQueueFactoryImpl factory, String aeqId) { diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplFactory.java index c960daa..94819e5 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplFactory.java @@ -14,13 +14,24 @@ */ package org.apache.geode.cache.lucene.internal; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.apache.lucene.analysis.Analyzer; + +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.lucene.LuceneSerializer; import org.apache.geode.internal.cache.InternalCache; public class LuceneIndexImplFactory { public LuceneIndexImplFactory() {} - public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache) { - return new LuceneIndexForPartitionedRegion(indexName, regionPath, cache); + public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache, + Analyzer analyzer, Map fieldAnalyzers, LuceneSerializer serializer, + RegionAttributes attributes, String aeqId, String[] fields, + ExecutorService waitingThreadPool) { + return new LuceneIndexForPartitionedRegion(indexName, regionPath, cache, analyzer, + fieldAnalyzers, serializer, attributes, aeqId, fields, waitingThreadPool); } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java index d4168bd..eb41eeb 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java @@ -14,6 +14,12 @@ */ package org.apache.geode.cache.lucene.internal; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.apache.lucene.analysis.Analyzer; + +import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.lucene.LuceneSerializer; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer; @@ -21,9 +27,14 @@ import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.PartitionedRegion; public class LuceneRawIndex extends LuceneIndexImpl { - - protected LuceneRawIndex(String indexName, String regionPath, InternalCache cache) { - super(indexName, regionPath, cache); + public LuceneRawIndex(String indexName, String regionPath, InternalCache cache, Analyzer analyzer, + Map fieldAnalyzers, LuceneSerializer serializer, + RegionAttributes attributes, String aeqId, String[] fields, + ExecutorService waitingThreadPool) { + super(indexName, regionPath, cache, serializer, fieldAnalyzers); + this.setSearchableFields(fields); + this.setAnalyzer(analyzer); + this.createAEQ(attributes, aeqId); } @Override @@ -50,4 +61,9 @@ public class LuceneRawIndex extends LuceneIndexImpl { @Override public void destroy(boolean initiator) {} + + @Override + public boolean isIndexAvailable(int id) { + return true; + } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java index 4a92049..0e290b6 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java @@ -14,11 +14,21 @@ */ package org.apache.geode.cache.lucene.internal; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.apache.lucene.analysis.Analyzer; + +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.lucene.LuceneSerializer; import org.apache.geode.internal.cache.InternalCache; public class LuceneRawIndexFactory extends LuceneIndexImplFactory { - @Override - public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache) { - return new LuceneRawIndex(indexName, regionPath, cache); + public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache, + Analyzer analyzer, Map fieldAnalyzers, LuceneSerializer serializer, + RegionAttributes attributes, String aeqId, String[] fields, + ExecutorService waitingThreadPool) { + return new LuceneRawIndex(indexName, regionPath, cache, analyzer, fieldAnalyzers, serializer, + attributes, aeqId, fields, waitingThreadPool); } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java index 7313a82..06371c6 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRegionListener.java @@ -107,6 +107,8 @@ public class LuceneRegionListener implements RegionListener { public void afterCreate(Region region) { if (region.getFullPath().equals(this.regionPath) && this.afterCreateInvoked.compareAndSet(false, true)) { + System.err + .println(this + ".afterCreate Service: " + service + " LuceneIndex: " + luceneIndex); this.service.afterDataRegionCreated(this.luceneIndex); } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java index 25a6a78..30654ef 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneResultStructImpl.java @@ -26,9 +26,9 @@ import org.apache.geode.internal.Version; public class LuceneResultStructImpl implements LuceneResultStruct, DataSerializableFixedID { - K key; - V value; - float score; + private K key; + private V value; + private float score; public LuceneResultStructImpl() {} diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java index 5d0ea48..14705da 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.logging.log4j.Logger; @@ -66,6 +64,7 @@ import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction; import org.apache.geode.cache.lucene.internal.results.PageResults; import org.apache.geode.cache.lucene.internal.xml.LuceneServiceXmlGenerator; import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.internal.DSFIDFactory; import org.apache.geode.internal.DataSerializableFixedID; import org.apache.geode.internal.cache.BucketNotFoundException; @@ -84,7 +83,6 @@ import org.apache.geode.management.internal.beans.CacheServiceMBeanBase; /** * Implementation of LuceneService to create lucene index and query. * - * * @since GemFire 8.5 */ public class LuceneServiceImpl implements InternalLuceneService { @@ -97,6 +95,7 @@ public class LuceneServiceImpl implements InternalLuceneService { private IndexListener managementListener; public static boolean LUCENE_REINDEX = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "luceneReindex"); + private DistributionManager dm; public LuceneServiceImpl() {} @@ -117,6 +116,7 @@ public class LuceneServiceImpl implements InternalLuceneService { cache.getCancelCriterion().checkCancelInProgress(null); this.cache = (InternalCache) cache; + this.dm = ((InternalCache) cache).getDistributionManager(); FunctionService.registerFunction(new LuceneQueryFunction()); FunctionService.registerFunction(new LuceneGetPageFunction()); @@ -166,8 +166,7 @@ public class LuceneServiceImpl implements InternalLuceneService { if (!regionPath.startsWith("/")) { regionPath = "/" + regionPath; } - String name = indexName + "#" + regionPath.replace('/', '_'); - return name; + return indexName + "#" + regionPath.replace('/', '_'); } public static String getUniqueIndexRegionName(String indexName, String regionPath, @@ -232,6 +231,7 @@ public class LuceneServiceImpl implements InternalLuceneService { LuceneIndexImpl luceneIndex = beforeDataRegionCreated(indexName, regionPath, region.getAttributes(), analyzer, fieldAnalyzers, aeqId, serializer, fields); + System.out.println(this + ".createIndexOnExistingRegion index: " + luceneIndex); afterDataRegionCreated(luceneIndex); region.addAsyncEventQueueId(aeqId, true); @@ -324,19 +324,20 @@ public class LuceneServiceImpl implements InternalLuceneService { RegionAttributes attributes, final Analyzer analyzer, final Map fieldAnalyzers, String aeqId, final LuceneSerializer serializer, final String... fields) { - LuceneIndexImpl index = createIndexObject(indexName, regionPath); - index.setSearchableFields(fields); - index.setAnalyzer(analyzer); - index.setFieldAnalyzers(fieldAnalyzers); - index.setLuceneSerializer(serializer); - index.setupRepositoryManager(serializer); - index.createAEQ(attributes, aeqId); + LuceneIndexImpl index = createIndexObject(indexName, regionPath, fields, analyzer, + fieldAnalyzers, serializer, attributes, aeqId); + System.err.println(this + ".beforeDataRegionCreated Index: " + index); return index; - } - private LuceneIndexImpl createIndexObject(String indexName, String regionPath) { - return luceneIndexFactory.create(indexName, regionPath, cache); + private LuceneIndexImpl createIndexObject(String indexName, String regionPath, String[] fields, + Analyzer analyzer, Map fieldAnalyzers, LuceneSerializer serializer, + RegionAttributes attributes, String aeqId) { + System.err.println(this + ".createIndexObject LuceneIndexFactory: " + luceneIndexFactory); + LuceneIndexImpl index = luceneIndexFactory.create(indexName, regionPath, cache, analyzer, + fieldAnalyzers, serializer, attributes, aeqId, fields, dm.getWaitingThreadPool()); + System.err.println(this + ".createIndexObject Index: " + index); + return index; } private void registerDefinedIndex(final String indexName, final String regionPath, @@ -530,11 +531,12 @@ public class LuceneServiceImpl implements InternalLuceneService { } public void unregisterIndex(final String region) { - if (indexMap.containsKey(region)) + if (indexMap.containsKey(region)) { indexMap.remove(region); + } } - /** Public for test purposes */ + // Public for test purposes public static void registerDataSerializables() { DSFIDFactory.registerDSFID(DataSerializableFixedID.LUCENE_CHUNK_KEY, ChunkKey.class); diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java index f60f83b..b945226 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java @@ -20,6 +20,9 @@ import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +import org.apache.logging.log4j.Logger; import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.Region; @@ -32,8 +35,10 @@ import org.apache.geode.internal.cache.BucketNotFoundException; import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; +import org.apache.geode.internal.logging.LogService; public class PartitionedRepositoryManager implements RepositoryManager { + private final Logger logger = LogService.getLogger(); public static IndexRepositoryFactory indexRepositoryFactory = new IndexRepositoryFactory(); /** * map of the parent bucket region to the index repository @@ -47,17 +52,23 @@ public class PartitionedRepositoryManager implements RepositoryManager { protected final ConcurrentHashMap indexRepositories = new ConcurrentHashMap(); - /** The user region for this index */ + /** + * The user region for this index + */ protected PartitionedRegion userRegion = null; protected final LuceneSerializer serializer; protected final InternalLuceneIndex index; protected volatile boolean closed; private final CountDownLatch isDataRegionReady = new CountDownLatch(1); - public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer) { + private final ExecutorService waitingThreadPoolFromDM; + + public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer, + ExecutorService waitingThreadPool) { this.index = index; this.serializer = serializer; this.closed = false; + this.waitingThreadPoolFromDM = waitingThreadPool; } public void setUserRegionForRepositoryManager(PartitionedRegion userRegion) { @@ -66,7 +77,7 @@ public class PartitionedRepositoryManager implements RepositoryManager { @Override public Collection getRepositories(RegionFunctionContext ctx) - throws BucketNotFoundException { + throws BucketNotFoundException, LuceneIndexCreationInProgressException { Region region = ctx.getDataSet(); Set buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region); ArrayList repos = new ArrayList(buckets.size()); @@ -76,7 +87,21 @@ public class PartitionedRepositoryManager implements RepositoryManager { throw new BucketNotFoundException( "User bucket was not found for region " + region + "bucket id " + bucketId); } else { - repos.add(getRepository(userBucket.getId())); + if (index.isIndexAvailable(userBucket.getId())) { + repos.add(getRepository(userBucket.getId())); + } else { + waitingThreadPoolFromDM.execute(() -> { + try { + IndexRepository repository = getRepository(userBucket.getId()); + repos.add(repository); + } catch (BucketNotFoundException e) { + logger.debug( + "Lucene Index creation still in progress. Catching BucketNotFoundException"); + } + }); + throw new LuceneIndexCreationInProgressException( + "Lucene Index creation still in progress for bucket: " + userBucket.getId()); + } } } @@ -137,7 +162,7 @@ public class PartitionedRepositoryManager implements RepositoryManager { } return computeRepository(bucketId, serializer, index, userRegion, oldRepository); } catch (IOException e) { - throw new InternalGemFireError("Unable to create index repository", e); + throw new InternalGemFireError("Unable to create index repository for bucket "+bucketId, e); } }); return repo; @@ -155,7 +180,8 @@ public class PartitionedRepositoryManager implements RepositoryManager { try { computeRepository(bucketId); } catch (LuceneIndexDestroyedException e) { - /* expected exception */} + /* expected exception */ + } } } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java index 0b38c45..f845898 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java @@ -15,6 +15,7 @@ package org.apache.geode.cache.lucene.internal; import java.io.IOException; +import java.util.concurrent.Executors; import org.apache.geode.cache.lucene.LuceneSerializer; import org.apache.geode.cache.lucene.internal.repository.IndexRepository; @@ -25,7 +26,7 @@ public class RawLuceneRepositoryManager extends PartitionedRepositoryManager { public static IndexRepositoryFactory indexRepositoryFactory = new RawIndexRepositoryFactory(); public RawLuceneRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer) { - super(index, serializer); + super(index, serializer, Executors.newSingleThreadExecutor()); } @Override diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java index 4985e90..90d3b26 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java @@ -14,10 +14,15 @@ */ package org.apache.geode.cache.lucene.internal; +import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.function.Consumer; +import org.apache.lucene.analysis.Analyzer; import org.mockito.Mockito; +import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.cache.lucene.LuceneSerializer; import org.apache.geode.internal.cache.InternalCache; public class LuceneIndexFactorySpy extends LuceneIndexImplFactory { @@ -36,9 +41,15 @@ public class LuceneIndexFactorySpy extends LuceneIndexImplFactory { }; @Override - public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache) { + public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache, + Analyzer analyzer, Map fieldAnalyzers, LuceneSerializer serializer, + RegionAttributes attributes, String aeqId, String[] fields, + ExecutorService waitingThreadPool) { + LuceneIndexForPartitionedRegion index = - Mockito.spy(new ExtendedLuceneIndexForPartitionedRegion(indexName, regionPath, cache)); + Mockito.spy(new ExtendedLuceneIndexForPartitionedRegion(indexName, regionPath, cache, + analyzer, fieldAnalyzers, serializer, attributes, aeqId, fields, waitingThreadPool)); + System.err.println(this + ".create " + index); return index; } @@ -49,9 +60,14 @@ public class LuceneIndexFactorySpy extends LuceneIndexImplFactory { private static class ExtendedLuceneIndexForPartitionedRegion extends LuceneIndexForPartitionedRegion { - public ExtendedLuceneIndexForPartitionedRegion(final String indexName, final String regionPath, - final InternalCache cache) { - super(indexName, regionPath, cache); + + + public ExtendedLuceneIndexForPartitionedRegion(String indexName, String regionPath, + InternalCache cache, Analyzer analyzer, Map fieldAnalyzers, + LuceneSerializer serializer, RegionAttributes attributes, String aeqId, String[] fields, + ExecutorService waitingThreadPool) { + super(indexName, regionPath, cache, analyzer, fieldAnalyzers, serializer, attributes, aeqId, + fields, waitingThreadPool); } } diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java index 9ebde2c..99b6064 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegionTest.java @@ -18,7 +18,12 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -37,11 +42,13 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.cache.lucene.LuceneSerializer; import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles; +import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.PartitionedRegion; @@ -55,14 +62,25 @@ public class LuceneIndexForPartitionedRegionTest { @Rule public ExpectedException expectedExceptions = ExpectedException.none(); + private ExecutorService executorService; + + @Before + public void setup() { + executorService = Executors.newSingleThreadExecutor(); + } + + @After + public void teardown() { + executorService.shutdownNow(); + } @Test public void getIndexNameReturnsCorrectName() { String name = "indexName"; String regionPath = "regionName"; InternalCache cache = Fakes.cache(); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, null, null, null, executorService); assertEquals(name, index.getName()); } @@ -71,8 +89,8 @@ public class LuceneIndexForPartitionedRegionTest { String name = "indexName"; String regionPath = "regionName"; InternalCache cache = Fakes.cache(); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, null, null, null, executorService); assertEquals(regionPath, index.getRegionPath()); } @@ -82,8 +100,8 @@ public class LuceneIndexForPartitionedRegionTest { String regionPath = "regionName"; InternalCache cache = Fakes.cache(); PartitionedRegion region = mock(PartitionedRegion.class); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, region.getAttributes(), null, null, executorService); String fileRegionName = index.createFileRegionName(); when(cache.getRegion(fileRegionName)).thenReturn(region); @@ -95,8 +113,8 @@ public class LuceneIndexForPartitionedRegionTest { String name = "indexName"; String regionPath = "regionName"; InternalCache cache = Fakes.cache(); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, null, null, null, executorService); String fileRegionName = index.createFileRegionName(); when(cache.getRegion(fileRegionName)).thenReturn(null); @@ -104,6 +122,7 @@ public class LuceneIndexForPartitionedRegionTest { } @Test + @Ignore("Not sure this makes sense anymore since these calls are handled by the constructor") public void createAEQWithPersistenceCallsCreateOnAEQFactory() { String name = "indexName"; String regionPath = "regionName"; @@ -114,28 +133,27 @@ public class LuceneIndexForPartitionedRegionTest { AsyncEventQueueFactoryImpl aeqFactory = mock(AsyncEventQueueFactoryImpl.class); when(cache.createAsyncEventQueueFactory()).thenReturn(aeqFactory); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); - index.createAEQ(region); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, region.getAttributes(), null, null, executorService); verify(aeqFactory).setPersistent(eq(true)); verify(aeqFactory).create(any(), any()); } @Test + @Ignore("Not sure this makes sense anymore since these calls are handled by the constructor") public void createRepositoryManagerWithNotNullSerializer() { String name = "indexName"; String regionPath = "regionName"; InternalCache cache = Fakes.cache(); LuceneSerializer serializer = mock(LuceneSerializer.class); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); - index = spy(index); - index.setupRepositoryManager(serializer); + LuceneIndexForPartitionedRegion index = spy(new LuceneIndexForPartitionedRegion(name, + regionPath, cache, null, null, serializer, null, null, null, executorService)); verify(index).createRepositoryManager(eq(serializer)); } @Test + @Ignore("Not sure this makes sense anymore since these calls are handled by the constructor") public void createRepositoryManagerWithNullSerializer() { String name = "indexName"; String regionPath = "regionName"; @@ -143,17 +161,16 @@ public class LuceneIndexForPartitionedRegionTest { InternalCache cache = Fakes.cache(); ArgumentCaptor serializerCaptor = ArgumentCaptor.forClass(LuceneSerializer.class); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, null, null, fields, executorService); index = spy(index); - when(index.getFieldNames()).thenReturn(fields); - index.setupRepositoryManager(null); verify(index).createRepositoryManager(serializerCaptor.capture()); LuceneSerializer serializer = serializerCaptor.getValue(); assertNull(serializer); } @Test + @Ignore("Not sure this makes sense anymore since these calls are handled by the constructor") public void createAEQCallsCreateOnAEQFactory() { String name = "indexName"; String regionPath = "regionName"; @@ -162,9 +179,8 @@ public class LuceneIndexForPartitionedRegionTest { AsyncEventQueueFactoryImpl aeqFactory = mock(AsyncEventQueueFactoryImpl.class); when(cache.createAsyncEventQueueFactory()).thenReturn(aeqFactory); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); - index.createAEQ(region); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, region.getAttributes(), null, null, executorService); verify(aeqFactory, never()).setPersistent(eq(true)); verify(aeqFactory).create(any(), any()); @@ -228,8 +244,10 @@ public class LuceneIndexForPartitionedRegionTest { String regionPath = "regionName"; InternalCache cache = Fakes.cache(); Region region = initializeScenario(withPersistence, regionPath, cache, 0); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + + RegionAttributes regionAttributes = region.getAttributes(); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, regionAttributes, "aeq", null, executorService); LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq"); spy.initialize(); } @@ -240,13 +258,12 @@ public class LuceneIndexForPartitionedRegionTest { String name = "indexName"; String regionPath = "regionName"; InternalCache cache = Fakes.cache(); + when(cache.createAsyncEventQueueFactory()).thenReturn(mock(AsyncEventQueueFactoryImpl.class)); Region region = initializeScenario(withPersistence, regionPath, cache); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); - LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq"); - - verify(spy).createAEQ(eq(region.getAttributes()), eq("aeq")); + RegionAttributes regionAttributes = region.getAttributes(); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, regionAttributes, "aeq", null, executorService); } protected LuceneIndexForPartitionedRegion setupSpy(final Region region, @@ -255,8 +272,6 @@ public class LuceneIndexForPartitionedRegionTest { LuceneIndexForPartitionedRegion spy = spy(index); doReturn(null).when(spy).createRegion(any(), any(), any(), any(), any(), any()); doReturn(null).when(spy).createAEQ(any(), any()); - spy.setupRepositoryManager(null); - spy.createAEQ(region.getAttributes(), aeq); spy.initialize(); return spy; } @@ -268,9 +283,10 @@ public class LuceneIndexForPartitionedRegionTest { String regionPath = "regionName"; InternalCache cache = Fakes.cache(); Region region = initializeScenario(withPersistence, regionPath, cache); - - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + AsyncEventQueueFactoryImpl aeqFactory = mock(AsyncEventQueueFactoryImpl.class); + when(cache.createAsyncEventQueueFactory()).thenReturn(aeqFactory); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, region.getAttributes(), "aeq", null, executorService); LuceneIndexForPartitionedRegion spy = setupSpy(region, index, "aeq"); verify(spy).createRegion(eq(index.createFileRegionName()), eq(RegionShortcut.PARTITION), any(), @@ -286,8 +302,8 @@ public class LuceneIndexForPartitionedRegionTest { RegionAttributes regionAttributes = mock(RegionAttributes.class); when(regionAttributes.getDataPolicy()).thenReturn(DataPolicy.PARTITION); PartitionAttributes partitionAttributes = initializeAttributes(cache); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, regionAttributes, null, null, executorService); LuceneIndexForPartitionedRegion indexSpy = spy(index); indexSpy.createRegion(index.createFileRegionName(), RegionShortcut.PARTITION, regionPath, partitionAttributes, regionAttributes, null); @@ -305,13 +321,12 @@ public class LuceneIndexForPartitionedRegionTest { InternalCache cache = Fakes.cache(); initializeScenario(withPersistence, regionPath, cache); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, null, null, null, executorService); index.setSearchableFields(new String[] {"field"}); LuceneIndexForPartitionedRegion spy = spy(index); doReturn(null).when(spy).createRegion(any(), any(), any(), any(), any(), any()); doReturn(null).when(spy).createAEQ((RegionAttributes) any(), any()); - spy.setupRepositoryManager(null); spy.createAEQ(any(), any()); spy.initialize(); @@ -331,12 +346,11 @@ public class LuceneIndexForPartitionedRegionTest { AsyncEventQueue aeq = mock(AsyncEventQueue.class); DumpDirectoryFiles function = new DumpDirectoryFiles(); FunctionService.registerFunction(function); - LuceneIndexForPartitionedRegion index = - new LuceneIndexForPartitionedRegion(name, regionPath, cache); + LuceneIndexForPartitionedRegion index = new LuceneIndexForPartitionedRegion(name, regionPath, + cache, null, null, null, null, null, null, executorService); index = spy(index); when(index.getFieldNames()).thenReturn(fields); doReturn(aeq).when(index).createAEQ(any(), any()); - index.setupRepositoryManager(null); index.createAEQ(cache.getRegionAttributes(regionPath), aeq.getId()); index.initialize(); PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionPath); diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java deleted file mode 100644 index 5286bae..0000000 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.cache.lucene.internal; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; - -import org.apache.geode.cache.lucene.LuceneIndex; -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.test.fake.Fakes; -import org.apache.geode.test.junit.categories.LuceneTest; -import org.apache.geode.test.junit.categories.UnitTest; - -@Category({UnitTest.class, LuceneTest.class}) -public class LuceneIndexImplJUnitTest { - - public static final String REGION = "region"; - public static final String INDEX = "index"; - public static final int MAX_WAIT = 30000; - - private InternalCache cache; - private LuceneIndex index; - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Before - public void createLuceneIndex() { - cache = Fakes.cache(); - index = new LuceneIndexForPartitionedRegion(INDEX, REGION, cache); - } - -} diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java index 0f67cb6..fb739e7 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; @@ -91,7 +92,8 @@ public class LuceneIndexRecoveryHAIntegrationTest { userRegion.put("rebalance", "test"); service.waitUntilFlushed("index1", "userRegion", 30000, TimeUnit.MILLISECONDS); - RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper); + RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper, + Executors.newSingleThreadExecutor()); IndexRepository repo = manager.getRepository(userRegion, 0, null); assertNotNull(repo); @@ -106,14 +108,13 @@ public class LuceneIndexRecoveryHAIntegrationTest { userRegion = (PartitionedRegion) regionfactory.create("userRegion"); userRegion.put("rebalance", "test"); - manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper); + manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper, + Executors.newSingleThreadExecutor()); IndexRepository newRepo = manager.getRepository(userRegion, 0, null); Assert.assertNotEquals(newRepo, repo); } - - private void verifyIndexFinishFlushing(String indexName, String regionName) throws InterruptedException { LuceneService service = LuceneServiceProvider.get(cache); diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java index 07c2d6c..d1e530f 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplJUnitTest.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; @@ -52,6 +53,7 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; import org.apache.geode.cache.lucene.LuceneIndexFactory; import org.apache.geode.cache.lucene.LuceneSerializer; import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PartitionedRegionDataStore; @@ -110,16 +112,23 @@ public class LuceneServiceImplJUnitTest { @Test public void userRegionShouldNotBeSetBeforeIndexInitialized() throws Exception { + DistributionManager dm = mock(DistributionManager.class); TestLuceneServiceImpl testService = new TestLuceneServiceImpl(); Field f = LuceneServiceImpl.class.getDeclaredField("cache"); f.setAccessible(true); f.set(testService, cache); + f = LuceneServiceImpl.class.getDeclaredField("dm"); + f.setAccessible(true); + f.set(testService, dm); AsyncEventQueueFactoryImpl aeqFactory = mock(AsyncEventQueueFactoryImpl.class); when(cache.createAsyncEventQueueFactory()).thenReturn(aeqFactory); DistributedSystem ds = mock(DistributedSystem.class); + Statistics luceneIndexStats = mock(Statistics.class); when(cache.getDistributedSystem()).thenReturn(ds); + when(cache.getDistributionManager()).thenReturn(dm); + when(dm.getWaitingThreadPool()).thenReturn(Executors.newSingleThreadExecutor()); when(((StatisticsFactory) ds).createAtomicStatistics(any(), anyString())) .thenReturn(luceneIndexStats); when(cache.getRegion(anyString())).thenReturn(region); diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java index b2c30b1..ed7305d 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java @@ -26,16 +26,23 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import org.apache.commons.collections.map.HashedMap; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.index.IndexWriter; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -83,7 +90,6 @@ public class PartitionedRepositoryManagerJUnitTest { protected LuceneSerializer serializer; protected PartitionedRegionDataStore userDataStore; protected PartitionedRegionDataStore fileDataStore; - protected PartitionedRegionHelper prHelper; protected PartitionRegionConfig prConfig; protected LocalRegion prRoot; @@ -94,6 +100,8 @@ public class PartitionedRepositoryManagerJUnitTest { protected LuceneIndexImpl indexForPR; protected PartitionedRepositoryManager repoManager; protected GemFireCacheImpl cache; + private final Map isIndexAvailableMap = new HashedMap(); + @Before public void setUp() { @@ -127,7 +135,7 @@ public class PartitionedRepositoryManagerJUnitTest { when(fileAndChunkRegion.getRegionIdentifier()).thenReturn("rid"); indexStats = Mockito.mock(LuceneIndexStats.class); fileSystemStats = Mockito.mock(FileSystemStats.class); - indexForPR = Mockito.mock(LuceneIndexForPartitionedRegion.class); + indexForPR = Mockito.spy(LuceneIndexForPartitionedRegion.class); when(((LuceneIndexForPartitionedRegion) indexForPR).getFileAndChunkRegion()) .thenReturn(fileAndChunkRegion); when(((LuceneIndexForPartitionedRegion) indexForPR).getFileSystemStats()) @@ -142,7 +150,8 @@ public class PartitionedRepositoryManagerJUnitTest { when(prRoot.get("rid")).thenReturn(prConfig); PowerMockito.mockStatic(PartitionedRegionHelper.class); PowerMockito.when(PartitionedRegionHelper.getPRRoot(cache)).thenReturn(prRoot); - repoManager = new PartitionedRepositoryManager(indexForPR, serializer); + repoManager = new PartitionedRepositoryManager(indexForPR, serializer, + Executors.newSingleThreadExecutor()); repoManager.setUserRegionForRepositoryManager(userRegion); repoManager.allowRepositoryComputation(); } @@ -213,30 +222,50 @@ public class PartitionedRepositoryManagerJUnitTest { when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(null); - when(fileAndChunkRegion.getOrCreateNodeForBucketWrite(eq(0), (RetryTimeKeeper) any())) - .then(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(fileAndChunkBuckets.get(0)); - return null; - } + when(fileAndChunkRegion.getOrCreateNodeForBucketWrite(eq(0), any())) + .then((Answer) invocation -> { + when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(fileAndChunkBuckets.get(0)); + return null; }); assertNotNull(repoManager.getRepository(userRegion, 0, null)); } + @Test(expected = LuceneIndexCreationInProgressException.class) + public void queryByRegionFailingWithInProgressException() + throws LuceneIndexCreationInProgressException, BucketNotFoundException { + setUpMockBucket(0); + setUpMockBucket(1); + + Set buckets = new LinkedHashSet<>(Arrays.asList(0, 1)); + InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class); + when(ctx.getLocalBucketSet((any()))).thenReturn(buckets); + repoManager.getRepositories(ctx); + } + @Test - public void getByRegion() throws BucketNotFoundException { + public void queryByRegionWaitingForRepoToBeCreated() + throws LuceneIndexCreationInProgressException { setUpMockBucket(0); setUpMockBucket(1); + setupIsIndexAvailable(); + Set buckets = new LinkedHashSet(Arrays.asList(0, 1)); InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class); when(ctx.getLocalBucketSet((any()))).thenReturn(buckets); - Collection repos = repoManager.getRepositories(ctx); - assertEquals(2, repos.size()); + final Collection repositories = new HashSet<>(); - Iterator itr = repos.iterator(); + Awaitility.await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS) + .atMost(500, TimeUnit.SECONDS).until(() -> { + try { + repositories.addAll(repoManager.getRepositories(ctx)); + } catch (BucketNotFoundException | LuceneIndexCreationInProgressException e) { + } + return repositories.size() == 2; + }); + + Iterator itr = repositories.iterator(); IndexRepositoryImpl repo0 = (IndexRepositoryImpl) itr.next(); IndexRepositoryImpl repo1 = (IndexRepositoryImpl) itr.next(); @@ -246,13 +275,40 @@ public class PartitionedRepositoryManagerJUnitTest { checkRepository(repo0, 0); checkRepository(repo1, 1); + + } + + private void setupIsIndexAvailable() { + when(indexForPR.isIndexAvailable(1)).then((Answer) invocation -> { + boolean result; + Boolean isAvailable = isIndexAvailableMap.get(1); + if (isAvailable == null || !isAvailable) { + isIndexAvailableMap.put(1, true); + result = false; + } else { + result = true; + } + return result; + }); + when(indexForPR.isIndexAvailable(0)).then((Answer) invocation -> { + boolean result; + Boolean isAvailable = isIndexAvailableMap.get(0); + if (isAvailable == null || !isAvailable) { + isIndexAvailableMap.put(0, true); + result = false; + } else { + result = true; + } + return result; + }); } /** * Test that we get the expected exception when a user bucket is missing */ - @Test(expected = BucketNotFoundException.class) - public void getMissingBucketByRegion() throws BucketNotFoundException { + @Test(expected = LuceneIndexCreationInProgressException.class) + public void getMissingBucketByRegion() + throws LuceneIndexCreationInProgressException, BucketNotFoundException { setUpMockBucket(0); Set buckets = new LinkedHashSet(Arrays.asList(0, 1)); @@ -270,7 +326,7 @@ public class PartitionedRepositoryManagerJUnitTest { assertEquals(serializer, repo0.getSerializer()); } - protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException { + protected BucketRegion setUpMockBucket(int id) { BucketRegion mockBucket = Mockito.mock(BucketRegion.class); BucketRegion fileAndChunkBucket = Mockito.mock(BucketRegion.class); // Allowing the fileAndChunkBucket to behave like a map so that the IndexWriter operations don't @@ -290,6 +346,7 @@ public class PartitionedRepositoryManagerJUnitTest { BucketAdvisor mockBucketAdvisor = Mockito.mock(BucketAdvisor.class); when(fileAndChunkBucket.getBucketAdvisor()).thenReturn(mockBucketAdvisor); when(mockBucketAdvisor.isPrimary()).thenReturn(true); + return mockBucket; } } diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java index a000d2f..73bbd76 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java @@ -53,7 +53,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa @After public void tearDown() { - ((RawLuceneRepositoryManager) repoManager).close(); + repoManager.close(); } protected void createIndexAndRepoManager() { @@ -66,6 +66,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa when(indexForPR.getCache()).thenReturn(cache); when(indexForPR.getRegionPath()).thenReturn("/testRegion"); when(indexForPR.withPersistence()).thenReturn(true); + when(indexForPR.getName()).thenReturn("rawLuceneTest"); repoManager = new RawLuceneRepositoryManager(indexForPR, serializer); repoManager.setUserRegionForRepositoryManager(userRegion); repoManager.allowRepositoryComputation(); @@ -85,7 +86,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa } @Override - protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException { + protected BucketRegion setUpMockBucket(int id) { BucketRegion mockBucket = Mockito.mock(BucketRegion.class); when(mockBucket.getId()).thenReturn(id); when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket); diff --git a/gradle.properties b/gradle.properties index ad3dbce..fc53156 100755 --- a/gradle.properties +++ b/gradle.properties @@ -42,6 +42,10 @@ productOrg = Apache Software Foundation (ASF) org.gradle.daemon = true org.gradle.jvmargs = -Xmx2048m +org.gradle.parallel=true +org.gradle.configureondemand=true +org.gradle.workers.max=4 + minimumGradleVersion = 3.5.1 # Set this on the command line with -P or in ~/.gradle/gradle.properties # to change the buildDir location. Use an absolute path. -- To stop receiving notification emails like this one, please contact udo@apache.org.