camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [4/7] CAMEL-5828 Added camel-disruptor component with thanks to Riccardo
Date Wed, 22 May 2013 10:48:42 GMT
http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java
new file mode 100644
index 0000000..f2d3908
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutChainedWithOnCompletionTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorInOutChainedWithOnCompletionTest extends CamelTestSupport {
+    @Test
+    public void testInOutDisruptorChainedWithCustomOnCompletion() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("start");
+        getMockEndpoint("mock:b").expectedBodiesReceived("start-a");
+        // the onCustomCompletion should be send very last (as it will be handed over)
+        getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b", "onCustomCompletion");
+
+        final String reply = template.requestBody("disruptor:a", "start", String.class);
+        assertEquals("start-a-b-c", reply);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:a").process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        // should come in last
+                        exchange.addOnCompletion(new SynchronizationAdapter() {
+                            @Override
+                            public void onDone(final Exchange exchange) {
+                                template.sendBody("mock:c", "onCustomCompletion");
+                            }
+                        });
+                    }
+                }).to("mock:a").transform(simple("${body}-a")).to("disruptor:b");
+
+                from("disruptor:b").to("mock:b").transform(simple("${body}-b")).to("disruptor:c");
+
+                from("disruptor:c").to("mock:c").transform(simple("${body}-c"));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutTest.java
new file mode 100644
index 0000000..080eef4
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorInOutTest extends CamelTestSupport {
+    @Test
+    public void testInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        final String out = template.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor:foo");
+
+                from("disruptor:foo").transform(constant("Bye World")).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutWithErrorDeadLetterChannelTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutWithErrorDeadLetterChannelTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutWithErrorDeadLetterChannelTest.java
new file mode 100644
index 0000000..089726a
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutWithErrorDeadLetterChannelTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorInOutWithErrorDeadLetterChannelTest extends CamelTestSupport {
+    @Test
+    public void testInOutWithErrorUsingDLC() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+        template.requestBody("direct:start", "Hello World", String.class);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).redeliveryDelay(0));
+
+                from("direct:start").to("disruptor:foo");
+
+                from("disruptor:foo").transform(constant("Bye World"))
+                        .throwException(new IllegalArgumentException("Damn I cannot do this"))
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutWithErrorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutWithErrorTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutWithErrorTest.java
new file mode 100644
index 0000000..ecb72d5
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorInOutWithErrorTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorInOutWithErrorTest extends CamelTestSupport {
+    @Test
+    public void testInOutWithError() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        try {
+            template.requestBody("direct:start", "Hello World", String.class);
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Damn I cannot do this", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor:foo");
+
+                from("disruptor:foo").transform(constant("Bye World"))
+                        .throwException(new IllegalArgumentException("Damn I cannot do this"))
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorMultipleConsumersTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorMultipleConsumersTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorMultipleConsumersTest.java
new file mode 100644
index 0000000..cbf1bae
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorMultipleConsumersTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorMultipleConsumersTest extends CamelTestSupport {
+    @Test
+    public void testDisruptorMultipleConsumers() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+        getMockEndpoint("mock:b").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+
+        template.sendBody("disruptor:foo", "Hello World");
+        template.sendBody("disruptor:bar", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testDisruptorMultipleConsumersNewAdded() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+        getMockEndpoint("mock:b").expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
+        getMockEndpoint("mock:c").expectedMessageCount(0);
+
+        template.sendBody("disruptor:foo", "Hello World");
+        template.sendBody("disruptor:bar", "Bye World");
+
+        assertMockEndpointsSatisfied();
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:foo?multipleConsumers=true").id("testRoute").to("mock:c");
+
+            }
+
+        });
+        resetMocks();
+
+        getMockEndpoint("mock:a").expectedMessageCount(20);
+        getMockEndpoint("mock:b").expectedMessageCount(20);
+        getMockEndpoint("mock:c").expectedMessageCount(20);
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("disruptor:foo", "Hello World");
+            template.sendBody("disruptor:bar", "Bye World");
+        }
+        assertMockEndpointsSatisfied();
+        resetMocks();
+
+        context.suspendRoute("testRoute");
+        getMockEndpoint("mock:a").expectedMessageCount(20);
+        getMockEndpoint("mock:b").expectedMessageCount(20);
+        getMockEndpoint("mock:c").expectedMessageCount(0);
+
+        for (int i = 0; i < 10; i++) {
+            template.sendBody("disruptor:foo", "Hello World");
+            template.sendBody("disruptor:bar", "Bye World");
+        }
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:foo?multipleConsumers=true").to("mock:a");
+
+                from("disruptor:foo?multipleConsumers=true").to("mock:b");
+
+                from("disruptor:bar").to("disruptor:foo?multipleConsumers=true");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorNoConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorNoConsumerTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorNoConsumerTest.java
new file mode 100644
index 0000000..66fa8cc
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorNoConsumerTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorNoConsumerTest extends CamelTestSupport {
+    @Test
+    public void testInOnly() throws Exception {
+        // no problem for in only as we do not expect a reply
+        template.sendBody("direct:start", "Hello World");
+    }
+
+    @Test
+    public void testInOut() throws Exception {
+        try {
+            template.requestBody("direct:start", "Hello World");
+            fail("Should throw an exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor:foo?timeout=1000");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducer.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducer.java
new file mode 100644
index 0000000..d1e1c88
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorReconfigureWithBlockingProducer.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorReconfigureWithBlockingProducer extends CamelTestSupport {
+
+    @Test
+    public void testDisruptorReconfigureWithBlockingProducer() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(20);
+        getMockEndpoint("mock:b").expectedMinimumMessageCount(10);
+
+        long beforeStart = System.currentTimeMillis();
+        ProducerThread producerThread = new ProducerThread();
+        producerThread.start();
+
+        //synchronize with the producer to the point that the buffer is full
+        assertTrue(producerThread.awaitFullBufferProduced());
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:foo?multipleConsumers=true&size=8").id("testRoute").to("mock:b");
+            }
+        });
+
+        // adding the consumer may take place after the current buffer is flushed
+        // which will take approximately 8*200=1600 ms because of delay on route.
+        // If the reconfigure does not correctly hold back the producer thread on this request,
+        // it will take approximately 20*200=4000 ms.
+        // be on the safe side and check that it was at least faster than 2 seconds.
+        assertTrue("Reconfigure of Disruptor blocked", (System.currentTimeMillis() - beforeStart) < 2000);
+
+        //Wait and check that the producer has produced all messages without throwing an exception
+        assertTrue(producerThread.checkResult());
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:foo?multipleConsumers=true&size=8").delay(200).to("mock:a");
+            }
+        };
+    }
+
+    private class ProducerThread extends Thread {
+        private final CountDownLatch startedLatch = new CountDownLatch(1);
+        private final CountDownLatch resultLatch = new CountDownLatch(1);
+        private Exception exception;
+
+        @Override
+        public void run() {
+            for (int i = 0; i < 8; i++) {
+                template.sendBody("disruptor:foo", "Message");
+            }
+
+            startedLatch.countDown();
+
+            try {
+                for (int i = 0; i < 12; i++) {
+                    template.sendBody("disruptor:foo", "Message");
+                }
+            } catch (Exception e) {
+                exception = e;
+            }
+
+            resultLatch.countDown();
+        }
+
+        public boolean awaitFullBufferProduced() throws InterruptedException {
+            return startedLatch.await(5, TimeUnit.SECONDS);
+        }
+
+        public boolean checkResult() throws Exception {
+            if (exception != null) {
+                throw exception;
+            }
+            boolean result = resultLatch.await(5, TimeUnit.SECONDS);
+            if (exception != null) {
+                throw exception;
+            }
+
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRemoveRouteThenAddAgainTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRemoveRouteThenAddAgainTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRemoveRouteThenAddAgainTest.java
new file mode 100644
index 0000000..fc04262
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRemoveRouteThenAddAgainTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorRemoveRouteThenAddAgainTest extends CamelTestSupport {
+    @Test
+    public void testRemoveRouteAndThenAddAgain() throws Exception {
+        final MockEndpoint out = getMockEndpoint("mock:out");
+        out.expectedMessageCount(1);
+        out.expectedBodiesReceived("before removing the route");
+
+        template.sendBody("disruptor:in", "before removing the route");
+
+        out.assertIsSatisfied();
+
+        out.reset();
+
+        // now stop & remove the route
+        context.stopRoute("disruptorToMock");
+        context.removeRoute("disruptorToMock");
+
+        // and then add it back again
+        context.addRoutes(createRouteBuilder());
+
+        out.expectedMessageCount(1);
+        out.expectedBodiesReceived("after removing the route");
+
+        template.sendBody("disruptor:in", "after removing the route");
+
+        out.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:in").routeId("disruptorToMock").to("mock:out");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRingBufferTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRingBufferTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRingBufferTest.java
new file mode 100644
index 0000000..83d333b
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRingBufferTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorRingBufferTest extends CamelTestSupport {
+    @Test
+    public void testQueue() throws Exception {
+        final MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World", "Goodday World", "Bar");
+
+        // Following 3 calls should all reference same Disruptor ring buffer.
+        template.sendBody("disruptor:foo", "Hello World");
+        template.sendBody("disruptor:foo?size=1024", "Bye World");
+        template.sendBody("disruptor:foo?concurrentConsumers=5", "Goodday World");
+
+        template.sendBody("disruptor:bar", "Bar");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:foo?concurrentConsumers=2").to("mock:result");
+
+                from("disruptor:bar").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRouteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRouteTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRouteTest.java
new file mode 100644
index 0000000..607a7e8
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorRouteTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.junit4.TestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorRouteTest extends TestSupport {
+    @Test
+    public void testDisruptorQueue() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final CamelContext context = new DefaultCamelContext();
+
+        // lets add some routes
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("disruptor:test.a").to("disruptor:test.b");
+                from("disruptor:test.b").process(new Processor() {
+                    @Override
+                    public void process(final Exchange e) {
+                        log.debug("Received exchange: " + e.getIn());
+                        latch.countDown();
+                    }
+                });
+            }
+        });
+
+        context.start();
+
+        // now lets fire in a message
+        final Endpoint endpoint = context.getEndpoint("disruptor:test.a");
+        final Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setHeader("cheese", 123);
+
+        final Producer producer = endpoint.createProducer();
+        producer.process(exchange);
+
+        // now lets sleep for a while
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+        context.stop();
+    }
+
+    @Test
+    public void testThatShowsEndpointResolutionIsNotConsistent() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final CamelContext context = new DefaultCamelContext();
+
+        // lets add some routes
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("disruptor:test.a").to("disruptor:test.b");
+                from("disruptor:test.b").process(new Processor() {
+                    @Override
+                    public void process(final Exchange e) {
+                        log.debug("Received exchange: " + e.getIn());
+                        latch.countDown();
+                    }
+                });
+            }
+        });
+
+        context.start();
+
+        // now lets fire in a message
+        final Endpoint endpoint = context.getEndpoint("disruptor:test.a");
+        final Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setHeader("cheese", 123);
+
+        final Producer producer = endpoint.createProducer();
+        producer.process(exchange);
+
+        // now lets sleep for a while
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+        context.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorShouldNotUseSameThreadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorShouldNotUseSameThreadTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorShouldNotUseSameThreadTest.java
new file mode 100644
index 0000000..addcd6c
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorShouldNotUseSameThreadTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Unit test to verify continuing using NOT same thread on the consumer side.
+ */
+public class DisruptorShouldNotUseSameThreadTest extends CamelTestSupport {
+
+    private static long id;
+
+    @Test
+    public void testNotUseSameThread() throws Exception {
+        final MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                final ThreadLocal<String> local = new ThreadLocal<String>();
+
+                from("direct:start").process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        local.set("Hello");
+                        id = Thread.currentThread().getId();
+                    }
+                }).to("disruptor:foo");
+
+                from("disruptor:foo").process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        assertEquals(null, local.get());
+                        assertNotSame("Thread ids should not be same", id, Thread.currentThread().getId());
+                    }
+                }).to("mock:result");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorTimeoutDisabledTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorTimeoutDisabledTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorTimeoutDisabledTest.java
new file mode 100644
index 0000000..6b58845
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorTimeoutDisabledTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorTimeoutDisabledTest extends CamelTestSupport {
+    @Test
+    public void testDisruptorNoTimeout() throws Exception {
+        final Future<String> out = template
+                .asyncRequestBody("disruptor:foo?timeout=0", "World", String.class);
+        // use 5 sec failsafe in case something hangs
+        assertEquals("Bye World", out.get(5, TimeUnit.SECONDS));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:foo").to("mock:before").delay(500).transform(body().prepend("Bye "))
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorTimeoutTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorTimeoutTest.java
new file mode 100644
index 0000000..93eeef6
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorTimeoutTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorTimeoutTest extends CamelTestSupport {
+    private int timeout = 100;
+
+    @Test
+    public void testDisruptorNoTimeout() throws Exception {
+        final MockEndpoint result = getMockEndpoint("mock:result");
+        result.setExpectedMessageCount(1);
+        final Future<String> out = template.asyncRequestBody("disruptor:foo", "World", String.class);
+        assertEquals("Bye World", out.get());
+        result.await(1, TimeUnit.SECONDS);
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testDisruptorTimeout() throws Exception {
+        final MockEndpoint result = getMockEndpoint("mock:result");
+        result.setExpectedMessageCount(0);
+
+        final Future<String> out = template
+                .asyncRequestBody("disruptor:foo?timeout=" + timeout, "World", String.class);
+        try {
+            out.get();
+            fail("Should have thrown an exception");
+        } catch (ExecutionException e) {
+            assertIsInstanceOf(CamelExecutionException.class, e.getCause());
+            assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause().getCause());
+
+            final DisruptorEndpoint de = (DisruptorEndpoint)context.getRoute("disruptor").getEndpoint();
+            assertNotNull("Consumer endpoint cannot be null", de);
+            //we can't remove the exchange from a Disruptor once it is published, but it should never reach the
+            //mock:result endpoint because it should be filtered out by the DisruptorConsumer
+            result.await(1, TimeUnit.SECONDS);
+            assertMockEndpointsSatisfied();
+        }
+    }
+
+    @Test
+    public void testDisruptorTimeoutWithStoppedRoute() throws Exception {
+        context.stopRoute("disruptor");
+        timeout = 500;
+        testDisruptorTimeout();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:foo").routeId("disruptor").to("mock:before").delay(250)
+                        .transform(body().prepend("Bye ")).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java
new file mode 100644
index 0000000..90fdf00
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Unit test to verify unit of work with disruptor. That the UnitOfWork is able to route using disruptor but keeping the
+ * same UoW.
+ */
+public class DisruptorUnitOfWorkTest extends CamelTestSupport {
+
+    private static volatile String sync;
+
+    private static volatile String lastOne;
+
+    @Test
+    public void testDisruptorUOW() throws Exception {
+        final NotifyBuilder notify = new NotifyBuilder(context).from("disruptor:foo").whenDone(1).create();
+
+        final MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        notify.create();
+        
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+        // need to sleep a while to wait for the calling of onComplete
+        Thread.sleep(200);
+        
+
+        assertEquals("onCompleteA", sync);
+        assertEquals("onCompleteA", lastOne);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setTracing(true);
+
+                from("direct:start").process(new MyUOWProcessor("A")).to("disruptor:foo");
+
+                from("disruptor:foo").process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        assertEquals(null, sync);
+                    }
+                }).process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        lastOne = "processor";
+                    }
+                }).to("mock:result");
+            }
+        };
+    }
+
+    private static final class MyUOWProcessor implements Processor {
+
+        private final String id;
+
+        private MyUOWProcessor(final String id) {
+            this.id = id;
+        }
+
+        @Override
+        public void process(final Exchange exchange) throws Exception {
+            exchange.getUnitOfWork().addSynchronization(new Synchronization() {
+                @Override
+                public void onComplete(final Exchange exchange) {
+                    sync = "onComplete" + id;
+                    lastOne = sync;
+                }
+
+                @Override
+                public void onFailure(final Exchange exchange) {
+                    sync = "onFailure" + id;
+                    lastOne = sync;
+                }
+            });
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitClaimStrategyComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitClaimStrategyComponentTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitClaimStrategyComponentTest.java
new file mode 100644
index 0000000..1042e9a
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitClaimStrategyComponentTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests the WaitStrategy and ClaimStrategy configuration of the disruptor component
+ */
+@RunWith(value = Parameterized.class)
+public class DisruptorWaitClaimStrategyComponentTest extends CamelTestSupport {
+    private static final Integer VALUE = Integer.valueOf(42);
+    
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce
+    protected ProducerTemplate template;
+
+
+    private final String producerType;
+    private final String waitStrategy;
+    private String disruptorUri;
+
+    public DisruptorWaitClaimStrategyComponentTest(final String waitStrategy, final String producerType) {
+
+        this.waitStrategy = waitStrategy;
+        this.producerType = producerType;
+    }
+
+    @Parameters
+    public static Collection<String[]> strategies() {
+        final List<String[]> strategies = new ArrayList<String[]>();
+
+        for (final DisruptorWaitStrategy waitStrategy : DisruptorWaitStrategy.values()) {
+            for (final DisruptorProducerType producerType : DisruptorProducerType.values()) {
+                strategies.add(new String[] {waitStrategy.name(), producerType.name()});
+            }
+        }
+
+        return strategies;
+    }
+
+
+    @Test
+    public void testProduce() throws InterruptedException {
+        resultEndpoint.expectedBodiesReceived(VALUE);
+        resultEndpoint.setExpectedMessageCount(1);
+
+        template.asyncSendBody(disruptorUri, VALUE);
+
+        resultEndpoint.await(5, TimeUnit.SECONDS);
+        resultEndpoint.assertIsSatisfied();
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+
+        disruptorUri = "disruptor:test?waitStrategy=" + waitStrategy + "&producerType=" + producerType;
+
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(disruptorUri).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskAsPropertyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskAsPropertyTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskAsPropertyTest.java
new file mode 100644
index 0000000..b8fae6b
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskAsPropertyTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.WaitForTaskToComplete;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorWaitForTaskAsPropertyTest extends CamelTestSupport {
+    @Test
+    public void testInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        final Exchange out = template.send("direct:start", new Processor() {
+            @Override
+            public void process(final Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOut);
+                exchange.setProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.IfReplyExpected);
+            }
+        });
+        assertEquals("Bye World", out.getOut().getBody());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testInOnly() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        final Exchange out = template.send("direct:start", new Processor() {
+            @Override
+            public void process(final Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOnly);
+                exchange.setProperty(Exchange.ASYNC_WAIT, WaitForTaskToComplete.IfReplyExpected);
+            }
+        });
+        // we do not expecy a reply and thus do no wait so we just get our own input back
+        assertEquals("Hello World", out.getIn().getBody());
+        assertEquals(null, out.getOut().getBody());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor:foo");
+
+                from("disruptor:foo").transform(constant("Bye World")).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java
new file mode 100644
index 0000000..80199de
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteOnCompletionTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class DisruptorWaitForTaskCompleteOnCompletionTest extends CamelTestSupport {
+
+    private static String done = "";
+
+    @Test
+    public void testAlways() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Forced", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // 3 + 1 C and A should be last
+        assertEquals("CCCCA", done);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(0));
+
+                from("direct:start").process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        exchange.addOnCompletion(new SynchronizationAdapter() {
+                            @Override
+                            public void onDone(final Exchange exchange) {
+                                done += "A";
+                            }
+                        });
+                    }
+                }).to("disruptor:foo?waitForTaskToComplete=Always").process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        done += "B";
+                    }
+                }).to("mock:result");
+
+                from("disruptor:foo").errorHandler(noErrorHandler()).process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        done = done + "C";
+                    }
+                }).throwException(new IllegalArgumentException("Forced"));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteTest.java
new file mode 100644
index 0000000..83481ea
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskCompleteTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorWaitForTaskCompleteTest extends CamelTestSupport {
+    @Test
+    public void testInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        final String out = template.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testInOnly() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        // we send an in only but we use Always to wait for it to complete
+        // and since the route changes the payload we can get the response anyway
+        final Exchange out = template.send("direct:start", new Processor() {
+            @Override
+            public void process(final Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOnly);
+            }
+        });
+        assertEquals("Bye World", out.getIn().getBody());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor:foo?waitForTaskToComplete=Always");
+
+                from("disruptor:foo?waitForTaskToComplete=Always").transform(constant("Bye World"))
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskIfReplyExpectedTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskIfReplyExpectedTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskIfReplyExpectedTest.java
new file mode 100644
index 0000000..a66de10
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskIfReplyExpectedTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+
+public class DisruptorWaitForTaskIfReplyExpectedTest extends CamelTestSupport {
+
+    @Test
+    public void testInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        final String out = template.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", out);
+        assertMockEndpointsSatisfied();
+
+    }
+
+    @Test
+    public void testInOnly() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        final Exchange out = template.send("direct:start", new Processor() {
+            @Override
+            public void process(final Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOnly);
+            }
+        });
+
+        // we do not expecy a reply and thus do no wait so we just get our own
+        // input back
+        assertEquals("Hello World", out.getIn().getBody());
+        assertEquals(null, out.getOut().getBody());
+        assertMockEndpointsSatisfied();
+
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor:foo?waitForTaskToComplete=IfReplyExpected");
+                from("disruptor:foo").transform(constant("Bye World")).to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java
new file mode 100644
index 0000000..78fe8c1
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverOnCompletionTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorWaitForTaskNeverOnCompletionTest extends CamelTestSupport {
+
+    private static String done = "";
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    @Test
+    public void testNever() throws Exception {
+        getMockEndpoint("mock:dead").expectedMessageCount(0);
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+        // B should be first because we do not wait
+        assertEquals("BCA", done);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(3).redeliveryDelay(0));
+
+                from("direct:start").process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        exchange.addOnCompletion(new SynchronizationAdapter() {
+                            @Override
+                            public void onDone(final Exchange exchange) {
+                                done = done + "A";
+                                latch.countDown();
+                            }
+                        });
+                    }
+                }).to("disruptor:foo?waitForTaskToComplete=Never").process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        done = done + "B";
+                    }
+                }).to("mock:result");
+
+                from("disruptor:foo").errorHandler(noErrorHandler()).delay(1000).process(new Processor() {
+                    @Override
+                    public void process(final Exchange exchange) throws Exception {
+                        done = done + "C";
+                    }
+                }).throwException(new IllegalArgumentException("Forced"));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverTest.java
new file mode 100644
index 0000000..4019066
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitForTaskNeverTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorWaitForTaskNeverTest extends CamelTestSupport {
+    @Test
+    public void testInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        final String out = template.requestBody("direct:start", "Hello World", String.class);
+        // we do not wait for the response so we just get our own input back
+        assertEquals("Hello World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testInOnly() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        final Exchange out = template.send("direct:start", new Processor() {
+            @Override
+            public void process(final Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOnly);
+            }
+        });
+        // we do not wait for the response so we just get our own input back
+        assertEquals("Hello World", out.getIn().getBody());
+        assertEquals(null, out.getOut().getBody());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor:foo?waitForTaskToComplete=Never");
+
+                from("disruptor:foo?waitForTaskToComplete=Never").transform(constant("Bye World"))
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitStrategyCreationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitStrategyCreationTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitStrategyCreationTest.java
new file mode 100644
index 0000000..7f29dae
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorWaitStrategyCreationTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import com.lmax.disruptor.WaitStrategy;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests correct creation of all supposedly possible wait strategies.
+ */
+public class DisruptorWaitStrategyCreationTest {
+    @Test
+    public void testCreateWaitStrategyInstance() throws Exception {
+        for (final DisruptorWaitStrategy strategy : DisruptorWaitStrategy.values()) {
+            final WaitStrategy waitStrategyInstance = strategy.createWaitStrategyInstance();
+
+            assertNotNull(waitStrategyInstance);
+            assertTrue(waitStrategyInstance instanceof WaitStrategy);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/FileDisruptorShutdownCompleteAllTasksTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/FileDisruptorShutdownCompleteAllTasksTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/FileDisruptorShutdownCompleteAllTasksTest.java
new file mode 100644
index 0000000..7cbf088
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/FileDisruptorShutdownCompleteAllTasksTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class FileDisruptorShutdownCompleteAllTasksTest extends CamelTestSupport {
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/disruptor");
+        super.setUp();
+    }
+
+    @Test
+    public void testShutdownCompleteAllTasks() throws Exception {
+        final String url = "file:target/disruptor";
+        template.sendBodyAndHeader(url, "A", Exchange.FILE_NAME, "a.txt");
+        template.sendBodyAndHeader(url, "B", Exchange.FILE_NAME, "b.txt");
+        template.sendBodyAndHeader(url, "C", Exchange.FILE_NAME, "c.txt");
+        template.sendBodyAndHeader(url, "D", Exchange.FILE_NAME, "d.txt");
+        template.sendBodyAndHeader(url, "E", Exchange.FILE_NAME, "e.txt");
+
+        // give it 20 seconds to shutdown
+        context.getShutdownStrategy().setTimeout(20 * 100000);
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(url).routeId("route1")
+                        // let it complete all tasks during shutdown
+                        .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks).to("log:delay").delay(1000)
+                        .to("disruptor:foo?size=8");
+
+                from("disruptor:foo?size=8").routeId("route2").to("log:bar").to("mock:bar");
+            }
+        });
+        context.start();
+
+        final MockEndpoint bar = getMockEndpoint("mock:bar");
+        bar.expectedMinimumMessageCount(1);
+
+        assertMockEndpointsSatisfied();
+
+        // shutdown during processing
+        context.stop();
+
+        // should route all 5
+        assertEquals("Should complete all messages", 5, bar.getReceivedCounter());
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/MulticastDisruptorComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/MulticastDisruptorComponentTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/MulticastDisruptorComponentTest.java
new file mode 100644
index 0000000..c264244
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/MulticastDisruptorComponentTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Tests that multicast functionality works correctly
+ */
+public class MulticastDisruptorComponentTest extends CamelTestSupport {
+    private static final String MULTIPLE_CONSUMERS_ENDPOINT_URI = "disruptor:test?multipleConsumers=true";
+    private static final Integer VALUE = Integer.valueOf(42);
+
+    @EndpointInject(uri = "mock:result1")
+    protected MockEndpoint resultEndpoint1;
+
+    @EndpointInject(uri = "mock:result2")
+    protected MockEndpoint resultEndpoint2;
+
+    @Produce(uri = "disruptor:test")
+    protected ProducerTemplate template;
+
+//    private ThreadCounter threadCounter = new ThreadCounter();
+
+    @Test
+    public void testMulticastProduce() throws InterruptedException {
+        resultEndpoint1.expectedBodiesReceived(VALUE);
+        resultEndpoint1.setExpectedMessageCount(1);
+
+        resultEndpoint2.expectedBodiesReceived(VALUE);
+        resultEndpoint2.setExpectedMessageCount(1);
+
+        template.asyncSendBody(MULTIPLE_CONSUMERS_ENDPOINT_URI, VALUE);
+
+        resultEndpoint1.await(5, TimeUnit.SECONDS);
+        resultEndpoint1.assertIsSatisfied(1);
+        resultEndpoint2.await(5, TimeUnit.SECONDS);
+        resultEndpoint2.assertIsSatisfied(1);
+    }
+
+//
+//    @Test
+//    public void testAsynchronous() throws InterruptedException {
+//        threadCounter.reset();
+//
+//        int messagesSent = 1000;
+//
+//        resultEndpoint.setExpectedMessageCount(messagesSent);
+//
+//        long currentThreadId = Thread.currentThread().getId();
+//
+//        for (int i = 0; i < messagesSent; ++i) {
+//            template.asyncSendBody("disruptor:testAsynchronous", VALUE);
+//        }
+//
+//        resultEndpoint.await(20, TimeUnit.SECONDS);
+//        resultEndpoint.assertIsSatisfied();
+//
+//        Assert.assertTrue(threadCounter.getThreadIdCount() > 0);
+//        Assert.assertFalse(threadCounter.getThreadIds().contains(currentThreadId));
+//    }
+//
+//    @Test
+//    public void testMultipleConsumers() throws InterruptedException {
+//        threadCounter.reset();
+//
+//        int messagesSent = 1000;
+//
+//        resultEndpoint.setExpectedMessageCount(messagesSent);
+//
+//        for (int i = 0; i < messagesSent; ++i) {
+//            template.asyncSendBody("disruptor:testMultipleConsumers?concurrentConsumers=4", VALUE);
+//        }
+//
+//        resultEndpoint.await(20, TimeUnit.SECONDS);
+//
+//        //sleep for another second to check for duplicate messages in transit
+//        Thread.sleep(1000);
+//
+//        System.out.println("count = " + resultEndpoint.getReceivedCounter());
+//        resultEndpoint.assertIsSatisfied();
+//
+//        Assert.assertEquals(4, threadCounter.getThreadIdCount());
+//    }
+//
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor:test?multipleConsumers=true").to("mock:result1")
+                        .setShutdownRoute(ShutdownRoute.Defer);
+                from("disruptor:test?multipleConsumers=true").to("mock:result2")
+                        .setShutdownRoute(ShutdownRoute.Defer);
+//                from("disruptor:testAsynchronous").process(threadCounter).to("mock:result");
+//                from("disruptor:testMultipleConsumers?concurrentConsumers=4").process(threadCounter).to("mock:result");
+            }
+        };
+    }
+
+//    private static final class ThreadCounter implements Processor {
+//
+//        private Set<Long> threadIds = new HashSet<Long>();
+//
+//        public void reset() {
+//            threadIds.clear();
+//        }
+//
+//        @Override
+//        public void process(Exchange exchange) throws Exception {
+//            threadIds.add(Thread.currentThread().getId());
+//        }
+//
+//        public Set<Long> getThreadIds() {
+//            return Collections.unmodifiableSet(threadIds);
+//        }
+//
+//        public int getThreadIdCount() {
+//            return threadIds.size();
+//        }
+//    }
+}


Mime
View raw message