Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 097FA17228 for ; Fri, 17 Oct 2014 08:40:09 +0000 (UTC) Received: (qmail 74116 invoked by uid 500); 17 Oct 2014 08:39:56 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 74005 invoked by uid 500); 17 Oct 2014 08:39:56 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 73762 invoked by uid 99); 17 Oct 2014 08:39:56 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Oct 2014 08:39:56 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5D9D1A09C67; Fri, 17 Oct 2014 08:39:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cmoulliard@apache.org To: commits@camel.apache.org Date: Fri, 17 Oct 2014 08:40:28 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [35/50] git commit: CAMEL-7885: Restarting a timer endpoint may not trigger at expected time the first time CAMEL-7885: Restarting a timer endpoint may not trigger at expected time the first time Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/077e1e21 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/077e1e21 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/077e1e21 Branch: refs/remotes/origin/camel-2.13.x Commit: 077e1e21795684159d7723a17298b6f84a533da7 Parents: b9ca479 Author: Claus Ibsen Authored: Wed Oct 1 11:17:14 2014 +0200 Committer: Claus Ibsen Committed: Wed Oct 1 11:18:40 2014 +0200 ---------------------------------------------------------------------- .../camel/component/timer/TimerComponent.java | 39 ++++++++++++++++--- .../camel/component/timer/TimerConsumer.java | 13 ++++++- .../camel/component/timer/TimerEndpoint.java | 41 ++++++++++++++------ 3 files changed, 74 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/077e1e21/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java index aed97e0..ec67f01 100644 --- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java +++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Timer; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; @@ -34,14 +35,15 @@ import org.apache.camel.impl.UriEndpointComponent; */ public class TimerComponent extends UriEndpointComponent { private final Map timers = new HashMap(); + private final Map refCounts = new HashMap<>(); public TimerComponent() { super(TimerEndpoint.class); } - public Timer getTimer(TimerEndpoint endpoint) { - String key = endpoint.getTimerName(); - if (!endpoint.isDaemon()) { + public Timer getTimer(TimerConsumer consumer) { + String key = consumer.getEndpoint().getTimerName(); + if (!consumer.getEndpoint().isDaemon()) { key = "nonDaemon:" + key; } @@ -50,14 +52,40 @@ public class TimerComponent extends UriEndpointComponent { answer = timers.get(key); if (answer == null) { // the timer name is also the thread name, so lets resolve a name to be used - String name = endpoint.getCamelContext().getExecutorServiceManager().resolveThreadName("timer://" + endpoint.getTimerName()); - answer = new Timer(name, endpoint.isDaemon()); + String name = consumer.getEndpoint().getCamelContext().getExecutorServiceManager().resolveThreadName("timer://" + consumer.getEndpoint().getTimerName()); + answer = new Timer(name, consumer.getEndpoint().isDaemon()); timers.put(key, answer); + // store new reference counter + refCounts.put(key, new AtomicInteger(1)); + } else { + // increase reference counter + AtomicInteger counter = refCounts.get(key); + counter.incrementAndGet(); } } return answer; } + public void removeTimer(TimerConsumer consumer) { + String key = consumer.getEndpoint().getTimerName(); + if (!consumer.getEndpoint().isDaemon()) { + key = "nonDaemon:" + key; + } + + synchronized (timers) { + // decrease reference counter + AtomicInteger counter = refCounts.get(key); + if (counter.decrementAndGet() <= 0) { + refCounts.remove(key); + // remove timer as its no longer in use + Timer timer = timers.remove(key); + if (timer != null) { + timer.cancel(); + } + } + } + } + @Override protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { TimerEndpoint answer = new TimerEndpoint(uri, this, remaining); @@ -89,5 +117,6 @@ public class TimerComponent extends UriEndpointComponent { timer.cancel(); } timers.clear(); + refCounts.clear(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/077e1e21/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java index 66617d2..d0b2e6b 100644 --- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.StartupListener; @@ -47,6 +48,11 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener { } @Override + public TimerEndpoint getEndpoint() { + return (TimerEndpoint) super.getEndpoint(); + } + + @Override protected void doStart() throws Exception { task = new TimerTask() { // counter @@ -81,7 +87,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener { // only configure task if CamelContext already started, otherwise the StartupListener // is configuring the task later if (!configured && endpoint.getCamelContext().getStatus().isStarted()) { - Timer timer = endpoint.getTimer(); + Timer timer = endpoint.getTimer(this); configureTask(task, timer); } } @@ -93,12 +99,15 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener { } task = null; configured = false; + + // remove timer + endpoint.removeTimer(this); } @Override public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { if (task != null && !configured) { - Timer timer = endpoint.getTimer(); + Timer timer = endpoint.getTimer(this); configureTask(task, timer); } } http://git-wip-us.apache.org/repos/asf/camel/blob/077e1e21/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java index 5b4dc3f..89c170e 100644 --- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java @@ -64,6 +64,11 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS this.timerName = timerName; } + @Override + public TimerComponent getComponent() { + return (TimerComponent) super.getComponent(); + } + public Producer createProducer() throws Exception { throw new RuntimeCamelException("Cannot produce to a TimerEndpoint: " + getEndpointUri()); } @@ -167,18 +172,6 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS return true; } - public synchronized Timer getTimer() { - if (timer == null) { - TimerComponent tc = (TimerComponent)getComponent(); - timer = tc.getTimer(this); - } - return timer; - } - - public synchronized void setTimer(Timer timer) { - this.timer = timer; - } - @ManagedAttribute(description = "Camel id") public String getCamelId() { return this.getCamelContext().getName(); @@ -198,4 +191,28 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS public String getState() { return getStatus().name(); } + + protected TimerEndpoint(String endpointUri, Component component) { + super(endpointUri, component); + } + + public Timer getTimer(TimerConsumer consumer) { + if (timer != null) { + // use custom timer + return timer; + } + return getComponent().getTimer(consumer); + } + + public void setTimer(Timer timer) { + this.timer = timer; + } + + public void removeTimer(TimerConsumer consumer) { + if (timer == null) { + // only remove timer if we are not using a custom timer + getComponent().removeTimer(consumer); + } + } + }