geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [49/50] [abbrv] geode git commit: GEODE-2612: Added option to invoke callbacks during snapshot
Date Thu, 09 Mar 2017 20:49:28 GMT
GEODE-2612: Added option to invoke callbacks during snapshot


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/63d565b5
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/63d565b5
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/63d565b5

Branch: refs/heads/feature/GEODE-1969
Commit: 63d565b5a02d1a4246f5a40f6acf7e47897a73e2
Parents: f1d4291
Author: Barry Oglesby <boglesby@pivotal.io>
Authored: Wed Mar 8 14:35:23 2017 -0800
Committer: Barry Oglesby <boglesby@pivotal.io>
Committed: Thu Mar 9 09:59:07 2017 -0800

----------------------------------------------------------------------
 .../geode/cache/snapshot/SnapshotOptions.java   |  16 ++
 .../snapshot/RegionSnapshotServiceImpl.java     |   4 +-
 .../cache/snapshot/SnapshotOptionsImpl.java     |  15 ++
 .../geode/cache/snapshot/SnapshotDUnitTest.java | 208 +++++++++++++++++--
 .../codeAnalysis/sanctionedSerializables.txt    |   2 +-
 .../LuceneIndexForPartitionedRegion.java        |   2 +-
 .../cache/lucene/internal/LuceneIndexImpl.java  |  10 +-
 .../lucene/LuceneIndexDestroyDUnitTest.java     | 201 ++++++++++++++++--
 .../geode/cache/lucene/test/TestObject.java     |   4 +
 9 files changed, 424 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/63d565b5/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotOptions.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotOptions.java
b/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotOptions.java
index 1562b3a..4c819b8 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotOptions.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotOptions.java
@@ -55,4 +55,20 @@ public interface SnapshotOptions<K, V> extends Serializable {
    * @return the filter, or null if the filter is not set
    */
   SnapshotFilter<K, V> getFilter();
+
+  /**
+   * Sets whether to invoke callbacks when loading a snapshot. The default is false.
+   *
+   * @param invokeCallbacks
+   *
+   * @return the snapshot options
+   */
+  SnapshotOptions<K, V> invokeCallbacks(boolean invokeCallbacks);
+
+  /**
+   * Returns whether loading a snapshot causes callbacks to be invoked
+   *
+   * @return whether loading a snapshot causes callbacks to be invoked
+   */
+  boolean shouldInvokeCallbacks();
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/63d565b5/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
index e8b28c4..bfdef05 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
@@ -257,7 +257,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
                 .getDistributionManager().getWaitingThreadPool().submit(new Runnable() {
                   @Override
                   public void run() {
-                    local.basicImportPutAll(copy, true);
+                    local.basicImportPutAll(copy, !options.shouldInvokeCallbacks());
                   }
                 });
 
@@ -270,7 +270,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
 
       // send off any remaining entries
       if (!buffer.isEmpty()) {
-        local.basicImportPutAll(buffer, true);
+        local.basicImportPutAll(buffer, !options.shouldInvokeCallbacks());
       }
 
       // wait for completion and check for errors

http://git-wip-us.apache.org/repos/asf/geode/blob/63d565b5/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/SnapshotOptionsImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/SnapshotOptionsImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/SnapshotOptionsImpl.java
index d56535f..9f4fcd8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/SnapshotOptionsImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/SnapshotOptionsImpl.java
@@ -30,6 +30,9 @@ public class SnapshotOptionsImpl<K, V> implements SnapshotOptions<K,
V> {
   /** the entry filter */
   private volatile SnapshotFilter<K, V> filter;
 
+  /** true if callbacks should be invoked on load */
+  private volatile boolean invokeCallbacks;
+
   /** true if parallel mode is enabled */
   private volatile boolean parallel;
 
@@ -38,6 +41,7 @@ public class SnapshotOptionsImpl<K, V> implements SnapshotOptions<K,
V> {
 
   public SnapshotOptionsImpl() {
     filter = null;
+    invokeCallbacks = false;
   }
 
   @Override
@@ -51,6 +55,17 @@ public class SnapshotOptionsImpl<K, V> implements SnapshotOptions<K,
V> {
     return filter;
   }
 
+  @Override
+  public SnapshotOptions<K, V> invokeCallbacks(boolean invokeCallbacks) {
+    this.invokeCallbacks = invokeCallbacks;
+    return this;
+  }
+
+  @Override
+  public boolean shouldInvokeCallbacks() {
+    return this.invokeCallbacks;
+  }
+
   /**
    * Enables parallel mode for snapshot operations. This will cause each member of a partitioned
    * region to save its local data set (ignoring redundant copies) to a separate snapshot
file.

http://git-wip-us.apache.org/repos/asf/geode/blob/63d565b5/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotDUnitTest.java
index 1b6d426..196a88b 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotDUnitTest.java
@@ -14,21 +14,29 @@
  */
 package org.apache.geode.cache.snapshot;
 
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
+import org.awaitility.Awaitility;
 import org.junit.experimental.categories.Category;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
 
 import java.io.File;
-import java.io.FilenameFilter;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.examples.snapshot.MyObject;
 import com.examples.snapshot.MyPdxSerializer;
@@ -42,12 +50,14 @@ import org.apache.geode.cache.snapshot.RegionGenerator.SerializationType;
 import org.apache.geode.cache.snapshot.SnapshotOptions.SnapshotFormat;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.cache.util.CacheWriterAdapter;
-import org.apache.geode.cache30.CacheTestCase;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.SerializableCallable;
 
 @Category(DistributedTest.class)
 public class SnapshotDUnitTest extends JUnit4CacheTestCase {
+
+  private static final int NUM_ENTRIES = 1000;
+
   public SnapshotDUnitTest() {
     super();
   }
@@ -60,17 +70,8 @@ public class SnapshotDUnitTest extends JUnit4CacheTestCase {
     // save all regions
     getCache().getSnapshotService().save(dir, SnapshotFormat.GEMFIRE);
 
-    for (final RegionType rt : RegionType.values()) {
-      for (final SerializationType st : SerializationType.values()) {
-        String name = "test-" + rt.name() + "-" + st.name();
-
-        // overwrite region with bad data
-        Region<Integer, MyObject> region = getCache().getRegion(name);
-        for (Entry<Integer, MyObject> entry : region.entrySet()) {
-          region.put(entry.getKey(), new MyObject(Integer.MAX_VALUE, "bad!!"));
-        }
-      }
-    }
+    // update regions with data to be overwritten by import
+    updateRegions();
 
     SerializableCallable callbacks = new SerializableCallable() {
       @Override
@@ -106,8 +107,83 @@ public class SnapshotDUnitTest extends JUnit4CacheTestCase {
     forEachVm(callbacks, true);
 
     // load all regions
+    loadRegions(dir, null);
+  }
+
+  @Test
+  public void testExportAndImportWithInvokeCallbacksEnabled() throws Exception {
+    File dir = new File(getDiskDirs()[0], "callbacks");
+    dir.mkdir();
+
+    // save all regions
+    CacheSnapshotService service = getCache().getSnapshotService();
+    service.save(dir, SnapshotFormat.GEMFIRE);
+
+    // update regions with data to be overwritten by import
+    updateRegions();
+
+    SerializableCallable callbacks = new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        for (final RegionType rt : RegionType.values()) {
+          for (final SerializationType st : SerializationType.values()) {
+            String name = "test-" + rt.name() + "-" + st.name();
+            Cache cache = getCache();
+            Region<Integer, MyObject> region = cache.getRegion(name);
+            // add CacheWriter and CacheListener
+            AttributesMutator mutator = region.getAttributesMutator();
+            mutator.setCacheWriter(new CountingCacheWriter());
+            mutator.addCacheListener(new CountingCacheListener());
+            // add AsyncEventQueue
+            addAsyncEventQueue(region, name);
+          }
+        }
+        return null;
+      }
+    };
+
+    // add callbacks
+    forEachVm(callbacks, true);
+
+    // load all regions with invoke callbacks enabled
+    SnapshotOptions options = service.createOptions();
+    options.invokeCallbacks(true);
+    loadRegions(dir, options);
+
+    // verify callbacks were invoked
+    verifyCallbacksInvoked();
+  }
+
+  private void addAsyncEventQueue(Region region, String name) {
+    DiskStoreFactory dsFactory = getCache().createDiskStoreFactory();
+    dsFactory.create(name);
+    AsyncEventQueueFactory aeqFactory = getCache().createAsyncEventQueueFactory();
+    aeqFactory.setDiskStoreName(name);
+    aeqFactory.create(name, new CountingAsyncEventListener());
+    region.getAttributesMutator().addAsyncEventQueueId(name);
+  }
+
+  private void updateRegions() {
+    for (final RegionType rt : RegionType.values()) {
+      for (final SerializationType st : SerializationType.values()) {
+        String name = "test-" + rt.name() + "-" + st.name();
+
+        // overwrite region with bad data
+        Region<Integer, MyObject> region = getCache().getRegion(name);
+        for (Entry<Integer, MyObject> entry : region.entrySet()) {
+          region.put(entry.getKey(), new MyObject(Integer.MAX_VALUE, "bad!!"));
+        }
+      }
+    }
+  }
+
+  private void loadRegions(File dir, SnapshotOptions options) throws Exception {
     RegionGenerator rgen = new RegionGenerator();
-    getCache().getSnapshotService().load(dir, SnapshotFormat.GEMFIRE);
+    if (options != null) {
+      getCache().getSnapshotService().load(dir.listFiles(), SnapshotFormat.GEMFIRE, options);
+    } else {
+      getCache().getSnapshotService().load(dir, SnapshotFormat.GEMFIRE);
+    }
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
         Region<Integer, MyObject> region =
@@ -120,6 +196,51 @@ public class SnapshotDUnitTest extends JUnit4CacheTestCase {
     }
   }
 
+  private void verifyCallbacksInvoked() throws Exception {
+    for (final RegionType rt : RegionType.values()) {
+      for (final SerializationType st : SerializationType.values()) {
+        SerializableCallable counts = new SerializableCallable() {
+          @Override
+          public Object call() throws Exception {
+            String name = "test-" + rt.name() + "-" + st.name();
+            Region<Integer, MyObject> region = getCache().getRegion(name);
+            // get CacheWriter and CacheListener events
+            CountingCacheWriter writer =
+                (CountingCacheWriter) region.getAttributes().getCacheWriter();
+            CountingCacheListener listener =
+                (CountingCacheListener) region.getAttributes().getCacheListener();
+            // get AsyncEventListener events
+            int numAeqEvents = 0;
+            AsyncEventQueue aeq = getCache().getAsyncEventQueue(name);
+            CountingAsyncEventListener aeqListener =
+                (CountingAsyncEventListener) aeq.getAsyncEventListener();
+            if (aeq.isPrimary()) {
+              Awaitility.waitAtMost(60, TimeUnit.SECONDS)
+                  .until(() -> aeqListener.getEvents() == NUM_ENTRIES);
+              numAeqEvents = aeqListener.getEvents();
+            }
+            return new int[] {writer.getEvents(), listener.getEvents(), numAeqEvents};
+          }
+        };
+        Object result = forEachVm(counts, true);
+        int totalWriterUpdates = 0, totalListenerUpdates = 0, totalAeqEvents = 0;
+        List<int[]> list = (List) result;
+        for (int[] vmResult : list) {
+          totalWriterUpdates += vmResult[0];
+          totalListenerUpdates += vmResult[1];
+          totalAeqEvents += vmResult[2];
+        }
+        if (rt.name().contains("PARTITION")) {
+          assertEquals(NUM_ENTRIES, totalListenerUpdates);
+        } else {
+          assertEquals(NUM_ENTRIES * (Host.getHost(0).getVMCount() + 1), totalListenerUpdates);
+        }
+        assertEquals(NUM_ENTRIES, totalWriterUpdates);
+        assertEquals(NUM_ENTRIES, totalAeqEvents);
+      }
+    }
+  }
+
   @Test
   public void testCacheExportFilterException() throws Exception {
     SnapshotFilter<Object, Object> oops = new SnapshotFilter<Object, Object>()
{
@@ -190,7 +311,7 @@ public class SnapshotDUnitTest extends JUnit4CacheTestCase {
   public static Map<Integer, MyObject> createExpected(SerializationType type,
       RegionGenerator rgen) {
     Map<Integer, MyObject> expected = new HashMap<Integer, MyObject>();
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < NUM_ENTRIES; i++) {
       expected.put(i, rgen.createData(type, i, "The number is " + i));
     }
     return expected;
@@ -222,16 +343,63 @@ public class SnapshotDUnitTest extends JUnit4CacheTestCase {
   }
 
   public static Object forEachVm(SerializableCallable call, boolean local) throws Exception
{
+    List result = new ArrayList();
     Host host = Host.getHost(0);
     int vms = host.getVMCount();
 
     for (int i = 0; i < vms; ++i) {
-      host.getVM(i).invoke(call);
+      result.add(host.getVM(i).invoke(call));
     }
 
     if (local) {
-      return call.call();
+      result.add(call.call());
     }
-    return null;
+    return result;
+  }
+
+  private static class CountingCacheListener extends CacheListenerAdapter<Integer, MyObject>
{
+
+    private final AtomicInteger events = new AtomicInteger();
+
+    @Override
+    public void afterUpdate(EntryEvent<Integer, MyObject> event) {
+      events.incrementAndGet();
+    }
+
+    private int getEvents() {
+      return events.get();
+    }
+  }
+
+  private static class CountingCacheWriter extends CacheWriterAdapter<Integer, MyObject>
{
+
+    private final AtomicInteger events = new AtomicInteger();
+
+    @Override
+    public void beforeUpdate(EntryEvent<Integer, MyObject> event) {
+      events.incrementAndGet();
+    }
+
+    private int getEvents() {
+      return events.get();
+    }
+  }
+
+  private static final class CountingAsyncEventListener implements AsyncEventListener {
+
+    private final AtomicInteger events = new AtomicInteger();
+
+    @Override
+    public boolean processEvents(final List<AsyncEvent> list) {
+      events.addAndGet(list.size());
+      return true;
+    }
+
+    private int getEvents() {
+      return events.get();
+    }
+
+    @Override
+    public void close() {}
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/63d565b5/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedSerializables.txt
----------------------------------------------------------------------
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedSerializables.txt
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedSerializables.txt
index a11c7ad..397ec14 100755
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedSerializables.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedSerializables.txt
@@ -335,7 +335,7 @@ org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl$1,true,1
 org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl$ParallelArgs,true,1,file:java/io/File,format:org/apache/geode/cache/snapshot/SnapshotOptions$SnapshotFormat,options:org/apache/geode/internal/cache/snapshot/SnapshotOptionsImpl
 org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl$ParallelExportFunction,false
 org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl$ParallelImportFunction,false
-org/apache/geode/internal/cache/snapshot/SnapshotOptionsImpl,true,1,filter:org/apache/geode/cache/snapshot/SnapshotFilter,mapper:org/apache/geode/internal/cache/snapshot/SnapshotFileMapper,parallel:boolean
+org/apache/geode/internal/cache/snapshot/SnapshotOptionsImpl,true,1,filter:org/apache/geode/cache/snapshot/SnapshotFilter,invokeCallbacks:boolean,mapper:org/apache/geode/internal/cache/snapshot/SnapshotFileMapper,parallel:boolean
 org/apache/geode/internal/cache/snapshot/WindowedExporter$WindowedArgs,true,1,exporter:org/apache/geode/distributed/DistributedMember,options:org/apache/geode/cache/snapshot/SnapshotOptions
 org/apache/geode/internal/cache/snapshot/WindowedExporter$WindowedExportFunction,true,1
 org/apache/geode/internal/cache/tier/BatchException,true,-6707074107791305564,_index:int

http://git-wip-us.apache.org/repos/asf/geode/blob/63d565b5/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index f24c6d6..7274d6a 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -251,7 +251,7 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
   private void destroyOnRemoteMembers() {
     PartitionedRegion pr = (PartitionedRegion) getDataRegion();
     DM dm = pr.getDistributionManager();
-    Set<InternalDistributedMember> recipients = pr.getRegionAdvisor().adviseDataStore();
+    Set<InternalDistributedMember> recipients = pr.getRegionAdvisor().adviseAllPRNodes();
     if (!recipients.isEmpty()) {
       if (logger.isDebugEnabled()) {
         logger.debug("LuceneIndexForPartitionedRegion: About to send destroy message recipients="

http://git-wip-us.apache.org/repos/asf/geode/blob/63d565b5/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
index b5b13c1..0c9d220 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
@@ -244,7 +244,10 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex
{
     AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId);
 
     // Stop the AsyncEventQueue (this stops the AsyncEventQueue's underlying GatewaySender)
-    aeq.stop();
+    // The AsyncEventQueue can be null in an accessor member
+    if (aeq != null) {
+      aeq.stop();
+    }
 
     // Remove the id from the dataRegion's AsyncEventQueue ids
     // Note: The region may already have been destroyed by a remote member
@@ -254,7 +257,10 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex
{
     }
 
     // Destroy the aeq (this also removes it from the GemFireCacheImpl)
-    aeq.destroy();
+    // The AsyncEventQueue can be null in an accessor member
+    if (aeq != null) {
+      aeq.destroy();
+    }
     if (logger.isDebugEnabled()) {
       logger.debug("Destroyed aeqId=" + aeqId);
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/63d565b5/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
index 1afde6a..496cdeb 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexDestroyDUnitTest.java
@@ -20,10 +20,13 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.lucene.internal.LuceneIndexForPartitionedRegion;
 import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
 import org.apache.geode.cache.lucene.test.TestObject;
+import org.apache.geode.cache.snapshot.RegionSnapshotService;
+import org.apache.geode.cache.snapshot.SnapshotOptions;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
-import org.apache.geode.test.dunit.ThreadUtils;
+import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.awaitility.Awaitility;
 import org.junit.Ignore;
@@ -31,12 +34,14 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
-import java.io.InterruptedIOException;
+import java.io.File;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.INDEX_NAME;
 import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.REGION_NAME;
 import static org.apache.geode.internal.Assert.fail;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -48,6 +53,14 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
 
   private volatile boolean STOP_PUTS = false;
 
+  protected VM accessor;
+
+  @Override
+  public void postSetUp() throws Exception {
+    super.postSetUp();
+    accessor = Host.getHost(0).getVM(3);
+  }
+
   private final Object[] parametersForIndexDestroys() {
     String[] destroyDataRegionParameters = {"true", "false"};
     RegionTestableType[] regionTestTypes = getListOfRegionTestTypes();
@@ -146,17 +159,31 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
 
   @Test
   @Parameters(method = "getListOfRegionTestTypes")
-  public void verifyDestroyRecreateSingleIndex(RegionTestableType regionType) {
+  public void verifyDestroyRecreateIndexSameName(RegionTestableType regionType) {
     // Create index and region
-    dataStore1.invoke(() -> initDataStore(createIndex(), regionType));
-    dataStore2.invoke(() -> initDataStore(createIndex(), regionType));
+    SerializableRunnableIF createIndex = createIndex();
+    dataStore1.invoke(() -> initDataStore(createIndex, regionType));
+    dataStore2.invoke(() -> initDataStore(createIndex, regionType));
+    accessor.invoke(() -> initAccessor(createIndex, regionType));
 
     // Verify index created
     dataStore1.invoke(() -> verifyIndexCreated());
     dataStore2.invoke(() -> verifyIndexCreated());
+    accessor.invoke(() -> verifyIndexCreated());
 
     // Do puts to cause IndexRepositories to be created
-    dataStore1.invoke(() -> doPuts(10));
+    int numPuts = 10;
+    accessor.invoke(() -> doPuts(numPuts));
+
+    // Wait until queue is flushed
+    dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME));
+    dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME));
+
+    // Execute query and verify results
+    accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", numPuts));
+
+    // Export entries from region
+    accessor.invoke(() -> exportData(regionType));
 
     // Destroy indexes (only needs to be done on one member)
     dataStore1.invoke(() -> destroyIndexes());
@@ -169,22 +196,142 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
     dataStore1.invoke(() -> destroyDataRegion(true));
 
     // Recreate index and region
-    dataStore1.invoke(() -> initDataStore(createIndex(), regionType));
-    dataStore2.invoke(() -> initDataStore(createIndex(), regionType));
+    dataStore1.invoke(() -> initDataStore(createIndex, regionType));
+    dataStore2.invoke(() -> initDataStore(createIndex, regionType));
+    accessor.invoke(() -> initAccessor(createIndex, regionType));
 
-    // Do puts to cause IndexRepositories to be recreated
-    dataStore1.invoke(() -> doPuts(10));
+    // Import entries into region
+    accessor.invoke(() -> importData(regionType, numPuts));
 
     // Wait until queue is flushed
     // This verifies there are no deadlocks
     dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME));
     dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME));
+
+    // re-execute query and verify results
+    accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", numPuts));
+  }
+
+  @Test
+  @Parameters(method = "getListOfRegionTestTypes")
+  public void verifyDestroyRecreateIndexDifferentName(RegionTestableType regionType) {
+    // Create index and region
+    SerializableRunnableIF createIndex = createIndex();
+    dataStore1.invoke(() -> initDataStore(createIndex, regionType));
+    dataStore2.invoke(() -> initDataStore(createIndex, regionType));
+    accessor.invoke(() -> initAccessor(createIndex, regionType));
+
+    // Verify index created
+    dataStore1.invoke(() -> verifyIndexCreated());
+    dataStore2.invoke(() -> verifyIndexCreated());
+    accessor.invoke(() -> verifyIndexCreated());
+
+    // Do puts to cause IndexRepositories to be created
+    int numPuts = 10;
+    accessor.invoke(() -> doPuts(numPuts));
+
+    // Wait until queue is flushed
+    dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME));
+    dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME));
+
+    // Execute query and verify results
+    accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", numPuts));
+
+    // Export entries from region
+    accessor.invoke(() -> exportData(regionType));
+
+    // Destroy indexes (only needs to be done on one member)
+    dataStore1.invoke(() -> destroyIndexes());
+
+    // Verify indexes destroyed
+    dataStore1.invoke(() -> verifyIndexesDestroyed());
+    dataStore2.invoke(() -> verifyIndexesDestroyed());
+
+    // Destroy data region
+    dataStore1.invoke(() -> destroyDataRegion(true));
+
+    // Recreate index and region
+    String newIndexName = INDEX_NAME + "+_1";
+    SerializableRunnableIF createIndexNewName = createIndex(newIndexName, "field1");
+    dataStore1.invoke(() -> initDataStore(createIndexNewName, regionType));
+    dataStore2.invoke(() -> initDataStore(createIndexNewName, regionType));
+    accessor.invoke(() -> initAccessor(createIndexNewName, regionType));
+
+    // Import entries into region
+    accessor.invoke(() -> importData(regionType, numPuts));
+
+    // Wait until queue is flushed
+    // This verifies there are no deadlocks
+    dataStore1.invoke(() -> waitUntilFlushed(newIndexName));
+    dataStore2.invoke(() -> waitUntilFlushed(newIndexName));
+
+    // re-execute query and verify results
+    accessor.invoke(() -> executeQuery(newIndexName, "field1Value", "field1", numPuts));
+  }
+
+  @Test
+  @Parameters(method = "getListOfRegionTestTypes")
+  public void verifyDestroyRecreateDifferentIndex(RegionTestableType regionType) {
+    SerializableRunnableIF createIndex = createIndex();
+    dataStore1.invoke(() -> initDataStore(createIndex, regionType));
+    dataStore2.invoke(() -> initDataStore(createIndex, regionType));
+    accessor.invoke(() -> initAccessor(createIndex, regionType));
+
+    // Verify index created
+    dataStore1.invoke(() -> verifyIndexCreated());
+    dataStore2.invoke(() -> verifyIndexCreated());
+    accessor.invoke(() -> verifyIndexCreated());
+
+    // Do puts to cause IndexRepositories to be created
+    int numPuts = 10;
+    accessor.invoke(() -> doPuts(numPuts));
+
+    // Wait until queue is flushed
+    dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME));
+    dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME));
+
+    // Execute query and verify results
+    accessor.invoke(() -> executeQuery(INDEX_NAME, "field1Value", "field1", numPuts));
+
+    // Export entries from region
+    accessor.invoke(() -> exportData(regionType));
+
+    // Destroy indexes (only needs to be done on one member)
+    dataStore1.invoke(() -> destroyIndexes());
+
+    // Verify indexes destroyed
+    dataStore1.invoke(() -> verifyIndexesDestroyed());
+    dataStore2.invoke(() -> verifyIndexesDestroyed());
+
+    // Destroy data region
+    dataStore1.invoke(() -> destroyDataRegion(true));
+
+    // Create new index and region
+    SerializableRunnableIF createNewIndex = createIndex(INDEX_NAME, "field2");
+    dataStore1.invoke(() -> initDataStore(createNewIndex, regionType));
+    dataStore2.invoke(() -> initDataStore(createNewIndex, regionType));
+    accessor.invoke(() -> initAccessor(createNewIndex, regionType));
+
+    // Import entries into region
+    accessor.invoke(() -> importData(regionType, numPuts));
+
+    // Wait until queue is flushed
+    // This verifies there are no deadlocks
+    dataStore1.invoke(() -> waitUntilFlushed(INDEX_NAME));
+    dataStore2.invoke(() -> waitUntilFlushed(INDEX_NAME));
+
+    // re-execute query and verify results
+    accessor.invoke(() -> executeQuery(INDEX_NAME, "field2Value", "field2", numPuts));
   }
 
   private SerializableRunnableIF createIndex() {
+    return createIndex(INDEX_NAME, "field1");
+  }
+
+  private SerializableRunnableIF createIndex(String indexName, String field) {
     return () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
-      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+      luceneService.createIndex(indexName, REGION_NAME, field);
     };
   }
 
@@ -216,7 +363,7 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
   private void doPuts(int numPuts) throws Exception {
     Region region = getCache().getRegion(REGION_NAME);
     for (int i = 0; i < numPuts; i++) {
-      region.put(i, new TestObject());
+      region.put(i, new TestObject("field1Value", "field2Value"));
     }
   }
 
@@ -238,6 +385,15 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
         .until(() -> getCache().getRegion(REGION_NAME).size() > 0);
   }
 
+  private void executeQuery(String indexName, String queryString, String field,
+      int expectedResultsSize) throws LuceneQueryException {
+    LuceneService luceneService = LuceneServiceProvider.get(getCache());
+    LuceneQuery query =
+        luceneService.createLuceneQueryFactory().create(indexName, REGION_NAME, queryString,
field);
+    Collection results = query.findValues();
+    assertEquals(expectedResultsSize, results.size());
+  }
+
   private void destroyDataRegion(boolean shouldSucceed) {
     Region region = getCache().getRegion(REGION_NAME);
     assertNotNull(region);
@@ -296,4 +452,25 @@ public class LuceneIndexDestroyDUnitTest extends LuceneDUnitTest {
     LocalRegion region = (LocalRegion) getCache().getRegion(REGION_NAME);
     assertFalse(region.getExtensionPoint().getExtensions().iterator().hasNext());
   }
+
+  private void exportData(RegionTestableType regionType) throws Exception {
+    Region region = getCache().getRegion(REGION_NAME);
+    RegionSnapshotService service = region.getSnapshotService();
+    service.save(getSnapshotFile(getDiskDirs()[0], regionType),
+        SnapshotOptions.SnapshotFormat.GEMFIRE);
+  }
+
+  private void importData(RegionTestableType regionType, int expectedRegionSize) throws Exception
{
+    Region region = getCache().getRegion(REGION_NAME);
+    RegionSnapshotService service = region.getSnapshotService();
+    SnapshotOptions options = service.createOptions();
+    options.invokeCallbacks(true);
+    service.load(getSnapshotFile(getDiskDirs()[0], regionType),
+        SnapshotOptions.SnapshotFormat.GEMFIRE, options);
+    assertEquals(expectedRegionSize, region.size());
+  }
+
+  private File getSnapshotFile(File baseDirectory, RegionTestableType regionType) {
+    return new File(baseDirectory, REGION_NAME + "_" + regionType.name() + "_snapshot.gfd");
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/63d565b5/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/TestObject.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/TestObject.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/TestObject.java
index be91b6d..24e330b 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/TestObject.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/TestObject.java
@@ -52,4 +52,8 @@ public class TestObject implements Serializable {
     return (testObject.field1.equals(field1) && testObject.field2.equals(field2));
   }
 
+  public String toString() {
+    return new StringBuilder().append(getClass().getSimpleName()).append("[").append("field1=")
+        .append(field1).append("; field2=").append(field2).append("]").toString();
+  }
 }


Mime
View raw message