camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shing Hing Man <mat...@yahoo.com>
Subject How to use a new tcp connection for each message in Mina ?
Date Mon, 16 Dec 2013 10:46:49 GMT
Hi,

 I am using Camel 2.11.1 Mina, to send tcp messages to a remote load balancer. The load balancer 
forwards the tcp messages to three workers in a  round robin fashion.  By default (please
correct me if I am wrong), a tcp connection created by Mina will be kept alive until timeout.
Consequently, the tcp messages from  Camel Mina  will always  be forwarded to the same
worker by the load balancer. 

Is it possible to configure Camel Mina to send a message in a new  connection ? If the above
is possible, the load balancer will distribute the messages to the three workers in a round
robin fashion. 

I have tried setting "disconnect=true" in the producer endpoint.  (Please see TcpEchoServer.java
and   TCPProducerAsyn.java below.)  But I get the following error when more than 3 messages
are sent.


[                          main] DefaultErrorHandler           
ERROR Failed delivery for (MessageId: ID-gauss-site-36756-1387189162872-0-5 on ExchangeId:
ID-gauss-site-36756-1387189162872-0-6). Exhausted after delivery attempt: 1 caught: org.apache.camel.CamelExchangeException:
Cannot write body. Exchange[Message: Hello3]
org.apache.camel.CamelExchangeException: Cannot write body. Exchange[Message: Hello3]
    at org.apache.camel.component.mina.MinaHelper.writeBody(MinaHelper.java:55)[camel-mina-2.11.1.jar:2.11.1]
    at org.apache.camel.component.mina.MinaProducer.doProcess(MinaProducer.java:127)[camel-mina-2.11.1.jar:2.11.1]
    at org.apache.camel.component.mina.MinaProducer.process(MinaProducer.java:77)[camel-mina-2.11.1.jar:2.11.1]
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.processor.UnitOfWorkProducer.process(UnitOfWorkProducer.java:63)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:366)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:337)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:233)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:337)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:175)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:111)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:124)[camel-core-2.11.1.jar:2.11.1]
    at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:131)[camel-core-2.11.1.jar:2.11.1]
    at net.sf.camel.tcp.TCPProducerAsyn.main(TCPProducerAsyn.java:40)[file:/home/shing/work/workspaces/wsa/testcamel/target/classes/:]
Exception in thread "main" org.apache.camel.CamelExecutionException: Exception occurred during
execution on the exchange: Exchange[Message: Hello3]
    at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1358)
    at org.apache.camel.util.ExchangeHelper.extractResultBody(ExchangeHelper.java:619)
    at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:454)
    at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:450)
    at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:126)
    at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:131)
    at net.sf.camel.tcp.TCPProducerAsyn.main(TCPProducerAsyn.java:40)
Caused by: org.apache.camel.CamelExchangeException: Cannot write body. Exchange[Message: Hello3]
    at org.apache.camel.component.mina.MinaHelper.writeBody(MinaHelper.java:55)
    at org.apache.camel.component.mina.MinaProducer.doProcess(MinaProducer.java:127)
    at org.apache.camel.component.mina.MinaProducer.process(MinaProducer.java:77)
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)
    at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:

----------------------

public class TCPEchoServer {
    public static void main(String[] args) throws Exception {
        // create CamelContext
        CamelContext context = new DefaultCamelContext();

        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from(
                        "mina://tcp://localhost:6200?textline=true&sync=true&decoderMaxLineLength=10000")
                        .to("stream:out");

            }

        });
        // start the route and let it do its work
        context.start();

        Thread.sleep(600000);

        // stop the CamelContext
        context.stop();

    }
}
-----------------

public class TCPProducerAsyn {

    public static void main(String[] args) throws Exception {
        // create CamelContext
        CamelContext context = new DefaultCamelContext();

        Component direct = new DirectComponent();

        context.addComponent("start", direct);
        
        final String  directEndPoint =  "direct:start";

        // add our route to the CamelContext
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                // Input from console. Replace "," by ";"
                from(directEndPoint)
                        .to("mina://tcp://localhost:6200?textline=true&sync=false&disconnect=true");

            }

        });
        // start the route and let it do its work
        context.start();
        ProducerTemplate template  =context.createProducerTemplate();
        
        // Send message asynchronously.
        
        for (int i=1; i< 10; ++i){
          template.sendBody(directEndPoint, "Hello" + i);
        }
        
        Thread.sleep(60000);

        // stop the CamelContext
        context.stop();

    }

}
-------------------------

Thanks in advance for your assistance !

Shing
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message