Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5C362C662 for ; Tue, 25 Jun 2013 07:15:42 +0000 (UTC) Received: (qmail 73504 invoked by uid 500); 25 Jun 2013 07:15:42 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 73373 invoked by uid 500); 25 Jun 2013 07:15:41 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 73272 invoked by uid 99); 25 Jun 2013 07:15:40 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jun 2013 07:15:40 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 85D7C82FC21; Tue, 25 Jun 2013 07:15:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Tue, 25 Jun 2013 07:15:41 -0000 Message-Id: <0d6a5cd1d8b04840a864b42cefab6c52@git.apache.org> In-Reply-To: <32da6305f3194f4883571f4d14908e43@git.apache.org> References: <32da6305f3194f4883571f4d14908e43@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] git commit: CAMEL-6465: Added greedy option to scheduled poll consumer. Thanks to John Liptak for the patch. CAMEL-6465: Added greedy option to scheduled poll consumer. Thanks to John Liptak for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b33dee5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b33dee5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b33dee5 Branch: refs/heads/camel-2.11.x Commit: 1b33dee5ce11ec116076d9c2cbb06025c47c7f4b Parents: a49076e Author: Claus Ibsen Authored: Tue Jun 25 09:12:25 2013 +0200 Committer: Claus Ibsen Committed: Tue Jun 25 09:13:19 2013 +0200 ---------------------------------------------------------------------- .../camel/impl/ScheduledPollConsumer.java | 18 ++++ .../camel/impl/ScheduledPollEndpoint.java | 6 +- .../impl/Mock321ScheduledPollConsumer.java | 41 +++++++++ .../impl/ScheduledPollConsumerGreedyTest.java | 92 ++++++++++++++++++++ 4 files changed, 156 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1b33dee5/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java index 972f6d6..bcccea0 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java @@ -54,6 +54,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy(); private LoggingLevel runLoggingLevel = LoggingLevel.TRACE; private boolean sendEmptyMessageWhenIdle; + private boolean greedy; private volatile boolean polling; public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { @@ -146,6 +147,12 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R } pollStrategy.commit(this, getEndpoint(), polledMessages); + + if (polledMessages > 0 && isGreedy()) { + done = false; + retryCounter = -1; + LOG.trace("Greedy polling after processing {} messages", polledMessages); + } } else { LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy); } @@ -295,6 +302,17 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer implements R return sendEmptyMessageWhenIdle; } + public boolean isGreedy() { + return greedy; + } + + /** + * If greedy then a poll is executed immediate after a previous poll that polled 1 or more messages. + */ + public void setGreedy(boolean greedy) { + this.greedy = greedy; + } + public ScheduledExecutorService getScheduledExecutorService() { return scheduledExecutorService; } http://git-wip-us.apache.org/repos/asf/camel/blob/1b33dee5/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java index 1d4aa85..b3a0ed6 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java @@ -62,6 +62,7 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { Object pollStrategy = options.remove("pollStrategy"); Object runLoggingLevel = options.remove("runLoggingLevel"); Object sendEmptyMessageWhenIdle = options.remove("sendEmptyMessageWhenIdle"); + Object greedy = options.remove("greedy"); Object scheduledExecutorService = options.remove("scheduledExecutorService"); boolean setConsumerProperties = false; @@ -69,7 +70,7 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null) { setConsumerProperties = true; } - if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || scheduledExecutorService != null) { + if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || greedy != null || scheduledExecutorService != null) { setConsumerProperties = true; } @@ -102,6 +103,9 @@ public abstract class ScheduledPollEndpoint extends DefaultEndpoint { if (sendEmptyMessageWhenIdle != null) { consumerProperties.put("sendEmptyMessageWhenIdle", sendEmptyMessageWhenIdle); } + if (greedy != null) { + consumerProperties.put("greedy", greedy); + } if (scheduledExecutorService != null) { consumerProperties.put("scheduledExecutorService", scheduledExecutorService); } http://git-wip-us.apache.org/repos/asf/camel/blob/1b33dee5/camel-core/src/test/java/org/apache/camel/impl/Mock321ScheduledPollConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/Mock321ScheduledPollConsumer.java b/camel-core/src/test/java/org/apache/camel/impl/Mock321ScheduledPollConsumer.java new file mode 100644 index 0000000..acb0f29 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/Mock321ScheduledPollConsumer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.Processor; + +public class Mock321ScheduledPollConsumer extends MockScheduledPollConsumer { + + private volatile int counter = 4; + + public Mock321ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected int poll() throws Exception { + if (counter > 0) { + counter = counter - 1; + } + return counter; + } + + @Override + public String toString() { + return "Mock321Scheduled"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1b33dee5/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerGreedyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerGreedyTest.java b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerGreedyTest.java new file mode 100644 index 0000000..6a81082 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerGreedyTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.Consumer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.spi.PollingConsumerPollStrategy; + +public class ScheduledPollConsumerGreedyTest extends ContextTestSupport { + + private final AtomicInteger polled = new AtomicInteger(); + + public void test321Greedy() throws Exception { + polled.set(0); + + MockScheduledPollConsumer consumer = new Mock321ScheduledPollConsumer(getMockEndpoint("mock:foo"), null); + consumer.setGreedy(true); + + consumer.setPollStrategy(new PollingConsumerPollStrategy() { + public boolean begin(Consumer consumer, Endpoint endpoint) { + return true; + } + + public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) { + polled.addAndGet(polledMessages); + } + + public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { + return false; + } + }); + + consumer.start(); + consumer.run(); + + assertEquals(6, polled.get()); + + consumer.stop(); + } + + public void test321NotGreedy() throws Exception { + polled.set(0); + + MockScheduledPollConsumer consumer = new Mock321ScheduledPollConsumer(getMockEndpoint("mock:foo"), null); + consumer.setGreedy(false); + + consumer.setPollStrategy(new PollingConsumerPollStrategy() { + public boolean begin(Consumer consumer, Endpoint endpoint) { + return true; + } + + public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) { + polled.addAndGet(polledMessages); + } + + public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { + return false; + } + }); + + consumer.start(); + + consumer.run(); + assertEquals(3, polled.get()); + consumer.run(); + assertEquals(5, polled.get()); + consumer.run(); + assertEquals(6, polled.get()); + consumer.run(); + assertEquals(6, polled.get()); + + consumer.stop(); + } + +}