camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [3/7] CAMEL-5828 Added camel-disruptor component with thanks to Riccardo
Date Wed, 22 May 2013 10:48:41 GMT
http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/Readme.txt
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/Readme.txt b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/Readme.txt
new file mode 100644
index 0000000..4baac3d
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/Readme.txt
@@ -0,0 +1,50 @@
+Seda and Disruptor unit tests comparaison
+-----------------------------------------
+
+The Seda Camel component has a lot of similarities with the Disruptor and it 
+just seems fair to copy the unit tests from Seda and apply them to the Disruptor.  
+Here is a list of all the unit tests found in Camel 2.10.1 and their correspondence:
+
+Original Camel Test class						Status			Status extra information
+-------------------------						------			------------------------ 
+CollectionProducerTest.java						*NOT COPIED*	Did not really apply to disruptor
+DirectRequestReplyAndSedaInOnlyTest.java		*COPIED*		DirectRequestReplyAndDisruptorInOnlyTest.java
+FileSedaShutdownCompleteAllTasksTest.java		*COPIED*		FileDisruptorShutdownCompleteAllTasksTest.java
+SedaAsyncProducerTest.java						*NOT COPIED*	Test case doesn't have any relationhsip with disruptor, only applies to old seda implementation
+SedaAsyncRouteTest.java							*COPIED*		DisruptorAsyncRouteTest.java
+SedaBlockWhenFullTest.java						*COPIED*		DisruptorBlockWhenFullTest.java
+SedaComplexInOutTest.java						*COPIED*		DisruptorComplexInOutTest.java
+SedaComponentReferenceEndpointTest.java			*COPIED*		DisruptorComponentReferenceEndpointTest.java
+SedaConcurrentConsumersNPEIssueTest.java		*COPIED*		DisruptorConcurrentConsumersNPEIssueTest.java
+SedaConcurrentConsumersTest.java				*COPIED*		DisruptorConcurrentConsumersTest.java
+SedaConcurrentTest.java							*COPIED*		DisruptorConcurrentTest.java
+SedaConfigureTest.java							*COPIED*		DisruptorConfigureTest.java
+SedaConsumerSuspendResumeTest.java				*COPIED*		DisruptorConsumerSuspendResumeTest.java
+SedaDefaultUnboundedQueueSizeTest.java			*NOT COPIED*	Disruptor doesn't support an upper limit and waits by default when ring buffer is full
+SedaEndpointTest.java							*NOT COPIED*	Test case did not seem fit for disruptor.  Could be revisited if needed
+SedaFromRouteIdTest.java						*COPIED*		DisruptorFromRouteIdTest.java
+SedaInOnlyChainedTest.java						*COPIED*		DisruptorInOnlyChainedTest.java
+SedaInOnlyTest.java								*COPIED*		DisruptorInOnlyTest.java
+SedaInOutBigChainedTest.java					*COPIED*		DisruptorInOutBigChainedTest.java
+SedaInOutChainedTest.java						*COPIED*		DisruptorInOutChainedTest.java
+SedaInOutChainedTimeoutTest.java				*COPIED*		DisruptorInOutChainedTimeoutTest.java
+SedaInOutChainedWithOnCompletionTest.java		*COPIED*		DisruptorInOutChainedWithOnCompletionTest.java
+SedaInOutTest.java								*COPIED*		DisruptorInOutTest.java
+SedaInOutWithErrorDeadLetterChannelTest.java	*COPIED*		DisruptorInOutWithErrorDeadLetterChannelTest.java
+SedaInOutWithErrorTest.java						*COPIED*		DisruptorInOutWithErrorTest.java
+SedaMultipleConsumersTest.java					*COPIED*		DisruptorMultipleConsumersTest.java
+SedaNoConsumerTest.java							*COPIED*		DisruptorNoConsumerTest.java
+SedaQueueTest.java								*COPIED*		DisruptorRingBufferTest.java
+SedaRemoveRouteThenAddAgainTest.java			*COPIED*		DisruptorRemoveRouteThenAddAgainTest.java			
+SedaRouteTest.java								*COPIED*		DisruptorRouteTest.java								
+SedaShouldNotUseSameThreadTest.java				*COPIED*		DisruptorShouldNotUseSameThreadTest.java
+SedaTimeoutDisabledTest.java					*COPIED*		DisruptorTimeoutDisabledTest.java
+SedaTimeoutTest.java							*COPIED*		DisruptorTimeoutTest.java
+SedaUnitOfWorkTest.java							*COPIED*		DisruptorUnitOfWorkTest.java
+SedaWaitForTaskAsPropertyTest.java				*COPIED*		DisruptorWaitForTaskAsPropertyTest.java
+SedaWaitForTaskCompleteOnCompletionTest.java	*COPIED*		DisruptorWaitForTaskCompleteOnCompletionTest.java
+SedaWaitForTaskCompleteTest.java				*COPIED*		DisruptorWaitForTaskCompleteTest.java
+SedaWaitForTaskIfReplyExpectedTest.java			*COPIED*		DisruptorWaitForTaskIfReplyExpectedTest.java
+SedaWaitForTaskNewerOnCompletionTest.java		*COPIED*		DisruptorWaitForTaskNeverOnCompletionTest.java
+SedaWaitForTaskNewerTest.java					*COPIED*		DisruptorWaitForTaskNeverTest.java
+TracingWithDelayTest.java						*NOT COPIED*	No direct association with disruptor

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/SedaDisruptorCompareTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/SedaDisruptorCompareTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/SedaDisruptorCompareTest.java
new file mode 100644
index 0000000..6be9eb1
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/SedaDisruptorCompareTest.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.lmax.disruptor.collections.Histogram;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.seda.SedaEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * This class does not perform any functional test, but instead makes a comparison between the performance of the
+ * Disruptor and SEDA component in several use cases.
+ * <p/>
+ * As memory management may have great impact on the results, it is adviced to run this test with a large, fixed heap (e.g. run with -Xmx1024m -Xms1024m JVM parameters)
+ */
+@Ignore
+@RunWith(value = Parameterized.class)
+public class SedaDisruptorCompareTest extends CamelTestSupport {
+    // Use '0' for default value, '1'+ for specific value to be used by both SEDA and DISRUPTOR.
+    private static final int SIZE_PARAMETER_VALUE = 1024;
+    private static final int SPEED_TEST_EXCHANGE_COUNT = 80000;
+    private static final long[] LATENCY_HISTOGRAM_BOUNDS = new long[] {1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000};
+    private static final long[] DISRUPTOR_SIZE_HISTOGRAM_BOUNDS = generateLinearHistogramBounds(
+            SIZE_PARAMETER_VALUE == 0 ? 1024 : SIZE_PARAMETER_VALUE, 8);
+    private static final long[] SEDA_SIZE_HISTOGRAM_BOUNDS = generateLinearHistogramBounds(
+            SIZE_PARAMETER_VALUE == 0 ? SPEED_TEST_EXCHANGE_COUNT : SIZE_PARAMETER_VALUE, 10);
+
+    @Produce
+    protected ProducerTemplate producerTemplate;
+
+    private final ExchangeAwaiter[] exchangeAwaiters;
+    private final String componentName;
+    private final String endpointUri;
+    private final int amountProducers;
+    private final long[] sizeHistogramBounds;
+
+    private final Queue<Integer> endpointSizeQueue = new ConcurrentLinkedQueue<Integer>();
+    
+    public SedaDisruptorCompareTest(final String componentName, final String endpointUri,
+                                    final int amountProducers, final int amountConsumers,
+                                    final int concurrentConsumerThreads, final long[] sizeHistogramBounds) {
+        this.componentName = componentName;
+        this.endpointUri = endpointUri;
+        this.amountProducers = amountProducers;
+        this.sizeHistogramBounds = sizeHistogramBounds;
+        exchangeAwaiters = new ExchangeAwaiter[amountConsumers];
+        for (int i = 0; i < amountConsumers; ++i) {
+            exchangeAwaiters[i] = new ExchangeAwaiter(SPEED_TEST_EXCHANGE_COUNT);
+        }
+    }
+
+    @BeforeClass
+    public static void legend() {
+        System.out.println("-----------------------");
+        System.out.println("- Tests output legend -");
+        System.out.println("-----------------------");
+        System.out.println(
+                "P: Number of concurrent Producer(s) sharing the load for publishing exchanges to the disruptor.");
+        System.out.println(
+                "C: Number of Consumer(s) receiving a copy of each exchange from the disruptor (pub/sub).");
+        System.out.println(
+                "CCT: Number of ConcurrentConsumerThreads sharing the load for consuming exchanges from the disruptor.");
+        System.out.println(
+                "SIZE: Maximum number of elements a SEDA or disruptor endpoint can have in memory before blocking the Producer thread(s).");
+        System.out.println("      0 means default value, so unbounded for SEDA and 1024 for disruptor.");
+        System.out.println("Each test is creating " + SPEED_TEST_EXCHANGE_COUNT + " exchanges.");
+        System.out.println();
+    }
+
+    private static long[] generateLinearHistogramBounds(final int maxValue, final int nbSlots) {
+        final long slotSize = maxValue / nbSlots;
+        final long[] bounds = new long[nbSlots];
+        for (int i = 0; i < nbSlots; i++) {
+            bounds[i] = slotSize * (i + 1);
+        }
+        return bounds;
+    }
+
+    
+
+    private static int singleProducer() {
+        return 1;
+    }
+
+    private static int multipleProducers() {
+        return 4;
+    }
+
+    private static int singleConsumer() {
+        return 1;
+    }
+
+    private static int multipleConsumers() {
+        return 4;
+    }
+
+    private static int singleConcurrentConsumerThread() {
+        return 1;
+    }
+
+    private static int multipleConcurrentConsumerThreads() {
+        return 2;
+    }
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<Object[]> parameters() {
+        final List<Object[]> parameters = new ArrayList<Object[]>();
+
+        // This parameter set can be compared to the next and shows the impact of a 'long' endpoint name
+        // It defines all parameters to the same values as the default, so the result should be the same as
+        // 'seda:speedtest'. This shows that disruptor has a slight disadvantage as its name is longer than 'seda' :)
+        // The reason why this test takes so long is because Camel has a SLF4J call in ProducerCache:
+        // LOG.debug(">>>> {} {}", endpoint, exchange);
+        // and the DefaultEndpoint.toString() method will use a Matcher to sanitize the URI.  There should be a guard
+        // before the debug() call to only evaluate the args when required: if(LOG.isDebugEnabled())...
+        if (SIZE_PARAMETER_VALUE == 0) {
+            parameters
+                .add(new Object[] {"SEDA LONG {P=1, C=1, CCT=1, SIZE=0}",
+                    "seda:speedtest?concurrentConsumers=1&waitForTaskToComplete=IfReplyExpected&timeout=30000&multipleConsumers=false&limitConcurrentConsumers=true&blockWhenFull=false",
+                    singleProducer(), singleConsumer(), singleConcurrentConsumerThread(),
+                    SEDA_SIZE_HISTOGRAM_BOUNDS});
+        } else {
+            parameters
+                .add(new Object[] {"SEDA LONG {P=1, C=1, CCT=1, SIZE=" + SIZE_PARAMETER_VALUE + "}",
+                    "seda:speedtest?concurrentConsumers=1&waitForTaskToComplete=IfReplyExpected&timeout=30000&multipleConsumers=false&limitConcurrentConsumers=true&blockWhenFull=true&size="
+                        + SIZE_PARAMETER_VALUE ,
+                    singleProducer(), singleConsumer(),
+                    singleConcurrentConsumerThread(), SEDA_SIZE_HISTOGRAM_BOUNDS});
+        }
+        addParameterPair(parameters, singleProducer(), singleConsumer(), singleConcurrentConsumerThread());
+        addParameterPair(parameters, singleProducer(), singleConsumer(), multipleConcurrentConsumerThreads());
+        addParameterPair(parameters, singleProducer(), multipleConsumers(), singleConcurrentConsumerThread());
+        addParameterPair(parameters, singleProducer(), multipleConsumers(),
+                multipleConcurrentConsumerThreads());
+        addParameterPair(parameters, multipleProducers(), singleConsumer(), singleConcurrentConsumerThread());
+        addParameterPair(parameters, multipleProducers(), singleConsumer(),
+                multipleConcurrentConsumerThreads());
+        addParameterPair(parameters, multipleProducers(), multipleConsumers(),
+                singleConcurrentConsumerThread());
+        addParameterPair(parameters, multipleProducers(), multipleConsumers(),
+                multipleConcurrentConsumerThreads());
+
+        return parameters;
+    }
+
+    private static void addParameterPair(final List<Object[]> parameters, final int producers,
+                                         final int consumers, final int parallelConsumerThreads) {
+        final String multipleConsumerOption = consumers > 1 ? "multipleConsumers=true" : "";
+        final String concurrentConsumerOptions = parallelConsumerThreads > 1 ? "concurrentConsumers=" + parallelConsumerThreads : "";
+        final String sizeOption = SIZE_PARAMETER_VALUE > 0 ? "size=" + SIZE_PARAMETER_VALUE : "";
+        final String sizeOptionSeda = SIZE_PARAMETER_VALUE > 0 ? "&blockWhenFull=true" : "";
+
+        String options = "";
+        if (!multipleConsumerOption.isEmpty()) {
+            if (!options.isEmpty()) {
+                options += "&";
+            }
+            options += multipleConsumerOption;
+        }
+        if (!concurrentConsumerOptions.isEmpty()) {
+            if (!options.isEmpty()) {
+                options += "&";
+            }
+            options += concurrentConsumerOptions;
+        }
+        if (!sizeOption.isEmpty()) {
+            if (!options.isEmpty()) {
+                options += "&";
+            }
+            options += sizeOption;
+        }
+
+        if (!options.isEmpty()) {
+            options = "?" + options;
+        }
+
+        final String sedaOptions = sizeOptionSeda.isEmpty() ? options : options + sizeOptionSeda;
+        // Using { ... } because there is a bug in JUnit 4.11 and Eclipse: https://bugs.eclipse.org/bugs/show_bug.cgi?id=102512
+        final String testDescription = " { P=" + producers + ", C=" + consumers + ", CCT="
+                + parallelConsumerThreads + ", SIZE=" + SIZE_PARAMETER_VALUE + " }";
+        parameters.add(new Object[] {"SEDA" + testDescription, "seda:speedtest" + sedaOptions, producers,
+            consumers, parallelConsumerThreads, SEDA_SIZE_HISTOGRAM_BOUNDS});
+        parameters.add(new Object[] {"Disruptor" + testDescription, "disruptor:speedtest" + options, producers,
+            consumers, parallelConsumerThreads, DISRUPTOR_SIZE_HISTOGRAM_BOUNDS});
+    }
+
+    @Test
+    public void speedTestDisruptor() throws InterruptedException {
+
+        System.out.println("Warming up for test of: " + componentName);
+
+        performTest(true);
+        System.out.println("Starting real test of: " + componentName);
+
+        forceGC();
+        Thread.sleep(1000);
+
+        performTest(false);
+    }
+
+    private void forceGC() {
+        // unfortunately there is no nice API that forces the Garbage collector to run, but it may consider our request
+        // more seriously if we ask it twice :)
+        System.gc();
+        System.gc();
+    }
+
+    private void resetExchangeAwaiters() {
+        for (final ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) {
+            exchangeAwaiter.reset();
+        }
+    }
+
+    private void awaitExchangeAwaiters() throws InterruptedException {
+        for (final ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) {
+            while (!exchangeAwaiter.awaitMessagesReceived(10, TimeUnit.SECONDS)) {
+                System.err.println(
+                        "Processing takes longer then expected: " + componentName + " " + exchangeAwaiter
+                                .getStatus());
+            }
+        }
+    }
+
+    private void outputExchangeAwaitersResult(final long start) throws InterruptedException {
+        for (final ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) {
+            final long stop = exchangeAwaiter.getCountDownReachedTime();
+            final Histogram histogram = exchangeAwaiter.getLatencyHistogram();
+
+            System.out.printf("%-45s time spent = %5d ms. Latency (ms): %s %n", componentName, stop - start, histogram.toString());
+        }
+    }
+
+    private void performTest(final boolean warmup) throws InterruptedException {
+        resetExchangeAwaiters();
+
+        final ProducerThread[] producerThread = new ProducerThread[amountProducers];
+        for (int i = 0; i < producerThread.length; ++i) {
+            producerThread[i] = new ProducerThread(SPEED_TEST_EXCHANGE_COUNT / amountProducers);
+        }
+
+        ExecutorService monitoring = null;
+        if (!warmup) {
+            monitoring = installSizeMonitoring(context.getEndpoint(endpointUri));
+        }
+        final long start = System.currentTimeMillis();
+
+        for (int i = 0; i < producerThread.length; ++i) {
+            producerThread[i].start();
+        }
+
+        awaitExchangeAwaiters();
+
+        if (!warmup) {
+            outputExchangeAwaitersResult(start);
+            uninstallSizeMonitoring(monitoring);
+        }
+    }
+
+    private ExecutorService installSizeMonitoring(final Endpoint endpoint) {
+        final ScheduledExecutorService service = context.getExecutorServiceManager()
+                .newScheduledThreadPool(this, "SizeMonitoringThread", 1);
+        endpointSizeQueue.clear();
+        final Runnable monitoring = new Runnable() {
+            @Override
+            public void run() {
+                if (endpoint instanceof SedaEndpoint) {
+                    final SedaEndpoint sedaEndpoint = (SedaEndpoint)endpoint;
+                    endpointSizeQueue.offer(sedaEndpoint.getCurrentQueueSize());
+                } else if (endpoint instanceof DisruptorEndpoint) {
+                    final DisruptorEndpoint disruptorEndpoint = (DisruptorEndpoint)endpoint;
+
+                    long remainingCapacity = 0;
+                    try {
+                        remainingCapacity = disruptorEndpoint.getRemainingCapacity();
+                    } catch (DisruptorNotStartedException e) {
+                        //ignore
+                    }
+                    endpointSizeQueue.offer((int)(disruptorEndpoint.getBufferSize() - remainingCapacity));
+                }
+            }
+        };
+        service.scheduleAtFixedRate(monitoring, 0, 100, TimeUnit.MILLISECONDS);
+        return service;
+    }
+
+    private void uninstallSizeMonitoring(final ExecutorService monitoring) {
+        if (monitoring != null) {
+            monitoring.shutdownNow();
+        }
+        final Histogram histogram = new Histogram(sizeHistogramBounds);
+        for (final int observation : endpointSizeQueue) {
+            histogram.addObservation(observation);
+        }
+        System.out.printf("%82s %s%n", "Endpoint size (# exchanges pending):", histogram.toString());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                for (final ExchangeAwaiter exchangeAwaiter : exchangeAwaiters) {
+                    from(endpointUri).process(exchangeAwaiter);
+                }
+            }
+        };
+    }
+
+    private static final class ExchangeAwaiter implements Processor {
+
+        private CountDownLatch latch;
+        private final int count;
+        private long countDownReachedTime;
+
+        private Queue<Long> latencyQueue = new ConcurrentLinkedQueue<Long>();
+
+        public ExchangeAwaiter(final int count) {
+            this.count = count;
+        }
+
+        public void reset() {
+            latencyQueue = new ConcurrentLinkedQueue<Long>();
+            latch = new CountDownLatch(count);
+            countDownReachedTime = 0;
+        }
+
+        public boolean awaitMessagesReceived(final long timeout, final TimeUnit unit) throws InterruptedException {
+            return latch.await(timeout, unit);
+        }
+
+        public String getStatus() {
+            final StringBuilder sb = new StringBuilder(100);
+            sb.append("processed ");
+            sb.append(count - latch.getCount());
+            sb.append('/');
+            sb.append(count);
+            sb.append(" messages");
+
+            return sb.toString();
+        }
+
+        @Override
+        public void process(final Exchange exchange) throws Exception {
+            final long sentTimeNs = exchange.getIn().getBody(Long.class);
+            latencyQueue.offer(Long.valueOf(System.nanoTime() - sentTimeNs));
+
+            countDownReachedTime = System.currentTimeMillis();
+            latch.countDown();
+        }
+
+        public long getCountDownReachedTime() {
+            // Make sure we wait until all exchanges have been processed. Otherwise the time value doesn't make sense.
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                countDownReachedTime = 0;
+            }
+            return countDownReachedTime;
+        }
+
+        public Histogram getLatencyHistogram() {
+            final Histogram histogram = new Histogram(LATENCY_HISTOGRAM_BOUNDS);
+            for (final Long latencyValue : latencyQueue) {
+                histogram.addObservation(latencyValue / 1000000);
+            }
+            return histogram;
+        }
+    }
+
+    private final class ProducerThread extends Thread {
+
+        private final int totalMessageCount;
+        private int producedMessageCount;
+
+        public ProducerThread(final int totalMessageCount) {
+            super("TestDataProducerThread");
+            this.totalMessageCount = totalMessageCount;
+        }
+
+        public void run() {
+            final Endpoint endpoint = context().getEndpoint(endpointUri);
+            while (producedMessageCount++ < totalMessageCount) {
+                producerTemplate.sendBody(endpoint, ExchangePattern.InOnly, System.nanoTime());
+            }
+        }
+
+        public String getStatus() {
+            final StringBuilder sb = new StringBuilder(100);
+            sb.append("produced ");
+            sb.append(producedMessageCount - 1);
+            sb.append('/');
+            sb.append(totalMessageCount);
+            sb.append(" messages");
+
+            return sb.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmComponentReferenceEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmComponentReferenceEndpointTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmComponentReferenceEndpointTest.java
new file mode 100644
index 0000000..cd2c7fd
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmComponentReferenceEndpointTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import java.util.Iterator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.disruptor.DisruptorReference;
+
+/**
+ *
+ */
+public class DisruptorVmComponentReferenceEndpointTest extends ContextTestSupport {
+
+    public void testDisruptorVmComponentReference() throws Exception {
+        DisruptorVmComponent vm = context.getComponent("disruptor-vm", DisruptorVmComponent.class);
+
+        String key = DisruptorVmComponent.getDisruptorKey("disruptor-vm://foo");
+        assertEquals(1, vm.getDisruptors().get(key).getEndpointCount());
+        assertEquals(2, numberOfReferences(vm));
+
+        // add a second consumer on the endpoint
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo?blockWhenFull=true").routeId("foo2").to("mock:foo2");
+            }
+        });
+
+        assertEquals(2, vm.getDisruptors().get(key).getEndpointCount());
+        assertEquals(3, numberOfReferences(vm));
+
+        // remove the 1st route
+        context.stopRoute("foo");
+        context.removeRoute("foo");
+
+        assertEquals(1, vm.getDisruptors().get(key).getEndpointCount());
+        assertEquals(2, numberOfReferences(vm));
+
+        // remove the 2nd route
+        context.stopRoute("foo2");
+        context.removeRoute("foo2");
+
+        // and there is no longer queues for the foo key
+        assertNull(vm.getDisruptors().get(key));
+
+        // there should still be a bar
+        assertEquals(1, numberOfReferences(vm));
+        key = DisruptorVmComponent.getDisruptorKey("disruptor-vm://bar");
+        assertEquals(1, vm.getDisruptors().get(key).getEndpointCount());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo").routeId("foo").to("mock:foo");
+
+                from("disruptor-vm:bar").routeId("bar").to("mock:bar");
+            }
+        };
+    }
+
+    private int numberOfReferences(DisruptorVmComponent vm) {
+        int num = 0;
+        Iterator<DisruptorReference> it = vm.getDisruptors().values().iterator();
+        while (it.hasNext()) {
+            num += it.next().getEndpointCount();
+        }
+        return num;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmConcurrentConsumersTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmConcurrentConsumersTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmConcurrentConsumersTest.java
new file mode 100644
index 0000000..ac5ccea
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmConcurrentConsumersTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmConcurrentConsumersTest extends AbstractVmTestSupport {
+
+    public void testSendToSeda() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template2.sendBody("disruptor-vm:foo?concurrentConsumers=5", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo?concurrentConsumers=5").to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmDifferentOptionsOnConsumerAndProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmDifferentOptionsOnConsumerAndProducerTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmDifferentOptionsOnConsumerAndProducerTest.java
new file mode 100644
index 0000000..75db387
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmDifferentOptionsOnConsumerAndProducerTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class DisruptorVmDifferentOptionsOnConsumerAndProducerTest extends AbstractVmTestSupport {
+
+    @Test
+    public void testSendToDisruptorVm() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived("Hello World");
+
+
+        template2.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // check the camel context of the exchange
+        assertEquals("Get a wrong context. ", context, result.getExchanges().get(0).getContext());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo?concurrentConsumers=5")
+                        .to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .to("disruptor-vm:foo");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOnlyChainedTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOnlyChainedTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOnlyChainedTest.java
new file mode 100644
index 0000000..fd4b320
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOnlyChainedTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmInOnlyChainedTest extends AbstractVmTestSupport {
+
+    public void testInOnlyDisruptorVmChained() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("start");
+        resolveMandatoryEndpoint(context2, "mock:b", MockEndpoint.class).expectedBodiesReceived("start-a");
+        getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b");
+
+        template.sendBody("disruptor-vm:a", "start");
+
+        assertMockEndpointsSatisfied();
+        MockEndpoint.assertIsSatisfied(context2);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:a").to("mock:a").setBody(simple("${body}-a")).to("disruptor-vm:b");
+
+                from("disruptor-vm:c").to("mock:c").setBody(simple("${body}-c"));
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:b").to("mock:b").setBody(simple("${body}-b")).to("disruptor-vm:c");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOnlyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOnlyTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOnlyTest.java
new file mode 100644
index 0000000..7e8e230
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOnlyTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmInOnlyTest extends AbstractVmTestSupport {
+
+    public void testInOnly() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        template2.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo").to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor-vm:foo");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutChainedTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutChainedTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutChainedTest.java
new file mode 100644
index 0000000..6535b26
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutChainedTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmInOutChainedTest extends AbstractVmTestSupport {
+
+    public void testInOutDisruptorVmChained() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("start");
+        resolveMandatoryEndpoint(context2, "mock:b", MockEndpoint.class).expectedBodiesReceived("start-a");
+        getMockEndpoint("mock:c").expectedBodiesReceived("start-a-b");
+
+        String reply = template2.requestBody("disruptor-vm:a", "start", String.class);
+        assertEquals("start-a-b-c", reply);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:a").to("mock:a").transform(simple("${body}-a")).to("disruptor-vm:b");
+
+                from("disruptor-vm:c").to("mock:c").transform(simple("${body}-c"));
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:b").to("mock:b").transform(simple("${body}-b")).to("disruptor-vm:c");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutChainedTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutChainedTimeoutTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutChainedTimeoutTest.java
new file mode 100644
index 0000000..aacca83
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutChainedTimeoutTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * @version
+ */
+public class DisruptorVmInOutChainedTimeoutTest extends AbstractVmTestSupport {
+
+    public void testDisruptorVmInOutChainedTimeout() throws Exception {
+        StopWatch watch = new StopWatch();
+
+        try {
+            template2.requestBody("disruptor-vm:a?timeout=1000", "Hello World");
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            // the chained vm caused the timeout
+            ExchangeTimedOutException cause = assertIsInstanceOf(ExchangeTimedOutException.class,
+                    e.getCause());
+            assertEquals(200, cause.getTimeout());
+        }
+
+        long delta = watch.stop();
+
+        assertTrue("Should be faster than 1 sec, was: " + delta, delta < 1100);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:b")
+                        .to("mock:b")
+                        .delay(500)
+                        .transform().constant("Bye World");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(noErrorHandler());
+
+                from("disruptor-vm:a")
+                        .to("mock:a")
+                                // this timeout will trigger an exception to occur
+                        .to("disruptor-vm:b?timeout=200")
+                        .to("mock:a2");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutTest.java
new file mode 100644
index 0000000..4e5aa65
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmInOutTest extends AbstractVmTestSupport {
+
+    public void testInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        String out = template2.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo").transform(constant("Bye World")).to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor-vm:foo");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutWithErrorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutWithErrorTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutWithErrorTest.java
new file mode 100644
index 0000000..1bd0bcd
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmInOutWithErrorTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmInOutWithErrorTest extends AbstractVmTestSupport {
+
+    public void testInOutWithError() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        try {
+            template2.requestBody("direct:start", "Hello World", String.class);
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Damn I cannot do this", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo").transform(constant("Bye World"))
+                        .throwException(new IllegalArgumentException("Damn I cannot do this"))
+                        .to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor-vm:foo");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmMultipleConsumersIssueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmMultipleConsumersIssueTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmMultipleConsumersIssueTest.java
new file mode 100644
index 0000000..2c8b228
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmMultipleConsumersIssueTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class DisruptorVmMultipleConsumersIssueTest extends ContextTestSupport {
+
+    public void testDisruptorVmMultipleConsumersIssue() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:d").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:e").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:done").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:inbox", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:inbox")
+                        .to(ExchangePattern.InOut, "disruptor-vm:foo?timeout=5000")
+                        .to("mock:done");
+
+                from("disruptor-vm:foo?multipleConsumers=true")
+                        .to("log:a")
+                        .to("mock:a");
+
+                from("disruptor-vm:foo?multipleConsumers=true")
+                        .to("log:b")
+                        .to("mock:b");
+
+                from("disruptor-vm:foo?multipleConsumers=true")
+                        .to("log:c")
+                        .to("mock:c");
+
+                from("disruptor-vm:foo?multipleConsumers=true")
+                        .to("log:d")
+                        .to("mock:d");
+
+                from("disruptor-vm:foo?multipleConsumers=true")
+                        .to("log:e")
+                        .to("mock:e");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmMultipleContextsStartStopTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmMultipleContextsStartStopTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmMultipleContextsStartStopTest.java
new file mode 100644
index 0000000..d4a2c16
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmMultipleContextsStartStopTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmMultipleContextsStartStopTest extends AbstractVmTestSupport {
+
+    public void testStartStop() throws Exception {
+        /* Check that contexts are communicated */
+        MockEndpoint mock = context2.getEndpoint("mock:result", MockEndpoint.class);
+        mock.expectedMessageCount(1);
+        template.requestBody("direct:test", "Hello world!");
+        mock.assertIsSatisfied();
+        mock.reset();
+        
+        /* Restart the consumer Camel Context */
+        context2.stop();
+        context2.start();
+        
+        /* Send a message again and assert that it's received */
+        template.requestBody("direct:test", "Hello world!");
+        mock.assertIsSatisfied();
+
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:test").to("disruptor-vm:foo");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo").to("mock:result");
+            }
+        };
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmQueueTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmQueueTest.java
new file mode 100644
index 0000000..d36755c
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmQueueTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmQueueTest extends AbstractVmTestSupport {
+
+    public void testQueue() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World", "Goodday World", "Bar");
+
+        template2.sendBody("disruptor-vm:foo", "Hello World");
+        template2.sendBody("disruptor-vm:foo?size=20", "Bye World");
+        template2.sendBody("disruptor-vm:foo?concurrentConsumers=5", "Goodday World");
+        template.sendBody("disruptor-vm:bar", "Bar");
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:bar").to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo?size=20&concurrentConsumers=2").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmShouldNotUseSameThreadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmShouldNotUseSameThreadTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmShouldNotUseSameThreadTest.java
new file mode 100644
index 0000000..a7bbbf9
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmShouldNotUseSameThreadTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * Unit test to verify continuing using NOT same thread on the consumer side.
+ */
+public class DisruptorVmShouldNotUseSameThreadTest extends AbstractVmTestSupport {
+
+    private static long id;
+    private final ThreadLocal<String> local = new ThreadLocal<String>();
+
+    public void testNotUseSameThread() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template2.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("disruptor-vm:foo").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        assertNull(local.get());
+                        assertNotSame("Thread is should not be same", id, Thread.currentThread().getId());
+                    }
+                }).to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        local.set("Hello");
+                        id = Thread.currentThread().getId();
+                    }
+                }).to("disruptor-vm:foo");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmSplitterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmSplitterTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmSplitterTest.java
new file mode 100644
index 0000000..efe6d43
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmSplitterTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.disruptor.vm;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+import org.apache.camel.impl.JndiRegistry;
+
+public class DisruptorVmSplitterTest extends AbstractVmTestSupport {
+
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("splitterBean", new SplitWordsBean());
+        return jndi;
+    }
+
+
+    public void testSplitUsingMethodCall() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedBodiesReceived("Claus", "James", "Willem");
+
+        template2.sendBody("direct:start", "Claus@James@Willem");
+
+        assertMockEndpointsSatisfied();
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:server").split().method("splitterBean", "splitWords").to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor-vm:server");
+            }
+        };
+    }
+
+    public static final class SplitWordsBean {
+        private SplitWordsBean() {
+            // Helper Class
+        }
+
+        public static List<String> splitWords(String body) {
+            // here we split the payload using java code
+            // we have the true power of Java to do the splitting
+            // as we like. As this is based on a unit test we just do it easy
+            return Arrays.asList(body.split("@"));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmTimeoutIssueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmTimeoutIssueTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmTimeoutIssueTest.java
new file mode 100644
index 0000000..62fdaa3
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmTimeoutIssueTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmTimeoutIssueTest extends AbstractVmTestSupport {
+
+    public void testDisruptorVmTimeoutWithAnotherDisruptorVm() throws Exception {
+        try {
+            template2.requestBody("disruptor-vm:start1?timeout=4000", "Hello");
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            ExchangeTimedOutException cause = assertIsInstanceOf(ExchangeTimedOutException.class,
+                    e.getCause());
+            assertEquals(2000, cause.getTimeout());
+        }
+    }
+
+    public void testDisruptorVmTimeoutWithProcessor() throws Exception {
+        try {
+            template2.requestBody("disruptor-vm:start2?timeout=4000", "Hello");
+            fail("Should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            ExchangeTimedOutException cause = assertIsInstanceOf(ExchangeTimedOutException.class,
+                    e.getCause());
+            assertEquals(2000, cause.getTimeout());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:end")
+                        .delay(3000).transform().constant("Bye World");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(noErrorHandler());
+
+                from("disruptor-vm:start1?timeout=4000")
+                        .to("log:AFTER_START1")
+                        .to("disruptor-vm:end?timeout=2000")
+                        .to("log:AFTER_END");
+
+                from("disruptor-vm:start2?timeout=4000")
+                        .to("log:AFTER_START2")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                // this exception will trigger to stop asap
+                                throw new ExchangeTimedOutException(exchange, 2000);
+                            }
+                        })
+                        .to("log:AFTER_PROCESSOR");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmUseSameQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmUseSameQueueTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmUseSameQueueTest.java
new file mode 100644
index 0000000..17b9d84
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmUseSameQueueTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmUseSameQueueTest extends AbstractVmTestSupport {
+
+    public void testDisruptorVmUseSameQueue() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(2);
+
+        template2.sendBody("direct:start", "Hello World");
+        template2.sendBody("direct:start", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo?size=500").to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor-vm:foo");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskCompleteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskCompleteTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskCompleteTest.java
new file mode 100644
index 0000000..afec250
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskCompleteTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmWaitForTaskCompleteTest extends AbstractVmTestSupport {
+
+    public void testInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        String out = template2.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testInOnly() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        // we send an in only but we use Always to wait for it to complete
+        // and since the route changes the payload we can get the response anyway
+        Exchange out = template2.send("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOnly);
+            }
+        });
+        assertEquals("Bye World", out.getIn().getBody());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo?waitForTaskToComplete=Always").transform(constant("Bye World"))
+                        .to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor-vm:foo?waitForTaskToComplete=Always");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskIfReplyExpectedTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskIfReplyExpectedTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskIfReplyExpectedTest.java
new file mode 100644
index 0000000..9ef6932
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskIfReplyExpectedTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmWaitForTaskIfReplyExpectedTest extends AbstractVmTestSupport {
+
+    public void testInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        String out = template2.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testInOnly() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        Exchange out = template2.send("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOnly);
+            }
+        });
+        // we do not expecy a reply and thus do no wait so we just get our own input back
+        assertEquals("Hello World", out.getIn().getBody());
+        assertNull(out.getOut().getBody());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo?waitForTaskToComplete=IfReplyExpected")
+                        .transform(constant("Bye World")).to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor-vm:foo?waitForTaskToComplete=IfReplyExpected");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/6d05326f/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskNewerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskNewerTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskNewerTest.java
new file mode 100644
index 0000000..72a112f
--- /dev/null
+++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/vm/DisruptorVmWaitForTaskNewerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.disruptor.vm;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vm.AbstractVmTestSupport;
+
+/**
+ * @version
+ */
+public class DisruptorVmWaitForTaskNewerTest extends AbstractVmTestSupport {
+
+    public void testInOut() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        String out = template2.requestBody("direct:start", "Hello World", String.class);
+        // we do not wait for the response so we just get our own input back
+        assertEquals("Hello World", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testInOnly() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        Exchange out = template2.send("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOnly);
+            }
+        });
+        // we do not wait for the response so we just get our own input back
+        assertEquals("Hello World", out.getIn().getBody());
+        assertNull(out.getOut().getBody());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("disruptor-vm:foo?waitForTaskToComplete=Never").transform(constant("Bye World"))
+                        .to("mock:result");
+            }
+        };
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilderForSecondContext() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("disruptor-vm:foo?waitForTaskToComplete=Never");
+            }
+        };
+    }
+}
\ No newline at end of file


Mime
View raw message