camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r981473 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/management/mbean/ camel-core/src/test/java/org/apache/camel/processor/async/ com...
Date Mon, 02 Aug 2010 12:10:07 GMT
Author: davsclaus
Date: Mon Aug  2 12:10:06 2010
New Revision: 981473

URL: http://svn.apache.org/viewvc?rev=981473&view=rev
Log:
CAMEL-2971: Added option synchronous to endpoints to control if async processing can be used by producers.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronousDelegateProducer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousFalseTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousTest.java
      - copied, changed from r981382, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
    camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousFalseTest.java
    camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousTest.java
      - copied, changed from r981382, camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerRouterTest.java
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousFalseTest.java
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousTest.java
      - copied, changed from r981382, camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousFalseTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousTest.java
      - copied, changed from r981382, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutIssueTest.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
    camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java?rev=981473&r1=981472&r2=981473&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java Mon Aug  2 12:10:06 2010
@@ -27,7 +27,7 @@ package org.apache.camel;
  * <b>Important:<b/> Use the {@link org.apache.camel.util.AsyncProcessorHelper#process(AsyncProcessor, Exchange, AsyncCallback)}
  * method to invoke the process method, which ensure Camel have a chance to interweave and invoke it in a reliable manner.
  * For example when using transactions all the invocations has to occur in synchronous manner to ensure the transaction
- * work is done in the same thread, which is required by Spring TransactionMananger.
+ * work is done in the same thread, which is required by Spring TransactionManager.
  *
  * @version $Revision$
  */

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=981473&r1=981472&r2=981473&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Mon Aug  2 12:10:06 2010
@@ -30,7 +30,12 @@ import org.apache.camel.PollingConsumer;
 import org.apache.camel.util.ObjectHelper;
 
 /**
- * A default endpoint useful for implementation inheritance
+ * A default endpoint useful for implementation inheritance.
+ * <p/>
+ * Components which leverages <a href="http://camel.apache.org/asynchronous-routing-engine.html">asynchronous processing model</a>
+ * should check the {@link #isSynchronous()} to determine if asynchronous processing is allowed.
+ * The <tt>synchronous</tt> option on the endpoint allows Camel end users to dictate whether they want the asynchronous model or not.
+ * The option is default <tt>false</tt> which means asynchronous processing is allowed.
  *
  * @version $Revision$
  */
@@ -39,6 +44,8 @@ public abstract class DefaultEndpoint im
     private CamelContext camelContext;
     private Component component;
     private ExchangePattern exchangePattern = ExchangePattern.InOnly;
+    // option to allow end user to dictate whether async processing should be used or not (if possible)
+    private boolean synchronous;
 
     protected DefaultEndpoint(String endpointUri, Component component) {
         this(endpointUri, component.getCamelContext());
@@ -163,6 +170,23 @@ public abstract class DefaultEndpoint im
         this.exchangePattern = exchangePattern;
     }
 
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    /**
+     * Sets whether synchronous processing should be strictly used, or Camel is allowed to use
+     * asynchronous processing (if supported).
+     * <p/>
+     * The default value is <tt>null</tt> which means this option hasn't been specified
+     * and the component can decide.
+     *
+     * @param synchronous <tt>true</tt> to enforce synchronous processing
+     */
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public void configureProperties(Map<String, Object> options) {
         // do nothing by default
     }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronousDelegateProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronousDelegateProducer.java?rev=981473&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronousDelegateProducer.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SynchronousDelegateProducer.java Mon Aug  2 12:10:06 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
+
+/**
+ * To process the delegated producer in synchronous mode.
+ * <p/>
+ * This is used to enforce asynchronous producers to run in synchronous mode
+ * when it has been configured to do so.
+ * <p/>
+ * This delegate allows the component developers easily to support their
+ * existing asynchronous producer to behave synchronously by wrapping their
+ * producer in this synchronous delegate.
+ *
+ * @version $Revision$
+ */
+public class SynchronousDelegateProducer implements Producer {
+
+    private final Producer producer;
+
+    public SynchronousDelegateProducer(Producer producer) {
+        this.producer = producer;
+    }
+
+    public Endpoint getEndpoint() {
+        return producer.getEndpoint();
+    }
+
+    public Exchange createExchange() {
+        return producer.createExchange();
+    }
+
+    public Exchange createExchange(ExchangePattern pattern) {
+        return producer.createExchange(pattern);
+    }
+
+    public Exchange createExchange(Exchange exchange) {
+        return producer.createExchange(exchange);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        producer.process(exchange);
+    }
+
+    public void start() throws Exception {
+        producer.start();
+    }
+
+    public void stop() throws Exception {
+        producer.stop();
+    }
+
+    public boolean isSingleton() {
+        return producer.isSingleton();
+    }
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java?rev=981473&r1=981472&r2=981473&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java Mon Aug  2 12:10:06 2010
@@ -60,7 +60,7 @@ public class ManagedCamelContext {
     @ManagedAttribute(description = "Camel State")
     public String getState() {
         // must use String type to be sure remote JMX can read the attribute without requiring Camel classes.
-        ServiceStatus status = (context).getStatus();
+        ServiceStatus status = context.getStatus();
         // if no status exists then its stopped
         if (status == null) {
             status = ServiceStatus.Stopped;

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousFalseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousFalseTest.java?rev=981473&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousFalseTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousFalseTest.java Mon Aug  2 12:10:06 2010
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision: 955480 $
+ */
+public class AsyncEndpointSynchronousFalseTest extends ContextTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
+    public void testAsyncEndpoint() throws Exception {
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+        assertEquals("Bye Camel", reply);
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("direct:start")
+                        .to("mock:before")
+                        .to("log:before")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                beforeThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("async:Bye Camel?synchronous=false")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                afterThreadName = Thread.currentThread().getName();
+                            }
+                        })
+                        .to("log:after")
+                        .to("mock:after")
+                        .to("mock:result");
+            }
+        };
+    }
+
+}

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousTest.java (from r981382, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=981382&r2=981473&rev=981473&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSynchronousTest.java Mon Aug  2 12:10:06 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointSynchronousTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
@@ -39,7 +39,7 @@ public class AsyncEndpointTest extends C
 
         assertMockEndpointsSatisfied();
 
-        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     @Override
@@ -57,7 +57,7 @@ public class AsyncEndpointTest extends C
                                 beforeThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("async:Bye Camel")
+                        .to("async:Bye Camel?synchronous=true")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 afterThreadName = Thread.currentThread().getName();

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java?rev=981473&r1=981472&r2=981473&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java Mon Aug  2 12:10:06 2010
@@ -21,6 +21,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
 
 /**
  * @version $Revision$
@@ -36,7 +37,13 @@ public class MyAsyncEndpoint extends Def
     }
 
     public Producer createProducer() throws Exception {
-        return new MyAsyncProducer(this);
+        Producer answer = new MyAsyncProducer(this);
+        if (isSynchronous()) {
+            // force it to be synchronously
+            return new SynchronousDelegateProducer(answer);
+        } else {
+            return answer;
+        }
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?rev=981473&r1=981472&r2=981473&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Mon Aug  2 12:10:06 2010
@@ -35,6 +35,7 @@ import org.apache.camel.component.cxf.fe
 import org.apache.camel.component.cxf.feature.PayLoadDataFormatFeature;
 import org.apache.camel.component.cxf.util.CxfEndpointUtils;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.spring.SpringCamelContext;
@@ -112,7 +113,12 @@ public class CxfEndpoint extends Default
     }
 
     public Producer createProducer() throws Exception {
-        return new CxfProducer(this);
+        Producer answer = new CxfProducer(this);
+        if (isSynchronous()) {
+            return new SynchronousDelegateProducer(answer);
+        } else {
+            return answer;
+        }
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {

Added: camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousFalseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousFalseTest.java?rev=981473&view=auto
==============================================================================
--- camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousFalseTest.java (added)
+++ camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousFalseTest.java Mon Aug  2 12:10:06 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cxf;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.cxf.bus.CXFBusFactory;
+import org.apache.cxf.frontend.ServerFactoryBean;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CxfProducerSynchronousFalseTest extends CamelTestSupport {
+
+    private static final String SIMPLE_SERVER_ADDRESS = "http://localhost:28081/test";
+    private static final String REQUEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+            + "<soap:Body><ns1:echo xmlns:ns1=\"http://cxf.component.camel.apache.org/\">"
+            + "<arg0 xmlns=\"http://cxf.component.camel.apache.org/\">Hello World!</arg0>"
+            + "</ns1:echo></soap:Body></soap:Envelope>";
+
+    private static final String TEST_MESSAGE = "Hello World!";
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
+    private String url = "cxf://" + SIMPLE_SERVER_ADDRESS
+                + "?serviceClass=org.apache.camel.component.cxf.HelloService&dataFormat=MESSAGE&synchronous=false";
+
+    @BeforeClass
+    public static void startServer() throws Exception {        
+        // start a simple front service
+        ServerFactoryBean svrBean = new ServerFactoryBean();
+        svrBean.setAddress(SIMPLE_SERVER_ADDRESS);
+        svrBean.setServiceClass(HelloService.class);
+        svrBean.setServiceBean(new HelloServiceImpl());
+        svrBean.setBus(CXFBusFactory.getDefaultBus());
+        svrBean.create();
+    }
+
+    @Test
+    public void testSynchronous() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        String response = template.requestBody("direct:start", REQUEST_MESSAGE, String.class);
+        assertTrue("It should has the echo message", response.indexOf("echo " + TEST_MESSAGE) > 0);
+        assertTrue("It should has the echoResponse tag", response.indexOf("echoResponse") > 0);
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to(url)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result");
+            }
+        };
+    }
+
+}

Copied: camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousTest.java (from r981382, camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerRouterTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousTest.java?p2=camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousTest.java&p1=camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerRouterTest.java&r1=981382&r2=981473&rev=981473&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerRouterTest.java (original)
+++ camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfProducerSynchronousTest.java Mon Aug  2 12:10:06 2010
@@ -16,38 +16,29 @@
  */
 package org.apache.camel.component.cxf;
 
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.apache.camel.util.URISupport;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.cxf.bus.CXFBusFactory;
-import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.frontend.ServerFactoryBean;
-import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.message.MessageContentsList;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class CxfProducerRouterTest extends CamelTestSupport {
-    private static final transient Log LOG = LogFactory.getLog(CxfProducerRouterTest.class);
+public class CxfProducerSynchronousTest extends CamelTestSupport {
+
     private static final String SIMPLE_SERVER_ADDRESS = "http://localhost:28080/test";
     private static final String REQUEST_MESSAGE = "<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
             + "<soap:Body><ns1:echo xmlns:ns1=\"http://cxf.component.camel.apache.org/\">"
             + "<arg0 xmlns=\"http://cxf.component.camel.apache.org/\">Hello World!</arg0>"
             + "</ns1:echo></soap:Body></soap:Envelope>";
 
-    private static final String ECHO_OPERATION = "echo";
     private static final String TEST_MESSAGE = "Hello World!";
-    
+    private static String beforeThreadName;
+    private static String afterThreadName;
+
+    private String url = "cxf://" + SIMPLE_SERVER_ADDRESS
+                + "?serviceClass=org.apache.camel.component.cxf.HelloService&dataFormat=MESSAGE&synchronous=true";
 
     @BeforeClass
     public static void startServer() throws Exception {        
@@ -60,66 +51,39 @@ public class CxfProducerRouterTest exten
         svrBean.create();
     }
 
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-            public void configure() {
-                from("direct:EndpointA").to(getSimpleEndpointUri());
-                from("direct:EndpointB").to(getSimpleEndpointUri() + "&dataFormat=MESSAGE");
-            }
-        };
-    }
-    
-    @Test
-    public void testCxfEndpointUris() throws URISyntaxException {
-        CxfEndpoint endpoint = context.getEndpoint(getSimpleEndpointUri(), CxfEndpoint.class);
-        assertEquals("Get a wrong endpoint uri", getSimpleEndpointUri(), endpoint.getEndpointUri());
-        
-        endpoint = context.getEndpoint(getSimpleEndpointUri() + "&dataFormat=MESSAGE", CxfEndpoint.class);
-        assertEquals("Get a wrong endpoint uri", URISupport.normalizeUri(getSimpleEndpointUri() + "&dataFormat=MESSAGE"), endpoint.getEndpointUri());
-
-    }
-
-    @Test
-    public void testInvokingSimpleServerWithParams() throws Exception {
-     // START SNIPPET: sending
-        Exchange senderExchange = new DefaultExchange(context, ExchangePattern.InOut);
-        final List<String> params = new ArrayList<String>();
-        // Prepare the request message for the camel-cxf procedure
-        params.add(TEST_MESSAGE);
-        senderExchange.getIn().setBody(params);
-        senderExchange.getIn().setHeader(CxfConstants.OPERATION_NAME, ECHO_OPERATION);
-
-        Exchange exchange = template.send("direct:EndpointA", senderExchange);
-
-        org.apache.camel.Message out = exchange.getOut();
-        // The response message's body is an MessageContentsList which first element is the return value of the operation,
-        // If there are some holder parameters, the holder parameter will be filled in the reset of List.
-        // The result will be extract from the MessageContentsList with the String class type
-        MessageContentsList result = (MessageContentsList)out.getBody();
-        LOG.info("Received output text: " + result.get(0));
-        Map<String, Object> responseContext = CastUtils.cast((Map)out.getHeader(Client.RESPONSE_CONTEXT));
-        assertNotNull(responseContext);
-        assertEquals("We should get the response context here", "UTF-8", responseContext.get(org.apache.cxf.message.Message.ENCODING));
-        assertEquals("Reply body on Camel is wrong", "echo " + TEST_MESSAGE, result.get(0));
-     // END SNIPPET: sending
-    }
-
     @Test
-    public void testInvokingSimpleServerWithMessageDataFormat() throws Exception {
-        Exchange senderExchange = new DefaultExchange(context, ExchangePattern.InOut);
-        senderExchange.getIn().setBody(REQUEST_MESSAGE);
-        Exchange exchange = template.send("direct:EndpointB", senderExchange);
+    public void testSynchronous() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
 
-        org.apache.camel.Message out = exchange.getOut();
-        String response = out.getBody(String.class);
+        String response = template.requestBody("direct:start", REQUEST_MESSAGE, String.class);
         assertTrue("It should has the echo message", response.indexOf("echo " + TEST_MESSAGE) > 0);
         assertTrue("It should has the echoResponse tag", response.indexOf("echoResponse") > 0);
 
+        assertMockEndpointsSatisfied();
+
+        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
-    private String getSimpleEndpointUri() {
-        return "cxf://" + SIMPLE_SERVER_ADDRESS
-            + "?serviceClass=org.apache.camel.component.cxf.HelloService";
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to(url)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result");
+            }
+        };
     }
 
 }

Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java?rev=981473&r1=981472&r2=981473&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java (original)
+++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java Mon Aug  2 12:10:06 2010
@@ -25,6 +25,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.http.HttpConsumer;
 import org.apache.camel.component.http.HttpEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.server.Handler;
 
@@ -52,7 +53,11 @@ public class JettyHttpEndpoint extends H
     public Producer createProducer() throws Exception {
         JettyHttpProducer answer = new JettyHttpProducer(this, getClient());
         answer.setBinding(getJettyBinding());
-        return answer;
+        if (isSynchronous()) {
+            return new SynchronousDelegateProducer(answer);
+        } else {
+            return answer;
+        }
     }
 
     @Override

Added: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousFalseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousFalseTest.java?rev=981473&view=auto
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousFalseTest.java (added)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousFalseTest.java Mon Aug  2 12:10:06 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty.jettyproducer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision: 957588 $
+ */
+public class JettyHttpProducerSynchronousFalseTest extends CamelTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+    private String url = "jetty://http://0.0.0.0:9123/sync?synchronous=false";
+
+    @Test
+    public void testSynchronous() throws Exception {
+        // these tests does not run well on Windows
+        if (isPlatform("windows")) {
+            return;
+        }
+
+        // give Jetty time to startup properly
+        Thread.sleep(1000);
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:start", null);
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to(url)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result");
+
+                from(url).transform(constant("Bye World"));
+            }
+        };
+    }
+}
\ No newline at end of file

Copied: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousTest.java (from r981382, camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousTest.java?p2=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousTest.java&p1=camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java&r1=981382&r2=981473&rev=981473&view=diff
==============================================================================
--- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsyncTimeoutTest.java (original)
+++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerSynchronousTest.java Mon Aug  2 12:10:06 2010
@@ -16,21 +16,23 @@
  */
 package org.apache.camel.component.jetty.jettyproducer;
 
-import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
  * @version $Revision$
  */
-public class JettyHttpProducerAsyncTimeoutTest extends CamelTestSupport {
+public class JettyHttpProducerSynchronousTest extends CamelTestSupport {
 
-    private String url = "jetty://http://0.0.0.0:9123/timeout?httpClient.timeout=2000";
+    private static String beforeThreadName;
+    private static String afterThreadName;
+    private String url = "jetty://http://0.0.0.0:9123/sync?synchronous=true";
 
     @Test
-    public void testTimeout() throws Exception {
+    public void testSynchronous() throws Exception {
         // these tests does not run well on Windows
         if (isPlatform("windows")) {
             return;
@@ -39,13 +41,13 @@ public class JettyHttpProducerAsyncTimeo
         // give Jetty time to startup properly
         Thread.sleep(1000);
 
-        getMockEndpoint("mock:result").expectedMessageCount(0);
-        getMockEndpoint("mock:error").expectedMessageCount(0);
-        getMockEndpoint("mock:timeout").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(1);
 
         template.sendBody("direct:start", null);
 
         assertMockEndpointsSatisfied();
+
+        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     @Override
@@ -53,12 +55,23 @@ public class JettyHttpProducerAsyncTimeo
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                onException(Exception.class).handled(true).to("mock:error");
-                onException(ExchangeTimedOutException.class).handled(true).to("mock:timeout");
-
-                from("direct:start").to(url).to("mock:result");
+                from("direct:start")
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to(url)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result");
 
-                from(url).delay(5000).transform(constant("Bye World"));
+                from(url).transform(constant("Bye World"));
             }
         };
     }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=981473&r1=981472&r2=981473&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Aug  2 12:10:06 2010
@@ -18,8 +18,6 @@ package org.apache.camel.component.jms;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -36,13 +34,14 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 import org.apache.camel.Service;
 import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
-import org.apache.camel.component.jms.reply.ReplyHolder;
 import org.apache.camel.component.jms.reply.ReplyManager;
 import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.spi.ManagementAware;
@@ -138,8 +137,13 @@ public class JmsEndpoint extends Default
         }
     }
 
-    public JmsProducer createProducer() throws Exception {
-        return new JmsProducer(this);
+    public Producer createProducer() throws Exception {
+        Producer answer = new JmsProducer(this);
+        if (isSynchronous()) {
+            return new SynchronousDelegateProducer(answer);
+        } else {
+            return answer;
+        }
     }
 
     public JmsConsumer createConsumer(Processor processor) throws Exception {

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java?rev=981473&r1=981472&r2=981473&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Mon Aug  2 12:10:06 2010
@@ -25,6 +25,7 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 import org.apache.camel.ResolveEndpointFailedException;
 import org.apache.camel.processor.Logger;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -131,7 +132,7 @@ public class JmsEndpointConfigurationTes
         JmsEndpoint endpoint = (JmsEndpoint) resolveMandatoryEndpoint("jms:queue:Foo");
         endpoint.getConfiguration().setDeliveryPersistent(true);
         endpoint.getConfiguration().setReplyToDeliveryPersistent(false);
-        JmsProducer producer = endpoint.createProducer();
+        Producer producer = endpoint.createProducer();
         assertNotNull("The producer should not be null", producer);
         JmsConsumer consumer = endpoint.createConsumer(dummyProcessor);
         JmsOperations operations = consumer.getEndpointMessageListener().getTemplate();

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousFalseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousFalseTest.java?rev=981473&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousFalseTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousFalseTest.java Mon Aug  2 12:10:06 2010
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
+
+/**
+ * @version $Revision: 978995 $
+ */
+public class JmsInOutSynchronousFalseTest extends CamelTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+    private String url = "activemq:queue:in?synchronous=false";
+
+    @Test
+    public void testSynchronous() throws Exception {
+        String reply = template.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", reply);
+
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+        camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false&broker.useJmx=false"));
+        return camelContext;
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to(url)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result");
+
+                from("activemq:queue:in").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        exchange.getOut().setBody("Bye World");
+                    }
+                });
+            }
+        };
+    }
+
+}

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousTest.java (from r981382, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutIssueTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutIssueTest.java&r1=981382&r2=981473&rev=981473&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutIssueTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutSynchronousTest.java Mon Aug  2 12:10:06 2010
@@ -14,13 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.jms.issues;
-
-import java.util.concurrent.Future;
+package org.apache.camel.component.jms;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -31,50 +28,18 @@ import static org.apache.activemq.camel.
 /**
  * @version $Revision$
  */
-public class JmsInOutIssueTest extends CamelTestSupport {
+public class JmsInOutSynchronousTest extends CamelTestSupport {
 
-    @Test
-    public void testInOutWithRequestBody() throws Exception {
-        String reply = template.requestBody("activemq:queue:in", "Hello World", String.class);
-        assertEquals("Bye World", reply);
-    }
+    private static String beforeThreadName;
+    private static String afterThreadName;
+    private String url = "activemq:queue:in?synchronous=true";
 
     @Test
-    public void testInOutTwoTimes() throws Exception {
-        String reply = template.requestBody("activemq:queue:in", "Hello World", String.class);
-        assertEquals("Bye World", reply);
-
-        reply = template.requestBody("activemq:queue:in", "Hello Camel", String.class);
+    public void testSynchronous() throws Exception {
+        String reply = template.requestBody("direct:start", "Hello World", String.class);
         assertEquals("Bye World", reply);
-    }
 
-    @Test
-    public void testInOutWithAsyncRequestBody() throws Exception {
-        Future<String> reply = template.asyncRequestBody("activemq:queue:in", "Hello World", String.class);
-        assertEquals("Bye World", reply.get());
-    }
-
-    @Test
-    public void testInOutWithSendExchange() throws Exception {
-        Exchange out = template.send("activemq:queue:in", ExchangePattern.InOut, new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                exchange.getIn().setBody("Hello World");
-            }
-        });
-
-        assertEquals("Bye World", out.getOut().getBody());
-    }
-
-    @Test
-    public void testInOutWithAsyncSendExchange() throws Exception {
-        Future<Exchange> out = template.asyncSend("activemq:queue:in", new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                exchange.setPattern(ExchangePattern.InOut);
-                exchange.getIn().setBody("Hello World");
-            }
-        });
-
-        assertEquals("Bye World", out.get().getOut().getBody());
+        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     protected CamelContext createCamelContext() throws Exception {
@@ -86,6 +51,22 @@ public class JmsInOutIssueTest extends C
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
+                from("direct:start")
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to(url)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result");
+
                 from("activemq:queue:in").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         exchange.getOut().setBody("Bye World");

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=981473&r1=981472&r2=981473&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java Mon Aug  2 12:10:06 2010
@@ -16,16 +16,15 @@
  */
 package org.apache.camel.component.netty;
 
-import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.util.ObjectHelper;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.util.Timer;
 
 public class NettyEndpoint extends DefaultEndpoint {
@@ -42,7 +41,12 @@ public class NettyEndpoint extends Defau
     }
 
     public Producer createProducer() throws Exception {
-        return new NettyProducer(this, configuration);
+        Producer answer = new NettyProducer(this, configuration);
+        if (isSynchronous()) {
+            return new SynchronousDelegateProducer(answer);
+        } else {
+            return answer;
+        }
     }
 
     public Exchange createExchange(ChannelHandlerContext ctx, MessageEvent messageEvent) {

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java?rev=981473&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java Mon Aug  2 12:10:06 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTextlineInOutSynchronousFalseTest extends CamelTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+    private String url = "netty:tcp://localhost:5148?textline=true&sync=true&synchronous=false";
+
+    @Test
+    public void testSynchronous() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        String reply = template.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", reply);
+
+        assertMockEndpointsSatisfied();
+
+        assertFalse("Should not same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to(url)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result");
+
+                from(url)
+                    // body should be a String when using textline codec
+                    .validate(body().isInstanceOf(String.class))
+                    .transform(body().regexReplaceAll("Hello", "Bye"));
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousFalseTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousTest.java?rev=981473&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousTest.java Mon Aug  2 12:10:06 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTextlineInOutSynchronousTest extends CamelTestSupport {
+
+    private static String beforeThreadName;
+    private static String afterThreadName;
+    private String url = "netty:tcp://localhost:5148?textline=true&sync=true&synchronous=true";
+
+    @Test
+    public void testSynchronous() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        String reply = template.requestBody("direct:start", "Hello World", String.class);
+        assertEquals("Bye World", reply);
+
+        assertMockEndpointsSatisfied();
+
+        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("log:before")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            beforeThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to(url)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            afterThreadName = Thread.currentThread().getName();
+                        }
+                    })
+                    .to("log:after")
+                    .to("mock:result");
+
+                from(url)
+                    // body should be a String when using textline codec
+                    .validate(body().isInstanceOf(String.class))
+                    .transform(body().regexReplaceAll("Hello", "Bye"));
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutSynchronousTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message