camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1055799 [2/3] - in /camel/trunk: apache-camel/ apache-camel/src/main/descriptors/ components/ components/camel-jdbc-aggregator/ components/camel-jdbc-aggregator/src/ components/camel-jdbc-aggregator/src/main/ components/camel-jdbc-aggregat...
Date Thu, 06 Jan 2011 09:24:15 GMT
Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateNotLostTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateNotLostTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateNotLostTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateNotLostTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregateNotLostTest extends CamelTestSupport {
+
+    private JdbcAggregationRepository repo;
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+        repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+        super.setUp();
+    }
+
+    @Test
+    public void testJdbcAggregateNotLost() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("ABCDE");
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+        Thread.sleep(1000);
+
+        String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
+
+        // the exchange should be in the completed repo where we should be able to find it
+        Exchange completed = repo.recover(context, exchangeId);
+        // assert the exchange was not lost and we got all the information still
+        assertNotNull(completed);
+        // should retain the exchange id
+        assertEquals(exchangeId, completed.getExchangeId());
+        assertEquals("ABCDE", completed.getIn().getBody());
+        assertEquals(123, completed.getIn().getHeader("id"));
+        assertEquals("size", completed.getProperty(Exchange.AGGREGATED_COMPLETED_BY));
+        assertEquals(5, completed.getProperty(Exchange.AGGREGATED_SIZE));
+        // will store correlation keys as String
+        assertEquals("123", completed.getProperty(Exchange.AGGREGATED_CORRELATION_KEY));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                                // throw an exception to fail, which we then will loose this message
+                        .throwException(new IllegalArgumentException("Damn"))
+                        .to("mock:result")
+                        .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateNotLostTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateNotLostTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelFailedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelFailedTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelFailedTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelFailedTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregateRecoverDeadLetterChannelFailedTest extends CamelTestSupport {
+
+    private JdbcAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+        repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+        // enable recovery
+        repo.setUseRecovery(true);
+        // exhaust after at most 2 attempts
+        repo.setMaximumRedeliveries(3);
+        // and move to this dead letter channel
+        repo.setDeadLetterUri("direct:dead");
+        // check faster
+        repo.setRecoveryInterval(1000, TimeUnit.MILLISECONDS);
+
+        super.setUp();
+    }
+
+    @Test
+    public void testJdbcAggregateRecoverDeadLetterChannelFailed() throws Exception {
+        // should fail all times
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+        // it should keep sending to DLC if it failed, so test for min 3 attempts
+        getMockEndpoint("mock:dead").expectedMinimumMessageCount(3);
+        // all the details should be the same about redelivered and redelivered 3 times
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:dead").message(1).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:dead").message(2).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+        getMockEndpoint("mock:dead").message(2).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        .throwException(new IllegalArgumentException("Damn"))
+                        .to("mock:result")
+                        .end();
+
+                from("direct:dead")
+                        .to("mock:dead")
+                        .throwException(new IllegalArgumentException("We are dead"));
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelFailedTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelFailedTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregateRecoverDeadLetterChannelTest extends CamelTestSupport {
+
+    private JdbcAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+        repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+        // enable recovery
+        repo.setUseRecovery(true);
+        // exhaust after at most 3 attempts
+        repo.setMaximumRedeliveries(3);
+        // and move to this dead letter channel
+        repo.setDeadLetterUri("mock:dead");
+        // check faster
+        repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+
+        super.setUp();
+    }
+
+    @Test
+    public void testJdbcAggregateRecoverDeadLetterChannel() throws Exception {
+        // should fail all times
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+        getMockEndpoint("mock:dead").expectedBodiesReceived("ABCDE");
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        getMockEndpoint("mock:dead").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        .throwException(new IllegalArgumentException("Damn"))
+                        .to("mock:result")
+                        .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverDeadLetterChannelTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregateRecoverTest extends CamelTestSupport {
+
+    private static AtomicInteger counter = new AtomicInteger(0);
+    private JdbcAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+        repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+        // enable recovery
+        repo.setUseRecovery(true);
+        // check faster
+        repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+        super.setUp();
+    }
+
+    @Test
+    public void testJdbcAggregateRecover() throws Exception {
+        // should fail the first 2 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        .delay(1000)
+                                // simulate errors the first two times
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                int count = counter.incrementAndGet();
+                                if (count <= 2) {
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+                        })
+                        .to("mock:result")
+                        .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithRedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithRedeliveryPolicyTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithRedeliveryPolicyTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithRedeliveryPolicyTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregateRecoverWithRedeliveryPolicyTest extends CamelTestSupport {
+
+    private static AtomicInteger counter = new AtomicInteger(0);
+    private JdbcAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+        repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+        // enable recovery
+        repo.setUseRecovery(true);
+        // check faster
+        repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+
+        super.setUp();
+    }
+
+    @Test
+    public void testJdbcAggregateRecover() throws Exception {
+        getMockEndpoint("mock:aggregated").setResultWaitTime(20000);
+        getMockEndpoint("mock:result").setResultWaitTime(20000);
+
+        // should fail the first 3 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        // this is the output from the aggregator
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                                // simulate errors the first three times
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                int count = counter.incrementAndGet();
+                                if (count <= 3) {
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+                        })
+                        .to("mock:result")
+                        .end();
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithSedaTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithSedaTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithSedaTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithSedaTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregateRecoverWithSedaTest extends CamelTestSupport {
+
+    private static AtomicInteger counter = new AtomicInteger(0);
+    private JdbcAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+        repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+        // enable recovery
+        repo.setUseRecovery(true);
+        // check faster
+        repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+
+        super.setUp();
+    }
+
+    @Test
+    public void testJdbcAggregateRecoverWithSeda() throws Exception {
+        // should fail the first 2 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionSize(5).aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with ${body}")
+                        .to("mock:aggregated")
+                        .to("seda:foo")
+                        .end();
+
+                // should be able to recover when we send over SEDA as its a OnCompletion
+                // which confirms the exchange when its complete.
+                from("seda:foo")
+                        .delay(1000)
+                                // simulate errors the first two times
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                int count = counter.incrementAndGet();
+                                if (count <= 2) {
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+                        })
+                        .to("mock:result");
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithSedaTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateRecoverWithSedaTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregateTest extends CamelTestSupport {
+
+    JdbcAggregationRepository repo;
+
+    @Override
+    public void setUp() throws Exception {
+        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+        repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+
+        super.setUp();
+    }
+
+    @Test
+    public void testJdbcAggregate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABCDE");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+
+        // from endpoint should be preserved
+        assertEquals("direct://start", mock.getReceivedExchanges().get(0).getFromEndpoint().getEndpointUri());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // here is the Camel route where we aggregate
+                from("direct:start")
+                        .aggregate(header("id"), new MyAggregationStrategy())
+                                // use our created jdbc repo as aggregation repository
+                        .completionSize(5).aggregationRepository(repo)
+                        .to("mock:aggregated");
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregateTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryAlotDataTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryAlotDataTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryAlotDataTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryAlotDataTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregationRepositoryAlotDataTest extends CamelTestSupport {
+
+    ApplicationContext applicationContext;
+
+    @Override
+    public void setUp() throws Exception {
+        applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+
+        super.setUp();
+    }
+
+    @Test
+    public void testWithAlotOfDataSameKey() {
+        JdbcAggregationRepository repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+
+        for (int i = 0; i < 100; i++) {
+            Exchange exchange1 = new DefaultExchange(context);
+            exchange1.getIn().setBody("counter:" + i);
+            repo.add(context, "foo", exchange1);
+        }
+
+        // Get it back..
+        Exchange actual = repo.get(context, "foo");
+        assertEquals("counter:99", actual.getIn().getBody());
+    }
+
+    @Test
+    public void testWithAlotOfDataTwoKeys() {
+        JdbcAggregationRepository repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+
+
+        for (int i = 0; i < 100; i++) {
+            Exchange exchange1 = new DefaultExchange(context);
+            exchange1.getIn().setBody("counter:" + i);
+            String key = i % 2 == 0 ? "foo" : "bar";
+            repo.add(context, key, exchange1);
+        }
+
+        // Get it back..
+        Exchange actual = repo.get(context, "foo");
+        assertEquals("counter:98", actual.getIn().getBody());
+
+        actual = repo.get(context, "bar");
+        assertEquals("counter:99", actual.getIn().getBody());
+    }
+
+    @Test
+    public void testWithAlotOfDataWithDifferentKeys() {
+        JdbcAggregationRepository repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+
+
+        for (int i = 0; i < 100; i++) {
+            Exchange exchange1 = new DefaultExchange(context);
+            exchange1.getIn().setBody("counter:" + i);
+            String key = "key" + i;
+            repo.add(context, key, exchange1);
+        }
+
+        // Get it back..
+        for (int i = 0; i < 100; i++) {
+            Exchange actual = repo.get(context, "key" + i);
+            assertEquals("counter:" + i, actual.getIn().getBody());
+        }
+    }
+}

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryAlotDataTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryAlotDataTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryMultipleRepoTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryMultipleRepoTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryMultipleRepoTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryMultipleRepoTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregationRepositoryMultipleRepoTest extends CamelTestSupport {
+
+    ApplicationContext applicationContext;
+
+    @Override
+    public void setUp() throws Exception {
+        applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+
+        super.setUp();
+    }
+
+    @Test
+    public void testMultipeRepo() {
+        JdbcAggregationRepository repo1 = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+        repo1.setReturnOldExchange(true);
+
+        JdbcAggregationRepository repo2 = applicationContext.getBean("repo2", JdbcAggregationRepository.class);
+        repo2.setReturnOldExchange(true);
+
+        // Can't get something we have not put in...
+        Exchange actual = repo1.get(context, "missing");
+        assertEquals(null, actual);
+
+        actual = repo2.get(context, "missing");
+        assertEquals(null, actual);
+
+        // Store it..
+        Exchange exchange1 = new DefaultExchange(context);
+        exchange1.getIn().setBody("counter:1");
+        actual = repo1.add(context, "foo", exchange1);
+        assertEquals(null, actual);
+
+        // Get it back..
+        actual = repo1.get(context, "foo");
+        assertEquals("counter:1", actual.getIn().getBody());
+        assertEquals(null, repo2.get(context, "foo"));
+
+        // Change it..
+        Exchange exchange2 = new DefaultExchange(context);
+        exchange2.getIn().setBody("counter:2");
+        actual = repo1.add(context, "foo", exchange2);
+        // the old one
+        assertEquals("counter:1", actual.getIn().getBody());
+
+        // add to repo2
+        Exchange exchange3 = new DefaultExchange(context);
+        exchange3.getIn().setBody("Hello World");
+        actual = repo2.add(context, "bar", exchange3);
+        assertEquals(null, actual);
+        assertEquals(null, repo1.get(context, "bar"));
+
+        // Get it back..
+        actual = repo1.get(context, "foo");
+        assertEquals("counter:2", actual.getIn().getBody());
+        assertEquals(null, repo2.get(context, "foo"));
+
+        actual = repo2.get(context, "bar");
+        assertEquals("Hello World", actual.getIn().getBody());
+        assertEquals(null, repo1.get(context, "bar"));
+    }
+
+    @Test
+    public void testMultipeRepoSameKeyDifferentContent() {
+        JdbcAggregationRepository repo1 = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+
+        JdbcAggregationRepository repo2 = applicationContext.getBean("repo2", JdbcAggregationRepository.class);
+
+        Exchange exchange1 = new DefaultExchange(context);
+        exchange1.getIn().setBody("Hello World");
+        repo1.add(context, "foo", exchange1);
+
+        Exchange exchange2 = new DefaultExchange(context);
+        exchange2.getIn().setBody("Bye World");
+        repo2.add(context, "foo", exchange2);
+
+        Exchange actual = repo1.get(context, "foo");
+        assertEquals("Hello World", actual.getIn().getBody());
+        actual = repo2.get(context, "foo");
+        assertEquals("Bye World", actual.getIn().getBody());
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryMultipleRepoTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryMultipleRepoTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryRecoverExistingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryRecoverExistingTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryRecoverExistingTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryRecoverExistingTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregationRepositoryRecoverExistingTest extends CamelTestSupport {
+
+    ApplicationContext applicationContext;
+
+    @Override
+    public void setUp() throws Exception {
+        applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+
+        super.setUp();
+    }
+
+    @Test
+    public void testExisting() throws Exception {
+        JdbcAggregationRepository repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+        repo.setReturnOldExchange(true);
+        repo.setUseRecovery(true);
+        repo.start();
+
+        // Store it..
+        Exchange exchange1 = new DefaultExchange(context);
+        exchange1.getIn().setBody("counter:1");
+        Exchange actual = repo.add(context, "foo", exchange1);
+        assertEquals(null, actual);
+
+        // Remove it, which makes it in the pre confirm stage
+        repo.remove(context, "foo", exchange1);
+
+        String id = exchange1.getExchangeId();
+
+        // stop the repo
+        repo.stop();
+
+        Thread.sleep(1000);
+
+        // load the repo again
+        repo.start();
+
+        // Get it back..
+        actual = repo.get(context, "foo");
+        assertNull(actual);
+
+        // Recover it
+        actual = repo.recover(context, id);
+        assertNotNull(actual);
+        assertEquals("counter:1", actual.getIn().getBody());
+
+        repo.stop();
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryRecoverExistingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryRecoverExistingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcAggregationRepositoryTest extends CamelTestSupport {
+
+    ApplicationContext applicationContext;
+
+    @Override
+    public void setUp() throws Exception {
+        applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+
+        super.setUp();
+    }
+
+    @Test
+    public void testOperations() {
+        JdbcAggregationRepository repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+        repo.setReturnOldExchange(true);
+
+        // Can't get something we have not put in...
+        Exchange actual = repo.get(context, "missing");
+        assertEquals(null, actual);
+
+        // Store it..
+        Exchange exchange1 = new DefaultExchange(context);
+        exchange1.getIn().setBody("counter:1");
+        actual = repo.add(context, "foo", exchange1);
+        assertEquals(null, actual);
+
+        // Get it back..
+        actual = repo.get(context, "foo");
+        assertEquals("counter:1", actual.getIn().getBody());
+
+        // Change it..
+        Exchange exchange2 = new DefaultExchange(context);
+        exchange2.getIn().setBody("counter:2");
+        actual = repo.add(context, "foo", exchange2);
+        // the old one
+        assertEquals("counter:1", actual.getIn().getBody());
+
+        // Get it back..
+        actual = repo.get(context, "foo");
+        assertEquals("counter:2", actual.getIn().getBody());
+
+        // now remove it
+        repo.remove(context, "foo", actual);
+        actual = repo.get(context, "foo");
+        assertEquals(null, actual);
+
+        // add it again
+        exchange1 = new DefaultExchange(context);
+        exchange1.getIn().setBody("counter:3");
+        actual = repo.add(context, "foo", exchange1);
+        assertEquals(null, actual);
+
+        // Get it back..
+        actual = repo.get(context, "foo");
+        assertEquals("counter:3", actual.getIn().getBody());
+    }
+
+}

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcAggregationRepositoryTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcExchangeSerializationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcExchangeSerializationTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcExchangeSerializationTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcExchangeSerializationTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.Date;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcExchangeSerializationTest extends CamelTestSupport {
+
+    ApplicationContext applicationContext;
+
+    @Override
+    public void setUp() throws Exception {
+        applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+
+        super.setUp();
+    }
+
+    @Test
+    public void testExchangeSerialization() {
+        JdbcAggregationRepository repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody("Hello World");
+        exchange.getIn().setHeader("name", "Olivier");
+        exchange.getIn().setHeader("number", 123);
+        exchange.setProperty("quote", "Camel rocks");
+
+        Date now = new Date();
+        exchange.getIn().setHeader("date", now);
+
+        repo.add(context, "foo", exchange);
+
+        Exchange actual = repo.get(context, "foo");
+        assertEquals("Hello World", actual.getIn().getBody());
+        assertEquals("Olivier", actual.getIn().getHeader("name"));
+        assertEquals(123, actual.getIn().getHeader("number"));
+        Date date = actual.getIn().getHeader("date", Date.class);
+        assertNotNull(date);
+        assertEquals(now.getTime(), date.getTime());
+        // we do not serialize properties to avoid storing all kind of not needed information
+        assertNull(actual.getProperty("quote"));
+        assertSame(context, actual.getContext());
+
+        // change something
+        exchange.getIn().setBody("Bye World");
+        exchange.getIn().setHeader("name", "Thomas");
+        exchange.getIn().removeHeader("date");
+
+        repo.add(context, "foo", exchange);
+
+        actual = repo.get(context, "foo");
+        assertEquals("Bye World", actual.getIn().getBody());
+        assertEquals("Thomas", actual.getIn().getHeader("name"));
+        assertEquals(123, actual.getIn().getHeader("number"));
+        date = actual.getIn().getHeader("date", Date.class);
+        assertNull(date);
+        assertSame(context, actual.getContext());
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcExchangeSerializationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcExchangeSerializationTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGetNotFoundTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGetNotFoundTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGetNotFoundTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGetNotFoundTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcGetNotFoundTest extends CamelTestSupport {
+
+    ApplicationContext applicationContext;
+
+    @Override
+    public void setUp() throws Exception {
+        applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+
+        super.setUp();
+    }
+
+    @Test
+    public void testGetNotFound() {
+        JdbcAggregationRepository repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody("Hello World");
+
+        Exchange out = repo.get(context, exchange.getExchangeId());
+        assertNull("Should not find exchange", out);
+    }
+
+    @Test
+    public void testPutAndGetNotFound() {
+        JdbcAggregationRepository repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody("Hello World");
+        log.info("Created " + exchange.getExchangeId());
+
+        repo.add(context, exchange.getExchangeId(), exchange);
+        Exchange out = repo.get(context, exchange.getExchangeId());
+        assertNotNull("Should find exchange", out);
+
+        Exchange exchange2 = new DefaultExchange(context);
+        exchange2.getIn().setBody("Bye World");
+        log.info("Created " + exchange2.getExchangeId());
+
+        Exchange out2 = repo.get(context, exchange2.getExchangeId());
+        assertNull("Should not find exchange", out2);
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGetNotFoundTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGetNotFoundTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGrowIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGrowIssueTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGrowIssueTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGrowIssueTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcGrowIssueTest extends CamelTestSupport {
+
+    private static final int SIZE = 1024;
+    private JdbcAggregationRepository repo;
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringDataSource.xml");
+        repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class);
+
+        super.setUp();
+    }
+
+    @Test
+    public void testGrowIssue() throws Exception {
+        // a 1kb string for testing
+        StringBuilder sb = new StringBuilder(SIZE);
+        for (int i = 0; i < SIZE; i++) {
+            sb.append("X");
+        }
+        Exchange item = new DefaultExchange(context);
+        item.getIn().setBody(sb.toString(), String.class);
+
+        // the key
+        final String key = "foo";
+
+        // we update using the same key, which means we should be able to do this within the file size limit
+        for (int i = 0; i < SIZE; i++) {
+            log.debug("Updating " + i);
+            repo.add(context, key, item);
+        }
+
+        // get the last
+        Exchange data = repo.get(context, key);
+        log.info(data);
+
+        assertTrue("Should start with 'XXX'", data.getIn().getBody(String.class).startsWith("XXX"));
+        int length = data.getIn().getBody(String.class).length();
+        assertEquals("Length should be 1024, was " + length, 1024, length);
+    }
+
+}

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGrowIssueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcGrowIssueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateRecoverWithRedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateRecoverWithRedeliveryPolicyTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateRecoverWithRedeliveryPolicyTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateRecoverWithRedeliveryPolicyTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcSpringAggregateRecoverWithRedeliveryPolicyTest extends CamelSpringTestSupport {
+
+    private static AtomicInteger counter = new AtomicInteger(0);
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateRecoverWithRedeliveryPolicyTest.xml");
+    }
+
+    @Test
+    public void testJdbcAggregateRecover() throws Exception {
+        getMockEndpoint("mock:aggregated").setResultWaitTime(20000);
+        getMockEndpoint("mock:result").setResultWaitTime(20000);
+
+        // should fail the first 3 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    public static class MyFailProcessor implements Processor {
+
+        public void process(Exchange exchange) throws Exception {
+            int count = counter.incrementAndGet();
+            if (count <= 3) {
+                throw new IllegalArgumentException("Damn");
+            }
+        }
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateTest.java?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateTest.java (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateTest.java Thu Jan  6 09:24:11 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc.aggregationrepository;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class JdbcSpringAggregateTest extends CamelSpringTestSupport {
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateTest.xml");
+    }
+
+    @Test
+    public void testJdbcAggregate() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregated");
+        mock.expectedBodiesReceived("ABCDE");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/java/org/apache/camel/component/jdbc/aggregationrepository/JdbcSpringAggregateTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/resources/jndi.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/resources/jndi.properties?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/resources/jndi.properties (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/resources/jndi.properties Thu Jan  6 09:24:11 2011
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+java.naming.factory.initial = org.apache.camel.util.jndi.CamelInitialContextFactory

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/resources/jndi.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/resources/jndi.properties
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/resources/jndi.properties
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: camel/trunk/components/camel-jdbc-aggregator/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc-aggregator/src/test/resources/log4j.properties?rev=1055799&view=auto
==============================================================================
--- camel/trunk/components/camel-jdbc-aggregator/src/test/resources/log4j.properties (added)
+++ camel/trunk/components/camel-jdbc-aggregator/src/test/resources/log4j.properties Thu Jan  6 09:24:11 2011
@@ -0,0 +1,40 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for eclipse testing, We want to see debug output on the console.
+#
+log4j.rootLogger=INFO, file
+
+# uncomment the following to enable camel debugging
+#log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.camel.component.jdbc.aggregationrepository=DEBUG
+log4j.logger.org.apache.camel.impl.converter=WARN
+log4j.logger.org.apache.camel.management=WARN
+log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+#log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+log4j.appender.out.layout.ConversionPattern=%d [%-50.50t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-jdbc-aggregationrepository-test.log
\ No newline at end of file

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-jdbc-aggregator/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message