camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [4/5] git commit: CAMEL-6670: Added current throttled number of exchanges to throttler eip jmx. Thanks to Christian Posta for the patch.
Date Fri, 11 Oct 2013 13:09:11 GMT
CAMEL-6670: Added current throttled number of exchanges to throttler eip jmx. Thanks to Christian
Posta for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c698df6e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c698df6e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c698df6e

Branch: refs/heads/master
Commit: c698df6e7a679e8e391baa75f2ac63523e740bb6
Parents: 3506c3e
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Fri Oct 11 15:08:28 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Oct 11 15:08:28 2013 +0200

----------------------------------------------------------------------
 .../management/mbean/ManagedThrottlerMBean.java |   3 +
 .../management/mbean/ManagedThrottler.java      |   4 +
 .../camel/processor/DelayProcessorSupport.java  |  20 ++
 .../org/apache/camel/processor/Throttler.java   |  11 +-
 .../camel/management/ManagedThrottlerTest.java  | 275 ++++++++++++++++++-
 5 files changed, 304 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
index a0dc3a0..feec600 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
@@ -32,4 +32,7 @@ public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
     @ManagedAttribute(description = "Time period in millis")
     void setTimePeriodMillis(long timePeriodMillis);
 
+    @ManagedAttribute(description = "Number of exchanges currently throttled")
+    int getThrottledCount();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
index f80dfa1..99a5e95 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
@@ -55,4 +55,8 @@ public class ManagedThrottler extends ManagedProcessor implements ManagedThrottl
     public void setTimePeriodMillis(long timePeriodMillis) {
         getThrottler().setTimePeriodMillis(timePeriodMillis);
     }
+
+    public int getThrottledCount() {
+        return getThrottler().getDelayedCount();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index ef69759..ff81170 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -43,6 +44,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor
{
     private final boolean shutdownExecutorService;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
+    private final AtomicInteger delayedCount = new AtomicInteger(0);
 
     // TODO: Add option to cancel tasks on shutdown so we can stop fast
 
@@ -56,6 +58,9 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor
{
         }
 
         public void run() {
+            // we are running now so decrement the counter
+            delayedCount.decrementAndGet();
+
             log.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId());
             if (!isRunAllowed()) {
                 exchange.setException(new RejectedExecutionException("Run is not allowed"));
@@ -123,6 +128,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor
{
             }
         } else {
             // asynchronous delay so schedule a process call task
+            // and increment the counter (we decrement the counter when we run the ProcessCall)
+            delayedCount.incrementAndGet();
             ProcessCall call = new ProcessCall(exchange, callback);
             try {
                 log.trace("Scheduling delayed task to run in {} millis for exchangeId: {}",
@@ -131,6 +138,8 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor
{
                 // tell Camel routing engine we continue routing asynchronous
                 return false;
             } catch (RejectedExecutionException e) {
+                // we were not allowed to run the ProcessCall, so need to decrement the counter
here
+                delayedCount.decrementAndGet();
                 if (isCallerRunsWhenRejected()) {
                     if (!isRunAllowed()) {
                         exchange.setException(new RejectedExecutionException());
@@ -174,6 +183,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor
{
     protected abstract long calculateDelay(Exchange exchange);
 
     /**
+     * Gets the current number of {@link Exchange}s being delayed (hold back due throttle
limit hit)
+     */
+    public int getDelayedCount() {
+        return delayedCount.get();
+    }
+
+    /**
      * Delays the given time before continuing.
      * <p/>
      * This implementation will block while waiting
@@ -191,9 +207,13 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor
{
             return;
         } else {
             try {
+                // keep track on delayer counter while we sleep
+                delayedCount.incrementAndGet();
                 sleep(delay);
             } catch (InterruptedException e) {
                 handleSleepInterruptedException(e, exchange);
+            } finally {
+                delayedCount.decrementAndGet();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 52989a4..ae6bc26 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -17,6 +17,7 @@
 package org.apache.camel.processor;
 
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -40,7 +41,7 @@ import org.apache.camel.util.ObjectHelper;
 public class Throttler extends DelayProcessorSupport implements Traceable {
     private volatile long maximumRequestsPerPeriod;
     private Expression maxRequestsPerPeriodExpression;
-    private long timePeriodMillis = 1000;
+    private AtomicLong timePeriodMillis = new AtomicLong(1000);
     private volatile TimeSlot slot;
 
     public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression,
long timePeriodMillis,
@@ -53,7 +54,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable
{
         if (timePeriodMillis <= 0) {
             throw new IllegalArgumentException("TimePeriodMillis should be a positive number,
was: " + timePeriodMillis);
         }
-        this.timePeriodMillis = timePeriodMillis;
+        this.timePeriodMillis.set(timePeriodMillis);
     }
 
     @Override
@@ -81,7 +82,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable
{
     }
     
     public long getTimePeriodMillis() {
-        return timePeriodMillis;
+        return timePeriodMillis.get();
     }
 
     /**
@@ -95,7 +96,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable
{
      * Sets the time period during which the maximum number of requests apply
      */
     public void setTimePeriodMillis(long timePeriodMillis) {
-        this.timePeriodMillis = timePeriodMillis;
+        this.timePeriodMillis.set(timePeriodMillis);
     }
 
     // Implementation methods
@@ -151,7 +152,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable
{
     protected class TimeSlot {
         
         private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
-        private final long duration = Throttler.this.timePeriodMillis;
+        private final long duration = Throttler.this.timePeriodMillis.get();
         private final long startTime;
 
         protected TimeSlot() {

http://git-wip-us.apache.org/repos/asf/camel/blob/c698df6e/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
index 2f90dfe..feb3e1c 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
@@ -16,14 +16,23 @@
  */
 package org.apache.camel.management;
 
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import javax.management.Attribute;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * @version 
+ * @version
  */
 public class ManagedThrottlerTest extends ManagementTestSupport {
 
@@ -95,15 +104,273 @@ public class ManagedThrottlerTest extends ManagementTestSupport {
         assertTrue("Should be around 5 sec now: was " + total, total > 3500);
     }
 
+    public void testThrottleVisableViaJmx() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        getMockEndpoint("mock:end").expectedMessageCount(10);
+
+        NotifyBuilder notifier = new NotifyBuilder(context).
+                from("seda:throttleCount").whenReceived(5).create();
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCount", "Message " + i);
+        }
+
+        assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+        // we are expecting this to be > 0
+        assertTrue(throttledMessages.intValue() > 0);
+
+        assertMockEndpointsSatisfied();
+
+        throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages,
(Integer) 0, throttledMessages);
+
+        Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+        assertEquals(10, completed.longValue());
+
+    }
+
+    public void testThrottleAsyncVisableViaJmx() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler3\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route3\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        getMockEndpoint("mock:endAsync").expectedMessageCount(10);
+
+        // we pick '5' because we are right in the middle of the number of messages
+        // that have been and reduces any race conditions to minimal...
+        NotifyBuilder notifier = new NotifyBuilder(context).
+                from("seda:throttleCountAsync").whenReceived(5).create();
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountAsync", "Message " + i);
+        }
+
+        assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+        // we are expecting this to be > 0
+        assertTrue(throttledMessages.intValue() > 0);
+
+        assertMockEndpointsSatisfied();
+
+        throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages,
(Integer)0, throttledMessages);
+
+        Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+        assertEquals(10, completed.longValue());
+
+    }
+
+    public void testThrottleAsyncExceptionVisableViaJmx() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler4\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route4\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        getMockEndpoint("mock:endAsyncException").expectedMessageCount(10);
+
+        NotifyBuilder notifier = new NotifyBuilder(context).
+                from("seda:throttleCountAsyncException").whenReceived(5).create();
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountAsyncException", "Message " + i);
+        }
+
+        assertTrue(notifier.matches(2, TimeUnit.SECONDS));
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+
+        // we are expecting this to be > 0
+        assertTrue(throttledMessages.intValue() > 0);
+
+        assertMockEndpointsSatisfied();
+
+        // give a sec for exception handling to finish..
+        Thread.sleep(500);
+
+        throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages,
(Integer)0, throttledMessages);
+
+        // since all exchanges ended w/ exception, they are not completed
+        Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted");
+        assertEquals(0, completed.longValue());
+
+    }
+
+    public void testRejectedExecution() throws Exception {
+        // when delaying async, we can possibly fill up the execution queue
+        //. which would through a RejectedExecutionException.. we need to make
+        // sure that the delayedCount/throttledCount doesn't leak
+
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        MockEndpoint mock = getMockEndpoint("mock:endAsyncReject");
+        // only one message (the first one) should get through because the rest should get
delayed
+        mock.expectedMessageCount(1);
+
+        MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint1");
+        exceptionMock.expectedMessageCount(9);
+
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountRejectExecution", "Message " + i);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // we shouldn't have ane leaked throttler counts
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages,
(Integer) 0, throttledMessages);
+
+    }
+
+    public void testRejectedExecutionCallerRuns() throws Exception {
+        // when delaying async, we can possibly fill up the execution queue
+        //. which would through a RejectedExecutionException.. we need to make
+        // sure that the delayedCount/throttledCount doesn't leak
+
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+        // get the object name for the delayer
+        ObjectName throttlerName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=processors,name=\"mythrottler2\"");
+
+        // use route to get the total time
+        ObjectName routeName = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route2\"");
+
+        // reset the counters
+        mbeanServer.invoke(routeName, "reset", null, null);
+
+        MockEndpoint mock = getMockEndpoint("mock:endAsyncRejectCallerRuns");
+        // only one message (the first one) should get through because the rest should get
delayed
+        mock.expectedMessageCount(10);
+
+        MockEndpoint exceptionMock = getMockEndpoint("mock:rejectedExceptionEndpoint");
+        exceptionMock.expectedMessageCount(0);
+
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("seda:throttleCountRejectExecutionCallerRuns", "Message " +
i);
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // we shouldn't have ane leaked throttler counts
+        Integer throttledMessages = (Integer) mbeanServer.getAttribute(throttlerName, "ThrottledCount");
+        assertEquals("Should not be any throttled messages left, found: " + throttledMessages,
(Integer) 0, throttledMessages);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
+        final ScheduledExecutorService badService = new ScheduledThreadPoolExecutor(1) {
+            @Override
+            public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit) {
+                throw new RejectedExecutionException();
+            }
+        };
+
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .to("log:foo")
-                    .throttle(10).id("mythrottler")
-                    .to("mock:result");
+                        .to("log:foo")
+                        .throttle(10).id("mythrottler")
+                        .to("mock:result");
+
+                from("seda:throttleCount")
+                        .throttle(1).timePeriodMillis(250).id("mythrottler2")
+                        .to("mock:end");
+
+                from("seda:throttleCountAsync")
+                        .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler3")
+                        .to("mock:endAsync");
+
+                from("seda:throttleCountAsyncException")
+                        .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler4")
+                        .to("mock:endAsyncException")
+                        .process(new Processor() {
+
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                throw new RuntimeException("Fail me");
+                            }
+                        });
+                from("seda:throttleCountRejectExecutionCallerRuns")
+                        .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
+                        .throttle(1)
+                            .timePeriodMillis(250)
+                            .asyncDelayed()
+                            .executorService(badService)
+                            .callerRunsWhenRejected(true)
+                            .id("mythrottler5")
+                        .to("mock:endAsyncRejectCallerRuns");
+
+                from("seda:throttleCountRejectExecution")
+                        .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
+                        .throttle(1)
+                            .timePeriodMillis(250)
+                            .asyncDelayed()
+                            .executorService(badService)
+                            .callerRunsWhenRejected(false)
+                            .id("mythrottler6")
+                        .to("mock:endAsyncReject");
             }
         };
     }


Mime
View raw message