geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject incubator-geode git commit: GEODE-1558, GEODE-1609: Fixing failures due to rebalancing buckets during lucene index updates
Date Thu, 07 Jul 2016 17:09:01 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop 4612a193f -> 7b28a8d4e


GEODE-1558, GEODE-1609: Fixing failures due to rebalancing buckets during lucene index updates

There are two failures we're fixing here:
1) Calling repository.create for create events was leaving duplicate events in the
index because when a bucket fails over, the event is dispatched again on the new primary.
Using the isPossibleDuplicate flag did not work because it was not consistently set to
true for duplicate events. Changed the code to call repository.update even for create events

2) The async event queue was repeatedly dispatching the same events event after
a bucket moved to another node. We changed the async event queue code to filter out
events for buckets that are no longer present on this dispatching member.

Cleaning up the rebalancing test and adding new tests to make these scenarios
more reproducable.

This closes #176


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7b28a8d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7b28a8d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7b28a8d4

Branch: refs/heads/develop
Commit: 7b28a8d4e9dc4359d4d11d286f895537864f3a03
Parents: 4612a19
Author: Dan Smith <dsmith@pivotal.io>
Authored: Tue Jun 28 15:33:51 2016 -0700
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Thu Jul 7 10:03:04 2016 -0700

----------------------------------------------------------------------
 .../parallel/ParallelGatewaySenderQueue.java    |  18 +-
 .../cache/wan/AsyncEventQueueTestBase.java      |  32 ++-
 .../asyncqueue/AsyncEventListenerDUnitTest.java |  98 +++++++++
 .../lucene/internal/IndexRepositoryFactory.java |  71 ++++++
 .../lucene/internal/LuceneEventListener.java    |   8 +-
 .../cache/lucene/internal/LuceneIndexImpl.java  |   2 +-
 .../internal/PartitionedRepositoryManager.java  |  28 +--
 .../repository/IndexRepositoryImpl.java         |   8 +-
 .../gemfire/cache/lucene/LuceneQueriesBase.java |  14 +-
 .../cache/lucene/LuceneQueriesPRBase.java       | 216 +++++++++++++++++--
 .../lucene/LuceneQueriesPeerPRDUnitTest.java    |   8 +-
 .../LuceneQueriesPeerPRRedundancyDUnitTest.java |  54 ++++-
 .../internal/LuceneEventListenerJUnitTest.java  |   6 +-
 13 files changed, 501 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index e0f8b6f..453e7f0 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1277,7 +1277,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     
     PartitionedRegion prQ = getRandomShadowPR();
-    List batch = new ArrayList();
+    List<GatewaySenderEventImpl> batch = new ArrayList<>();
     if (prQ == null || prQ.getLocalMaxMemory() == 0) {
       try {
         Thread.sleep(50);
@@ -1370,8 +1370,20 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return batch;
   }
 
-  private void addPeekedEvents(List batch, int batchSize) {
+  private void addPeekedEvents(List<GatewaySenderEventImpl> batch, int batchSize) {
     if (this.resetLastPeeked) {
+
+      //Remove all entries from peekedEvents for buckets that are not longer primary
+      //This will prevent repeatedly trying to dispatch non-primary events
+      for(Iterator<GatewaySenderEventImpl> iterator = peekedEvents.iterator(); iterator.hasNext();
) {
+        GatewaySenderEventImpl event = iterator.next();
+        final int bucketId = event.getBucketId();
+        final PartitionedRegion region = (PartitionedRegion) event.getRegion();
+        if(!region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
+          iterator.remove();
+        }
+      }
+
       if (this.peekedEventsProcessingInProgress) {
         // Peeked event processing is in progress. This means that the original peekedEvents
         // contained > batch size events due to a reduction in the batch size. Create
a batch
@@ -1400,7 +1412,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     }
   }
 
-  private void addPreviouslyPeekedEvents(List batch, int batchSize) {
+  private void addPreviouslyPeekedEvents(List<GatewaySenderEventImpl> batch, int batchSize)
{
     for (int i=0; i<batchSize; i++) {
       batch.add(this.peekedEventsProcessing.remove());
       if (this.peekedEventsProcessing.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
index 596756f..d7739c5 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -258,13 +258,26 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase
{
   }
 
   public static void createAsyncEventQueue(String asyncChannelId,
-      boolean isParallel, Integer maxMemory, Integer batchSize,
-      boolean isConflation, boolean isPersistent, String diskStoreName,
-      boolean isDiskSynchronous) {
+                                           boolean isParallel, Integer maxMemory, Integer
batchSize,
+                                           boolean isConflation, boolean isPersistent, String
diskStoreName,
+                                           boolean isDiskSynchronous)
+  {
+    createAsyncEventQueue(asyncChannelId, isParallel, maxMemory, batchSize, isConflation,
isPersistent, diskStoreName,
+      isDiskSynchronous, new MyAsyncEventListener());
+  }
+
+  public static void createAsyncEventQueue(
+    String asyncChannelId,
+    boolean isParallel,
+    Integer maxMemory,
+    Integer batchSize,
+    boolean isConflation,
+    boolean isPersistent,
+    String diskStoreName,
+    boolean isDiskSynchronous,
+    final AsyncEventListener asyncEventListener) {
     createDiskStore(asyncChannelId, diskStoreName);
 
-    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-
     AsyncEventQueueFactory factory = getInitialAsyncEventQueueFactory(isParallel, maxMemory,
batchSize, isPersistent, diskStoreName);
     factory.setDiskSynchronous(isDiskSynchronous);
     factory.setBatchConflationEnabled(isConflation);
@@ -1387,15 +1400,20 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase
{
     assertEquals(expectedToDataInvoations, filter.getNumToDataInvocations());
   }
 
-  public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
+  public static AsyncEventListener getAsyncEventListener(String asyncEventQueueId) {
     AsyncEventListener theListener = null;
 
     Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
     for (AsyncEventQueue asyncQueue : asyncEventQueues) {
       if (asyncEventQueueId.equals(asyncQueue.getId())) {
-        theListener = asyncQueue.getAsyncEventListener();
+        return asyncQueue.getAsyncEventListener();
       }
     }
+    return null;
+  }
+
+  public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
+    AsyncEventListener theListener = getAsyncEventListener(asyncEventQueueId);
 
     final Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap();
     assertNotNull(eventsMap);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index f96926f..f090402 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -18,26 +18,38 @@ package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
 
 import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
 import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.internal.AvailablePortHelper;
 import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
 import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
+import com.gemstone.gemfire.test.dunit.VM;
 import com.gemstone.gemfire.test.dunit.Wait;
 import com.gemstone.gemfire.test.junit.categories.DistributedTest;
 import com.gemstone.gemfire.test.junit.categories.FlakyTest;
@@ -1562,4 +1574,90 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase
{
     int vm3size = (Integer)vm3.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize(
"ln" ));
     assertEquals(vm3size, 1000);
   }
+
+  @Test
+  public void testParallelAsyncEventQueueMoveBucketAndMoveItBackDuringDispatching() {
+    Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId(
1 ));
+
+    vm1.invoke(createCacheRunnable(lnPort));
+    vm2.invoke(createCacheRunnable(lnPort));
+    final DistributedMember member1 = vm1.invoke(() -> cache.getDistributedSystem().getDistributedMember());
+    final DistributedMember member2 = vm2.invoke(() -> cache.getDistributedSystem().getDistributedMember());
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue("ln",
+        true, 100, 10, false, false, null, false, new BucketMovingAsyncEventListener(member2)));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln"));
+    vm1.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR",
+      113 ));
+
+    vm2.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue("ln",
+      true, 100, 10, false, false, null, false, new BucketMovingAsyncEventListener(member1)));
+
+    vm2.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue(
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
+    vm2.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" ));
+
+    Set<Object> allKeys = new HashSet<Object>();
+    allKeys.addAll(getKeysSeen(vm1, "ln"));
+    allKeys.addAll(getKeysSeen(vm2, "ln"));
+
+    final Set<Long> expectedKeys = LongStream.range(0, 113).mapToObj(Long::valueOf).collect(Collectors.toSet());
+    assertEquals(expectedKeys, allKeys);
+
+    assertTrue(getBucketMoved(vm1, "ln"));
+    assertTrue(getBucketMoved(vm2, "ln"));
+  }
+
+  private static Set<Object> getKeysSeen(VM vm, String asyncEventQueueId) {
+    return vm.invoke(() -> {
+      final BucketMovingAsyncEventListener listener = (BucketMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId);
+      return listener.keysSeen;
+    });
+  }
+
+  private static boolean getBucketMoved(VM vm, String asyncEventQueueId) {
+    return vm.invoke(() -> {
+      final BucketMovingAsyncEventListener listener = (BucketMovingAsyncEventListener) getAsyncEventListener(asyncEventQueueId);
+      return listener.moved;
+    });
+  }
+
+  private static final class BucketMovingAsyncEventListener implements AsyncEventListener
{
+    private final DistributedMember destination;
+    private boolean moved;
+    private Set<Object> keysSeen = new HashSet<Object>();
+
+    public BucketMovingAsyncEventListener(final DistributedMember destination) {
+      this.destination = destination;
+    }
+
+    @Override public boolean processEvents(final List<AsyncEvent> events) {
+      if(!moved) {
+
+        AsyncEvent event1 = events.get(0);
+        moveBucket(destination, event1.getKey());
+        moved = true;
+        return false;
+      }
+
+      events.stream().map(AsyncEvent::getKey).forEach(keysSeen::add);
+      return true;
+    }
+
+    @Override public void close() {
+
+    }
+    private static void moveBucket(final DistributedMember destination, final Object key)
{
+      Region<Object, Object> region = cache.getRegion(getTestMethodName() + "_PR");
+      DistributedMember source = cache.getDistributedSystem().getDistributedMember();
+      PartitionRegionHelper.moveBucketByKey(region, source, destination, key);
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
new file mode 100644
index 0000000..12f12ad
--- /dev/null
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/IndexRepositoryFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+
+public class IndexRepositoryFactory {
+
+  public IndexRepositoryFactory() {
+  }
+
+  public IndexRepository createIndexRepository(final Integer bucketId,
+                                        PartitionedRegion userRegion,
+                                        PartitionedRegion fileRegion,
+                                        PartitionedRegion chunkRegion,
+                                        LuceneSerializer serializer,
+                                        Analyzer analyzer,
+                                        LuceneIndexStats indexStats,
+                                        FileSystemStats fileSystemStats)
+    throws IOException
+  {
+    final IndexRepository repo;
+    BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId);
+    BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId);
+    if(fileBucket == null || chunkBucket == null) {
+      return null;
+    }
+    RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket, fileSystemStats);
+    IndexWriterConfig config = new IndexWriterConfig(analyzer);
+    IndexWriter writer = new IndexWriter(dir, config);
+    repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexStats);
+    return repo;
+  }
+
+  /**
+   * Find the bucket in region2 that matches the bucket id from region1.
+   */
+  private BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) {
+    //Force the bucket to be created if it is not already
+    region.getOrCreateNodeForBucketWrite(bucketId, null);
+
+    return region.getDataStore().getLocalBucketById(bucketId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
index a7150c0..29fb159 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
@@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger;
 import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
@@ -36,6 +37,7 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
 import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
 import com.gemstone.gemfire.internal.cache.partitioned.Bucket;
 import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.TestHook;
 import com.gemstone.gemfire.internal.logging.LogService;
@@ -63,7 +65,7 @@ public class LuceneEventListener implements AsyncEventListener {
     DefaultQuery.setPdxReadSerialized(true);
 
     Set<IndexRepository> affectedRepos = new HashSet<IndexRepository>();
-    
+
     try {
       for (AsyncEvent event : events) {
         Region region = event.getRegion();
@@ -75,7 +77,7 @@ public class LuceneEventListener implements AsyncEventListener {
         Operation op = event.getOperation();
 
         if (op.isCreate()) {
-          repository.create(key, event.getDeserializedValue());
+          repository.update(key, event.getDeserializedValue());
         } else if (op.isUpdate()) {
           repository.update(key, event.getDeserializedValue());
         } else if (op.isDestroy()) {
@@ -92,7 +94,7 @@ public class LuceneEventListener implements AsyncEventListener {
         repo.commit();
       }
       return true;
-    } catch(BucketNotFoundException e) {
+    } catch(BucketNotFoundException | RegionDestroyedException | PrimaryBucketException e)
{
       logger.debug("Bucket not found while saving to lucene index: " + e.getMessage());
       return false;
     } catch(IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
index acd3765..ff31c49 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java
@@ -93,7 +93,6 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
       long start = System.nanoTime();
       while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWaitInMillisecond))
{
         if (0 == queue.size()) {
-          logger.debug("waitUntilFlushed: Queue size is 0");
           flushed = true;
           break;
         } else {
@@ -106,6 +105,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex {
     } else { 
       throw new IllegalArgumentException("The AEQ does not exist for the index "+indexName+"
region "+regionPath);
     }
+
     return flushed;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
index a119157..3cc713b 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/PartitionedRepositoryManager.java
@@ -26,16 +26,12 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
 
 import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
-import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
 import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
 import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
-import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepositoryImpl;
 import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
 import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
 import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
@@ -53,6 +49,8 @@ import com.gemstone.gemfire.internal.util.concurrent.CopyOnWriteHashMap;
  */
 public class PartitionedRepositoryManager implements RepositoryManager {
 
+  public static IndexRepositoryFactory indexRepositoryFactory = new IndexRepositoryFactory();
+
   /** map of the parent bucket region to the index repository
    * 
    * This is based on the BucketRegion in case a bucket is rebalanced, we don't want to 
@@ -142,16 +140,8 @@ public class PartitionedRepositoryManager implements RepositoryManager
{
       }
 
       try {
-        BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId);
-        BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId);
-        if(fileBucket == null || chunkBucket == null) {
-          return null;
-        }
-        RegionDirectory dir = new RegionDirectory(fileBucket, chunkBucket, fileSystemStats);
-        IndexWriterConfig config = new IndexWriterConfig(analyzer);
-        IndexWriter writer = new IndexWriter(dir, config);
-        return new IndexRepositoryImpl(fileBucket, writer, serializer, indexStats);
-
+        return indexRepositoryFactory.createIndexRepository(bucketId, userRegion, fileRegion,
chunkRegion, serializer,
+          analyzer, indexStats, fileSystemStats);
       } catch(IOException e) {
         throw new InternalGemFireError("Unable to create index repository", e);
       }
@@ -164,14 +154,4 @@ public class PartitionedRepositoryManager implements RepositoryManager
{
 
     return repo;
   }
-
-  /**
-   * Find the bucket in region2 that matches the bucket id from region1.
-   */
-  private BucketRegion getMatchingBucket(PartitionedRegion region, Integer bucketId) {
-    //Force the bucket to be created if it is not already
-    region.getOrCreateNodeForBucketWrite(bucketId, null);
-    
-    return region.getDataStore().getLocalBucketById(bucketId);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
index 563e382..0b70542 100644
--- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
+++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImpl.java
@@ -30,6 +30,7 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.*;
+import org.apache.lucene.store.AlreadyClosedException;
 
 import java.io.IOException;
 import java.util.function.IntSupplier;
@@ -168,7 +169,12 @@ public class IndexRepositoryImpl implements IndexRepository {
         stats.removeDocumentsSupplier(this);
         return 0;
       }
-      return writer.numDocs();
+      try {
+        return writer.numDocs();
+      } catch(AlreadyClosedException e) {
+        //ignore
+        return 0;
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
index 1f3795c..e817d3b 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java
@@ -22,9 +22,11 @@ import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.Region;
@@ -170,9 +172,13 @@ public abstract class LuceneQueriesBase extends LuceneDUnitTest {
 
       LuceneService service = LuceneServiceProvider.get(cache);
       LuceneQuery<Integer, TestObject> query;
-      query = service.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME, queryString,
defaultField);
-      PageableLuceneQueryResults<Integer, TestObject> results = query.findPages();
-      assertEquals(results.size(), expectedResultsSize);
+      query = service.createLuceneQueryFactory()
+        .setResultLimit(1000)
+        .setPageSize(1000)
+        .create(INDEX_NAME, REGION_NAME, queryString, defaultField);
+      Collection<?> results = query.findKeys();
+
+      assertEquals(expectedResultsSize, results.size());
     });
   }
 
@@ -186,7 +192,7 @@ public abstract class LuceneQueriesBase extends LuceneDUnitTest {
     });
   }
 
-  private static class TestObject implements Serializable {
+  protected static class TestObject implements Serializable {
     private static final long serialVersionUID = 1L;
     private String text;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
index 4d5a0b7..889b16f 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPRBase.java
@@ -21,25 +21,38 @@ package com.gemstone.gemfire.cache.lucene;
 
 import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.IOException;
+import java.util.concurrent.Callable;
 import java.util.function.Consumer;
+import java.util.stream.IntStream;
 
+import org.apache.lucene.analysis.Analyzer;
+import org.junit.After;
+import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.control.RebalanceOperation;
 import com.gemstone.gemfire.cache.control.RebalanceResults;
-import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.cache.lucene.internal.IndexRepositoryFactory;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexStats;
+import com.gemstone.gemfire.cache.lucene.internal.PartitionedRepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.FileSystemStats;
+import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository;
+import com.gemstone.gemfire.cache.lucene.internal.repository.serializer.LuceneSerializer;
+import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.BucketNotFoundException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
 import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
 
 /**
  * This test class adds more basic tests of lucene functionality
@@ -49,20 +62,130 @@ import com.gemstone.gemfire.test.junit.categories.DistributedTest;
  */
 public abstract class LuceneQueriesPRBase extends LuceneQueriesBase {
 
+  @After
+  public void cleanupRebalanceCallback() {
+    removeCallback(dataStore1);
+    removeCallback(dataStore2);
+  }
+
+
+
+  @Test
+  public void returnCorrectResultsWhenRebalanceHappensOnIndexUpdate() throws InterruptedException
{
+    addCallbackToTriggerRebalance(dataStore1);
+
+    putEntriesAndValidateQueryResults();
+  }
+
+  @Test
+  public void returnCorrectResultsWhenMoveBucketHappensOnIndexUpdate() throws InterruptedException
{
+    final DistributedMember member2 = dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
+    addCallbackToMoveBucket(dataStore1, member2);
+
+    putEntriesAndValidateQueryResults();
+  }
+
+  @Test
+  public void returnCorrectResultsWhenBucketIsMovedAndMovedBackOnIndexUpdate() throws InterruptedException
{
+    final DistributedMember member1 = dataStore1.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
+    final DistributedMember member2 = dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
+    addCallbackToMoveBucket(dataStore1, member2);
+    addCallbackToMoveBucket(dataStore2, member1);
+
+    putEntriesAndValidateQueryResults();
+  }
+
+  protected void putEntriesAndValidateQueryResults() {
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex));
+    accessor.invoke(() -> initAccessor(createIndex));
+    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
+
+    put113Entries();
+
+    dataStore2.invoke(() -> initDataStore(createIndex));
+    dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
+
+    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
+
+    executeTextSearch(accessor, "world", "text", 113);
+  }
+
+  @Test
+  public void returnCorrectResultsWhenRebalanceHappensAfterUpdates() throws InterruptedException
{
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex));
+    accessor.invoke(() -> initAccessor(createIndex));
+
+    put113Entries();
+
+    dataStore2.invoke(() -> initDataStore(createIndex));
+    assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
+
+    rebalanceRegion(dataStore2);
+
+    executeTextSearch(accessor, "world", "text", 113);
+  }
+
   @Test
-  public void returnCorrectResultsAfterRebalance() {
+  public void returnCorrectResultsWhenRebalanceHappensWhileSenderIsPaused() throws InterruptedException
{
     SerializableRunnableIF createIndex = () -> {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
       luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
     };
     dataStore1.invoke(() -> initDataStore(createIndex));
     accessor.invoke(() -> initAccessor(createIndex));
-    putDataInRegion(accessor);
+    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
+
+    put113Entries();
+
     dataStore2.invoke(() -> initDataStore(createIndex));
+    rebalanceRegion(dataStore2);
+    dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
 
-    rebalanceRegion(dataStore1);
     assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
-    executeTextSearch(accessor);
+
+    executeTextSearch(accessor, "world", "text", 113);
+  }
+
+  protected void put113Entries() {
+    accessor.invoke(() -> {
+      final Cache cache = getCache();
+      Region<Object, Object> region = cache.getRegion(REGION_NAME);
+      IntStream.range(0,113).forEach(i -> region.put(i, new TestObject("hello world")));
+    });
+  }
+
+  private void addCallbackToTriggerRebalance(VM vm) {
+    vm.invoke(() -> {
+      IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
+
+      spy.beforeWrite(doOnce(key -> rebalanceRegion(vm)));
+    });
+  }
+
+  protected void addCallbackToMoveBucket(VM vm, final DistributedMember destination) {
+    vm.invoke(() -> {
+      IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
+
+      spy.beforeWrite(doOnce(key -> moveBucket(destination, key)));
+    });
+  }
+
+  private void moveBucket(final DistributedMember destination, final Object key) {
+    Region<Object, Object> region = getCache().getRegion(REGION_NAME);
+    DistributedMember source = getCache().getDistributedSystem().getDistributedMember();
+    PartitionRegionHelper.moveBucketByKey(region, source, destination, key);
+  }
+
+  private void removeCallback(VM vm) {
+    vm.invoke(IndexRepositorySpy::remove);
   }
 
   private void rebalanceRegion(VM vm) {
@@ -70,8 +193,75 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase {
     vm.invoke(() -> {
         RebalanceOperation op = getCache().getResourceManager().createRebalanceFactory().start();
         RebalanceResults results = op.getResults();
-        assertTrue("Transferred " + results.getTotalBucketTransfersCompleted(), 1 < results.getTotalBucketTransfersCompleted());
     });
   }
 
+  protected static class IndexRepositorySpy extends IndexRepositoryFactory {
+
+    private Consumer<Object> beforeWrite = key -> {};
+
+    public static IndexRepositorySpy injectSpy() {
+      IndexRepositorySpy factory = new IndexRepositorySpy();
+      PartitionedRepositoryManager.indexRepositoryFactory = factory;
+      return factory;
+    }
+
+    public static void remove() {
+      PartitionedRepositoryManager.indexRepositoryFactory = new IndexRepositoryFactory();
+    }
+
+    private IndexRepositorySpy() {
+    }
+
+    @Override
+    public IndexRepository createIndexRepository(final Integer bucketId,
+                                                 final PartitionedRegion userRegion,
+                                                 final PartitionedRegion fileRegion,
+                                                 final PartitionedRegion chunkRegion,
+                                                 final LuceneSerializer serializer,
+                                                 final Analyzer analyzer,
+                                                 final LuceneIndexStats indexStats,
+                                                 final FileSystemStats fileSystemStats)
+      throws IOException
+    {
+      final IndexRepository indexRepo = super.createIndexRepository(bucketId, userRegion,
fileRegion, chunkRegion,
+        serializer, analyzer,
+        indexStats,
+        fileSystemStats);
+      final IndexRepository spy = Mockito.spy(indexRepo);
+
+      Answer invokeBeforeWrite = invocation -> {
+        beforeWrite.accept(invocation.getArgumentAt(0, Object.class));
+        invocation.callRealMethod();
+        return null;
+      };
+      doAnswer(invokeBeforeWrite).when(spy).update(any(), any());
+      doAnswer(invokeBeforeWrite).when(spy).create(any(), any());
+      doAnswer(invokeBeforeWrite).when(spy).delete(any());
+
+      return spy;
+    }
+
+    /**
+     * Add a callback that runs before a call to
+     * {@link IndexRepository#create(Object, Object)}
+     */
+    public void beforeWrite(Consumer<Object> action) {
+      this.beforeWrite = action;
+    }
+  }
+
+  protected static <T> Consumer<T> doOnce(Consumer<T> consumer) {
+    return new Consumer<T>() {
+      boolean done;
+
+      @Override
+      public void accept(final T t) {
+        if (!done) {
+          done = true;
+          consumer.accept(t);
+        }
+      }
+    };
+  };
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
index 830ca26..00b8254 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRDUnitTest.java
@@ -17,6 +17,8 @@
 package com.gemstone.gemfire.cache.lucene;
 
 import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.REGION_NAME;
+
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 import com.gemstone.gemfire.cache.RegionShortcut;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
 import com.gemstone.gemfire.test.junit.categories.DistributedTest;
@@ -28,7 +30,11 @@ public class LuceneQueriesPeerPRDUnitTest extends LuceneQueriesPRBase {
 
   @Override protected void initDataStore(final SerializableRunnableIF createIndex) throws
Exception {
     createIndex.run();
-    getCache().createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME);
+    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
+    partitionAttributesFactory.setLocalMaxMemory(100);
+    getCache().createRegionFactory(RegionShortcut.PARTITION)
+      .setPartitionAttributes(partitionAttributesFactory.create())
+      .create(REGION_NAME);
   }
 
   @Override protected void initAccessor(final SerializableRunnableIF createIndex) throws
Exception {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
index 494cc9f..0a7bb67 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesPeerPRRedundancyDUnitTest.java
@@ -16,11 +16,23 @@
  */
 package com.gemstone.gemfire.cache.lucene;
 
-import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.REGION_NAME;
+import static com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities.*;
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.lucene.test.LuceneTestUtilities;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
 import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
+import com.gemstone.gemfire.test.dunit.VM;
 import com.gemstone.gemfire.test.junit.categories.DistributedTest;
 
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category(DistributedTest.class)
@@ -34,4 +46,44 @@ public class LuceneQueriesPeerPRRedundancyDUnitTest extends LuceneQueriesPRBase
   @Override protected void initAccessor(final SerializableRunnableIF createIndex) throws
Exception {
     initDataStore(createIndex);
   }
+
+  @Test
+  public void returnCorrectResultsWhenMovePrimaryHappensOnIndexUpdate() throws InterruptedException
{
+    final DistributedMember member2 = dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
+    addCallbackToMovePrimary(dataStore1, member2);
+
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndex(INDEX_NAME, REGION_NAME, "text");
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex));
+    dataStore2.invoke(() -> initDataStore(createIndex));
+    accessor.invoke(() -> initAccessor(createIndex));
+    dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
+
+    put113Entries();
+
+    dataStore1.invoke(() -> LuceneTestUtilities.resumeSender(getCache()));
+
+    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
+
+    executeTextSearch(accessor, "world", "text", 113);
+  }
+
+  protected void addCallbackToMovePrimary(VM vm, final DistributedMember destination) {
+    vm.invoke(() -> {
+      IndexRepositorySpy spy = IndexRepositorySpy.injectSpy();
+
+      spy.beforeWrite(doOnce(key -> moveBucket(destination, key)));
+    });
+  }
+
+  private void moveBucket(final DistributedMember destination, final Object key) {
+    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(REGION_NAME);
+
+    BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage.send(
+      (InternalDistributedMember) destination, region, region.getKeyInfo(key).getBucketId(),
true);
+    assertNotNull(response);
+    assertTrue(response.waitForResponse());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7b28a8d4/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
index 86ed481..e3331de 100644
--- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
+++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
@@ -96,12 +96,10 @@ public class LuceneEventListenerJUnitTest {
 
     listener.processEvents(events);
 
-    verify(repo1, atLeast(numEntries / 6)).create(any(), any());
     verify(repo1, atLeast(numEntries / 6)).delete(any());
-    verify(repo1, atLeast(numEntries / 6)).update(any(), any());
-    verify(repo2, atLeast(numEntries / 6)).create(any(), any());
+    verify(repo1, atLeast(numEntries / 3)).update(any(), any());
     verify(repo2, atLeast(numEntries / 6)).delete(any());
-    verify(repo2, atLeast(numEntries / 6)).update(any(), any());
+    verify(repo2, atLeast(numEntries / 3)).update(any(), any());
     verify(repo1, times(1)).commit();
     verify(repo2, times(1)).commit();
   }



Mime
View raw message