camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject git commit: CAMEL-6372: Added support for maxMessagesPerPoll on camel-krati.
Date Fri, 17 May 2013 07:13:26 GMT
Updated Branches:
  refs/heads/master 037a5e654 -> 8abd3bb53


CAMEL-6372: Added support for maxMessagesPerPoll on camel-krati.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8abd3bb5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8abd3bb5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8abd3bb5

Branch: refs/heads/master
Commit: 8abd3bb53253a696699cddacda0798c22a700351
Parents: 037a5e6
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Fri May 17 09:13:15 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri May 17 09:13:15 2013 +0200

----------------------------------------------------------------------
 .../camel/component/krati/KratiConsumer.java       |   12 ++-
 .../camel/component/krati/KratiEndpoint.java       |   13 +++-
 .../krati/KratiConsumerMaxMessagesPerPollTest.java |   65 +++++++++++++++
 .../camel/component/krati/KratiConsumerTest.java   |    9 +-
 .../camel/component/krati/KratiEndpointTest.java   |    2 -
 .../component/krati/KratiProducerSpringTest.java   |   28 +++----
 .../camel/component/krati/KratiProducerTest.java   |   27 +++----
 7 files changed, 115 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
index 4fea54c..24694c1 100644
--- a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
+++ b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
@@ -30,7 +30,6 @@ import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * The Krati consumer.
  */
@@ -51,18 +50,27 @@ public class KratiConsumer extends ScheduledBatchPollingConsumer {
     protected int poll() throws Exception {
         shutdownRunningTask = null;
         pendingExchanges = 0;
+        int max = getMaxMessagesPerPoll() > 0 ? getMaxMessagesPerPoll() : Integer.MAX_VALUE;
 
         Queue<Exchange> queue = new LinkedList<Exchange>();
 
         Iterator<Object> keyIterator = dataStore.keyIterator();
-        while (keyIterator.hasNext()) {
+        int index = 0;
+        while (keyIterator.hasNext() && index < max) {
             Object key = keyIterator.next();
             Object value = dataStore.get(key);
             Exchange exchange = endpoint.createExchange();
             exchange.setProperty(KratiConstants.KEY, key);
             exchange.getIn().setBody(value);
             queue.add(exchange);
+            index++;
+        }
+
+        // did we cap at max?
+        if (index == max && keyIterator.hasNext()) {
+            log.debug("Limiting to maximum messages to poll {} as there was more messages
in this poll.", max);
         }
+
         return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue));
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
index b61f83c..435086b 100644
--- a/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
+++ b/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiEndpoint.java
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
+
 import krati.core.segment.ChannelSegmentFactory;
 import krati.core.segment.SegmentFactory;
 import krati.io.Serializer;
@@ -30,12 +31,12 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.krati.serializer.KratiDefaultSerializer;
-import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.ScheduledPollEndpoint;
 
 /**
  * Represents a Krati endpoint.
  */
-public class KratiEndpoint extends DefaultEndpoint {
+public class KratiEndpoint extends ScheduledPollEndpoint {
 
     protected static Map<String, KratiDataStoreRegistration> dataStoreRegistry = new
HashMap<String, KratiDataStoreRegistration>();
 
@@ -53,6 +54,7 @@ public class KratiEndpoint extends DefaultEndpoint {
     protected HashFunction<byte[]> hashFunction = new FnvHashFunction();
 
     protected String path;
+    protected int maxMessagesPerPoll;
 
     public KratiEndpoint(String uri, KratiComponent component) throws URISyntaxException
{
         super(uri, component);
@@ -92,6 +94,7 @@ public class KratiEndpoint extends DefaultEndpoint {
             dataStoreRegistry.put(path, new KratiDataStoreRegistration(dataStore));
         }
         KratiConsumer answer = new KratiConsumer(this, processor, dataStore);
+        answer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         configureConsumer(answer);
         return answer;
     }
@@ -179,5 +182,11 @@ public class KratiEndpoint extends DefaultEndpoint {
         return path;
     }
 
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
 
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java
----------------------------------------------------------------------
diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java
new file mode 100644
index 0000000..3baf1af
--- /dev/null
+++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerMaxMessagesPerPollTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.krati;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class KratiConsumerMaxMessagesPerPollTest extends CamelTestSupport {
+
+    @Test
+    public void testPutAndConsume() throws InterruptedException {
+        MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class);
+        // batch-1
+        endpoint.message(0).property(Exchange.BATCH_SIZE).isEqualTo(2);
+        endpoint.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
+        endpoint.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        endpoint.message(1).property(Exchange.BATCH_SIZE).isEqualTo(2);
+        endpoint.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
+        endpoint.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+
+        // batch-2
+        endpoint.message(2).property(Exchange.BATCH_SIZE).isEqualTo(1);
+        endpoint.message(2).property(Exchange.BATCH_INDEX).isEqualTo(0);
+        endpoint.message(2).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+
+        endpoint.expectedMessageCount(3);
+
+        template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");
+        template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2");
+        template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3");
+
+        endpoint.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:put")
+                        .to("krati:target/test/consumertest");
+
+                from("krati:target/test/consumertest?maxMessagesPerPoll=2")
+                        .to("mock:results");
+
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
index 4be1388..011ae90 100644
--- a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
+++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiConsumerTest.java
@@ -14,10 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.krati;
 
-import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -27,12 +25,13 @@ public class KratiConsumerTest extends CamelTestSupport {
 
     @Test
     public void testPutAndConsume() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+        MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class);
+        endpoint.expectedMessageCount(3);
+
         template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");
         template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2");
         template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3");
-        MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class);
-        endpoint.expectedMessageCount(3);
+
         endpoint.assertIsSatisfied();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
index 6f4e320..94bd86c 100644
--- a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
+++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiEndpointTest.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.krati;
 
 import org.junit.Test;
@@ -32,7 +31,6 @@ public class KratiEndpointTest {
         endpoint.start();
         endpoint.stop();
         assertEquals("target/test/endpointtest", endpoint.getPath());
-
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
----------------------------------------------------------------------
diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
index 9971bc7..a18e7ec 100644
--- a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
+++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerSpringTest.java
@@ -14,10 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.krati;
 
-import org.apache.camel.ProducerTemplate;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.spring.CamelSpringTestSupport;
 import org.junit.Test;
@@ -27,26 +25,26 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
 public class KratiProducerSpringTest extends CamelSpringTestSupport {
 
     @Test
-    public void testPut() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+    public void testPut() throws Exception {
+        MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class);
+        endpoint.expectedMessageCount(3);
+
         template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");
         template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2");
         template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3");
-        MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class);
-        endpoint.expectedMessageCount(3);
+
         endpoint.assertIsSatisfied();
     }
 
-
     @Test
-    public void testPutAndGet() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+    public void testPutAndGet() throws Exception {
+        MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class);
+        endpoint.expectedMessageCount(3);
+
         template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");
         template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2");
         template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3");
 
-        MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class);
-        endpoint.expectedMessageCount(3);
         endpoint.assertIsSatisfied();
 
         Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY,
"3");
@@ -54,23 +52,23 @@ public class KratiProducerSpringTest extends CamelSpringTestSupport {
     }
 
     @Test
-    public void testPutDeleteAndGet() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+    public void testPutDeleteAndGet() throws Exception {
         template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");
         template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2");
         template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3");
         template.requestBodyAndHeader("direct:delete", null, KratiConstants.KEY, "3");
+
         Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY,
"3");
         assertEquals(null, result);
     }
 
     @Test
-    public void testPutDeleteAllAndGet() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+    public void testPutDeleteAllAndGet() throws Exception {
         template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");
         template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2");
         template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3");
         template.requestBodyAndHeader("direct:deleteall", null, KratiConstants.KEY, "3");
+
         Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY,
"1");
         assertEquals(null, result);
         result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, "2");

http://git-wip-us.apache.org/repos/asf/camel/blob/8abd3bb5/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
index 2e499f0..ce6e05e 100644
--- a/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
+++ b/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
@@ -14,12 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.krati;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -28,30 +26,29 @@ import org.junit.Test;
 public class KratiProducerTest extends CamelTestSupport {
 
     @Test
-    public void testPut() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+    public void testPut() throws Exception {
+        MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class);
+        endpoint.expectedMessageCount(3);
+
         template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"), KratiConstants.KEY,
new KeyObject("1"));
         template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"), KratiConstants.KEY,
new KeyObject("2"));
         template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"), KratiConstants.KEY,
new KeyObject("3"));
-        MockEndpoint endpoint = context.getEndpoint("mock:results", MockEndpoint.class);
-        endpoint.expectedMessageCount(3);
+
         endpoint.assertIsSatisfied();
     }
 
-
     @Test
-    public void testPutAndGet() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+    public void testPutAndGet() throws Exception {
         template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"), KratiConstants.KEY,
new KeyObject("1"));
         template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"), KratiConstants.KEY,
new KeyObject("2"));
         template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"), KratiConstants.KEY,
new KeyObject("3"));
+
         Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY,
new KeyObject("3"));
         assertEquals(new ValueObject("TEST3"), result);
     }
 
     @Test
-    public void testPutAndGetPreserveHeaders() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+    public void testPutAndGetPreserveHeaders() throws Exception {
         template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"), KratiConstants.KEY,
new KeyObject("1"));
         template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"), KratiConstants.KEY,
new KeyObject("2"));
         template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"), KratiConstants.KEY,
new KeyObject("3"));
@@ -69,23 +66,23 @@ public class KratiProducerTest extends CamelTestSupport {
     }
 
     @Test
-    public void testPutDeleteAndGet() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+    public void testPutDeleteAndGet() throws Exception {
         template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");
         template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2");
         template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "4");
         template.requestBodyAndHeader("direct:delete", null, KratiConstants.KEY, "4");
+
         Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY,
"4");
         assertEquals(null, result);
     }
 
     @Test
-    public void testPutDeleteAllAndGet() throws InterruptedException {
-        ProducerTemplate template = context.createProducerTemplate();
+    public void testPutDeleteAllAndGet() throws Exception {
         template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");
         template.sendBodyAndHeader("direct:put", "TEST2", KratiConstants.KEY, "2");
         template.sendBodyAndHeader("direct:put", "TEST3", KratiConstants.KEY, "3");
         template.requestBodyAndHeader("direct:deleteall", null, KratiConstants.KEY, "3");
+
         Object result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY,
"1");
         assertEquals(null, result);
         result = template.requestBodyAndHeader("direct:get", null, KratiConstants.KEY, "2");


Mime
View raw message