From commits-return-28169-archive-asf-public=cust-asf.ponee.io@geode.apache.org Thu Aug 30 20:35:44 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8D00C180656 for ; Thu, 30 Aug 2018 20:35:43 +0200 (CEST) Received: (qmail 80441 invoked by uid 500); 30 Aug 2018 18:35:42 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 80432 invoked by uid 99); 30 Aug 2018 18:35:42 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Aug 2018 18:35:42 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 0BA318519C; Thu, 30 Aug 2018 18:35:42 +0000 (UTC) Date: Thu, 30 Aug 2018 18:35:41 +0000 To: "commits@geode.apache.org" Subject: [geode] branch develop updated: GEODE-5652: use unlimited ThreadPoolExecutor in ExecutorServiceRule MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153565414081.17735.12470229606736827488@gitbox.apache.org> From: klund@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/develop X-Git-Reftype: branch X-Git-Oldrev: e76f1e369c3ab7bae5e89320d6492a37a8fbb660 X-Git-Newrev: 52a6e4279b74f6c374061b9132df42fa8cb7f89c X-Git-Rev: 52a6e4279b74f6c374061b9132df42fa8cb7f89c X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git The following commit(s) were added to refs/heads/develop by this push: new 52a6e42 GEODE-5652: use unlimited ThreadPoolExecutor in ExecutorServiceRule 52a6e42 is described below commit 52a6e4279b74f6c374061b9132df42fa8cb7f89c Author: Kirk Lund AuthorDate: Tue Aug 28 14:57:52 2018 -0700 GEODE-5652: use unlimited ThreadPoolExecutor in ExecutorServiceRule --- .../cache/PartitionedRegionCreationDUnitTest.java | 1 - .../internal/locks/DLockServiceLeakTest.java | 5 +- .../internal/cache/InterruptDiskJUnitTest.java | 120 +++++++-------------- .../rules/ExecutorServiceRuleIntegrationTest.java | 10 +- .../test/junit/rules/ExecutorServiceRule.java | 68 ++---------- 5 files changed, 59 insertions(+), 145 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java index 5c4a546..e28df99 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java @@ -51,7 +51,6 @@ import org.apache.geode.test.junit.rules.ExecutorServiceRule; * will verify the functionality under distributed scenario. */ @SuppressWarnings("serial") - public class PartitionedRegionCreationDUnitTest extends CacheTestCase { private VM vm0; diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java index f95f8e7..61fc97c 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java @@ -44,15 +44,14 @@ import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.test.junit.categories.DLockTest; import org.apache.geode.test.junit.rules.ExecutorServiceRule; -@Category({DLockTest.class}) +@Category(DLockTest.class) public class DLockServiceLeakTest { private Cache cache; private DistributedRegion testRegion; @Rule - public ExecutorServiceRule executorServiceRule = - ExecutorServiceRule.builder().threadCount(5).build(); + public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); @Before public void setUp() { diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java index f68e615..bdc2f33 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java @@ -14,125 +14,87 @@ */ package org.apache.geode.internal.cache; -import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT; import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; -import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; -import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE; -import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import java.io.File; +import java.time.Duration; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionShortcut; -import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.test.junit.rules.ExecutorServiceRule; /** * Test of interrupting threads doing disk writes to see the effect. - * */ public class InterruptDiskJUnitTest { - private static volatile Thread puttingThread; - private static final long MAX_WAIT = 60 * 1000; - private DistributedSystem ds; + private final AtomicInteger nextValue = new AtomicInteger(); + private final AtomicReference puttingThread = new AtomicReference<>(); + private Cache cache; private Region region; - private AtomicLong nextValue = new AtomicLong(); @Rule public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); - @Test - @Ignore - public void testLoop() throws Throwable { - for (int i = 0; i < 100; i++) { - System.err.println("i=" + i); - System.out.println("i=" + i); - testDRPutWithInterrupt(); - tearDown(); - setUp(); - } - } - + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Before public void setUp() { - Properties props = new Properties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, ""); - props.setProperty(LOG_LEVEL, "config"); // to keep diskPerf logs smaller - props.setProperty(STATISTIC_SAMPLING_ENABLED, "true"); - props.setProperty(ENABLE_TIME_STATISTICS, "true"); - props.setProperty(STATISTIC_ARCHIVE_FILE, "stats.gfs"); - ds = DistributedSystem.connect(props); - cache = CacheFactory.create(ds); - File diskStore = new File("diskStore"); - diskStore.mkdir(); - cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskStore}) - .create("store"); - region = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT) - .setDiskStoreName("store").create("region"); - } + String diskStoreName = getClass().getSimpleName() + "_diskStore"; + String regionName = getClass().getSimpleName() + "_region"; + Properties config = new Properties(); + config.setProperty(MCAST_PORT, "0"); + config.setProperty(LOCATORS, ""); - @After - public void tearDown() { - ds.disconnect(); - } + File diskDir = temporaryFolder.getRoot(); + cache = new CacheFactory(config).create(); - @Test - public void testDRPutWithInterrupt() throws Throwable { - Callable doPuts = new Callable() { - - @Override - public Object call() { - puttingThread = Thread.currentThread(); - long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT); - while (!Thread.currentThread().isInterrupted()) { - region.put(0, nextValue.incrementAndGet()); - if (System.nanoTime() > end) { - fail("Did not get interrupted in 60 seconds"); - } - } - return null; - } - }; + cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskDir}) + .create(diskStoreName); - Future result = executorServiceRule.submit(doPuts); + region = cache.createRegionFactory(REPLICATE_PERSISTENT).setDiskStoreName(diskStoreName) + .create(regionName); + } + @After + public void tearDown() { + cache.close(); + } - Thread.sleep(50); - long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT); - while (puttingThread == null) { - Thread.sleep(50); - if (System.nanoTime() > end) { - fail("Putting thread not set in 60 seconds"); + @Test + public void testDRPutWithInterrupt() throws Exception { + Future doPutWhileNotInterrupted = executorServiceRule.runAsync(() -> { + puttingThread.set(Thread.currentThread()); + while (!Thread.currentThread().isInterrupted()) { + region.put(0, nextValue.incrementAndGet()); } - } - - puttingThread.interrupt(); - - result.get(60, TimeUnit.SECONDS); + }); - assertEquals(nextValue.get(), region.get(0)); + await().atMost(2, MINUTES).untilAsserted(() -> assertThat(puttingThread).isNotNull()); + Thread.sleep(Duration.ofSeconds(1).toMillis()); + puttingThread.get().interrupt(); + doPutWhileNotInterrupted.get(2, MINUTES); + assertThat(region.get(0)).isEqualTo(nextValue.get()); } } diff --git a/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java b/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java index d2b2d6b..05e3bd6 100644 --- a/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java +++ b/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java @@ -34,10 +34,10 @@ import org.apache.geode.test.junit.runners.TestRunner; public class ExecutorServiceRuleIntegrationTest { - static volatile CountDownLatch hangLatch; - static volatile CountDownLatch terminateLatch; - static volatile ExecutorService executorService; - static Awaits.Invocations invocations; + private static volatile CountDownLatch hangLatch; + private static volatile CountDownLatch terminateLatch; + private static volatile ExecutorService executorService; + private static Awaits.Invocations invocations; @Before public void setUp() throws Exception { @@ -64,7 +64,7 @@ public class ExecutorServiceRuleIntegrationTest { } @Test - public void awaitTermination() throws Exception { + public void awaitTermination() { Result result = TestRunner.runTest(Awaits.class); assertThat(result.wasSuccessful()).isTrue(); diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java index 934c3c1..f79d874 100644 --- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java +++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java @@ -14,8 +14,6 @@ */ package org.apache.geode.test.junit.rules; -import static org.assertj.core.api.Assertions.assertThat; - import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -32,22 +30,15 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour * creates an {@code ExecutorService} which is terminated after the scope of the {@code Rule}. This * {@code Rule} can be used in tests for hangs, deadlocks, and infinite loops. * - *

- * By default, the {@code ExecutorService} is single-threaded. You can specify the thread count by - * using {@link Builder#threadCount(int)} or {@link #ExecutorServiceRule(int)}. - * - *

- * Example with default configuration (single-threaded and does not assert that tasks are done): - * *

  * private CountDownLatch hangLatch = new CountDownLatch(1);
  *
  * {@literal @}Rule
- * public AsynchronousRule asynchronousRule = new AsynchronousRule();
+ * public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
  *
  * {@literal @}Test
  * public void doTest() throws Exception {
- *   Future result = asynchronousRule.runAsync(() -> {
+ *   Future result = executorServiceRule.runAsync(() -> {
  *     try {
  *       hangLatch.await();
  *     } catch (InterruptedException e) {
@@ -73,17 +64,13 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
  * private CountDownLatch hangLatch = new CountDownLatch(1);
  *
  * {@literal @}Rule
- * public ExecutorServiceRule asynchronousRule = ExecutorServiceRule.builder().threadCount(10).awaitTermination(10, MILLISECONDS).build();
+ * public ExecutorServiceRule executorServiceRule = ExecutorServiceRule.builder().awaitTermination(10, SECONDS).build();
  *
  * {@literal @}Test
  * public void doTest() throws Exception {
  *   for (int i = 0; i < 10; i++) {
- *     asynchronousRule.runAsync(() -> {
- *       try {
- *         hangLatch.await();
- *       } catch (InterruptedException e) {
- *         // do nothing
- *       }
+ *     executorServiceRule.runAsync(() -> {
+ *       hangLatch.await();
  *     });
  *   }
  * }
@@ -92,7 +79,6 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
 @SuppressWarnings("unused")
 public class ExecutorServiceRule extends SerializableExternalResource {
 
-  protected final int threadCount;
   protected final boolean enableAwaitTermination;
   protected final long awaitTerminationTimeout;
   protected final TimeUnit awaitTerminationTimeUnit;
@@ -110,7 +96,6 @@ public class ExecutorServiceRule extends SerializableExternalResource {
   }
 
   protected ExecutorServiceRule(Builder builder) {
-    threadCount = builder.threadCount;
     enableAwaitTermination = builder.enableAwaitTermination;
     awaitTerminationTimeout = builder.awaitTerminationTimeout;
     awaitTerminationTimeUnit = builder.awaitTerminationTimeUnit;
@@ -120,25 +105,10 @@ public class ExecutorServiceRule extends SerializableExternalResource {
   }
 
   /**
-   * Constructs a new single-threaded {@code ExecutorServiceRule} which invokes
-   * {@code ExecutorService.shutdownNow()} during {@code tearDown}.
+   * Constructs a {@code ExecutorServiceRule} which invokes {@code ExecutorService.shutdownNow()}
+   * during {@code tearDown}.
    */
   public ExecutorServiceRule() {
-    threadCount = 1;
-    enableAwaitTermination = false;
-    awaitTerminationTimeout = 0;
-    awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
-    awaitTerminationBeforeShutdown = false;
-    useShutdown = false;
-    useShutdownNow = true;
-  }
-
-  /**
-   * Constructs a new multi-threaded {@code ExecutorServiceRule} which invokes
-   * {@code ExecutorService.shutdownNow()} during {@code tearDown}.
-   */
-  public ExecutorServiceRule(int threadCount) {
-    this.threadCount = threadCount;
     enableAwaitTermination = false;
     awaitTerminationTimeout = 0;
     awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
@@ -149,11 +119,7 @@ public class ExecutorServiceRule extends SerializableExternalResource {
 
   @Override
   public void before() {
-    if (threadCount > 1) {
-      executor = Executors.newFixedThreadPool(threadCount);
-    } else {
-      executor = Executors.newSingleThreadExecutor();
-    }
+    executor = Executors.newCachedThreadPool();
   }
 
   @Override
@@ -272,12 +238,11 @@ public class ExecutorServiceRule extends SerializableExternalResource {
 
   public static class Builder {
 
-    protected int threadCount = 1;
-    protected boolean enableAwaitTermination = false;
-    protected long awaitTerminationTimeout = 0;
+    protected boolean enableAwaitTermination;
+    protected long awaitTerminationTimeout;
     protected TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
     protected boolean awaitTerminationBeforeShutdown = true;
-    protected boolean useShutdown = false;
+    protected boolean useShutdown;
     protected boolean useShutdownNow = true;
 
     protected Builder() {
@@ -285,16 +250,6 @@ public class ExecutorServiceRule extends SerializableExternalResource {
     }
 
     /**
-     * Configures the number of threads. Default is one thread.
-     *
-     * @param threadCount the number of threads in the pool
-     */
-    public Builder threadCount(int threadCount) {
-      this.threadCount = threadCount;
-      return this;
-    }
-
-    /**
      * Enables invocation of {@code awaitTermination} during {@code tearDown}. Default is disabled.
      *
      * @param timeout the maximum time to wait
@@ -348,7 +303,6 @@ public class ExecutorServiceRule extends SerializableExternalResource {
      * Builds the instance of {@code ExecutorServiceRule}.
      */
     public ExecutorServiceRule build() {
-      assertThat(threadCount).isGreaterThan(0);
       return new ExecutorServiceRule(this);
     }
   }