geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstew...@apache.org
Subject [46/50] [abbrv] geode git commit: GEODE-2553: Closed IndexRepositories when deleting an index
Date Mon, 13 Mar 2017 17:43:13 GMT
GEODE-2553: Closed IndexRepositories when deleting an index


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/38cf13ff
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/38cf13ff
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/38cf13ff

Branch: refs/heads/GEODE-2290
Commit: 38cf13ffbbc912f78bdacb1caf835508abe1b5e3
Parents: cfaa0e7
Author: Barry Oglesby <boglesby@pivotal.io>
Authored: Wed Mar 1 10:41:03 2017 -0800
Committer: Barry Oglesby <boglesby@pivotal.io>
Committed: Wed Mar 8 14:30:14 2017 -0800

----------------------------------------------------------------------
 .../AbstractPartitionedRepositoryManager.java   | 16 +++++-
 .../lucene/internal/LuceneBucketListener.java   |  7 +--
 .../LuceneIndexForPartitionedRegion.java        | 31 ++---------
 .../cache/lucene/internal/LuceneIndexImpl.java  | 36 ++++++++++---
 .../internal/RawLuceneRepositoryManager.java    |  6 ---
 .../internal/repository/RepositoryManager.java  |  5 ++
 .../lucene/LuceneIndexDestroyDUnitTest.java     | 56 +++++++++++++++++++-
 7 files changed, 107 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
index 97acea1..26179c7 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/AbstractPartitionedRepositoryManager.java
@@ -49,11 +49,13 @@ public abstract class AbstractPartitionedRepositoryManager implements
Repository
   protected final PartitionedRegion userRegion;
   protected final LuceneSerializer serializer;
   protected final LuceneIndexImpl index;
+  protected volatile boolean closed;
 
   public AbstractPartitionedRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer)
{
     this.index = index;
     this.userRegion = (PartitionedRegion) index.getCache().getRegion(index.getRegionPath());
     this.serializer = serializer;
+    this.closed = false;
   }
 
   @Override
@@ -91,9 +93,13 @@ public abstract class AbstractPartitionedRepositoryManager implements Repository
       LuceneSerializer serializer, LuceneIndexImpl index, PartitionedRegion userRegion,
       IndexRepository oldRepository) throws IOException;
 
-  protected IndexRepository computeRepository(Integer bucketId) throws BucketNotFoundException
{
+  protected IndexRepository computeRepository(Integer bucketId) {
     IndexRepository repo = indexRepositories.compute(bucketId, (key, oldRepository) ->
{
       try {
+        if (closed && oldRepository != null) {
+          oldRepository.cleanup();
+          return null;
+        }
         return computeRepository(bucketId, serializer, index, userRegion, oldRepository);
       } catch (IOException e) {
         throw new InternalGemFireError("Unable to create index repository", e);
@@ -119,4 +125,12 @@ public abstract class AbstractPartitionedRepositoryManager implements
Repository
     }
     return repo;
   }
+
+  @Override
+  public void close() {
+    this.closed = true;
+    for (Integer bucketId : indexRepositories.keySet()) {
+      computeRepository(bucketId);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
index 9532249..32fb3fc 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneBucketListener.java
@@ -40,11 +40,6 @@ public class LuceneBucketListener extends PartitionListenerAdapter {
         lucenePartitionRepositoryManager.computeRepository(bucketId);
       } catch (PrimaryBucketException e) {
         logger.info("Index repository could not be created because we are no longer primary?",
e);
-      } catch (BucketNotFoundException e) {
-        logger.info(
-            "Index repository could not be created when index chunk region bucket became
primary. "
-                + "Deferring index repository to be created lazily during lucene query execution."
-                + e);
       }
     });
   }
@@ -57,7 +52,7 @@ public class LuceneBucketListener extends PartitionListenerAdapter {
     dm.getWaitingThreadPool().execute(() -> {
       try {
         lucenePartitionRepositoryManager.computeRepository(bucketId);
-      } catch (PrimaryBucketException | BucketNotFoundException | AlreadyClosedException
e) {
+      } catch (PrimaryBucketException | AlreadyClosedException e) {
         logger.debug("Exception while cleaning up Lucene Index Repository", e);
       }
     });

http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index 4aa24b5..f24c6d6 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -217,10 +217,6 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl
{
     // Invoke super destroy to remove the extension
     super.destroy(initiator);
 
-    // Destroy the AsyncEventQueue
-    PartitionedRegion pr = (PartitionedRegion) getDataRegion();
-    destroyAsyncEventQueue(pr);
-
     // Destroy the chunk region (colocated with the file region)
     // localDestroyRegion can't be used because locally destroying regions is not supported
on
     // colocated regions
@@ -243,7 +239,7 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
 
     // Destroy index on remote members if necessary
     if (initiator) {
-      destroyOnRemoteMembers(pr);
+      destroyOnRemoteMembers();
     }
 
     if (logger.isDebugEnabled()) {
@@ -252,29 +248,8 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl
{
     }
   }
 
-  private void destroyAsyncEventQueue(PartitionedRegion pr) {
-    String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
-
-    // Get the AsyncEventQueue
-    AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId);
-
-    // Stop the AsyncEventQueue (this stops the AsyncEventQueue's underlying GatewaySender)
-    aeq.stop();
-
-    // Remove the id from the dataRegion's AsyncEventQueue ids
-    // Note: The region may already have been destroyed by a remote member
-    if (!pr.isDestroyed()) {
-      pr.getAttributesMutator().removeAsyncEventQueueId(aeqId);
-    }
-
-    // Destroy the aeq (this also removes it from the GemFireCacheImpl)
-    aeq.destroy();
-    if (logger.isDebugEnabled()) {
-      logger.debug("Destroyed aeqId=" + aeqId);
-    }
-  }
-
-  private void destroyOnRemoteMembers(PartitionedRegion pr) {
+  private void destroyOnRemoteMembers() {
+    PartitionedRegion pr = (PartitionedRegion) getDataRegion();
     DM dm = pr.getDistributionManager();
     Set<InternalDistributedMember> recipients = pr.getRegionAdvisor().adviseDataStore();
     if (!recipients.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
index cf519be..b5b13c1 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
@@ -16,9 +16,7 @@
 package org.apache.geode.cache.lucene.internal;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.internal.cache.extension.Extension;
 import org.apache.logging.log4j.Logger;
@@ -33,11 +31,6 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import org.apache.geode.cache.execute.Execution;
-import org.apache.geode.cache.execute.FunctionService;
-import org.apache.geode.cache.execute.ResultCollector;
-import org.apache.geode.cache.lucene.internal.distributed.WaitUntilFlushedFunction;
-import org.apache.geode.cache.lucene.internal.distributed.WaitUntilFlushedFunctionContext;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
 import org.apache.geode.cache.lucene.internal.xml.LuceneIndexCreation;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -217,6 +210,12 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex
{
     if (extensionToDelete != null) {
       getDataRegion().getExtensionPoint().removeExtension(extensionToDelete);
     }
+
+    // Destroy the async event queue
+    destroyAsyncEventQueue();
+
+    // Close the repository manager
+    repositoryManager.close();
   }
 
   protected <K, V> Region<K, V> createRegion(final String regionName,
@@ -237,4 +236,27 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex
{
       throw ige;
     }
   }
+
+  private void destroyAsyncEventQueue() {
+    String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
+
+    // Get the AsyncEventQueue
+    AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId);
+
+    // Stop the AsyncEventQueue (this stops the AsyncEventQueue's underlying GatewaySender)
+    aeq.stop();
+
+    // Remove the id from the dataRegion's AsyncEventQueue ids
+    // Note: The region may already have been destroyed by a remote member
+    Region region = getDataRegion();
+    if (!region.isDestroyed()) {
+      region.getAttributesMutator().removeAsyncEventQueueId(aeqId);
+    }
+
+    // Destroy the aeq (this also removes it from the GemFireCacheImpl)
+    aeq.destroy();
+    if (logger.isDebugEnabled()) {
+      logger.debug("Destroyed aeqId=" + aeqId);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
index c31f19c..b9f4de8 100755
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java
@@ -28,12 +28,6 @@ public class RawLuceneRepositoryManager extends AbstractPartitionedRepositoryMan
     super(index, serializer);
   }
 
-  public void close() {
-    for (IndexRepository repo : indexRepositories.values()) {
-      repo.cleanup();
-    }
-  }
-
   @Override
   protected IndexRepository getRepository(Integer bucketId) throws BucketNotFoundException
{
     IndexRepository repo = indexRepositories.get(bucketId);

http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
index b569d70..7f4a6c6 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/RepositoryManager.java
@@ -39,4 +39,9 @@ public interface RepositoryManager {
    */
   Collection<IndexRepository> getRepositories(RegionFunctionContext context)
       throws BucketNotFoundException;
+
+  /**
+   * Closes this {@link RepositoryManager}
+   */
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/38cf13ff/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
index 6260075..1afde6a 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
@@ -23,6 +23,7 @@ import org.apache.geode.cache.lucene.test.TestObject;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.dunit.ThreadUtils;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.awaitility.Awaitility;
 import org.junit.Ignore;
@@ -30,6 +31,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.io.InterruptedIOException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.INDEX_NAME;
@@ -38,6 +40,7 @@ import static org.apache.geode.internal.Assert.fail;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 @Category(DistributedTest.class)
 @RunWith(JUnitParamsRunner.class)
@@ -124,7 +127,7 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
     dataStore2.invoke(() -> verifyIndexCreated());
 
     // Start puts
-    AsyncInvocation putter = dataStore1.invokeAsync(() -> doPuts());
+    AsyncInvocation putter = dataStore1.invokeAsync(() -> doPutsUntilStopped());
 
     // Wait until puts have started
     dataStore1.invoke(() -> waitUntilPutsHaveStarted());
@@ -141,6 +144,42 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
     putter.join();
   }
 
+  @Test
+  @Parameters(method = "getListOfRegionTestTypes")
+  public void verifyDestroyRecreateSingleIndex(RegionTestableType regionType) {
+    // Create index and region
+    dataStore1.invoke(() -> initDataStore(createIndex(), regionType));
+    dataStore2.invoke(() -> initDataStore(createIndex(), regionType));
+
+    // Verify index created
+    dataStore1.invoke(() -> verifyIndexCreated());
+    dataStore2.invoke(() -> verifyIndexCreated());
+
+    // Do puts to cause IndexRepositories to be created
+    dataStore1.invoke(() -> doPuts(10));
+
+    // Destroy indexes (only needs to be done on one member)
+    dataStore1.invoke(() -> destroyIndexes());
+
+    // Verify indexes destroyed
+    dataStore1.invoke(() -> verifyIndexesDestroyed());
+    dataStore2.invoke(() -> verifyIndexesDestroyed());
+
+    // Destroy data region
+    dataStore1.invoke(() -> destroyDataRegion(true));
+
+    // Recreate index and region
+    dataStore1.invoke(() -> initDataStore(createIndex(), regionType));
+    dataStore2.invoke(() -> initDataStore(createIndex(), regionType));
+
+    // Do puts to cause IndexRepositories to be recreated
+    dataStore1.invoke(() -> doPuts(10));
+
+    // Wait until queue is flushed
+    // This verifies there are no deadlocks
+    dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME));
+    dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME));
+  }
 
   private SerializableRunnableIF createIndex() {
     return () -> {
@@ -168,7 +207,20 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
     assertNotNull(luceneService.getIndex(INDEX_NAME + "1", REGION_NAME));
   }
 
-  private void doPuts() throws Exception {
+  private void waitUntilFlushed(String indexName) throws Exception {
+    LuceneService luceneService = LuceneServiceProvider.get(getCache());
+    assertTrue(
+        luceneService.waitUntilFlushed(indexName, REGION_NAME, 30000, TimeUnit.MILLISECONDS));
+  }
+
+  private void doPuts(int numPuts) throws Exception {
+    Region region = getCache().getRegion(REGION_NAME);
+    for (int i = 0; i < numPuts; i++) {
+      region.put(i, new TestObject());
+    }
+  }
+
+  private void doPutsUntilStopped() throws Exception {
     Region region = getCache().getRegion(REGION_NAME);
     int i = 0;
     while (!STOP_PUTS) {


Mime
View raw message