camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r827950 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/issues/ test/java/org/apache/camel/component/jms/tx/
Date Wed, 21 Oct 2009 10:07:28 GMT
Author: davsclaus
Date: Wed Oct 21 10:07:27 2009
New Revision: 827950

URL: http://svn.apache.org/viewvc?rev=827950&view=rev
Log:
CAMEL-2089: Fixed replyTo option now also working for JMS consumers.

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEagerLoadingPropertiesTest.java
      - copied, changed from r827881, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRemoveHeaderTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java
      - copied, changed from r827881, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToEndpointUsingInOutTest.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/NonTransactedInOutForJmsWithTxnMgrTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=827950&r1=827949&r2=827950&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
Wed Oct 21 10:07:27 2009
@@ -51,7 +51,7 @@
     private Processor processor;
     private JmsBinding binding;
     private boolean eagerLoadingOfProperties;
-    private Destination replyToDestination;
+    private Object replyToDestination;
     private JmsOperations template;
     private boolean disableReplyTo;
 
@@ -70,7 +70,7 @@
 
         RuntimeCamelException rce = null;
         try {
-            Destination replyDestination = getReplyToDestination(message);
+            Object replyDestination = getReplyToDestination(message);
             final Exchange exchange = createExchange(message, replyDestination);
             if (eagerLoadingOfProperties) {
                 exchange.getIn().getHeaders();
@@ -116,7 +116,11 @@
             // send the reply if we got a response and the exchange is out capable
             if (rce == null && sendReply && !disableReplyTo && exchange.getPattern().isOutCapable())
{
                 LOG.trace("onMessage.sendReply START");
-                sendReply(replyDestination, message, exchange, body, cause);
+                if (replyDestination instanceof Destination) {
+                    sendReply((Destination)replyDestination, message, exchange, body, cause);
+                } else {
+                    sendReply((String)replyDestination, message, exchange, body, cause);
+                }
                 LOG.trace("onMessage.sendReply END");
             }
 
@@ -135,7 +139,7 @@
         LOG.trace("onMessage END");
     }
 
-    public Exchange createExchange(Message message, Destination replyDestination) {
+    public Exchange createExchange(Message message, Object replyDestination) {
         Exchange exchange = new DefaultExchange(endpoint, endpoint.getExchangePattern());
         JmsBinding binding = getBinding();
         exchange.setProperty(Exchange.BINDING, binding);
@@ -211,7 +215,7 @@
         this.disableReplyTo = disableReplyTo;
     }
 
-    public Destination getReplyToDestination() {
+    public Object getReplyToDestination() {
         return replyToDestination;
     }
 
@@ -220,8 +224,9 @@
      * any incoming value of {@link Message#getJMSReplyTo()}
      *
      * @param replyToDestination the destination that should be used to send replies to
+     * as either a String or {@link javax.jms.Destination} type.
      */
-    public void setReplyToDestination(Destination replyToDestination) {
+    public void setReplyToDestination(Object replyToDestination) {
         this.replyToDestination = replyToDestination;
     }
 
@@ -258,9 +263,39 @@
         });
     }
 
-    protected Destination getReplyToDestination(Message message) throws JMSException {
+    protected void sendReply(String replyDestination, final Message message, final Exchange
exchange,
+                             final JmsMessage out, final Exception cause) {
+        if (replyDestination == null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Cannot send reply message as there is no replyDestination for:
" + out);
+            }
+            return;
+        }
+        getTemplate().send(replyDestination, new MessageCreator() {
+            public Message createMessage(Session session) throws JMSException {
+                Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session,
cause);
+
+                if (endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
+                    String messageID = exchange.getIn().getHeader("JMSMessageID", String.class);
+                    reply.setJMSCorrelationID(messageID);
+                } else {
+                    String correlationID = message.getJMSCorrelationID();
+                    if (correlationID != null) {
+                        reply.setJMSCorrelationID(correlationID);
+                    }
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(endpoint + " sending reply JMS message: " + reply);
+                }
+                return reply;
+            }
+        });
+    }
+
+    protected Object getReplyToDestination(Message message) throws JMSException {
         // lets send a response back if we can
-        Destination destination = replyToDestination;
+        Object destination = replyToDestination;
         if (destination == null) {
             destination = message.getJMSReplyTo();
         }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=827950&r1=827949&r2=827950&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
(original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Wed Oct 21 10:07:27 2009
@@ -1091,6 +1091,10 @@
         if (isEagerLoadingOfProperties()) {
             listener.setEagerLoadingOfProperties(true);
         }
+        if (getReplyTo() != null) {
+            listener.setReplyToDestination(getReplyTo());
+        }
+
         // TODO: REVISIT: We really ought to change the model and let JmsProducer
         // and JmsConsumer have their own JmsConfiguration instance
         // This way producer's and consumer's QoS can differ and be
@@ -1244,8 +1248,7 @@
             throw new IllegalArgumentException("ReplyTo destination value has to be of type
queue; "
                     + "e.g: \"queue:replyQueue\"");
         }
-        this.replyToDestination =
-                removeStartingCharacters(replyToDestination.substring(QUEUE_PREFIX.length()),
'/');
+        this.replyToDestination = removeStartingCharacters(replyToDestination.substring(QUEUE_PREFIX.length()),
'/');
     }
 
     public String getReplyToDestinationSelectorName() {

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEagerLoadingPropertiesTest.java
(from r827881, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRemoveHeaderTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEagerLoadingPropertiesTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEagerLoadingPropertiesTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRemoveHeaderTest.java&r1=827881&r2=827950&rev=827950&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRemoveHeaderTest.java
(original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEagerLoadingPropertiesTest.java
Wed Oct 21 10:07:27 2009
@@ -16,12 +16,12 @@
  */
 package org.apache.camel.component.jms;
 
-import java.util.HashMap;
-import java.util.Map;
 import javax.jms.ConnectionFactory;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -31,22 +31,15 @@
 /**
  * @version $Revision$
  */
-public class JmsRemoveHeaderTest extends CamelTestSupport {
+public class JmsEagerLoadingPropertiesTest extends CamelTestSupport {
 
     @Test
-    public void testRemoveHeader() throws Exception {
+    public void testJmsEagerLoadingPropertiesTest() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        // should not receive the foo header
-        mock.expectedHeaderReceived("foo", null);
-        // but only the bar header
-        mock.expectedHeaderReceived("bar", 123);
-
-        Map headers = new HashMap();
-        headers.put("foo", "cheese");
-        headers.put("bar", 123);
+        mock.expectedHeaderReceived("name", "Claus");
 
-        template.sendBodyAndHeaders("activemq:queue:foo", "Hello World", headers);
+        template.sendBodyAndHeader("activemq:queue:foo", "Hello World", "name", "Claus");
 
         assertMockEndpointsSatisfied();
     }
@@ -65,10 +58,13 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("activemq:queue:foo").removeHeader("foo").to("activemq:queue:bar");
-
-                from("activemq:queue:bar").to("mock:result");
+                from("activemq:queue:foo?eagerLoadingOfProperties=true").process(new Processor()
{
+                    public void process(Exchange exchange) throws Exception {
+                        String name = exchange.getIn().getHeader("name", String.class);
+                        assertEquals("Claus", name);
+                    }
+                }).to("mock:result");
             }
         };
     }
-}
+}
\ No newline at end of file

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java?rev=827950&r1=827949&r2=827950&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
(original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteRequestReplyTest.java
Wed Oct 21 10:07:27 2009
@@ -99,7 +99,7 @@
 
     public static class MultiNodeReplyToRouteBuilder extends RouteBuilder {
         public void configure() throws Exception {
-            from(endpoingtReplyToUriA).to(endpoingtReplyToUriB);
+            from(endpoingUriA).to(endpoingtReplyToUriB);
             from(endpointUriB).process(new Processor() {
                 public void process(Exchange e) {
                     Message in = e.getIn();

Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java
(from r827881, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToEndpointUsingInOutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToEndpointUsingInOutTest.java&r1=827881&r2=827950&rev=827950&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToEndpointUsingInOutTest.java
(original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsJMSReplyToConsumerEndpointUsingInOutTest.java
Wed Oct 21 10:07:27 2009
@@ -16,98 +16,39 @@
  */
 package org.apache.camel.component.jms.issues;
 
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
 import org.apache.activemq.camel.component.ActiveMQComponent;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-
 import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
+
 /**
  * Unit test using a fixed replyTo specified on the JMS endpoint
- * 
+ *
  * @version $Revision$
  */
-public class JmsJMSReplyToEndpointUsingInOutTest extends ContextTestSupport {
+public class JmsJMSReplyToConsumerEndpointUsingInOutTest extends ContextTestSupport {
     private static final String MQURI = "vm://localhost?broker.persistent=false&broker.useJmx=false";
     private ActiveMQComponent amq;
-    
 
     public void testCustomJMSReplyToInOut() throws Exception {
+        template.sendBody("activemq:queue:hello", "What is your name?");
 
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("My name is Arnio");
-
-        // do not use Camel to send and receive to simulate a non Camel client
-
-        // use another thread to listen and send the reply
-        Executors.newFixedThreadPool(1).submit(new Callable<Object>() {
-            public Object call() throws Exception {
-                JmsTemplate jms = new JmsTemplate(amq.getConfiguration().getConnectionFactory());
-
-                final TextMessage msg = (TextMessage) jms.receive("nameRequestor");
-                assertEquals("What's your name", msg.getText());
-
-                // there should be a JMSReplyTo so we know where to send the reply
-                final Destination replyTo = msg.getJMSReplyTo();
-
-                // send reply
-                jms.send(replyTo, new MessageCreator() {
-                    public Message createMessage(Session session) throws JMSException {
-                        TextMessage replyMsg = session.createTextMessage();
-                        replyMsg.setText("My name is Arnio");
-                        replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
-                        return replyMsg;
-                    }
-                });
-
-                return null;
-            }
-        });
-
-
-        // now get started and send the first message that gets the ball rolling
-        JmsTemplate jms = new JmsTemplate(amq.getConfiguration().getConnectionFactory());
-
-        jms.send("hello", new MessageCreator() {
-            public Message createMessage(Session session) throws JMSException {
-                TextMessage msg = session.createTextMessage();
-                msg.setText("Hello, I'm here");
-                return msg;
-            }
-        });
-
-        assertMockEndpointsSatisfied();
+        String reply = consumer.receiveBody("activemq:queue:namedReplyQueue", 5000, String.class);
+        assertEquals("My name is Camel", reply);
     }
 
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
-
             public void configure() throws Exception {
-                from("activemq:queue:hello")
+                from("activemq:queue:hello?replyTo=queue:namedReplyQueue")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
-                            exchange.getOut().setBody("What's your name");
+                            exchange.getOut().setBody("My name is Camel");
                         }
-                    })
-                    // use in out to get a reply as well
-                    .to(ExchangePattern.InOut, "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue")
-                    // and send the reply to our mock for validation
-                    .to("mock:result");
+                    });
             }
         };
     }
@@ -119,5 +60,4 @@
         return camelContext;
     }
 
-
 }
\ No newline at end of file

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/NonTransactedInOutForJmsWithTxnMgrTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/NonTransactedInOutForJmsWithTxnMgrTest.java?rev=827950&r1=827949&r2=827950&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/NonTransactedInOutForJmsWithTxnMgrTest.java
(original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/NonTransactedInOutForJmsWithTxnMgrTest.java
Wed Oct 21 10:07:27 2009
@@ -54,10 +54,11 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new SpringRouteBuilder() {
             public void configure() throws Exception {
-                from("activemq:queue:mainStage?replyTo=queue:mainStage.reply").to("activemq:queue:request?replyTo=queue:request.reply");
+                from("activemq:queue:mainStage").to("activemq:queue:request?replyTo=queue:request.reply");
             }
         };
     }
+
     @Override 
     @Before
     public void setUp() throws Exception {
@@ -67,6 +68,7 @@
         Thread thread = new Thread(consumer);
         thread.start();
     }
+
     @Override
     @After
     public void tearDown() throws Exception {

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java?rev=827950&r1=827949&r2=827950&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
(original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/QueueToQueueRequestReplyTransactionTest.java
Wed Oct 21 10:07:27 2009
@@ -19,11 +19,9 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.component.jms.JmsComponent;
 import org.apache.camel.spi.Policy;
 import org.apache.camel.spring.SpringRouteBuilder;
 import org.apache.camel.spring.spi.SpringTransactionPolicy;
-import org.apache.log4j.Logger;
 import org.junit.Test;
 
 /**
@@ -38,23 +36,20 @@
  */
 public class QueueToQueueRequestReplyTransactionTest extends AbstractTransactionTest {
 
-    private Logger log = Logger.getLogger(getClass());
-    
     protected int getExpectedRouteCount() {
         return 0;
     }
 
     @Test
     public void testRollbackUsingXmlQueueToQueueRequestReplyUsingDynamicMessageSelector()
throws Exception {
-
-        JmsComponent c = (JmsComponent)context.getComponent("activemq");
-        JmsComponent c1 = (JmsComponent)context.getComponent("activemq-1");
         final ConditionalExceptionProcessor cp = new ConditionalExceptionProcessor(10);
         context.addRoutes(new SpringRouteBuilder() {
             @Override
             public void configure() throws Exception {
                 Policy required = lookup("PROPAGATION_REQUIRED_POLICY", SpringTransactionPolicy.class);
-                from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(cp).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
+
+                from("activemq:queue:foo").policy(required).process(cp).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
+
                 from("activemq-1:queue:bar").process(new Processor() {
                     public void process(Exchange e) {
                         String request = e.getIn().getBody(String.class);
@@ -75,51 +70,5 @@
             assertTrue(cp.getErrorMessage(), cp.getErrorMessage() == null);
         }
     }
-/*
- * This is a working test but is commented out because there is bug in that ConditionalExceptionProcessor

- * gets somehow reused among different tests, which it should not and then the second test
always get its request 
- * flow rolled back
- * 
- * I didn't split this test into two separate tests as I think this will be a good reminder
of the problem that
- * needs fixing
- * 
- * The bellow log crearly shows the same processor reused between tests
- *  testRollbackUsingXmlQueueToQueueRequestReplyUsingDynamicMessageSelector()
- *  org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() =
1
- *  org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() =
2
- *       
- *  testRollbackUsingXmlQueueToQueueRequestReplyUsingMessageSelectorPerProducer()
- *  org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() =
3
- *  org.apache.camel.component.jms.tx.ConditionalExceptionProcessor@63a721; getCount() =
4
-*/
-    /*
-    public void testRollbackUsingXmlQueueToQueueRequestReplyUsingMessageSelectorPerProducer()
throws Exception {
-
-        JmsComponent c = (JmsComponent)context.getComponent("activemq");
-        c.getConfiguration().setReplyToDestinationSelectorName("camelProvider");
-        JmsComponent c1 = (JmsComponent)context.getComponent("activemq-1");
-        c1.getConfiguration().setReplyToDestinationSelectorName("camelProvider");
-        
-        context.addRoutes(new SpringRouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                Policy required = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED_POLICY");
-                from("activemq:queue:foo?replyTo=queue:foo.reply").policy(required).process(new
ConditionalExceptionProcessor()).to("activemq-1:queue:bar?replyTo=queue:bar.reply");
-                from("activemq-1:queue:bar").process(new Processor() {
-                    public void process(Exchange e) {
-                        String request = e.getIn().getBody(String.class);
-                        Message out = e.getOut();
-                        String selectorValue = e.getIn().getHeader("camelProvider", String.class);
-                        out.setHeader("camelProvider", selectorValue);
-                        out.setBody("Re: " + request);
-                    }
-                });
-            }
-        });
-
-        Object reply = template.requestBody("activemq:queue:foo", "blah");
-        assertTrue("Received unexpeced reply", reply.equals("Re: blah"));
-    }
-    */
 
 }



Mime
View raw message