Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 621B4200C24 for ; Thu, 23 Feb 2017 16:20:34 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 60AC0160B62; Thu, 23 Feb 2017 15:20:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 704A2160B50 for ; Thu, 23 Feb 2017 16:20:33 +0100 (CET) Received: (qmail 26687 invoked by uid 500); 23 Feb 2017 15:20:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 26678 invoked by uid 99); 23 Feb 2017 15:20:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Feb 2017 15:20:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 53665DFD9E; Thu, 23 Feb 2017 15:20:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: <2f5526e0c29f438ba25c79c2617a3be4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: AMQ-6602: Fix race condition in TaskRunnerFactory Date: Thu, 23 Feb 2017 15:20:32 +0000 (UTC) archived-at: Thu, 23 Feb 2017 15:20:34 -0000 Repository: activemq Updated Branches: refs/heads/master 816f81e60 -> fe5164a40 AMQ-6602: Fix race condition in TaskRunnerFactory Fixing a race condition in TaskRunnerFactory where if multiple threads call createTaskRunner() at the same time some threads might see the executor as null (if it hasn't finished initializing) leading to the creation of extra DedicatedTaskRunner objects instead of sharing a PooledTaskRunner. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fe5164a4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fe5164a4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fe5164a4 Branch: refs/heads/master Commit: fe5164a404ebcb0879c1b769e16c00f475320419 Parents: 816f81e Author: Christopher L. Shannon (cshannon) Authored: Thu Feb 23 10:18:22 2017 -0500 Committer: Christopher L. Shannon (cshannon) Committed: Thu Feb 23 10:20:13 2017 -0500 ---------------------------------------------------------------------- .../activemq/thread/TaskRunnerFactory.java | 50 ++++++++++----- .../activemq/thread/TaskRunnerFactoryTest.java | 67 ++++++++++++++++++++ 2 files changed, 102 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/fe5164a4/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index ab49375..002aec7 100644 --- a/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.util.ThreadPoolUtils; import org.slf4j.Logger; @@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory; public class TaskRunnerFactory implements Executor { private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class); - private ExecutorService executor; + private final AtomicReference executorRef = new AtomicReference<>(); private int maxIterationsPerRun; private String name; private int priority; @@ -81,15 +82,23 @@ public class TaskRunnerFactory implements Executor { } public void init() { - if (initDone.compareAndSet(false, true)) { + if (!initDone.get()) { // If your OS/JVM combination has a good thread model, you may want to // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead. - if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) { - executor = null; - } else if (executor == null) { - executor = createDefaultExecutor(); + //AMQ-6602 - lock instead of using compareAndSet to prevent threads from seeing a null value + //for executorRef inside createTaskRunner() on contention and creating a DedicatedTaskRunner + synchronized(this) { + //need to recheck if initDone is true under the lock + if (!initDone.get()) { + if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) { + executorRef.set(null); + } else { + executorRef.compareAndSet(null, createDefaultExecutor()); + } + LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executorRef.get()); + initDone.set(true); + } } - LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor); } } @@ -99,11 +108,11 @@ public class TaskRunnerFactory implements Executor { * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService) */ public void shutdown() { + ExecutorService executor = executorRef.get(); if (executor != null) { ThreadPoolUtils.shutdown(executor); - executor = null; } - initDone.set(false); + clearExecutor(); } /** @@ -112,11 +121,11 @@ public class TaskRunnerFactory implements Executor { * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService) */ public void shutdownNow() { + ExecutorService executor = executorRef.get(); if (executor != null) { ThreadPoolUtils.shutdownNow(executor); - executor = null; } - initDone.set(false); + clearExecutor(); } /** @@ -125,15 +134,25 @@ public class TaskRunnerFactory implements Executor { * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService) */ public void shutdownGraceful() { + ExecutorService executor = executorRef.get(); if (executor != null) { ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination); - executor = null; } - initDone.set(false); + clearExecutor(); + } + + private void clearExecutor() { + //clear under a lock to prevent threads from seeing initDone == true + //but then getting null from executorRef + synchronized(this) { + executorRef.set(null); + initDone.set(false); + } } public TaskRunner createTaskRunner(Task task, String name) { init(); + ExecutorService executor = executorRef.get(); if (executor != null) { return new PooledTaskRunner(executor, task, maxIterationsPerRun); } else { @@ -149,6 +168,7 @@ public class TaskRunnerFactory implements Executor { public void execute(Runnable runnable, String name) { init(); LOG.trace("Execute[{}] runnable: {}", name, runnable); + ExecutorService executor = executorRef.get(); if (executor != null) { executor.execute(runnable); } else { @@ -198,11 +218,11 @@ public class TaskRunnerFactory implements Executor { } public ExecutorService getExecutor() { - return executor; + return executorRef.get(); } public void setExecutor(ExecutorService executor) { - this.executor = executor; + this.executorRef.set(executor); } public int getMaxIterationsPerRun() { http://git-wip-us.apache.org/repos/asf/activemq/blob/fe5164a4/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerFactoryTest.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerFactoryTest.java b/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerFactoryTest.java new file mode 100644 index 0000000..4926041 --- /dev/null +++ b/activemq-client/src/test/java/org/apache/activemq/thread/TaskRunnerFactoryTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.thread; + +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.junit.Test; + +public class TaskRunnerFactoryTest { + + /** + * AMQ-6602 test + * Test contention on createTaskRunner() to make sure that all threads end up + * using a PooledTaskRunner + * + * @throws Exception + */ + @Test + public void testConcurrentTaskRunnerCreaction() throws Exception { + + final TaskRunnerFactory factory = new TaskRunnerFactory(); + final ExecutorService service = Executors.newFixedThreadPool(10); + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(10); + final List runners = Collections.synchronizedList(new ArrayList<>(10)); + + for (int i = 0; i < 10; i++) { + service.execute(() -> { + try { + latch1.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + runners.add(factory.createTaskRunner(() -> true, "task") ); + latch2.countDown(); + }); + } + + latch1.countDown(); + latch2.await(); + + for (TaskRunner runner : runners) { + assertTrue(runner instanceof PooledTaskRunner); + } + } +}