Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB74ADAAF for ; Fri, 17 May 2013 07:13:28 +0000 (UTC) Received: (qmail 92398 invoked by uid 500); 17 May 2013 07:13:28 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 92291 invoked by uid 500); 17 May 2013 07:13:27 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 92255 invoked by uid 99); 17 May 2013 07:13:26 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 May 2013 07:13:26 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1485322C8C; Fri, 17 May 2013 07:13:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Message-Id: <456ee6c44b014a57917e67395f065526@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CAMEL-6372: Added support for maxMessagesPerPoll on camel-krati. Date: Fri, 17 May 2013 07:13:26 +0000 (UTC) Updated Branches: refs/heads/master 037a5e654 -> 8abd3bb53 CAMEL-6372: Added support for maxMessagesPerPoll on camel-krati. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8abd3bb5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8abd3bb5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8abd3bb5 Branch: refs/heads/master Commit: 8abd3bb53253a696699cddacda0798c22a700351 Parents: 037a5e6 Author: Claus Ibsen Authored: Fri May 17 09:13:15 2013 +0200 Committer: Claus Ibsen Committed: Fri May 17 09:13:15 2013 +0200 ---------------------------------------------------------------------- .../camel/component/krati/KratiConsumer.java | 12 ++- .../camel/component/krati/KratiEndpoint.java | 13 +++- .../krati/KratiConsumerMaxMessagesPerPollTest.java | 65 +++++++++++++++ .../camel/component/krati/KratiConsumerTest.java | 9 +- .../camel/component/krati/KratiEndpointTest.java | 2 - .../component/krati/KratiProducerSpringTest.java | 28 +++---- .../camel/component/krati/KratiProducerTest.java | 27 +++---- 7 files changed, 115 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java index 4fea54c..24694c1 100644 --- a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java +++ b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java @@ -30,7 +30,6 @@ import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * The Krati consumer. */ @@ -51,18 +50,27 @@ public class KratiConsumer extends ScheduledBatchPollingConsumer { protected int poll() throws Exception { shutdownRunningTask = null; pendingExchanges = 0; + int max = getMaxMessagesPerPoll() > 0 ? getMaxMessagesPerPoll() : Integer.MAX_VALUE; Queue queue = new LinkedList(); Iterator keyIterator = dataStore.keyIterator(); - while (keyIterator.hasNext()) { + int index = 0; + while (keyIterator.hasNext() && index < max) { Object key = keyIterator.next(); Object value = dataStore.get(key); Exchange exchange = endpoint.createExchange(); exchange.setProperty(KratiConstants.KEY, key); exchange.getIn().setBody(value); queue.add(exchange); + index++; + } + + // did we cap at max? + if (index == max && keyIterator.hasNext()) { + log.debug("Limiting to maximum messages to poll {} as there was more messages in this poll.", max); } + return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue)); } http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java index b61f83c..435086b 100644 --- a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java +++ b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java @@ -20,6 +20,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; + import krati.core.segment.ChannelSegmentFactory; import krati.core.segment.SegmentFactory; import krati.io.Serializer; @@ -30,12 +31,12 @@ import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.krati.serializer.KratiDefaultSerializer; -import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.ScheduledPollEndpoint; /** * Represents a Krati endpoint. */ -public class KratiEndpoint extends DefaultEndpoint { +public class KratiEndpoint extends ScheduledPollEndpoint { protected static Map dataStoreRegistry = new HashMap(); @@ -53,6 +54,7 @@ public class KratiEndpoint extends DefaultEndpoint { protected HashFunction hashFunction = new FnvHashFunction(); protected String path; + protected int maxMessagesPerPoll; public KratiEndpoint(String uri, KratiComponent component) throws URISyntaxException { super(uri, component); @@ -92,6 +94,7 @@ public class KratiEndpoint extends DefaultEndpoint { dataStoreRegistry.put(path, new KratiDataStoreRegistration(dataStore)); } KratiConsumer answer = new KratiConsumer(this, processor, dataStore); + answer.setMaxMessagesPerPoll(getMaxMessagesPerPoll()); configureConsumer(answer); return answer; } @@ -179,5 +182,11 @@ public class KratiEndpoint extends DefaultEndpoint { return path; } + public int getMaxMessagesPerPoll() { + return maxMessagesPerPoll; + } + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java ---------------------------------------------------------------------- diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java new file mode 100644 index 0000000..3baf1af --- /dev/null +++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.krati; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class KratiConsumerMaxMessagesPerPollTest extends CamelTestSupport { + + @Test + public void testPutAndConsume() throws InterruptedException { + MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class); + // batch-1 + endpoint.message(0).property(Exchange.BATCH_SIZE).isEqualTo(2); + endpoint.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0); + endpoint.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false); + endpoint.message(1).property(Exchange.BATCH_SIZE).isEqualTo(2); + endpoint.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1); + endpoint.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(true); + + // batch-2 + endpoint.message(2).property(Exchange.BATCH_SIZE).isEqualTo(1); + endpoint.message(2).property(Exchange.BATCH_INDEX).isEqualTo(0); + endpoint.message(2).property(Exchange.BATCH_COMPLETE).isEqualTo(true); + + endpoint.expectedMessageCount(3); + + template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1"); + template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2"); + template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3"); + + endpoint.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from("direct:put") + .to("krati:target/test/consumertest"); + + from("krati:target/test/consumertest?maxMessagesPerPoll=2") + .to("mock:results"); + + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java index 4be1388..011ae90 100644 --- a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java +++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java @@ -14,10 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.krati; -import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit4.CamelTestSupport; @@ -27,12 +25,13 @@ public class KratiConsumerTest extends CamelTestSupport { @Test public void testPutAndConsume() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class); + endpoint.expectedMessageCount(3); + template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1"); template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2"); template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3"); - MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class); - endpoint.expectedMessageCount(3); + endpoint.assertIsSatisfied(); } http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java index 6f4e320..94bd86c 100644 --- a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java +++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.krati; import org.junit.Test; @@ -32,7 +31,6 @@ public class KratiEndpointTest { endpoint.start(); endpoint.stop(); assertEquals("target/test/endpointtest", endpoint.getPath()); - } @Test http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java ---------------------------------------------------------------------- diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java index 9971bc7..a18e7ec 100644 --- a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java +++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java @@ -14,10 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.krati; -import org.apache.camel.ProducerTemplate; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.spring.CamelSpringTestSupport; import org.junit.Test; @@ -27,26 +25,26 @@ import org.springframework.context.support.ClassPathXmlApplicationContext; public class KratiProducerSpringTest extends CamelSpringTestSupport { @Test - public void testPut() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + public void testPut() throws Exception { + MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class); + endpoint.expectedMessageCount(3); + template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1"); template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2"); template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3"); - MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class); - endpoint.expectedMessageCount(3); + endpoint.assertIsSatisfied(); } - @Test - public void testPutAndGet() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + public void testPutAndGet() throws Exception { + MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class); + endpoint.expectedMessageCount(3); + template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1"); template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2"); template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3"); - MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class); - endpoint.expectedMessageCount(3); endpoint.assertIsSatisfied(); Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, "3"); @@ -54,23 +52,23 @@ public class KratiProducerSpringTest extends CamelSpringTestSupport { } @Test - public void testPutDeleteAndGet() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + public void testPutDeleteAndGet() throws Exception { template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1"); template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2"); template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3"); template.requestBodyAndHeader("direct:delete", null, KratiConstants.KEY, "3"); + Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, "3"); assertEquals(null, result); } @Test - public void testPutDeleteAllAndGet() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + public void testPutDeleteAllAndGet() throws Exception { template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1"); template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2"); template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3"); template.requestBodyAndHeader("direct:deleteall", null, KratiConstants.KEY, "3"); + Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, "1"); assertEquals(null, result); result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, "2"); http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java index 2e499f0..ce6e05e 100644 --- a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java +++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java @@ -14,12 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.krati; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit4.CamelTestSupport; @@ -28,30 +26,29 @@ import org.junit.Test; public class KratiProducerTest extends CamelTestSupport { @Test - public void testPut() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + public void testPut() throws Exception { + MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class); + endpoint.expectedMessageCount(3); + template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"), KratiConstants.KEY, new KeyObject("1")); template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"), KratiConstants.KEY, new KeyObject("2")); template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"), KratiConstants.KEY, new KeyObject("3")); - MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class); - endpoint.expectedMessageCount(3); + endpoint.assertIsSatisfied(); } - @Test - public void testPutAndGet() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + public void testPutAndGet() throws Exception { template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"), KratiConstants.KEY, new KeyObject("1")); template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"), KratiConstants.KEY, new KeyObject("2")); template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"), KratiConstants.KEY, new KeyObject("3")); + Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, new KeyObject("3")); assertEquals(new ValueObject("TEST3"), result); } @Test - public void testPutAndGetPreserveHeaders() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + public void testPutAndGetPreserveHeaders() throws Exception { template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"), KratiConstants.KEY, new KeyObject("1")); template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"), KratiConstants.KEY, new KeyObject("2")); template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"), KratiConstants.KEY, new KeyObject("3")); @@ -69,23 +66,23 @@ public class KratiProducerTest extends CamelTestSupport { } @Test - public void testPutDeleteAndGet() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + public void testPutDeleteAndGet() throws Exception { template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1"); template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2"); template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "4"); template.requestBodyAndHeader("direct:delete", null, KratiConstants.KEY, "4"); + Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, "4"); assertEquals(null, result); } @Test - public void testPutDeleteAllAndGet() throws InterruptedException { - ProducerTemplate template = context.createProducerTemplate(); + public void testPutDeleteAllAndGet() throws Exception { template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1"); template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2"); template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3"); template.requestBodyAndHeader("direct:deleteall", null, KratiConstants.KEY, "3"); + Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, "1"); assertEquals(null, result); result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, "2");