Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D721A109B8 for ; Mon, 3 Feb 2014 13:55:08 +0000 (UTC) Received: (qmail 78611 invoked by uid 500); 3 Feb 2014 13:55:08 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 78529 invoked by uid 500); 3 Feb 2014 13:55:06 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 78506 invoked by uid 99); 3 Feb 2014 13:55:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Feb 2014 13:55:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5421D8C1C98; Mon, 3 Feb 2014 13:55:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Mon, 03 Feb 2014 13:55:06 -0000 Message-Id: In-Reply-To: <9445eebaa20c43599e0f6621bacf03c4@git.apache.org> References: <9445eebaa20c43599e0f6621bacf03c4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] git commit: CAMEL-7160: Fixed throttler with dynamic header changing rate not as expected. Thanks to Michael Pisula for the patch. CAMEL-7160: Fixed throttler with dynamic header changing rate not as expected. Thanks to Michael Pisula for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/04f0d8d7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/04f0d8d7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/04f0d8d7 Branch: refs/heads/camel-2.11.x Commit: 04f0d8d7f5c8a1494eba1b311d4fbf45b125d16e Parents: 9fe42d1 Author: Claus Ibsen Authored: Mon Feb 3 14:55:23 2014 +0100 Committer: Claus Ibsen Committed: Mon Feb 3 14:55:55 2014 +0100 ---------------------------------------------------------------------- .../org/apache/camel/processor/Throttler.java | 28 +++++++----- .../apache/camel/processor/ThrottlerTest.java | 46 ++++++++++++++++++-- 2 files changed, 61 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/04f0d8d7/camel-core/src/main/java/org/apache/camel/processor/Throttler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java index 52989a4..3ecc24f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java @@ -34,8 +34,8 @@ import org.apache.camel.util.ObjectHelper; * as only allowing 100 requests per second; or if huge load can cause a * particular system to malfunction or to reduce its throughput you might want * to introduce some throttling. - * - * @version + * + * @version */ public class Throttler extends DelayProcessorSupport implements Traceable { private volatile long maximumRequestsPerPeriod; @@ -79,7 +79,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable { public Expression getMaximumRequestsPerPeriodExpression() { return maxRequestsPerPeriodExpression; } - + public long getTimePeriodMillis() { return timePeriodMillis; } @@ -115,6 +115,9 @@ public class Throttler extends DelayProcessorSupport implements Traceable { if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) { log.debug("Throttler changed maximum requests per period from {} to {}", maximumRequestsPerPeriod, longValue); } + if (maximumRequestsPerPeriod > longValue) { + slot.capacity = 0; + } maximumRequestsPerPeriod = longValue; } @@ -130,7 +133,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable { return 0; } } - + /* * Determine what the next available time slot is for handling an Exchange */ @@ -138,7 +141,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable { if (slot == null) { slot = new TimeSlot(); } - if (slot.isFull() || !slot.isActive()) { + if (slot.isFull() || !slot.isPast()) { slot = slot.next(); } slot.assign(); @@ -149,7 +152,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable { * A time slot is capable of handling a number of exchanges within a certain period of time. */ protected class TimeSlot { - + private volatile long capacity = Throttler.this.maximumRequestsPerPeriod; private final long duration = Throttler.this.timePeriodMillis; private final long startTime; @@ -165,7 +168,7 @@ public class Throttler extends DelayProcessorSupport implements Traceable { protected void assign() { capacity--; } - + /* * Start the next time slot either now or in the future * (no time slots are being created in the past) @@ -173,15 +176,20 @@ public class Throttler extends DelayProcessorSupport implements Traceable { protected TimeSlot next() { return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration)); } - + + protected boolean isPast() { + long current = System.currentTimeMillis(); + return current < (startTime + duration); + } + protected boolean isActive() { long current = System.currentTimeMillis(); return startTime <= current && current < (startTime + duration); } - + protected boolean isFull() { return capacity <= 0; - } + } } TimeSlot getSlot() { http://git-wip-us.apache.org/repos/asf/camel/blob/04f0d8d7/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java index 01e3469..29fdc45 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java @@ -28,7 +28,7 @@ import org.apache.camel.processor.Throttler.TimeSlot; import static org.apache.camel.builder.Builder.constant; /** - * @version + * @version */ public class ThrottlerTest extends ContextTestSupport { private static final int INTERVAL = 500; @@ -47,7 +47,7 @@ public class ThrottlerTest extends ContextTestSupport { // to check that the throttle really does kick in resultEndpoint.assertIsSatisfied(); } - + public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); resultEndpoint.expectedMessageCount(messageCount); @@ -85,7 +85,7 @@ public class ThrottlerTest extends ContextTestSupport { assertSame(slot, throttler.nextSlot()); assertTrue(slot.isFull()); assertTrue(slot.isActive()); - + TimeSlot next = throttler.nextSlot(); // now we should have a new slot that starts somewhere in the future assertNotSame(slot, next); @@ -159,6 +159,46 @@ public class ThrottlerTest extends ContextTestSupport { executor.shutdownNow(); } + public void testConfigurationWithChangingHeaderExpression() throws Exception { + ExecutorService executor = Executors.newFixedThreadPool(messageCount); + MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); + sendMessagesWithHeaderExpression(executor, resultEndpoint, 1); + + resultEndpoint.reset(); + sendMessagesWithHeaderExpression(executor, resultEndpoint, 10); + + resultEndpoint.reset(); + sendMessagesWithHeaderExpression(executor, resultEndpoint, 1); + + executor.shutdownNow(); + } + + private void sendMessagesWithHeaderExpression(ExecutorService executor, MockEndpoint resultEndpoint, final int + throttle) throws InterruptedException { + resultEndpoint.expectedMessageCount(messageCount); + + long start = System.currentTimeMillis(); + for (int i = 0; i < messageCount; i++) { + executor.execute(new Runnable() { + public void run() { + template.sendBodyAndHeader("direct:expressionHeader", "payload", "throttleValue", + throttle); + } + }); + } + + // let's wait for the exchanges to arrive + resultEndpoint.assertIsSatisfied(); + + // now assert that they have actually been throttled + long minimumTime = (messageCount - 1) * INTERVAL / throttle; + // add a little slack + long delta = System.currentTimeMillis() - start + 200; + assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime); + long maxTime = (messageCount - 1) * INTERVAL / throttle * 3; + assertTrue("Should take at most " + maxTime + "ms, was: " + delta, delta <= maxTime); + } + protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() {