camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [4/4] git commit: CAMEL-7885: Restarting a timer endpoint may not trigger at expected time the first time
Date Wed, 01 Oct 2014 09:19:00 GMT
CAMEL-7885: Restarting a timer endpoint may not trigger at expected time the first time


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/17a908ad
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/17a908ad
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/17a908ad

Branch: refs/heads/camel-2.12.x
Commit: 17a908ad6510fe5e21a91f619aeadff4f5c7df5d
Parents: 598d2b3
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Wed Oct 1 11:17:14 2014 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Wed Oct 1 11:18:48 2014 +0200

----------------------------------------------------------------------
 .../camel/component/timer/TimerComponent.java   | 39 ++++++++++++++++---
 .../camel/component/timer/TimerConsumer.java    | 13 ++++++-
 .../camel/component/timer/TimerEndpoint.java    | 41 ++++++++++++++------
 3 files changed, 74 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/17a908ad/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
index aed97e0..ec67f01 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
@@ -34,14 +35,15 @@ import org.apache.camel.impl.UriEndpointComponent;
  */
 public class TimerComponent extends UriEndpointComponent {
     private final Map<String, Timer> timers = new HashMap<String, Timer>();
+    private final Map<String, AtomicInteger> refCounts = new HashMap<>();
 
     public TimerComponent() {
         super(TimerEndpoint.class);
     }
 
-    public Timer getTimer(TimerEndpoint endpoint) {
-        String key = endpoint.getTimerName();
-        if (!endpoint.isDaemon()) {
+    public Timer getTimer(TimerConsumer consumer) {
+        String key = consumer.getEndpoint().getTimerName();
+        if (!consumer.getEndpoint().isDaemon()) {
             key = "nonDaemon:" + key;
         }
 
@@ -50,14 +52,40 @@ public class TimerComponent extends UriEndpointComponent {
             answer = timers.get(key);
             if (answer == null) {
                 // the timer name is also the thread name, so lets resolve a name to be used
-                String name = endpoint.getCamelContext().getExecutorServiceManager().resolveThreadName("timer://"
+ endpoint.getTimerName());
-                answer = new Timer(name, endpoint.isDaemon());
+                String name = consumer.getEndpoint().getCamelContext().getExecutorServiceManager().resolveThreadName("timer://"
+ consumer.getEndpoint().getTimerName());
+                answer = new Timer(name, consumer.getEndpoint().isDaemon());
                 timers.put(key, answer);
+                // store new reference counter
+                refCounts.put(key, new AtomicInteger(1));
+            } else {
+                // increase reference counter
+                AtomicInteger counter = refCounts.get(key);
+                counter.incrementAndGet();
             }
         }
         return answer;
     }
 
+    public void removeTimer(TimerConsumer consumer) {
+        String key = consumer.getEndpoint().getTimerName();
+        if (!consumer.getEndpoint().isDaemon()) {
+            key = "nonDaemon:" + key;
+        }
+
+        synchronized (timers) {
+            // decrease reference counter
+            AtomicInteger counter = refCounts.get(key);
+            if (counter.decrementAndGet() <= 0) {
+                refCounts.remove(key);
+                // remove timer as its no longer in use
+                Timer timer = timers.remove(key);
+                if (timer != null) {
+                    timer.cancel();
+                }
+            }
+        }
+    }
+
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object>
parameters) throws Exception {
         TimerEndpoint answer = new TimerEndpoint(uri, this, remaining);
@@ -89,5 +117,6 @@ public class TimerComponent extends UriEndpointComponent {
             timer.cancel();
         }
         timers.clear();
+        refCounts.clear();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/17a908ad/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index 66617d2..d0b2e6b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.StartupListener;
@@ -47,6 +48,11 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener
{
     }
 
     @Override
+    public TimerEndpoint getEndpoint() {
+        return (TimerEndpoint) super.getEndpoint();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         task = new TimerTask() {
             // counter
@@ -81,7 +87,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener
{
         // only configure task if CamelContext already started, otherwise the StartupListener
         // is configuring the task later
         if (!configured && endpoint.getCamelContext().getStatus().isStarted()) {
-            Timer timer = endpoint.getTimer();
+            Timer timer = endpoint.getTimer(this);
             configureTask(task, timer);
         }
     }
@@ -93,12 +99,15 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener
{
         }
         task = null;
         configured = false;
+
+        // remove timer
+        endpoint.removeTimer(this);
     }
 
     @Override
     public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws
Exception {
         if (task != null && !configured) {
-            Timer timer = endpoint.getTimer();
+            Timer timer = endpoint.getTimer(this);
             configureTask(task, timer);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/17a908ad/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
index 61e3b88..7056ab3 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
@@ -64,6 +64,11 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS
         this.timerName = timerName;
     }
 
+    @Override
+    public TimerComponent getComponent() {
+        return (TimerComponent) super.getComponent();
+    }
+
     public Producer createProducer() throws Exception {
         throw new RuntimeCamelException("Cannot produce to a TimerEndpoint: " + getEndpointUri());
     }
@@ -167,18 +172,6 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return true;
     }
 
-    public synchronized Timer getTimer() {
-        if (timer == null) {
-            TimerComponent tc = (TimerComponent)getComponent();
-            timer = tc.getTimer(this);
-        }
-        return timer;
-    }
-
-    public synchronized void setTimer(Timer timer) {
-        this.timer = timer;
-    }
-
     @ManagedAttribute(description = "Camel id")
     public String getCamelId() {
         return this.getCamelContext().getName();
@@ -193,4 +186,28 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public String getState() {
         return getStatus().name();
     }
+
+    protected TimerEndpoint(String endpointUri, Component component) {
+        super(endpointUri, component);
+    }
+
+    public Timer getTimer(TimerConsumer consumer) {
+        if (timer != null) {
+            // use custom timer
+            return timer;
+        }
+        return getComponent().getTimer(consumer);
+    }
+
+    public void setTimer(Timer timer) {
+        this.timer = timer;
+    }
+
+    public void removeTimer(TimerConsumer consumer) {
+        if (timer == null) {
+            // only remove timer if we are not using a custom timer
+            getComponent().removeTimer(consumer);
+        }
+    }
+
 }


Mime
View raw message