geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [05/15] geode git commit: GEODE-2367: Lucene indexes do not handle ha scenarios
Date Mon, 30 Jan 2017 20:52:52 GMT
GEODE-2367: Lucene indexes do not handle ha scenarios

* Added afterSecondary callback to partition listener to allow cleaning up
  of the index repo when the bucket losses primary
* Added lock prior to creating the bucket indexes to prevent multiple index
  writers from being available at a time
* Changed single point of lucene index creation, no longer creating on the fly


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/8062e9a7
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/8062e9a7
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/8062e9a7

Branch: refs/heads/feature/GEODE-1930-2
Commit: 8062e9a7c9c3574f0168aaa88f61887717cb7b1a
Parents: c395853
Author: Jason Huynh <huynhja@gmail.com>
Authored: Wed Jan 25 17:12:23 2017 -0800
Committer: Jason Huynh <huynhja@gmail.com>
Committed: Fri Jan 27 14:34:33 2017 -0800

----------------------------------------------------------------------
 .../cache/partition/PartitionListener.java      | 11 ++++
 .../geode/internal/cache/BucketAdvisor.java     | 15 ++++-
 .../AbstractPartitionedRepositoryManager.java   | 37 +++++++-----
 .../lucene/internal/IndexRepositoryFactory.java | 44 +++++++++++---
 .../lucene/internal/LuceneBucketListener.java   | 63 ++++++++++++++++++++
 .../lucene/internal/LuceneEventListener.java    |  4 +-
 .../LuceneIndexForPartitionedRegion.java        |  7 +--
 .../internal/LucenePrimaryBucketListener.java   | 49 ---------------
 .../cache/lucene/internal/LuceneQueryImpl.java  | 36 +++++++----
 .../internal/RawIndexRepositoryFactory.java     |  3 +-
 .../internal/RawLuceneRepositoryManager.java    | 19 ++++++
 .../internal/distributed/LuceneFunction.java    |  5 +-
 .../repository/IndexRepositoryImpl.java         | 37 ++++++++++--
 .../PartitionedRepositoryManagerJUnitTest.java  | 12 +++-
 .../RawLuceneRepositoryManagerJUnitTest.java    |  6 +-
 .../DistributedScoringJUnitTest.java            |  2 +-
 .../distributed/LuceneFunctionJUnitTest.java    | 27 +++++----
 .../IndexRepositoryImplJUnitTest.java           |  4 +-
 18 files changed, 267 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-core/src/main/java/org/apache/geode/cache/partition/PartitionListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/partition/PartitionListener.java
b/geode-core/src/main/java/org/apache/geode/cache/partition/PartitionListener.java
index a534e50..deb319f 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/partition/PartitionListener.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/partition/PartitionListener.java
@@ -71,6 +71,16 @@ public interface PartitionListener {
   public void afterPrimary(int bucketId);
 
   /**
+   * Callback invoked when any bucket in a partitioned region stops being primary
+   *
+   * @param bucketId id of the bucket which stopped being primary
+   * @since Geode 1.1
+   */
+  default public void afterSecondary(int bucketId) {
+
+  }
+
+  /**
    * Callback invoked when a partition region is created
    * 
    * @param region handle of the region which is created
@@ -99,4 +109,5 @@ public interface PartitionListener {
    * @since GemFire 6.6.1
    */
   public void afterBucketCreated(int bucketId, Iterable<?> keys);
+
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 7b79bfb..8b8705a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -46,7 +46,6 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -953,6 +952,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
       }
     } finally {
       if (lostPrimary) {
+        invokeAfterSecondaryInPartitionListeners();
         Bucket br = this.regionAdvisor.getBucket(getBucket().getId());
         if (br != null && br instanceof BucketRegion) {
           ((BucketRegion) br).beforeReleasingPrimaryLockDuringDemotion();
@@ -1283,6 +1283,19 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
     }
   }
 
+  private void invokeAfterSecondaryInPartitionListeners() {
+    PartitionListener[] listeners = this.pRegion.getPartitionListeners();
+    if (listeners == null || listeners.length == 0) {
+      return;
+    }
+    for (int i = 0; i < listeners.length; i++) {
+      PartitionListener listener = listeners[i];
+      if (listener != null) {
+        listener.afterSecondary(getBucket().getId());
+      }
+    }
+  }
+
   /**
    * Lazily gets the lock for acquiring primary lock. Caller must handle null. If DLS, Cache,
or
    * DistributedSystem are shutting down then null will be returned. If DLS does not yet
exist and

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
index aa29e1b..9e055f0 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
@@ -18,7 +18,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.Region;
@@ -91,16 +94,8 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository
       LuceneSerializer serializer, LuceneIndexImpl index, PartitionedRegion userRegion)
       throws IOException;
 
-  /**
-   * Return the repository for a given user bucket
-   */
-  protected IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException
{
-    IndexRepository repo = indexRepositories.get(bucketId);
-    if (repo != null && !repo.isClosed()) {
-      return repo;
-    }
-
-    repo = indexRepositories.compute(bucketId, (key, oldRepository) -> {
+  protected IndexRepository createRepository(Integer bucketId) throws BucketNotFoundException
{
+    IndexRepository repo = indexRepositories.compute(bucketId, (key, oldRepository) ->
{
       if (oldRepository != null && !oldRepository.isClosed()) {
         return oldRepository;
       }
@@ -115,12 +110,26 @@ public abstract class AbstractPartitionedRepositoryManager implements
Repository
       }
 
     });
+    return repo;
+  }
 
-    if (repo == null) {
-      throw new BucketNotFoundException(
-          "Colocated index buckets not found for bucket id " + bucketId);
+  /**
+   * Return the repository for a given user bucket
+   */
+  protected IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException
{
+    IndexRepository repo = indexRepositories.get(bucketId);
+    if (repo != null && !repo.isClosed()) {
+      return repo;
     }
 
-    return repo;
+    throw new BucketNotFoundException(
+        "Colocated index buckets not found for bucket id " + bucketId);
+  }
+
+  protected void cleanRepository(Integer bucketId) throws BucketNotFoundException {
+    IndexRepository repo = indexRepositories.remove(bucketId);
+    if (repo != null) {
+      repo.cleanup();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index c73d64a..5be17e3 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -20,15 +20,24 @@ import org.apache.geode.cache.lucene.internal.directory.RegionDirectory;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl;
 import org.apache.geode.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.internal.cache.BucketNotFoundException;
 import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
 
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
+import org.jgroups.blocks.locking.LockService;
 
 public class IndexRepositoryFactory {
 
+  private static final Logger logger = LogService.getLogger();
+  public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:";
+
   public IndexRepositoryFactory() {}
 
   public IndexRepository createIndexRepository(final Integer bucketId, LuceneSerializer serializer,
@@ -38,16 +47,37 @@ public class IndexRepositoryFactory {
     BucketRegion fileBucket = getMatchingBucket(indexForPR.getFileRegion(), bucketId);
     BucketRegion chunkBucket = getMatchingBucket(indexForPR.getChunkRegion(), bucketId);
     BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
+    boolean success = false;
     if (fileBucket == null || chunkBucket == null) {
       return null;
     }
-    RegionDirectory dir =
-        new RegionDirectory(fileBucket, chunkBucket, indexForPR.getFileSystemStats());
-    IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
-    IndexWriter writer = new IndexWriter(dir, config);
-    repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexForPR.getIndexStats(),
-        dataBucket);
-    return repo;
+    if (!fileBucket.getBucketAdvisor().isPrimary()) {
+      throw new IOException("Not creating the index because we are not the primary");
+    }
+    DistributedLockService lockService =
+        DistributedLockService.getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
+    String lockName = FILE_REGION_LOCK_FOR_BUCKET_ID + fileBucket.getFullPath() + bucketId;
+    if (lockService != null) {
+      // lockService will be null for testing at this point
+      lockService.lock(lockName, -1, -1);
+    }
+    try {
+      RegionDirectory dir =
+          new RegionDirectory(fileBucket, chunkBucket, indexForPR.getFileSystemStats());
+      IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
+      IndexWriter writer = new IndexWriter(dir, config);
+      repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexForPR.getIndexStats(),
+          dataBucket, lockService, lockName);
+      success = true;
+      return repo;
+    } finally {
+      if (!success) {
+        if (lockService != null) {
+          lockService.unlock(lockName);
+        }
+      }
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
new file mode 100644
index 0000000..da0c2c2
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal;
+
+import org.apache.geode.GemFireException;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.partition.PartitionListenerAdapter;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+public class LuceneBucketListener extends PartitionListenerAdapter {
+  private static final Logger logger = LogService.getLogger();
+  private PartitionedRepositoryManager lucenePartitionRepositoryManager;
+  private final DM dm;
+
+  public LuceneBucketListener(PartitionedRepositoryManager partitionedRepositoryManager,
+      final DM dm) {
+    lucenePartitionRepositoryManager = partitionedRepositoryManager;
+    this.dm = dm;
+  }
+
+  @Override
+  public void afterPrimary(int bucketId) {
+    dm.getWaitingThreadPool().execute(() -> {
+      try {
+        lucenePartitionRepositoryManager.createRepository(bucketId);
+      } catch (BucketNotFoundException e) {
+        logger.warn(
+            "Index repository could not be created when index chunk region bucket became
primary. "
+                + "Deferring index repository to be created lazily during lucene query execution."
+                + e);
+      }
+    });
+  }
+
+  public void afterBucketRemoved(int bucketId, Iterable<?> keys) {
+    afterSecondary(bucketId);
+  }
+
+  public void afterSecondary(int bucketId) {
+    dm.getWaitingThreadPool().execute(() -> {
+      try {
+        lucenePartitionRepositoryManager.cleanRepository(bucketId);
+      } catch (Exception e) {
+        logger.warn("Exception while cleaning up Lucene Index Repository", e);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
index f2c7c8f..44453e4 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneEventListener.java
@@ -21,7 +21,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
-
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
@@ -91,7 +91,7 @@ public class LuceneEventListener implements AsyncEventListener {
     } catch (BucketNotFoundException | RegionDestroyedException | PrimaryBucketException
e) {
       logger.debug("Bucket not found while saving to lucene index: " + e.getMessage());
       return false;
-    } catch (IOException e) {
+    } catch (IOException | CacheClosedException e) {
       logger.error("Unable to save to lucene index", e);
       return false;
     } finally {

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 06dcb09..53b4e08 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -17,14 +17,11 @@ package org.apache.geode.cache.lucene.internal;
 
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
-import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
@@ -86,8 +83,8 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
     PartitionedRepositoryManager partitionedRepositoryManager =
         new PartitionedRepositoryManager(this, mapper);
     DM dm = ((GemFireCacheImpl) getCache()).getDistributedSystem().getDistributionManager();
-    LucenePrimaryBucketListener lucenePrimaryBucketListener =
-        new LucenePrimaryBucketListener(partitionedRepositoryManager, dm);
+    LuceneBucketListener lucenePrimaryBucketListener =
+        new LuceneBucketListener(partitionedRepositoryManager, dm);
     if (!chunkRegionExists(chunkRegionName)) {
       chunkRegion = createChunkRegion(regionShortCut, fileRegionName, partitionAttributes,
           chunkRegionName, regionAttributes, lucenePrimaryBucketListener);

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LucenePrimaryBucketListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LucenePrimaryBucketListener.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LucenePrimaryBucketListener.java
deleted file mode 100644
index d17b5f2..0000000
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LucenePrimaryBucketListener.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain
a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-package org.apache.geode.cache.lucene.internal;
-
-import org.apache.geode.GemFireException;
-import org.apache.geode.cache.execute.FunctionException;
-import org.apache.geode.cache.partition.PartitionListenerAdapter;
-import org.apache.geode.distributed.internal.DM;
-import org.apache.geode.internal.cache.BucketNotFoundException;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-
-public class LucenePrimaryBucketListener extends PartitionListenerAdapter {
-  private static final Logger logger = LogService.getLogger();
-  private PartitionedRepositoryManager lucenePartitionRepositoryManager;
-  private final DM dm;
-
-  public LucenePrimaryBucketListener(PartitionedRepositoryManager partitionedRepositoryManager,
-      final DM dm) {
-    lucenePartitionRepositoryManager = partitionedRepositoryManager;
-    this.dm = dm;
-  }
-
-  @Override
-  public void afterPrimary(int bucketId) {
-    dm.getWaitingThreadPool().execute(() -> {
-      try {
-        lucenePartitionRepositoryManager.getRepository(bucketId);
-      } catch (BucketNotFoundException e) {
-        logger.warn(
-            "Index repository could not be created when index chunk region bucket became
primary. "
-                + "Deferring index repository to be created lazily during lucene query execution."
-                + e);
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
index 77333d4..d73acc6 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
@@ -38,8 +38,14 @@ import org.apache.geode.cache.lucene.internal.distributed.TopEntries;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollector;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollectorManager;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesFunctionCollector;
+import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
 
 public class LuceneQueryImpl<K, V> implements LuceneQuery<K, V> {
+  Logger logger = LogService.getLogger();
+  private final static int MAX_TRIES = 500;
+
   private int limit = LuceneQueryFactory.DEFAULT_LIMIT;
   private int pageSize = LuceneQueryFactory.DEFAULT_PAGESIZE;
   private String indexName;
@@ -101,19 +107,27 @@ public class LuceneQueryImpl<K, V> implements LuceneQuery<K,
V> {
         new LuceneFunctionContext<>(query, indexName, manager, limit);
     TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(context);
 
-    ResultCollector<TopEntriesCollector, TopEntries<K>> rc =
-        (ResultCollector<TopEntriesCollector, TopEntries<K>>) onRegion().withArgs(context)
-            .withCollector(collector).execute(LuceneFunction.ID);
+
 
     // TODO provide a timeout to the user?
-    TopEntries<K> entries;
-    try {
-      entries = rc.getResult();
-    } catch (FunctionException e) {
-      if (e.getCause() instanceof LuceneQueryException) {
-        throw new LuceneQueryException(e);
-      } else {
-        throw e;
+    TopEntries<K> entries = null;
+    int numTries = 0;
+    while (entries == null && numTries++ < MAX_TRIES) {
+      try {
+        ResultCollector<TopEntriesCollector, TopEntries<K>> rc =
+            (ResultCollector<TopEntriesCollector, TopEntries<K>>) onRegion().withArgs(context)
+                .withCollector(collector).execute(LuceneFunction.ID);
+        entries = rc.getResult();
+      } catch (FunctionException e) {
+        if (e.getCause() instanceof LuceneQueryException) {
+          throw new LuceneQueryException(e);
+        } else if (e.getCause() instanceof BucketNotFoundException) {
+          logger.debug("Retrying due to index on bucket not found:" + e);
+          // throw e;
+        } else {
+          e.printStackTrace();
+          throw e;
+        }
       }
     }
     return entries;

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
index 2afccf9..2f61913 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java
@@ -53,7 +53,8 @@ public class RawIndexRepositoryFactory extends IndexRepositoryFactory {
     }
     IndexWriterConfig config = new IndexWriterConfig(indexForRaw.getAnalyzer());
     IndexWriter writer = new IndexWriter(dir, config);
+
     return new IndexRepositoryImpl(null, writer, serializer, indexForRaw.getIndexStats(),
-        dataBucket);
+        dataBucket, null, "");
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
index 64f2e56..b503692 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
@@ -16,6 +16,7 @@ package org.apache.geode.cache.lucene.internal;
 
 import java.io.IOException;
 
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.repository.serializer.LuceneSerializer;
 import org.apache.geode.internal.cache.BucketNotFoundException;
@@ -35,6 +36,24 @@ public class RawLuceneRepositoryManager extends AbstractPartitionedRepositoryMan
   }
 
   @Override
+  protected IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException
{
+    IndexRepository repo = indexRepositories.get(bucketId);
+    if (repo != null && !repo.isClosed()) {
+      return repo;
+    }
+
+    try {
+      repo = createOneIndexRepository(bucketId, this.serializer, this.index, this.userRegion);
+      return repo;
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    throw new BucketNotFoundException(
+        "Colocated index buckets not found for bucket id " + bucketId);
+  }
+
+  @Override
   public IndexRepository createOneIndexRepository(Integer bucketId, LuceneSerializer serializer,
       LuceneIndexImpl index, PartitionedRegion userRegion) throws IOException {
     return indexRepositoryFactory.createIndexRepository(bucketId, serializer, index, userRegion);

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
index ec94469..5271a2f 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
@@ -98,8 +98,11 @@ public class LuceneFunction extends FunctionAdapter implements InternalEntity
{
     TopEntriesCollector mergedResult = null;
     try {
       long start = stats.startQuery();
+      Collection<IndexRepository> repositories = null;
+
       try {
-        Collection<IndexRepository> repositories = repoManager.getRepositories(ctx);
+        repositories = repoManager.getRepositories(ctx);
+
         for (IndexRepository repo : repositories) {
           IndexResultCollector collector = manager.newCollector(repo.toString());
           if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
index 4e86eb5..f1ee987 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
@@ -19,8 +19,10 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.lucene.internal.LuceneIndexStats;
 import org.apache.geode.cache.lucene.internal.repository.serializer.LuceneSerializer;
 import org.apache.geode.cache.lucene.internal.repository.serializer.SerializerUtil;
+import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.document.Document;
@@ -28,6 +30,7 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.*;
 import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.geode.distributed.LockNotHeldException;
 
 import java.io.IOException;
 import java.util.function.IntSupplier;
@@ -48,11 +51,21 @@ public class IndexRepositoryImpl implements IndexRepository {
   private Region<?, ?> userRegion;
   private LuceneIndexStats stats;
   private DocumentCountSupplier documentCountSupplier;
+  private DistributedLockService lockService;
+  private String lockName;
 
   private static final Logger logger = LogService.getLogger();
 
-  public IndexRepositoryImpl(Region<?, ?> region, IndexWriter writer, LuceneSerializer
serializer,
+  // For test purposes
+  IndexRepositoryImpl(Region<?, ?> region, IndexWriter writer, LuceneSerializer serializer,
       LuceneIndexStats stats, Region<?, ?> userRegion) throws IOException {
+    this(region, writer, serializer, stats, userRegion,
+        ((DistributedRegion) region).getLockService(), "NoLockFile");
+  }
+
+  public IndexRepositoryImpl(Region<?, ?> region, IndexWriter writer, LuceneSerializer
serializer,
+      LuceneIndexStats stats, Region<?, ?> userRegion, DistributedLockService lockService,
+      String lockName) throws IOException {
     this.region = region;
     this.userRegion = userRegion;
     this.writer = writer;
@@ -61,6 +74,8 @@ public class IndexRepositoryImpl implements IndexRepository {
     this.stats = stats;
     documentCountSupplier = new DocumentCountSupplier();
     stats.addDocumentsSupplier(documentCountSupplier);
+    this.lockService = lockService;
+    this.lockName = lockName;
   }
 
   @Override
@@ -148,16 +163,26 @@ public class IndexRepositoryImpl implements IndexRepository {
 
   @Override
   public boolean isClosed() {
-    return userRegion.isDestroyed();
+    return userRegion.isDestroyed() || !writer.isOpen();
   }
 
   @Override
   public void cleanup() {
-    stats.removeDocumentsSupplier(documentCountSupplier);
     try {
-      writer.close();
-    } catch (IOException e) {
-      logger.warn("Unable to clean up index repository", e);
+      stats.removeDocumentsSupplier(documentCountSupplier);
+      try {
+        writer.close();
+      } catch (IOException e) {
+        logger.warn("Unable to clean up index repository", e);
+      }
+    } finally {
+      try {
+        if (lockService != null) {
+          lockService.unlock(lockName);
+        }
+      } catch (LockNotHeldException e) {
+        logger.debug("Tried to unlock file region lock(" + lockName + ") that we did not
hold", e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
index 960d794..1c47e89 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java
@@ -27,6 +27,9 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.internal.cache.BucketAdvisor;
+import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.index.IndexWriter;
 import org.junit.Before;
@@ -228,7 +231,7 @@ public class PartitionedRepositoryManagerJUnitTest {
     assertEquals(serializer, repo0.getSerializer());
   }
 
-  protected BucketRegion setUpMockBucket(int id) {
+  protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException {
     BucketRegion mockBucket = Mockito.mock(BucketRegion.class);
     BucketRegion fileBucket = Mockito.mock(BucketRegion.class);
     // Allowing the fileBucket to behave like a map so that the IndexWriter operations don't
fail
@@ -245,6 +248,13 @@ public class PartitionedRepositoryManagerJUnitTest {
     fileBuckets.put(id, fileBucket);
     chunkBuckets.put(id, chunkBucket);
     dataBuckets.put(id, mockBucket);
+
+    BucketAdvisor mockBucketAdvisor = Mockito.mock(BucketAdvisor.class);
+    DistributedLockService lockService = Mockito.mock(DistributedLockService.class);
+    when(fileBucket.getLockService()).thenReturn(lockService);
+    when(fileBucket.getBucketAdvisor()).thenReturn(mockBucketAdvisor);
+    when(mockBucketAdvisor.isPrimary()).thenReturn(true);
+    repoManager.createRepository(mockBucket.getId());
     return mockBucket;
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
index 9201180..a9fb52b 100755
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java
@@ -19,6 +19,8 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.internal.cache.BucketAdvisor;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.store.Directory;
@@ -73,7 +75,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
   }
 
   @Override
-  protected BucketRegion setUpMockBucket(int id) {
+  protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException {
     BucketRegion mockBucket = Mockito.mock(BucketRegion.class);
     when(mockBucket.getId()).thenReturn(id);
     when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket);
@@ -81,6 +83,8 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa
     when(userRegion.getBucketRegion(eq(id + 113), eq(null))).thenReturn(mockBucket);
     when(userDataStore.getLocalBucketById(eq(id + 113))).thenReturn(mockBucket);
     dataBuckets.put(id, mockBucket);
+
+    repoManager.createRepository(mockBucket.getId());
     return mockBucket;
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java
index 225f6ac..6062904 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/DistributedScoringJUnitTest.java
@@ -139,7 +139,7 @@ public class DistributedScoringJUnitTest {
     IndexWriterConfig config = new IndexWriterConfig(analyzer);
     IndexWriter writer = new IndexWriter(dir, config);
 
-    return new IndexRepositoryImpl(region, writer, mapper, indexStats, null);
+    return new IndexRepositoryImpl(region, writer, mapper, indexStats, null, null, "");
   }
 
   private static class TestType {

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
index 71172f0..fe05248 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
@@ -209,19 +209,20 @@ public class LuceneFunctionJUnitTest {
     function.execute(mockContext);
   }
 
-  @Test(expected = FunctionException.class)
-  public void testBucketNotFound() throws Exception {
-    when(mockContext.getDataSet()).thenReturn(mockRegion);
-    when(mockContext.getArguments()).thenReturn(searchArgs);
-    when(mockContext.<TopEntriesCollector>getResultSender()).thenReturn(mockResultSender);
-    when(mockRepoManager.getRepositories(eq(mockContext)))
-        .thenThrow(new BucketNotFoundException(""));
-    LuceneFunction function = new LuceneFunction();
-
-    function.execute(mockContext);
-
-    verify(mockResultSender).sendException(any(BucketNotFoundException.class));
-  }
+  // Disabled currently as we are retrying the function if a bucket is not found
+  // @Test(expected = FunctionException.class)
+  // public void testBucketNotFound() throws Exception {
+  // when(mockContext.getDataSet()).thenReturn(mockRegion);
+  // when(mockContext.getArguments()).thenReturn(searchArgs);
+  // when(mockContext.<TopEntriesCollector>getResultSender()).thenReturn(mockResultSender);
+  // when(mockRepoManager.getRepositories(eq(mockContext)))
+  // .thenThrow(new BucketNotFoundException(""));
+  // LuceneFunction function = new LuceneFunction();
+  //
+  // function.execute(mockContext);
+  //
+  // verify(mockResultSender).sendException(any(BucketNotFoundException.class));
+  // }
 
   @Test(expected = FunctionException.class)
   public void testReduceError() throws Exception {

http://git-wip-us.apache.org/repos/asf/geode/blob/8062e9a7/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
index 7426fa5..42cc2bc 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImplJUnitTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.IntSupplier;
 
+import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.internal.cache.BucketAdvisor;
 import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
@@ -85,7 +86,8 @@ public class IndexRepositoryImplJUnitTest {
     Mockito.when(((BucketRegion) userRegion).getBucketAdvisor().isPrimary()).thenReturn(true);
     stats = Mockito.mock(LuceneIndexStats.class);
     Mockito.when(userRegion.isDestroyed()).thenReturn(false);
-    repo = new IndexRepositoryImpl(region, writer, mapper, stats, userRegion);
+    repo = new IndexRepositoryImpl(region, writer, mapper, stats, userRegion,
+        mock(DistributedLockService.class), "lockName");
   }
 
   @Test


Mime
View raw message