camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jim Newsham <jnews...@referentia.com>
Subject slow reply for jms component when url contains replyTo
Date Fri, 08 Jul 2011 01:18:56 GMT

I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I 
specify a custom replyTo destination on the endpoint url, the time it 
takes for the producer to receive a reply increases drastically.  The 
curious thing is that the time to receive a reply is almost exactly 1 
second.  When I remove the replyTo from the url, everything's fast again.

I created a very simple, stand-alone test to demonstrate what I'm 
seeing.  There is a server class [4] which runs an embedded instance of 
ActiveMQ and simply replies to messages as they arrive; and a client [3] 
class which simply sends messages to the server, and prints the elapsed 
time.  The USE_REPLY_TO symbolic constant in the client determines 
whether a replyTo value is added to the url or not.

The client output when USE_REPLY_TO is false is shown as [1].  The 
client output when USE_REPLY_TO is true is shown as [2].  The code is 
pretty trivial.  Am I doing something wrong, or is this a Camel and/or 
ActiveMQ issue?

Thanks!
Jim


[1] USE_REPLY_TO = false

received reply in: 0.476 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
...


[2] USE_REPLY_TO = true

received reply in: 1.524 s
received reply in: 1.002 s
received reply in: 1.003 s
received reply in: 1.003 s
received reply in: 1.002 s
...


[3] TestReplyToClient.java

package test;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;

public class TestReplyToClient {

   private static final boolean USE_REPLY_TO = false;

   public static void main(String... args) throws Exception {
     // create camel context; configure activemq component for 
tcp://localhost:7001
     CamelContext context = new DefaultCamelContext();
     ActiveMQComponent activemqComponent = 
ActiveMQComponent.activeMQComponent();
     activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
       null, null, "tcp://localhost:7001"));
     context.addComponent("activemq", activemqComponent);
     context.start();

     // define url to send requests to
     String sendUrl = "activemq:queue:dest";
     if (USE_REPLY_TO) {
       sendUrl += "?replyTo=replyQueue";
     }
     System.err.println("sending to url: " + sendUrl);

     // repeatedly send requests; measure elapsed time
     ProducerTemplate template = context.createProducerTemplate();
     while (true) {
       long startNanos = System.nanoTime();
       template.requestBody(sendUrl, "abc");
       long elapsedNanos = System.nanoTime() - startNanos;
       System.err.println(String.format("received reply in: %.3f s", 
elapsedNanos / 1000000000.0));
     }
   }

}


[4] TestReplyToServer.java

package test;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class TestReplyToServer {

   private static final String BROKER_NAME = "thebroker";

   public static void main(String... args) throws Exception {
     startBroker();
     startCamel();
     Thread.sleep(Long.MAX_VALUE);
   }

   private static void startBroker() throws Exception {
     BrokerService brokerService = new BrokerService();
     brokerService.setBrokerName(BROKER_NAME);
     brokerService.setSchedulerSupport(false);
     brokerService.setPersistent(false);
     brokerService.addConnector("tcp://0.0.0.0:7001");
     brokerService.start();
     brokerService.waitUntilStarted();
   }


   private static void startCamel() throws Exception {
     CamelContext context = new DefaultCamelContext();

     ActiveMQComponent activemqComponent = 
ActiveMQComponent.activeMQComponent();
     
activemqComponent.setBrokerURL(String.format("vm://%s?create=false", 
BROKER_NAME));
     context.addComponent("activemq", activemqComponent);

     final String receiveUrl = "activemq:queue:dest";
     context.addRoutes(new RouteBuilder() {
       @Override
       public void configure() throws Exception {
         from(receiveUrl).process(new Processor() {
           @Override
           public void process(Exchange exchange) throws Exception {
             System.err.println("received request");
             exchange.getOut().setBody("reply");
           }
         });
       }
     });

     context.start();
     System.err.println("listening on url: " + receiveUrl);
   }

}





Mime
View raw message