geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasonhu...@apache.org
Subject geode git commit: Revert "GEODE-2529: Rename LuceneFunction to LuceneQueryFunction"
Date Thu, 23 Feb 2017 00:34:59 GMT
Repository: geode
Updated Branches:
  refs/heads/develop a59a37a51 -> 2d72624cd


Revert "GEODE-2529: Rename LuceneFunction to LuceneQueryFunction"

This reverts commit a59a37a51deb89ccaf16728850687ead1657457a.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/2d72624c
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/2d72624c
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/2d72624c

Branch: refs/heads/develop
Commit: 2d72624cda30af5a34b48e2c1dcc2f6b69680909
Parents: a59a37a
Author: Jason Huynh <huynhja@gmail.com>
Authored: Wed Feb 22 16:34:50 2017 -0800
Committer: Jason Huynh <huynhja@gmail.com>
Committed: Wed Feb 22 16:34:50 2017 -0800

----------------------------------------------------------------------
 .../cache/lucene/internal/LuceneQueryImpl.java  |  32 +-
 .../lucene/internal/LuceneServiceImpl.java      |   5 +-
 .../internal/distributed/LuceneFunction.java    | 138 +++++++++
 .../internal/distributed/package-info.java      |   4 +-
 .../internal/LuceneQueryImplJUnitTest.java      |   4 +-
 .../LuceneServiceImplIntegrationTest.java       |   6 +-
 .../LuceneFunctionContextJUnitTest.java         |  59 ++++
 .../distributed/LuceneFunctionJUnitTest.java    | 300 +++++++++++++++++++
 8 files changed, 525 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/2d72624c/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
index b50996b..b41bb5f 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneQueryImpl.java
@@ -32,12 +32,13 @@ import org.apache.geode.cache.lucene.LuceneQueryProvider;
 import org.apache.geode.cache.lucene.LuceneResultStruct;
 import org.apache.geode.cache.lucene.PageableLuceneQueryResults;
 import org.apache.geode.cache.lucene.internal.distributed.EntryScore;
-import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
+import org.apache.geode.cache.lucene.internal.distributed.LuceneFunction;
 import org.apache.geode.cache.lucene.internal.distributed.LuceneFunctionContext;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntries;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollector;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollectorManager;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesFunctionCollector;
+import org.apache.geode.internal.cache.BucketNotFoundException;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.logging.log4j.Logger;
 
@@ -106,20 +107,23 @@ public class LuceneQueryImpl<K, V> implements LuceneQuery<K,
V> {
 
     // TODO provide a timeout to the user?
     TopEntries<K> entries = null;
-    try {
-      TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(context);
-      ResultCollector<TopEntriesCollector, TopEntries<K>> rc =
-          (ResultCollector<TopEntriesCollector, TopEntries<K>>) onRegion().withArgs(context)
-              .withCollector(collector).execute(LuceneQueryFunction.ID);
-      entries = rc.getResult();
-    } catch (FunctionException e) {
-      if (e.getCause() instanceof LuceneQueryException) {
-        throw new LuceneQueryException(e);
-      } else {
-        e.printStackTrace();
-        throw e;
+    while (entries == null) {
+      try {
+        TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(context);
+        ResultCollector<TopEntriesCollector, TopEntries<K>> rc =
+            (ResultCollector<TopEntriesCollector, TopEntries<K>>) onRegion().withArgs(context)
+                .withCollector(collector).execute(LuceneFunction.ID);
+        entries = rc.getResult();
+      } catch (FunctionException e) {
+        if (e.getCause() instanceof BucketNotFoundException) {
+          entries = null;
+        } else if (e.getCause() instanceof LuceneQueryException) {
+          throw new LuceneQueryException(e);
+        } else {
+          e.printStackTrace();
+          throw e;
+        }
       }
-
     }
     return entries;
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/2d72624c/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index cf7b2c9..f1b1861 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -18,7 +18,6 @@ package org.apache.geode.cache.lucene.internal;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
 import org.apache.geode.cache.lucene.internal.management.LuceneServiceMBean;
 import org.apache.geode.cache.lucene.internal.management.ManagementIndexListener;
 import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
@@ -40,6 +39,7 @@ import org.apache.geode.cache.lucene.LuceneIndex;
 import org.apache.geode.cache.lucene.LuceneQueryFactory;
 import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
 import org.apache.geode.cache.lucene.internal.distributed.EntryScore;
+import org.apache.geode.cache.lucene.internal.distributed.LuceneFunction;
 import org.apache.geode.cache.lucene.internal.distributed.LuceneFunctionContext;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntries;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollector;
@@ -55,6 +55,7 @@ import org.apache.geode.internal.cache.extension.Extensible;
 import org.apache.geode.internal.cache.CacheService;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalRegionArguments;
+import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.RegionListener;
 import org.apache.geode.internal.cache.xmlcache.XmlGenerator;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -88,7 +89,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
 
     this.cache = gfc;
 
-    FunctionService.registerFunction(new LuceneQueryFunction());
+    FunctionService.registerFunction(new LuceneFunction());
     FunctionService.registerFunction(new WaitUntilFlushedFunction());
     FunctionService.registerFunction(new DumpDirectoryFiles());
     registerDataSerializables();

http://git-wip-us.apache.org/repos/asf/geode/blob/2d72624c/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
new file mode 100644
index 0000000..a4c2c66
--- /dev/null
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunction.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.geode.cache.lucene.internal.distributed;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.lucene.internal.LuceneIndexImpl;
+import org.apache.geode.cache.lucene.internal.LuceneIndexStats;
+import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
+import org.apache.logging.log4j.Logger;
+import org.apache.lucene.search.Query;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionAdapter;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.cache.lucene.LuceneQueryException;
+import org.apache.geode.cache.lucene.LuceneQueryProvider;
+import org.apache.geode.cache.lucene.LuceneService;
+import org.apache.geode.cache.lucene.LuceneServiceProvider;
+import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
+import org.apache.geode.cache.lucene.internal.repository.IndexResultCollector;
+import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import org.apache.geode.internal.InternalEntity;
+import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * {@link LuceneFunction} coordinates text search on a member. It receives text search query
from
+ * the coordinator and arguments like region and buckets. It invokes search on the local
index and
+ * provides a result collector. The locally collected results are sent to the search coordinator.
+ */
+public class LuceneFunction implements Function, InternalEntity {
+  private static final long serialVersionUID = 1L;
+  public static final String ID = LuceneFunction.class.getName();
+
+  private static final Logger logger = LogService.getLogger();
+
+  @Override
+  public void execute(FunctionContext context) {
+    RegionFunctionContext ctx = (RegionFunctionContext) context;
+    ResultSender<TopEntriesCollector> resultSender = ctx.getResultSender();
+
+    Region region = ctx.getDataSet();
+
+    LuceneFunctionContext<IndexResultCollector> searchContext =
+        (LuceneFunctionContext) ctx.getArguments();
+    if (searchContext == null) {
+      throw new IllegalArgumentException("Missing search context");
+    }
+
+    LuceneQueryProvider queryProvider = searchContext.getQueryProvider();
+    if (queryProvider == null) {
+      throw new IllegalArgumentException("Missing query provider");
+    }
+
+    LuceneService service = LuceneServiceProvider.get(region.getCache());
+    LuceneIndexImpl index =
+        (LuceneIndexImpl) service.getIndex(searchContext.getIndexName(), region.getFullPath());
+    RepositoryManager repoManager = index.getRepositoryManager();
+    LuceneIndexStats stats = index.getIndexStats();
+
+    Query query = null;
+    try {
+      query = queryProvider.getQuery(index);
+    } catch (LuceneQueryException e) {
+      logger.warn("", e);
+      throw new FunctionException(e);
+    }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Executing lucene query: {}, on region {}", query, region.getFullPath());
+    }
+
+    int resultLimit = searchContext.getLimit();
+    CollectorManager manager = (searchContext == null) ? null : searchContext.getCollectorManager();
+    if (manager == null) {
+      manager = new TopEntriesCollectorManager(null, resultLimit);
+    }
+
+    Collection<IndexResultCollector> results = new ArrayList<>();
+    TopEntriesCollector mergedResult = null;
+    try {
+      long start = stats.startQuery();
+      Collection<IndexRepository> repositories = null;
+
+      try {
+        repositories = repoManager.getRepositories(ctx);
+
+        for (IndexRepository repo : repositories) {
+          IndexResultCollector collector = manager.newCollector(repo.toString());
+          if (logger.isDebugEnabled()) {
+            logger.debug("Executing search on repo: " + repo.toString());
+          }
+          repo.query(query, resultLimit, collector);
+          results.add(collector);
+        }
+        mergedResult = (TopEntriesCollector) manager.reduce(results);
+      } finally {
+        stats.endQuery(start, mergedResult == null ? 0 : mergedResult.size());
+      }
+      stats.incNumberOfQueryExecuted();
+      resultSender.lastResult(mergedResult);
+    } catch (IOException | BucketNotFoundException e) {
+      logger.debug("Exception during lucene query function", e);
+      throw new FunctionException(e);
+    }
+  }
+
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/2d72624c/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java
index b718c29..f6c1018 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/package-info.java
@@ -14,8 +14,8 @@
  */
 /**
  * Classes used for distributing lucene queries to geode nodes. Contains the lucene related
- * functions like {@link org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction}
as
- * well as objects that are passed between nodes like
+ * functions like {@link org.apache.geode.cache.lucene.internal.distributed.LuceneFunction}
as well
+ * as objects that are passed between nodes like
  * {@link org.apache.geode.cache.lucene.internal.distributed.EntryScore}
  */
 

http://git-wip-us.apache.org/repos/asf/geode/blob/2d72624c/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
index 9f826d5..4a31e96 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneQueryImplJUnitTest.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -41,6 +40,7 @@ import org.apache.geode.cache.lucene.LuceneQueryProvider;
 import org.apache.geode.cache.lucene.PageableLuceneQueryResults;
 import org.apache.geode.cache.lucene.LuceneResultStruct;
 import org.apache.geode.cache.lucene.internal.distributed.EntryScore;
+import org.apache.geode.cache.lucene.internal.distributed.LuceneFunction;
 import org.apache.geode.cache.lucene.internal.distributed.LuceneFunctionContext;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntries;
 import org.apache.geode.cache.lucene.internal.distributed.TopEntriesCollector;
@@ -127,7 +127,7 @@ public class LuceneQueryImplJUnitTest {
     addValueToResults();
     PageableLuceneQueryResults<Object, Object> results = query.findPages();
 
-    verify(execution).execute(eq(LuceneQueryFunction.ID));
+    verify(execution).execute(eq(LuceneFunction.ID));
     ArgumentCaptor<LuceneFunctionContext> captor =
         ArgumentCaptor.forClass(LuceneFunctionContext.class);
     verify(execution).withArgs(captor.capture());

http://git-wip-us.apache.org/repos/asf/geode/blob/2d72624c/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplIntegrationTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplIntegrationTest.java
index 9b382e6..9e3a13c 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplIntegrationTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneServiceImplIntegrationTest.java
@@ -23,7 +23,7 @@ import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.lucene.LuceneService;
 import org.apache.geode.cache.lucene.LuceneServiceProvider;
-import org.apache.geode.cache.lucene.internal.distributed.LuceneQueryFunction;
+import org.apache.geode.cache.lucene.internal.distributed.LuceneFunction;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.junit.After;
 import org.junit.Rule;
@@ -55,13 +55,13 @@ public class LuceneServiceImplIntegrationTest {
   // lucene service will register query execution function on initialization
   @Test
   public void shouldRegisterQueryFunction() {
-    Function function = FunctionService.getFunction(LuceneQueryFunction.ID);
+    Function function = FunctionService.getFunction(LuceneFunction.ID);
     assertNull(function);
 
     cache = getCache();
     new LuceneServiceImpl().init(cache);
 
-    function = FunctionService.getFunction(LuceneQueryFunction.ID);
+    function = FunctionService.getFunction(LuceneFunction.ID);
     assertNotNull(function);
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/2d72624c/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionContextJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionContextJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionContextJUnitTest.java
new file mode 100644
index 0000000..1a6ed59
--- /dev/null
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionContextJUnitTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal.distributed;
+
+import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.DEFAULT_FIELD;
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CopyHelper;
+import org.apache.geode.cache.lucene.LuceneQueryFactory;
+import org.apache.geode.cache.lucene.LuceneQueryProvider;
+import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
+import org.apache.geode.cache.lucene.internal.StringQueryProvider;
+import org.apache.geode.cache.lucene.internal.repository.IndexResultCollector;
+import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class LuceneFunctionContextJUnitTest {
+
+  @Test
+  public void testLuceneFunctionArgsDefaults() {
+    LuceneFunctionContext<IndexResultCollector> context = new LuceneFunctionContext<>();
+    assertEquals(LuceneQueryFactory.DEFAULT_LIMIT, context.getLimit());
+    assertEquals(DataSerializableFixedID.LUCENE_FUNCTION_CONTEXT, context.getDSFID());
+  }
+
+  @Test
+  public void testSerialization() {
+    LuceneServiceImpl.registerDataSerializables();
+
+    LuceneQueryProvider provider = new StringQueryProvider("text", DEFAULT_FIELD);
+    CollectorManager<TopEntriesCollector> manager = new TopEntriesCollectorManager("test");
+    LuceneFunctionContext<TopEntriesCollector> context =
+        new LuceneFunctionContext<>(provider, "testIndex", manager, 123);
+
+    LuceneFunctionContext<TopEntriesCollector> copy = CopyHelper.deepCopy(context);
+    assertEquals(123, copy.getLimit());
+    assertNotNull(copy.getQueryProvider());
+    assertEquals("text", ((StringQueryProvider) copy.getQueryProvider()).getQueryString());
+    assertEquals(TopEntriesCollectorManager.class, copy.getCollectorManager().getClass());
+    assertEquals("test", ((TopEntriesCollectorManager) copy.getCollectorManager()).getId());
+    assertEquals("testIndex", copy.getIndexName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/2d72624c/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
new file mode 100644
index 0000000..fe05248
--- /dev/null
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/LuceneFunctionJUnitTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.geode.cache.lucene.internal.distributed;
+
+import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.DEFAULT_FIELD;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.cache.lucene.LuceneQueryException;
+import org.apache.geode.cache.lucene.LuceneQueryFactory;
+import org.apache.geode.cache.lucene.LuceneQueryProvider;
+import org.apache.geode.cache.lucene.internal.InternalLuceneService;
+import org.apache.geode.cache.lucene.internal.LuceneIndexImpl;
+import org.apache.geode.cache.lucene.internal.LuceneIndexStats;
+import org.apache.geode.cache.lucene.internal.StringQueryProvider;
+import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
+import org.apache.geode.cache.lucene.internal.repository.IndexResultCollector;
+import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
+import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.search.Query;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+@Category(UnitTest.class)
+public class LuceneFunctionJUnitTest {
+
+  String regionPath = "/region";
+  String indexName = "index";
+  final EntryScore<String> r1_1 = new EntryScore<String>("key-1-1", .5f);
+  final EntryScore<String> r1_2 = new EntryScore<String>("key-1-2", .4f);
+  final EntryScore<String> r1_3 = new EntryScore<String>("key-1-3", .3f);
+  final EntryScore<String> r2_1 = new EntryScore<String>("key-2-1", .45f);
+  final EntryScore<String> r2_2 = new EntryScore<String>("key-2-2", .35f);
+
+  InternalRegionFunctionContext mockContext;
+  ResultSender<TopEntriesCollector> mockResultSender;
+  Region<Object, Object> mockRegion;
+
+  RepositoryManager mockRepoManager;
+  IndexRepository mockRepository1;
+  IndexRepository mockRepository2;
+  IndexResultCollector mockCollector;
+  InternalLuceneService mockService;
+  LuceneIndexImpl mockIndex;
+  LuceneIndexStats mockStats;
+
+  ArrayList<IndexRepository> repos;
+  LuceneFunctionContext<IndexResultCollector> searchArgs;
+  LuceneQueryProvider queryProvider;
+  Query query;
+
+  private InternalCache mockCache;
+
+  @Test
+  public void testRepoQueryAndMerge() throws Exception {
+    when(mockContext.getDataSet()).thenReturn(mockRegion);
+    when(mockContext.getArguments()).thenReturn(searchArgs);
+    when(mockContext.<TopEntriesCollector>getResultSender()).thenReturn(mockResultSender);
+    when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+    doAnswer(invocation -> {
+      IndexResultCollector collector = invocation.getArgumentAt(2, IndexResultCollector.class);
+      collector.collect(r1_1.getKey(), r1_1.getScore());
+      collector.collect(r1_2.getKey(), r1_2.getScore());
+      collector.collect(r1_3.getKey(), r1_3.getScore());
+      return null;
+    }).when(mockRepository1).query(eq(query), eq(LuceneQueryFactory.DEFAULT_LIMIT),
+        any(IndexResultCollector.class));
+
+    doAnswer(invocation -> {
+      IndexResultCollector collector = invocation.getArgumentAt(2, IndexResultCollector.class);
+      collector.collect(r2_1.getKey(), r2_1.getScore());
+      collector.collect(r2_2.getKey(), r2_2.getScore());
+      return null;
+    }).when(mockRepository2).query(eq(query), eq(LuceneQueryFactory.DEFAULT_LIMIT),
+        any(IndexResultCollector.class));
+
+    LuceneFunction function = new LuceneFunction();
+
+    function.execute(mockContext);
+
+    ArgumentCaptor<TopEntriesCollector> resultCaptor =
+        ArgumentCaptor.forClass(TopEntriesCollector.class);
+    verify(mockResultSender).lastResult(resultCaptor.capture());
+    TopEntriesCollector result = resultCaptor.getValue();
+
+
+    List<EntryScore> hits = result.getEntries().getHits();
+    assertEquals(5, hits.size());
+    TopEntriesJUnitTest.verifyResultOrder(result.getEntries().getHits(), r1_1, r2_1, r1_2,
r2_2,
+        r1_3);
+  }
+
+  @Test
+  public void testResultLimitClause() throws Exception {
+
+    searchArgs =
+        new LuceneFunctionContext<IndexResultCollector>(queryProvider, "indexName",
null, 3);
+    when(mockContext.getDataSet()).thenReturn(mockRegion);
+    when(mockContext.getArguments()).thenReturn(searchArgs);
+    when(mockContext.<TopEntriesCollector>getResultSender()).thenReturn(mockResultSender);
+    when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+
+    doAnswer(invocation -> {
+      IndexResultCollector collector = invocation.getArgumentAt(2, IndexResultCollector.class);
+      collector.collect(r1_1.getKey(), r1_1.getScore());
+      collector.collect(r1_2.getKey(), r1_2.getScore());
+      collector.collect(r1_3.getKey(), r1_3.getScore());
+      return null;
+    }).when(mockRepository1).query(eq(query), eq(3), any(IndexResultCollector.class));
+
+    doAnswer(invocation -> {
+      IndexResultCollector collector = invocation.getArgumentAt(2, IndexResultCollector.class);
+      collector.collect(r2_1.getKey(), r2_1.getScore());
+      collector.collect(r2_2.getKey(), r2_2.getScore());
+      return null;
+    }).when(mockRepository2).query(eq(query), eq(3), any(IndexResultCollector.class));
+
+
+    LuceneFunction function = new LuceneFunction();
+
+    function.execute(mockContext);
+
+    ArgumentCaptor<TopEntriesCollector> resultCaptor =
+        ArgumentCaptor.forClass(TopEntriesCollector.class);
+    verify(mockResultSender).lastResult(resultCaptor.capture());
+    TopEntriesCollector result = resultCaptor.getValue();
+
+    List<EntryScore> hits = result.getEntries().getHits();
+    assertEquals(3, hits.size());
+    TopEntriesJUnitTest.verifyResultOrder(result.getEntries().getHits(), r1_1, r2_1, r1_2);
+  }
+
+  @Test
+  public void injectCustomCollectorManager() throws Exception {
+    final CollectorManager mockManager = mock(CollectorManager.class);
+    searchArgs =
+        new LuceneFunctionContext<IndexResultCollector>(queryProvider, "indexName",
mockManager);
+    when(mockContext.getDataSet()).thenReturn(mockRegion);
+    when(mockContext.getArguments()).thenReturn(searchArgs);
+    when(mockContext.<TopEntriesCollector>getResultSender()).thenReturn(mockResultSender);
+    repos.remove(0);
+    when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+    when(mockManager.newCollector(eq("repo2"))).thenReturn(mockCollector);
+    when(mockManager.reduce(any(Collection.class))).thenAnswer(invocation -> {
+      Collection<IndexResultCollector> collectors = invocation.getArgumentAt(0, Collection.class);
+      assertEquals(1, collectors.size());
+      assertEquals(mockCollector, collectors.iterator().next());
+      return new TopEntriesCollector(null);
+
+    });
+
+    doAnswer(invocation -> {
+      IndexResultCollector collector = invocation.getArgumentAt(2, IndexResultCollector.class);
+      collector.collect(r2_1.getKey(), r2_1.getScore());
+      return null;
+    }).when(mockRepository2).query(eq(query), eq(LuceneQueryFactory.DEFAULT_LIMIT),
+        any(IndexResultCollector.class));
+
+
+    LuceneFunction function = new LuceneFunction();
+
+    function.execute(mockContext);
+
+    verify(mockCollector).collect(eq("key-2-1"), eq(.45f));
+    verify(mockResultSender).lastResult(any(TopEntriesCollector.class));
+  }
+
+  @Test(expected = FunctionException.class)
+  public void testIndexRepoQueryFails() throws Exception {
+    when(mockContext.getDataSet()).thenReturn(mockRegion);
+    when(mockContext.getArguments()).thenReturn(searchArgs);
+    when(mockContext.<TopEntriesCollector>getResultSender()).thenReturn(mockResultSender);
+    when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+    doThrow(IOException.class).when(mockRepository1).query(eq(query),
+        eq(LuceneQueryFactory.DEFAULT_LIMIT), any(IndexResultCollector.class));
+
+    LuceneFunction function = new LuceneFunction();
+
+    function.execute(mockContext);
+  }
+
+  // Disabled currently as we are retrying the function if a bucket is not found
+  // @Test(expected = FunctionException.class)
+  // public void testBucketNotFound() throws Exception {
+  // when(mockContext.getDataSet()).thenReturn(mockRegion);
+  // when(mockContext.getArguments()).thenReturn(searchArgs);
+  // when(mockContext.<TopEntriesCollector>getResultSender()).thenReturn(mockResultSender);
+  // when(mockRepoManager.getRepositories(eq(mockContext)))
+  // .thenThrow(new BucketNotFoundException(""));
+  // LuceneFunction function = new LuceneFunction();
+  //
+  // function.execute(mockContext);
+  //
+  // verify(mockResultSender).sendException(any(BucketNotFoundException.class));
+  // }
+
+  @Test(expected = FunctionException.class)
+  public void testReduceError() throws Exception {
+    final CollectorManager mockManager = mock(CollectorManager.class);
+    searchArgs =
+        new LuceneFunctionContext<IndexResultCollector>(queryProvider, "indexName",
mockManager);
+
+    when(mockContext.getDataSet()).thenReturn(mockRegion);
+    when(mockContext.getArguments()).thenReturn(searchArgs);
+    when(mockContext.<TopEntriesCollector>getResultSender()).thenReturn(mockResultSender);
+    repos.remove(1);
+    when(mockRepoManager.getRepositories(eq(mockContext))).thenReturn(repos);
+    when(mockManager.newCollector(eq("repo1"))).thenReturn(mockCollector);
+    when(mockManager.reduce(any(Collection.class))).thenThrow(IOException.class);
+
+    LuceneFunction function = new LuceneFunction();
+
+    function.execute(mockContext);
+  }
+
+  @Test(expected = FunctionException.class)
+  public void queryProviderErrorIsHandled() throws Exception {
+    queryProvider = mock(LuceneQueryProvider.class);
+    searchArgs = new LuceneFunctionContext<IndexResultCollector>(queryProvider, "indexName");
+    when(mockContext.getDataSet()).thenReturn(mockRegion);
+    when(mockContext.getArguments()).thenReturn(searchArgs);
+    when(mockContext.<TopEntriesCollector>getResultSender()).thenReturn(mockResultSender);
+    when(queryProvider.getQuery(eq(mockIndex))).thenThrow(LuceneQueryException.class);
+    LuceneFunction function = new LuceneFunction();
+
+    function.execute(mockContext);
+  }
+
+  @Test
+  public void testQueryFunctionId() {
+    String id = new LuceneFunction().getId();
+    assertEquals(LuceneFunction.class.getName(), id);
+  }
+
+  @Before
+  public void createMocksAndCommonObjects() throws Exception {
+    mockContext = mock(InternalRegionFunctionContext.class);
+    mockResultSender = mock(ResultSender.class);
+    mockRegion = mock(Region.class);
+
+    mockRepoManager = mock(RepositoryManager.class);
+    mockRepository1 = mock(IndexRepository.class, "repo1");
+    mockRepository2 = mock(IndexRepository.class, "repo2");
+    mockCollector = mock(IndexResultCollector.class);
+    mockStats = mock(LuceneIndexStats.class);
+
+    repos = new ArrayList<IndexRepository>();
+    repos.add(mockRepository1);
+    repos.add(mockRepository2);
+
+    mockIndex = mock(LuceneIndexImpl.class);
+    mockService = mock(InternalLuceneService.class);
+    mockCache = mock(InternalCache.class);
+    Analyzer analyzer = new StandardAnalyzer();
+    Mockito.doReturn(analyzer).when(mockIndex).getAnalyzer();
+    queryProvider = new StringQueryProvider("gemfire:lucene", DEFAULT_FIELD);
+
+    searchArgs = new LuceneFunctionContext<IndexResultCollector>(queryProvider, "indexName");
+
+    when(mockRegion.getCache()).thenReturn(mockCache);
+    when(mockRegion.getFullPath()).thenReturn(regionPath);
+    when(mockCache.getService(any())).thenReturn(mockService);
+    when(mockService.getIndex(eq("indexName"), eq(regionPath))).thenReturn(mockIndex);
+    when(mockIndex.getRepositoryManager()).thenReturn(mockRepoManager);
+    when(mockIndex.getFieldNames()).thenReturn(new String[] {"gemfire"});
+    when(mockIndex.getIndexStats()).thenReturn(mockStats);
+
+    query = queryProvider.getQuery(mockIndex);
+  }
+}


Mime
View raw message