geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ladyva...@apache.org
Subject geode git commit: GEODE-2852: Enforce lucene waitUntilFlushed timeout for all buckets
Date Thu, 04 May 2017 20:34:51 GMT
Repository: geode
Updated Branches:
  refs/heads/develop 8c4a36916 -> b47673239


GEODE-2852: Enforce lucene waitUntilFlushed timeout for all buckets

* Since we are now batching waitUntilFlushed threads, do not submit more WaitUntilFlushed
Callables if timeout exceeded
* create the WaitUntilFlushed Callables just prior to submitting for execution so the remaining
nanoSeconds can be passed in and applied for each bucket.
* updated tests to accommodate changes


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b4767323
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b4767323
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b4767323

Branch: refs/heads/develop
Commit: b47673239c9e7dca362de08964326d20a9e1f7bc
Parents: 8c4a369
Author: Lynn Hughes-Godfrey <lhughesgodfrey@pivotal.io>
Authored: Thu May 4 11:53:48 2017 -0700
Committer: Lynn Hughes-Godfrey <lhughesgodfrey@pivotal.io>
Committed: Thu May 4 11:53:48 2017 -0700

----------------------------------------------------------------------
 ...ParallelGatewaySenderFlushedCoordinator.java | 50 ++++++----
 ...atewaySenderFlushedCoordinatorJUnitTest.java | 96 ++++++++++++++------
 2 files changed, 98 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/b4767323/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
index 1388dd0..42ce68c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
@@ -23,8 +23,10 @@ import org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordina
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.*;
 
 public class WaitUntilParallelGatewaySenderFlushedCoordinator
@@ -46,22 +48,30 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator
       sender.getCancelCriterion().checkCancelInProgress(null);
     }
 
-    // Create callables for local buckets
-    List<WaitUntilBucketRegionQueueFlushedCallable> callables =
-        buildWaitUntilBucketRegionQueueFlushedCallables(pr);
-
-    // Submit local callables for execution
     ExecutorService service = this.sender.getDistributionManager().getWaitingThreadPool();
     List<Future<Boolean>> callableFutures = new ArrayList<>();
     int callableCount = 0;
-    if (logger.isDebugEnabled()) {
-      logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Created and being submitted
"
-          + callables.size() + " callables=" + callables);
-    }
-    for (Callable<Boolean> callable : callables) {
+    long nanosRemaining = unit.toNanos(timeout);
+    long endTime = System.nanoTime() + nanosRemaining;
+    Set<BucketRegion> localBucketRegions = getLocalBucketRegions(pr);
+    for (BucketRegion br : localBucketRegions) {
+      // timeout exceeded, do not submit more callables, return localResult false
+      if (System.nanoTime() >= endTime) {
+        localResult = false;
+        break;
+      }
+      // create and submit callable with updated timeout
+      Callable<Boolean> callable = createWaitUntilBucketRegionQueueFlushedCallable(
+          (BucketRegionQueue) br, nanosRemaining, TimeUnit.NANOSECONDS);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "WaitUntilParallelGatewaySenderFlushedCoordinator: Submitting callable for bucket
"
+                + br.getId() + " callable=" + callable + " nanosRemaining=" + nanosRemaining);
+      }
       callableFutures.add(service.submit(callable));
       callableCount++;
-      if ((callableCount % CALLABLES_CHUNK_SIZE) == 0 || callableCount == callables.size())
{
+      if ((callableCount % CALLABLES_CHUNK_SIZE) == 0
+          || callableCount == localBucketRegions.size()) {
         CallablesChunkResults callablesChunkResults =
             new CallablesChunkResults(localResult, exceptionToThrow, callableFutures).invoke();
         localResult = callablesChunkResults.getLocalResult();
@@ -74,6 +84,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator
           throw exceptionToThrow;
         }
       }
+      nanosRemaining = endTime - System.nanoTime();
     }
 
     // Return the full result
@@ -84,16 +95,17 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinator
     return localResult;
   }
 
-  protected List<WaitUntilBucketRegionQueueFlushedCallable> buildWaitUntilBucketRegionQueueFlushedCallables(
-      PartitionedRegion pr) {
-    List<WaitUntilBucketRegionQueueFlushedCallable> callables = new ArrayList<>();
+  protected Set<BucketRegion> getLocalBucketRegions(PartitionedRegion pr) {
+    Set<BucketRegion> localBucketRegions = new HashSet<BucketRegion>();
     if (pr.isDataStore()) {
-      for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
-        callables.add(new WaitUntilBucketRegionQueueFlushedCallable((BucketRegionQueue) br,
-            this.timeout, this.unit));
-      }
+      localBucketRegions = pr.getDataStore().getAllLocalBucketRegions();
     }
-    return callables;
+    return localBucketRegions;
+  }
+
+  protected WaitUntilBucketRegionQueueFlushedCallable createWaitUntilBucketRegionQueueFlushedCallable(
+      BucketRegionQueue br, long timeout, TimeUnit unit) {
+    return new WaitUntilBucketRegionQueueFlushedCallable(br, timeout, unit);
   }
 
   public static class WaitUntilBucketRegionQueueFlushedCallable implements Callable<Boolean>
{

http://git-wip-us.apache.org/repos/asf/geode/blob/b4767323/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
index d957f91..5e12ed5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.internal.cache.wan.parallel;
 
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinatorJUnitTest;
@@ -21,9 +23,12 @@ import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySend
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertFalse;
@@ -32,12 +37,14 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 @Category(IntegrationTest.class)
 public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     extends WaitUntilGatewaySenderFlushedCoordinatorJUnitTest {
 
   private PartitionedRegion region;
+  private BucketRegionQueue brq;
 
   protected void createGatewaySender() {
     super.createGatewaySender();
@@ -46,6 +53,7 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
     doReturn(queue).when(this.sender).getQueue();
     this.region = mock(PartitionedRegion.class);
     doReturn(this.region).when(queue).getRegion();
+    this.brq = mock(BucketRegionQueue.class);
   }
 
   protected AbstractGatewaySenderEventProcessor getEventProcessor() {
@@ -56,69 +64,99 @@ public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
 
   @Test
   public void testWaitUntilParallelGatewaySenderFlushedSuccessfulNotInitiator() throws Throwable
{
+    long timeout = 5000;
+    TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l,
-            TimeUnit.MILLISECONDS, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit,
false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator);
-    doReturn(getSuccessfulCallables(true)).when(coordinatorSpy)
-        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
+    doReturn(getCallableResult(true)).when(coordinatorSpy)
+        .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), any());
     boolean result = coordinatorSpy.waitUntilFlushed();
     assertTrue(result);
   }
 
   @Test
+  public void waitUntilFlushShouldExitEarlyIfTimeoutElapses() throws Throwable {
+    long timeout = 500;
+    TimeUnit unit = TimeUnit.MILLISECONDS;
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit,
false);
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator);
+
+    Set<BucketRegion> bucketRegions = new HashSet<>();
+    for (int i = 0; i < 5000; i++) {
+      bucketRegions.add(mock(BucketRegionQueue.class));
+    }
+    doReturn(bucketRegions).when(coordinatorSpy).getLocalBucketRegions(any());
+
+    WaitUntilBucketRegionQueueFlushedCallable callable =
+        mock(WaitUntilBucketRegionQueueFlushedCallable.class);
+    when(callable.call()).then(invocation -> {
+      Thread.sleep(100);
+      return true;
+    });
+    doReturn(callable).when(coordinatorSpy).createWaitUntilBucketRegionQueueFlushedCallable(any(),
+        anyLong(), any());
+    boolean result = coordinatorSpy.waitUntilFlushed();
+    assertFalse(result);
+
+    verify(callable, atMost(50)).call();
+  }
+
+  @Test
   public void testWaitUntilParallelGatewaySenderFlushedUnsuccessfulNotInitiator() throws
Throwable {
+    long timeout = 5000;
+    TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l,
-            TimeUnit.MILLISECONDS, false);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit,
false);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator);
-    doReturn(getUnsuccessfulCallables()).when(coordinatorSpy)
-        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
+    doReturn(getCallableResult(false)).when(coordinatorSpy)
+        .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), any());
     boolean result = coordinatorSpy.waitUntilFlushed();
     assertFalse(result);
   }
 
   @Test
   public void testWaitUntilParallelGatewaySenderFlushedSuccessfulInitiator() throws Throwable
{
+    long timeout = 5000;
+    TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l,
-            TimeUnit.MILLISECONDS, true);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit,
true);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator);
-    doReturn(getSuccessfulCallables(true)).when(coordinatorSpy)
-        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
+    doReturn(getCallableResult(true)).when(coordinatorSpy)
+        .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), any());
     boolean result = coordinatorSpy.waitUntilFlushed();
     assertTrue(result);
   }
 
   @Test
   public void testWaitUntilParallelGatewaySenderFlushedUnsuccessfulInitiator() throws Throwable
{
+    long timeout = 5000;
+    TimeUnit unit = TimeUnit.MILLISECONDS;
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
-        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l,
-            TimeUnit.MILLISECONDS, true);
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, timeout, unit,
true);
     WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator);
-    doReturn(getSuccessfulCallables(false)).when(coordinatorSpy)
-        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    doReturn(getLocalBucketRegions()).when(coordinatorSpy).getLocalBucketRegions(any());
+    doReturn(getCallableResult(false)).when(coordinatorSpy)
+        .createWaitUntilBucketRegionQueueFlushedCallable(any(), anyLong(), any());
     boolean result = coordinatorSpy.waitUntilFlushed();
     assertFalse(result);
   }
 
-  private List<WaitUntilBucketRegionQueueFlushedCallable> getSuccessfulCallables(
-      boolean expectedResult) throws Exception {
-    List callables = new ArrayList();
+  private WaitUntilBucketRegionQueueFlushedCallable getCallableResult(boolean expectedResult)
+      throws Exception {
     WaitUntilBucketRegionQueueFlushedCallable callable =
         mock(WaitUntilBucketRegionQueueFlushedCallable.class);
     when(callable.call()).thenReturn(expectedResult);
-    callables.add(callable);
-    return callables;
+    return callable;
   }
 
-  private List<WaitUntilBucketRegionQueueFlushedCallable> getUnsuccessfulCallables()
-      throws Exception {
-    List callables = new ArrayList();
-    WaitUntilBucketRegionQueueFlushedCallable callable =
-        mock(WaitUntilBucketRegionQueueFlushedCallable.class);
-    when(callable.call()).thenReturn(false);
-    callables.add(callable);
-    return callables;
+  private Set<BucketRegionQueue> getLocalBucketRegions() {
+    Set<BucketRegionQueue> localBucketRegions = new HashSet<BucketRegionQueue>();
+    localBucketRegions.add(this.brq);
+    return localBucketRegions;
   }
 }


Mime
View raw message