Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D63BC200BF6 for ; Tue, 10 Jan 2017 16:27:54 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D4D12160B3D; Tue, 10 Jan 2017 15:27:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D28C5160B29 for ; Tue, 10 Jan 2017 16:27:53 +0100 (CET) Received: (qmail 41156 invoked by uid 500); 10 Jan 2017 15:27:53 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 41146 invoked by uid 99); 10 Jan 2017 15:27:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Jan 2017 15:27:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CF0CADFA98; Tue, 10 Jan 2017 15:27:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: <31a60842a0c6439ebc8659304a804880@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6555 Date: Tue, 10 Jan 2017 15:27:52 +0000 (UTC) archived-at: Tue, 10 Jan 2017 15:27:55 -0000 Repository: activemq Updated Branches: refs/heads/activemq-5.14.x 71f51dc4c -> e4da98bd7 https://issues.apache.org/jira/browse/AMQ-6555 Fixing Scheduler so that a rescheduled task will first cancel the existing task so it does not get orphaned from the task map. Also fixing Topic start so that it will only start once and not twice. (cherry picked from commit 2769298cf64a10cd74320ad132b3677bac20a6cc) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e4da98bd Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e4da98bd Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e4da98bd Branch: refs/heads/activemq-5.14.x Commit: e4da98bd790566ba2208246541f71a28b4a00c0f Parents: 71f51dc Author: Christopher L. Shannon (cshannon) Authored: Tue Jan 10 10:25:15 2017 -0500 Committer: Christopher L. Shannon (cshannon) Committed: Tue Jan 10 10:27:44 2017 -0500 ---------------------------------------------------------------------- .../activemq/broker/region/BaseDestination.java | 2 + .../apache/activemq/broker/region/Queue.java | 8 +- .../apache/activemq/broker/region/Topic.java | 40 ++++----- .../org/apache/activemq/thread/Scheduler.java | 8 ++ .../apache/activemq/thread/SchedulerTest.java | 85 ++++++++++++++++++++ 5 files changed, 120 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e4da98bd/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 75f2ee0..71642d5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -18,6 +18,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.ResourceAllocationException; @@ -59,6 +60,7 @@ public abstract class BaseDestination implements Destination { public static final int MAX_PRODUCERS_TO_AUDIT = 64; public static final int MAX_AUDIT_DEPTH = 10000; + protected final AtomicBoolean started = new AtomicBoolean(); protected final ActiveMQDestination destination; protected final Broker broker; protected final MessageStore store; http://git-wip-us.apache.org/repos/asf/activemq/blob/e4da98bd/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 6a42ebc..b841b89 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.region; +import static org.apache.activemq.broker.region.cursors.AbstractStoreCursor.gotToTheStore; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -36,7 +38,6 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -96,8 +97,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import static org.apache.activemq.broker.region.cursors.AbstractStoreCursor.gotToTheStore; - /** * The Queue is a List of MessageEntry objects that are dispatched to matching * subscriptions. @@ -132,7 +131,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index private CountDownLatch consumersBeforeStartsLatch; private final AtomicLong pendingWakeups = new AtomicLong(); private boolean allConsumersExclusiveByDefault = false; - private final AtomicBoolean started = new AtomicBoolean(); private volatile boolean resetNeeded; @@ -217,7 +215,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index LOG.debug(getName() + "Producer Flow Control Timeout Task is stopping"); } } - }; + } private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask(); http://git-wip-us.apache.org/repos/asf/activemq/blob/e4da98bd/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 0842467..7707bf5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -83,7 +83,7 @@ public class Topic extends BaseDestination implements Task { Topic.this.taskRunner.wakeup(); } catch (InterruptedException e) { } - }; + } }; public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, @@ -598,30 +598,34 @@ public class Topic extends BaseDestination implements Task { @Override public void start() throws Exception { - this.subscriptionRecoveryPolicy.start(); - if (memoryUsage != null) { - memoryUsage.start(); - } + if (started.compareAndSet(false, true)) { + this.subscriptionRecoveryPolicy.start(); + if (memoryUsage != null) { + memoryUsage.start(); + } - if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { - scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); + if (getExpireMessagesPeriod() > 0 && !AdvisorySupport.isAdvisoryTopic(getActiveMQDestination())) { + scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); + } } } @Override public void stop() throws Exception { - if (taskRunner != null) { - taskRunner.shutdown(); - } - this.subscriptionRecoveryPolicy.stop(); - if (memoryUsage != null) { - memoryUsage.stop(); - } - if (this.topicStore != null) { - this.topicStore.stop(); - } + if (started.compareAndSet(true, false)) { + if (taskRunner != null) { + taskRunner.shutdown(); + } + this.subscriptionRecoveryPolicy.stop(); + if (memoryUsage != null) { + memoryUsage.stop(); + } + if (this.topicStore != null) { + this.topicStore.stop(); + } - scheduler.cancel(expireMessagesTask); + scheduler.cancel(expireMessagesTask); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/e4da98bd/activemq-client/src/main/java/org/apache/activemq/thread/Scheduler.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/thread/Scheduler.java b/activemq-client/src/main/java/org/apache/activemq/thread/Scheduler.java index 2fdb11a..2e2de95 100755 --- a/activemq-client/src/main/java/org/apache/activemq/thread/Scheduler.java +++ b/activemq-client/src/main/java/org/apache/activemq/thread/Scheduler.java @@ -22,12 +22,15 @@ import java.util.TimerTask; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * */ public final class Scheduler extends ServiceSupport { + private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class); private final String name; private Timer timer; private final HashMap timerTasks = new HashMap(); @@ -37,6 +40,11 @@ public final class Scheduler extends ServiceSupport { } public synchronized void executePeriodically(final Runnable task, long period) { + TimerTask existing = timerTasks.get(task); + if (existing != null) { + LOG.debug("Task {} already scheduled, cancelling and rescheduling", task); + cancel(task); + } TimerTask timerTask = new SchedulerTimerTask(task); timer.schedule(timerTask, period, period); timerTasks.put(task, timerTask); http://git-wip-us.apache.org/repos/asf/activemq/blob/e4da98bd/activemq-client/src/test/java/org/apache/activemq/thread/SchedulerTest.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/test/java/org/apache/activemq/thread/SchedulerTest.java b/activemq-client/src/test/java/org/apache/activemq/thread/SchedulerTest.java new file mode 100644 index 0000000..d63c831 --- /dev/null +++ b/activemq-client/src/test/java/org/apache/activemq/thread/SchedulerTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.thread; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class SchedulerTest { + + private final static String schedulerName = "testScheduler"; + private Scheduler scheduler; + + @Before + public void before() throws Exception { + scheduler = new Scheduler(schedulerName); + scheduler.start(); + } + + @After + public void after() throws Exception { + scheduler.stop(); + } + + @Test + public void testExecutePeriodically() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + scheduler.executePeriodically(new CountDownRunnable(latch), 10); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + } + + @Test + public void executeAfterDelay() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + scheduler.executeAfterDelay(new CountDownRunnable(latch), 10); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + } + + @Test + public void testExecutePeriodicallyReplace() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownRunnable task = new CountDownRunnable(latch); + + scheduler.executePeriodically(task, 500); + scheduler.executePeriodically(task, 500); + scheduler.cancel(task); + + //make sure the task never runs + assertFalse(latch.await(1000, TimeUnit.MILLISECONDS)); + } + + private static class CountDownRunnable implements Runnable { + final CountDownLatch latch; + + CountDownRunnable(final CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + latch.countDown(); + } + } + +}