geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zho...@apache.org
Subject geode git commit: GEODE-2175: wait for results from other members is not needed in result collector. It's already handled by Function framework.
Date Thu, 08 Dec 2016 22:50:03 GMT
Repository: geode
Updated Branches:
  refs/heads/develop f22809a44 -> 1d951e334


GEODE-2175: wait for results from other members is not needed in result collector. It's already
handled by Function framework.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1d951e33
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1d951e33
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1d951e33

Branch: refs/heads/develop
Commit: 1d951e334334393c2052307988a21c1cd1f11985
Parents: f22809a
Author: zhouxh <gzhou@pivotal.io>
Authored: Thu Dec 8 14:47:56 2016 -0800
Committer: zhouxh <gzhou@pivotal.io>
Committed: Thu Dec 8 14:47:56 2016 -0800

----------------------------------------------------------------------
 .../TopEntriesFunctionCollector.java            |  43 +-----
 .../TopEntriesFunctionCollectorJUnitTest.java   | 137 +------------------
 2 files changed, 4 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/1d951e33/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
index d4a7008..66c4c0a 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
@@ -47,9 +47,6 @@ public class TopEntriesFunctionCollector
   // Use this instance to perform reduce operation
   final CollectorManager<TopEntriesCollector> manager;
 
-  // latch to wait till all results are collected
-  private final CountDownLatch waitForResults = new CountDownLatch(1);
-
   final String id;
 
   // Instance of gemfire cache to check status and other utility methods
@@ -83,36 +80,11 @@ public class TopEntriesFunctionCollector
 
   @Override
   public TopEntries getResult() throws FunctionException {
-    try {
-      waitForResults.await();
-    } catch (InterruptedException e) {
-      logger.debug("Interrupted while waiting for result collection", e);
-      Thread.currentThread().interrupt();
-      if (cache != null) {
-        cache.getCancelCriterion().checkCancelInProgress(e);
-      }
-      throw new FunctionException(e);
-    }
-
     return aggregateResults();
   }
 
   @Override
   public TopEntries getResult(long timeout, TimeUnit unit) throws FunctionException {
-    try {
-      boolean result = waitForResults.await(timeout, unit);
-      if (!result) {
-        throw new FunctionException("Did not receive results from all members within wait
time");
-      }
-    } catch (InterruptedException e) {
-      logger.debug("Interrupted while waiting for result collection", e);
-      Thread.currentThread().interrupt();
-      if (cache != null) {
-        cache.getCancelCriterion().checkCancelInProgress(e);
-      }
-      throw new FunctionException(e);
-    }
-
     return aggregateResults();
   }
 
@@ -128,20 +100,11 @@ public class TopEntriesFunctionCollector
   }
 
   @Override
-  public void endResults() {
-    synchronized (subResults) {
-      waitForResults.countDown();
-    }
-  }
+  public void endResults() {}
 
   @Override
   public void clearResults() {
     synchronized (subResults) {
-      if (waitForResults.getCount() == 0) {
-        throw new IllegalStateException(
-            "This collector is closed and cannot accept anymore results");
-      }
-
       subResults.clear();
     }
   }
@@ -149,10 +112,6 @@ public class TopEntriesFunctionCollector
   @Override
   public void addResult(DistributedMember memberID, TopEntriesCollector resultOfSingleExecution)
{
     synchronized (subResults) {
-      if (waitForResults.getCount() == 0) {
-        throw new IllegalStateException(
-            "This collector is closed and cannot accept anymore results");
-      }
       subResults.add(resultOfSingleExecution);
     }
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/1d951e33/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
index af868e2..bf08877 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
@@ -63,25 +63,8 @@ public class TopEntriesFunctionCollectorJUnitTest {
   @Test
   public void testGetResultsBlocksTillEnd() throws Exception {
     final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
-    final CountDownLatch insideThread = new CountDownLatch(1);
-    final CountDownLatch resultReceived = new CountDownLatch(1);
-    Thread resultClient = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        insideThread.countDown();
-        collector.getResult();
-        resultReceived.countDown();
-      }
-    });
-    resultClient.start();
-
-    insideThread.await(1, TimeUnit.SECONDS);
-    assertEquals(0, insideThread.getCount());
-    assertEquals(1, resultReceived.getCount());
-
-    collector.endResults();
-    resultReceived.await(1, TimeUnit.SECONDS);
-    assertEquals(0, resultReceived.getCount());
+    TopEntries merged = collector.getResult();
+    assertEquals(0, merged.size());
   }
 
   @Test
@@ -94,111 +77,11 @@ public class TopEntriesFunctionCollectorJUnitTest {
     final CountDownLatch resultReceived = new CountDownLatch(1);
 
     final AtomicReference<TopEntries> result = new AtomicReference<>();
-
-    Thread resultClient = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        insideThread.countDown();
-        result.set(collector.getResult(1, TimeUnit.SECONDS));
-        resultReceived.countDown();
-      }
-    });
-    resultClient.start();
-
-    insideThread.await(1, TimeUnit.SECONDS);
-    assertEquals(0, insideThread.getCount());
-    assertEquals(1, resultReceived.getCount());
-
-    collector.endResults();
-
-    resultReceived.await(1, TimeUnit.SECONDS);
-    assertEquals(0, resultReceived.getCount());
-
-    TopEntries merged = result.get();
+    TopEntries merged = collector.getResult(1, TimeUnit.SECONDS);
     assertEquals(4, merged.size());
     TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
   }
 
-  @Test(expected = FunctionException.class)
-  public void testGetResultsWaitInterrupted() throws Exception {
-    interruptWhileWaiting(false);
-  }
-
-  @Test(expected = FunctionException.class)
-  public void testGetResultsTimedWaitInterrupted() throws Exception {
-    interruptWhileWaiting(false);
-  }
-
-  private void interruptWhileWaiting(final boolean timedWait)
-      throws InterruptedException, Exception {
-    GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class);
-    final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null, mockCache);
-
-    final CountDownLatch insideThread = new CountDownLatch(1);
-    final CountDownLatch endGetResult = new CountDownLatch(1);
-    final AtomicReference<Exception> exception = new AtomicReference<>();
-
-    Thread resultClient = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        insideThread.countDown();
-        try {
-          if (timedWait) {
-            collector.getResult(1, TimeUnit.SECONDS);
-          } else {
-            collector.getResult();
-          }
-        } catch (FunctionException e) {
-          exception.set(e);
-          endGetResult.countDown();
-        }
-      }
-    });
-    resultClient.start();
-
-    insideThread.await(1, TimeUnit.SECONDS);
-    assertEquals(0, insideThread.getCount());
-    assertEquals(1, endGetResult.getCount());
-
-    CancelCriterion mockCriterion = mock(CancelCriterion.class);
-    when(mockCache.getCancelCriterion()).thenReturn(mockCriterion);
-    resultClient.interrupt();
-    endGetResult.await(1, TimeUnit.SECONDS);
-    assertEquals(0, endGetResult.getCount());
-    throw exception.get();
-  }
-
-  @Test(expected = FunctionException.class)
-  public void expectErrorAfterWaitTime() throws Exception {
-    final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null);
-
-    final CountDownLatch insideThread = new CountDownLatch(1);
-    final CountDownLatch endGetResult = new CountDownLatch(1);
-    final AtomicReference<Exception> exception = new AtomicReference<>();
-
-    Thread resultClient = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        insideThread.countDown();
-        try {
-          collector.getResult(10, TimeUnit.MILLISECONDS);
-        } catch (FunctionException e) {
-          exception.set(e);
-          endGetResult.countDown();
-        }
-      }
-    });
-    resultClient.start();
-
-    insideThread.await(1, TimeUnit.SECONDS);
-    assertEquals(0, insideThread.getCount());
-    assertEquals(1, endGetResult.getCount());
-
-    endGetResult.await(1, TimeUnit.SECONDS);
-    assertEquals(0, endGetResult.getCount());
-    throw exception.get();
-  }
-
   @Test
   public void mergeShardAndLimitResults() throws Exception {
     LuceneFunctionContext<TopEntriesCollector> context =
@@ -299,20 +182,6 @@ public class TopEntriesFunctionCollectorJUnitTest {
     collector.getResult();
   }
 
-  @Test(expected = IllegalStateException.class)
-  public void addResultDisallowedAfterEndResult() {
-    TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
-    collector.endResults();
-    collector.addResult(null, new TopEntriesCollector(null));
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void clearResultDisallowedAfterEndResult() {
-    TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
-    collector.endResults();
-    collector.clearResults();
-  }
-
   @Test
   public void testCollectorName() {
     GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class);


Mime
View raw message