geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [3/5] geode git commit: GEODE-2520: Add a lucene query test where a bucket is moved while a query is executing
Date Wed, 22 Feb 2017 21:38:22 GMT
GEODE-2520: Add a lucene query test where a bucket is moved while a query is executing

*  Added LuceneIndexServiceSpy


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0d2397b2
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0d2397b2
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0d2397b2

Branch: refs/heads/feature/GEODE-2460
Commit: 0d2397b203060f02b95ece30bf4c7130685b71cd
Parents: 8065b67
Author: Jason Huynh <huynhja@gmail.com>
Authored: Fri Feb 17 13:03:29 2017 -0800
Committer: Jason Huynh <huynhja@gmail.com>
Committed: Wed Feb 22 11:33:59 2017 -0800

----------------------------------------------------------------------
 .../internal/distributed/LuceneFunction.java    |  1 +
 .../distributed/WaitUntilFlushedFunction.java   | 10 ++-
 .../geode/cache/lucene/LuceneQueriesPRBase.java | 32 +++++++-
 .../lucene/internal/LuceneIndexFactorySpy.java  | 80 ++++++++++++++++++++
 4 files changed, 116 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/0d2397b2/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
index 76c5893..a4c2c66 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.lucene.internal.LuceneIndexImpl;
 import org.apache.geode.cache.lucene.internal.LuceneIndexStats;
+import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.search.Query;
 

http://git-wip-us.apache.org/repos/asf/geode/blob/0d2397b2/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
index 90f1b9f..e11384c 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/WaitUntilFlushedFunction.java
@@ -20,6 +20,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.lucene.internal.LuceneIndexImpl;
 import org.apache.geode.cache.lucene.internal.LuceneIndexStats;
 import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
@@ -49,7 +51,7 @@ import org.apache.geode.internal.logging.LogService;
  * in current AEQs are flushed into index. This function enables an accessor and client to
call to
  * make sure the current events are processed.
  */
-public class WaitUntilFlushedFunction extends FunctionAdapter implements InternalEntity {
+public class WaitUntilFlushedFunction implements Function, InternalEntity {
   private static final long serialVersionUID = 1L;
   public static final String ID = WaitUntilFlushedFunction.class.getName();
 
@@ -61,7 +63,7 @@ public class WaitUntilFlushedFunction extends FunctionAdapter implements
Interna
     ResultSender<Boolean> resultSender = ctx.getResultSender();
 
     Region region = ctx.getDataSet();
-
+    Cache cache = region.getCache();
     WaitUntilFlushedFunctionContext arg = (WaitUntilFlushedFunctionContext) ctx.getArguments();
     String indexName = arg.getIndexName();
     if (indexName == null) {
@@ -70,12 +72,12 @@ public class WaitUntilFlushedFunction extends FunctionAdapter implements
Interna
     long timeout = arg.getTimeout();
     TimeUnit unit = arg.getTimeunit();
 
-    LuceneService service = LuceneServiceProvider.get(region.getCache());
+    LuceneService service = LuceneServiceProvider.get(cache);
     LuceneIndexImpl index = (LuceneIndexImpl) service.getIndex(indexName, region.getFullPath());
 
     boolean result = false;
     String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, region.getFullPath());
-    AsyncEventQueueImpl queue = (AsyncEventQueueImpl) region.getCache().getAsyncEventQueue(aeqId);
+    AsyncEventQueueImpl queue = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId);
     if (queue != null) {
       try {
         result = queue.waitUntilFlushed(timeout, unit);

http://git-wip-us.apache.org/repos/asf/geode/blob/0d2397b2/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
index 25a4927..9c2f6ed 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPRBase.java
@@ -20,9 +20,13 @@ import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 
-import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
+import org.apache.geode.cache.lucene.internal.LuceneIndexFactorySpy;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
@@ -30,6 +34,11 @@ import org.apache.geode.internal.cache.InitialImageOperation;
 import org.apache.geode.internal.cache.InitialImageOperation.GIITestHook;
 import org.apache.geode.internal.cache.InitialImageOperation.GIITestHookType;
 import org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage;
+import org.apache.geode.cache.lucene.internal.LuceneIndexFactorySpy;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
 import org.junit.After;
 import org.junit.Test;
 
@@ -37,10 +46,8 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.control.RebalanceOperation;
 import org.apache.geode.cache.control.RebalanceResults;
-import org.apache.geode.cache.lucene.internal.LuceneIndexImpl;
 import org.apache.geode.cache.lucene.test.IndexRepositorySpy;
 import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
@@ -79,6 +86,15 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase {
   }
 
   @Test
+  public void returnCorrectResultsWhenMoveBucketHappensOnQuery() throws InterruptedException
{
+    final DistributedMember member2 =
+        dataStore2.invoke(() -> getCache().getDistributedSystem().getDistributedMember());
+    addCallbackToMovePrimaryOnQuery(dataStore1, member2);
+
+    putEntriesAndValidateQueryResults();
+  }
+
+  @Test
   public void returnCorrectResultsWhenBucketIsMovedAndMovedBackOnIndexUpdate()
       throws InterruptedException {
     final DistributedMember member1 =
@@ -186,9 +202,18 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase {
     });
   }
 
+  protected void addCallbackToMovePrimaryOnQuery(VM vm, final DistributedMember destination)
{
+    vm.invoke(() -> {
+      LuceneIndexFactorySpy factorySpy = LuceneIndexFactorySpy.injectSpy();
+
+      factorySpy.setGetRespositoryConsumer(doOnce(key -> moveBucket(destination, key)));
+    });
+  }
+
   private void moveBucket(final DistributedMember destination, final Object key) {
     Region<Object, Object> region = getCache().getRegion(REGION_NAME);
     DistributedMember source = getCache().getDistributedSystem().getDistributedMember();
+    System.out.println("WHAT");
     PartitionRegionHelper.moveBucketByKey(region, source, destination, key);
   }
 
@@ -196,6 +221,7 @@ public abstract class LuceneQueriesPRBase extends LuceneQueriesBase {
     vm.invoke(() -> {
       IndexRepositorySpy.remove();
       InitialImageOperation.resetAllGIITestHooks();
+      LuceneIndexFactorySpy.remove();
     });
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/0d2397b2/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
new file mode 100644
index 0000000..5c6d256
--- /dev/null
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexFactorySpy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
+import java.util.function.Consumer;
+
+public class LuceneIndexFactorySpy extends LuceneIndexFactory {
+
+  public static LuceneIndexFactorySpy injectSpy() {
+    LuceneIndexFactorySpy factory = new LuceneIndexFactorySpy();
+    LuceneServiceImpl.luceneIndexFactory = factory;
+    return factory;
+  }
+
+  public static void remove() {
+    LuceneServiceImpl.luceneIndexFactory = new LuceneIndexFactory();
+  }
+
+
+  private Consumer<Object> getRepositoryConsumer = key -> {
+  };
+
+  @Override
+  public LuceneIndexImpl create(String indexName, String regionPath, GemFireCacheImpl cache)
{
+    LuceneIndexForPartitionedRegion index =
+        Mockito.spy(new ExtendedLuceneIndexForPartitionedRegion(indexName, regionPath, cache));
+    return index;
+  }
+
+
+  public void setGetRespositoryConsumer(Consumer<Object> getRepositoryConsumer) {
+    this.getRepositoryConsumer = getRepositoryConsumer;
+  }
+
+
+  private class ExtendedLuceneIndexForPartitionedRegion extends LuceneIndexForPartitionedRegion
{
+    public ExtendedLuceneIndexForPartitionedRegion(final String indexName, final String regionPath,
+        final Cache cache) {
+      super(indexName, regionPath, cache);
+    }
+
+    @Override
+    public RepositoryManager createRepositoryManager() {
+      RepositoryManager repositoryManagerSpy = Mockito.spy(super.createRepositoryManager());
+      Answer getRepositoryAnswer = invocation -> {
+        getRepositoryConsumer.accept(invocation.getArgumentAt(0, Object.class));
+        return invocation.callRealMethod();
+      };
+      try {
+        doAnswer(getRepositoryAnswer).when(repositoryManagerSpy).getRepositories(any());
+      } catch (BucketNotFoundException e) {
+        e.printStackTrace();
+      }
+      return repositoryManagerSpy;
+    }
+  }
+}
+


Mime
View raw message