geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ash...@apache.org
Subject [4/4] incubator-geode git commit: GEODE-11: Add search function ResultCollector
Date Fri, 11 Sep 2015 21:56:40 GMT
GEODE-11: Add search function ResultCollector

https://reviews.apache.org/r/38320/


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/026271b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/026271b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/026271b0

Branch: refs/heads/feature/GEODE-11
Commit: 026271b0ca9fac716285cdcd085a22b1f18c4e9f
Parents: 01c4bc9
Author: Ashvin Agrawal <ashvin@apache.org>
Authored: Fri Sep 11 11:24:05 2015 -0700
Committer: Ashvin Agrawal <ashvin@apache.org>
Committed: Fri Sep 11 14:55:18 2015 -0700

----------------------------------------------------------------------
 .../distributed/TopEntriesCollectorManager.java |   2 +-
 .../TopEntriesFunctionCollector.java            | 132 +++++++++
 .../TopEntriesFunctionCollectorJUnitTest.java   | 270 +++++++++++++++++++
 3 files changed, 403 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/026271b0/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
index 21e11ab..f269b2b 100644
--- a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesCollectorManager.java
@@ -33,7 +33,7 @@ public class TopEntriesCollectorManager implements CollectorManager<TopEntriesCo
   }
 
   public TopEntriesCollectorManager(String id, int resultLimit) {
-    this.limit = resultLimit < 0 ? LuceneQueryFactory.DEFAULT_LIMIT : resultLimit;
+    this.limit = resultLimit <= 0 ? LuceneQueryFactory.DEFAULT_LIMIT : resultLimit;
     this.id = id == null ? String.valueOf(this.hashCode()) : id;
     logger.debug("Max count of entries to be produced by {} is {}", id, limit);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/026271b0/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
new file mode 100644
index 0000000..3b39538
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
@@ -0,0 +1,132 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.lucene.LuceneQuery;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * A {@link ResultCollector} implementation for collecting and ordering {@link TopEntries}.
The {@link TopEntries}
+ * objects will be created by members when a {@link LuceneQuery} is executed on the local
data hosted by the member. The
+ * member executing this logic must have sufficient space to hold all the {@link EntryScore}
documents returned from the
+ * members.
+ * 
+ * <p>
+ * This class will perform a lazy merge operation. Merge will take place if the merge {@link
ResultCollector#getResult}
+ * is invoked or if the combined result size is more than the limit set. In the later case,
merge will be performed
+ * whenever {@link ResultCollector#addResult} is invoked.
+ */
+public class TopEntriesFunctionCollector implements ResultCollector<TopEntriesCollector,
TopEntries> {
+  // Use this instance to perform reduce operation
+  final CollectorManager<TopEntriesCollector> manager;
+
+  // latch to wait till all results are collected
+  private final CountDownLatch waitForResults = new CountDownLatch(1);
+
+  final String id;
+
+  // Instance of gemfire cache to check status and other utility methods
+  final private GemFireCacheImpl cache;
+  private static final Logger logger = LogService.getLogger();
+
+  private final Collection<TopEntriesCollector> subResults = new ArrayList<>();
+
+  public TopEntriesFunctionCollector() {
+    this(null, null);
+  }
+
+  public TopEntriesFunctionCollector(CollectorManager<TopEntriesCollector> manager)
{
+    this(manager, null);
+  }
+
+  public TopEntriesFunctionCollector(CollectorManager<TopEntriesCollector> manager,
GemFireCacheImpl cache) {
+    this.cache = cache;
+    id = cache == null ? String.valueOf(this.hashCode()) : cache.getName();
+    this.manager = manager == null ? new TopEntriesCollectorManager(id) : manager;
+  }
+
+  @Override
+  public TopEntries getResult() throws FunctionException {
+    try {
+      waitForResults.await();
+    } catch (InterruptedException e) {
+      logger.debug("Interrupted while waiting for result collection", e);
+      Thread.currentThread().interrupt();
+      if (cache != null) {
+        cache.getCancelCriterion().checkCancelInProgress(e);
+      }
+      throw new FunctionException(e);
+    }
+
+    return aggregateResults();
+  }
+
+  @Override
+  public TopEntries getResult(long timeout, TimeUnit unit) throws FunctionException {
+    try {
+      boolean result = waitForResults.await(timeout, unit);
+      if (!result) {
+        throw new FunctionException("Did not receive results from all members within wait
time");
+      }
+    } catch (InterruptedException e) {
+      logger.debug("Interrupted while waiting for result collection", e);
+      Thread.currentThread().interrupt();
+      if (cache != null) {
+        cache.getCancelCriterion().checkCancelInProgress(e);
+      }
+      throw new FunctionException(e);
+    }
+    
+    return aggregateResults();
+  }
+
+  private TopEntries aggregateResults() {
+    synchronized (subResults) {
+      try {
+        TopEntriesCollector finalResult = manager.reduce(subResults);
+        return finalResult.getEntries();
+      } catch (IOException e) {
+        logger.debug("Error while merging function execution results", e);
+        throw new FunctionException(e);
+      }
+    }
+  }
+  
+  @Override
+  public void endResults() {
+    synchronized (subResults) {
+      waitForResults.countDown();
+    }
+  }
+
+  @Override
+  public void clearResults() {
+    synchronized (subResults) {
+      if (waitForResults.getCount() == 0) {
+        throw new IllegalStateException("This collector is closed and cannot accept anymore
results");
+      }
+      
+      subResults.clear();
+    }
+  }
+
+  @Override
+  public void addResult(DistributedMember memberID, TopEntriesCollector resultOfSingleExecution)
{
+    synchronized (subResults) {
+      if (waitForResults.getCount() == 0) {
+        throw new IllegalStateException("This collector is closed and cannot accept anymore
results");
+      }
+      subResults.add(resultOfSingleExecution);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/026271b0/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
new file mode 100644
index 0000000..1c42912
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
@@ -0,0 +1,270 @@
+package com.gemstone.gemfire.cache.lucene.internal.distributed;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TopEntriesFunctionCollectorJUnitTest {
+  EntryScore r1_1, r1_2, r2_1, r2_2;
+  TopEntriesCollector result1, result2;
+
+  @Before
+  public void initializeCommonObjects() {
+    r1_1 = new EntryScore("3", .9f);
+    r1_2 = new EntryScore("1", .8f);
+    r2_1 = new EntryScore("2", 0.85f);
+    r2_2 = new EntryScore("4", 0.1f);
+
+    result1 = new TopEntriesCollector(null);
+    result1.collect(r1_1);
+    result1.collect(r1_2);
+
+    result2 = new TopEntriesCollector(null);
+    result2.collect(r2_1);
+    result2.collect(r2_2);
+  }
+
+  @Test
+  public void testGetResultsBlocksTillEnd() throws Exception {
+    final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
+    final CountDownLatch insideThread = new CountDownLatch(1);
+    final CountDownLatch resultReceived = new CountDownLatch(1);
+    Thread resultClient = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        insideThread.countDown();
+        collector.getResult();
+        resultReceived.countDown();
+      }
+    });
+    resultClient.start();
+
+    insideThread.await(1, TimeUnit.SECONDS);
+    assertEquals(0, insideThread.getCount());
+    assertEquals(1, resultReceived.getCount());
+
+    collector.endResults();
+    resultReceived.await(1, TimeUnit.SECONDS);
+    assertEquals(0, resultReceived.getCount());
+  }
+
+  @Test
+  public void testGetResultsTimedWait() throws Exception {
+    final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
+    collector.addResult(null, result1);
+    collector.addResult(null, result2);
+
+    final CountDownLatch insideThread = new CountDownLatch(1);
+    final CountDownLatch resultReceived = new CountDownLatch(1);
+
+    final AtomicReference<TopEntries> result = new AtomicReference<>();
+
+    Thread resultClient = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        insideThread.countDown();
+        result.set(collector.getResult(1, TimeUnit.SECONDS));
+        resultReceived.countDown();
+      }
+    });
+    resultClient.start();
+
+    insideThread.await(1, TimeUnit.SECONDS);
+    assertEquals(0, insideThread.getCount());
+    assertEquals(1, resultReceived.getCount());
+
+    collector.endResults();
+
+    resultReceived.await(1, TimeUnit.SECONDS);
+    assertEquals(0, resultReceived.getCount());
+
+    TopEntries merged = result.get();
+    assertEquals(4, merged.size());
+    TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
+  }
+
+  @Test(expected = FunctionException.class)
+  public void testGetResultsWaitInterrupted() throws Exception {
+    interruptWhileWaiting(false);
+  }
+
+  @Test(expected = FunctionException.class)
+  public void testGetResultsTimedWaitInterrupted() throws Exception {
+    interruptWhileWaiting(false);
+  }
+
+  private void interruptWhileWaiting(final boolean timedWait) throws InterruptedException,
Exception {
+    GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class);
+    final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null, mockCache);
+
+    final CountDownLatch insideThread = new CountDownLatch(1);
+    final CountDownLatch endGetResult = new CountDownLatch(1);
+    final AtomicReference<Exception> exception = new AtomicReference<>();
+
+    Thread resultClient = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        insideThread.countDown();
+        try {
+          if (timedWait) {
+            collector.getResult(1, TimeUnit.SECONDS);
+          } else {
+            collector.getResult();
+          }
+        } catch (FunctionException e) {
+          endGetResult.countDown();
+          exception.set(e);
+        }
+      }
+    });
+    resultClient.start();
+
+    insideThread.await(1, TimeUnit.SECONDS);
+    assertEquals(0, insideThread.getCount());
+    assertEquals(1, endGetResult.getCount());
+
+    CancelCriterion mockCriterion = mock(CancelCriterion.class);
+    when(mockCache.getCancelCriterion()).thenReturn(mockCriterion);
+    resultClient.interrupt();
+    endGetResult.await(1, TimeUnit.SECONDS);
+    assertEquals(0, endGetResult.getCount());
+    throw exception.get();
+  }
+
+  @Test(expected = FunctionException.class)
+  public void expectErrorAfterWaitTime() throws Exception {
+    final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null);
+
+    final CountDownLatch insideThread = new CountDownLatch(1);
+    final CountDownLatch endGetResult = new CountDownLatch(1);
+    final AtomicReference<Exception> exception = new AtomicReference<>();
+
+    Thread resultClient = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        insideThread.countDown();
+        try {
+          collector.getResult(10, TimeUnit.MILLISECONDS);
+        } catch (FunctionException e) {
+          endGetResult.countDown();
+          exception.set(e);
+        }
+      }
+    });
+    resultClient.start();
+
+    insideThread.await(1, TimeUnit.SECONDS);
+    assertEquals(0, insideThread.getCount());
+    assertEquals(1, endGetResult.getCount());
+
+    endGetResult.await(1, TimeUnit.SECONDS);
+    assertEquals(0, endGetResult.getCount());
+    throw exception.get();
+  }
+
+  @Test
+  public void mergeResultsDefaultCollectorManager() throws Exception {
+    TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null);
+    collector.addResult(null, result1);
+    collector.addResult(null, result2);
+    collector.endResults();
+
+    TopEntries merged = collector.getResult();
+    Assert.assertNotNull(merged);
+    assertEquals(4, merged.size());
+    TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
+  }
+
+  @Test
+  public void mergeResultsCustomCollectorManager() throws Exception {
+    TopEntries resultEntries = new TopEntries();
+    TopEntriesCollector mockCollector = mock(TopEntriesCollector.class);
+    Mockito.doReturn(resultEntries).when(mockCollector).getEntries();
+
+    CollectorManager<TopEntriesCollector> mockManager = mock(CollectorManager.class);
+    Mockito.doReturn(mockCollector).when(mockManager)
+        .reduce(Mockito.argThat(new ArgumentMatcher<Collection<TopEntriesCollector>>()
{
+          @Override
+          public boolean matches(Object argument) {
+            Collection<TopEntriesCollector> collectors = (Collection<TopEntriesCollector>)
argument;
+            return collectors.contains(result1) && collectors.contains(result2);
+          }
+        }));
+
+    TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(mockManager);
+    collector.addResult(null, result1);
+    collector.addResult(null, result2);
+    collector.endResults();
+
+    TopEntries merged = collector.getResult();
+    assertEquals(resultEntries, merged);
+  }
+
+  @Test
+  public void mergeAfterClearResults() throws Exception {
+    TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null);
+    collector.addResult(null, result1);
+    collector.clearResults();
+    collector.addResult(null, result2);
+    collector.endResults();
+
+    TopEntries merged = collector.getResult();
+    Assert.assertNotNull(merged);
+    assertEquals(2, merged.size());
+    TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r2_1, r2_2);
+  }
+
+  @Test(expected = FunctionException.class)
+  public void testExceptionDuringMerge() throws Exception {
+    TopEntriesCollectorManager mockManager = mock(TopEntriesCollectorManager.class);
+    Mockito.doThrow(new IOException()).when(mockManager).reduce(any(Collection.class));
+
+    TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(mockManager);
+    collector.endResults();
+    collector.getResult();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void addResultDisallowedAfterEndResult() {
+    TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
+    collector.endResults();
+    collector.addResult(null, new TopEntriesCollector(null));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void clearResultDisallowedAfterEndResult() {
+    TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
+    collector.endResults();
+    collector.clearResults();
+  }
+
+  @Test
+  public void testCollectorName() {
+    GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class);
+    Mockito.doReturn("server").when(mockCache).getName();
+
+    TopEntriesFunctionCollector function = new TopEntriesFunctionCollector(null, mockCache);
+    assertEquals("server", function.id);
+  }
+
+}


Mime
View raw message