lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hoss...@apache.org
Subject [lucene-solr] branch master updated: Harden OrderedExecutorTest to use concurrent latches/barriers for testing parallelism instead of making assumpions about how milliseconds something should take in another thread
Date Wed, 06 Feb 2019 21:32:56 GMT
This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new ea2956f  Harden OrderedExecutorTest to use concurrent latches/barriers for testing
parallelism instead of making assumpions about how milliseconds something should take in another
thread
ea2956f is described below

commit ea2956fda3c23695daa43c6cb6f1c7b2a345ce27
Author: Chris Hostetter <hossman@apache.org>
AuthorDate: Wed Feb 6 14:32:12 2019 -0700

    Harden OrderedExecutorTest to use concurrent latches/barriers for testing parallelism
instead of making assumpions about how milliseconds something should take in another thread
---
 .../org/apache/solr/util/OrderedExecutorTest.java  | 159 +++++++++++++++++----
 1 file changed, 133 insertions(+), 26 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index 0211a11..929cc72 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -17,15 +17,29 @@
 
 package org.apache.solr.util;
 
+import java.lang.invoke.MethodHandles;
+
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class OrderedExecutorTest extends LuceneTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   @Test
   public void testExecutionInOrder() {
@@ -40,44 +54,137 @@ public class OrderedExecutorTest extends LuceneTestCase {
 
   @Test
   public void testLockWhenQueueIsFull() {
-    OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
-    IntBox intBox = new IntBox();
-    long t = System.nanoTime();
-    orderedExecutor.execute(1, () -> {
+    final ExecutorService controlExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull_control");
+    final OrderedExecutor orderedExecutor = new OrderedExecutor
+      (10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull_test"));
+    
+    try {
+      // AAA and BBB events will both depend on the use of the same lockId
+      final BlockingQueue<String> events = new ArrayBlockingQueue<>(2);
+      final Integer lockId = 1;
+      
+      // AAA enters executor first so it should execute first (even though it's waiting on
latch)
+      final CountDownLatch latchAAA = new CountDownLatch(1);
+      orderedExecutor.execute(lockId, () -> {
+          try {
+            if (latchAAA.await(120, TimeUnit.SECONDS)) {
+              events.add("AAA");
+            } else {
+              events.add("AAA Timed Out");
+            }
+          } catch (InterruptedException e) {
+            log.error("Interrupt in AAA worker", e);
+            Thread.currentThread().interrupt();
+          }
+        });
+      // BBB doesn't care about the latch, but because it uses the same lockId, it's blocked
on AAA
+      // so we execute it in a background thread...
+      controlExecutor.execute(() -> {
+          orderedExecutor.execute(lockId, () -> {
+              events.add("BBB");
+            });
+        });
+      
+      // now if we release the latchAAA, AAA should be garunteed to fire first, then BBB
+      latchAAA.countDown();
       try {
-        Thread.sleep(500L);
+        assertEquals("AAA", events.poll(120, TimeUnit.SECONDS));
+        assertEquals("BBB", events.poll(120, TimeUnit.SECONDS));
       } catch (InterruptedException e) {
+        log.error("Interrupt polling event queue", e);
         Thread.currentThread().interrupt();
+        fail("interupt while trying to poll event queue");
       }
-      intBox.value++;
-    });
-    assertTrue(System.nanoTime() - t < 100 * 1000000);
-
-    t = System.nanoTime();
-    orderedExecutor.execute(1, () -> {
-      intBox.value++;
-    });
-    assertTrue(System.nanoTime() - t > 300 * 1000000);
-    orderedExecutor.shutdownAndAwaitTermination();
-    assertEquals(intBox.value, 2);
+    } finally {
+      ExecutorUtil.shutdownAndAwaitTermination(controlExecutor);
+      orderedExecutor.shutdownAndAwaitTermination();
+    }
   }
 
   @Test
   public void testRunInParallel() {
-    OrderedExecutor orderedExecutor = new OrderedExecutor(10, ExecutorUtil.newMDCAwareCachedThreadPool("testLockWhenQueueIsFull"));
-    AtomicInteger atomicInteger = new AtomicInteger(0);
-    orderedExecutor.execute(1, () -> {
+    final int parallelism = atLeast(3);
+    
+    final ExecutorService controlExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("testRunInParallel_control");
+    final OrderedExecutor orderedExecutor = new OrderedExecutor
+      (parallelism, ExecutorUtil.newMDCAwareCachedThreadPool("testRunInParallel_test"));
+
+    try {
+      // distinct lockIds should be able to be used in parallel, up to the size of the executor,
+      // w/o any execute calls blocking... until the test Runables being executed are all
+      // waiting on the same cyclic barrier...
+      final CyclicBarrier barrier = new CyclicBarrier(parallelism + 1);
+      final CountDownLatch preBarrierLatch = new CountDownLatch(parallelism);
+      final CountDownLatch postBarrierLatch = new CountDownLatch(parallelism);
+      
+      for (int i = 0; i < parallelism; i++) {
+        final int lockId = i;
+        controlExecutor.execute(() -> {
+            orderedExecutor.execute(lockId, () -> {
+                try {
+                  log.info("Worker #{} starting", lockId);
+                  preBarrierLatch.countDown();
+                  barrier.await(120, TimeUnit.SECONDS);
+                  postBarrierLatch.countDown();
+                } catch (TimeoutException t) {
+                  log.error("Timeout in worker#" + lockId + "awaiting barrier", t);
+                } catch (BrokenBarrierException b) {
+                  log.error("Broken Barrier in worker#" + lockId, b);
+                } catch (InterruptedException e) {
+                  log.error("Interrupt in worker#" + lockId + "awaiting barrier", e);
+                  Thread.currentThread().interrupt();
+                }
+              });
+          });
+      }
+
+      log.info("main thread: about to wait on pre-barrier latch, barrier={}, post-barrier
latch={}",
+               barrier.getNumberWaiting(), postBarrierLatch.getCount());
+      
       try {
-        Thread.sleep(500L);
+        // this latch should have fully counted down by now
+        // (or with a small await for thread scheduling but no other external action)
+        assertTrue("Timeout awaiting pre barrier latch",
+                   preBarrierLatch.await(120, TimeUnit.SECONDS));
       } catch (InterruptedException e) {
+        log.error("Interrupt awwaiting pre barrier latch", e);
         Thread.currentThread().interrupt();
+        fail("interupt while trying to await the preBarrierLatch");
       }
-      if (atomicInteger.get() == 1) atomicInteger.incrementAndGet();
-    });
+      
+      log.info("main thread: pre-barrier latch done, barrier={}, post-barrier latch={}",
+               barrier.getNumberWaiting(), postBarrierLatch.getCount());
+      
+      // nothing should have counted down yet on the postBarrierLatch
+      assertEquals(parallelism, postBarrierLatch.getCount());
 
-    orderedExecutor.execute(2, atomicInteger::incrementAndGet);
-    orderedExecutor.shutdownAndAwaitTermination();
-    assertEquals(atomicInteger.get(), 2);
+      try {
+        // if we now await on the the barrier, it should release
+        // (once all other threads get to the barrier as well, but no external action needed)
+        barrier.await(120, TimeUnit.SECONDS);
+        
+        log.info("main thread: barrier has released, post-barrier latch={}",
+                 postBarrierLatch.getCount());
+        
+        // and now the post-barrier latch should release immediately
+        // (or with a small await for thread scheduling but no other external action)
+        assertTrue("Timeout awaiting post barrier latch",
+                   postBarrierLatch.await(120, TimeUnit.SECONDS));
+      } catch (TimeoutException t) {
+        log.error("Timeout awaiting barrier", t);
+        fail("barrier timed out");
+      } catch (BrokenBarrierException b) {
+        log.error("Broken Barrier in main test thread", b);
+        fail("broken barrier while trying to release the barrier");
+      } catch (InterruptedException e) {
+        log.error("Interrupt awwaiting barrier / post barrier latch", e);
+        Thread.currentThread().interrupt();
+        fail("interupt while trying to release the barrier and await the postBarrierLatch");
+      }
+    } finally {
+      ExecutorUtil.shutdownAndAwaitTermination(controlExecutor);
+      orderedExecutor.shutdownAndAwaitTermination();
+    }
   }
 
   @Test


Mime
View raw message