camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Simpor <simon.rydb...@transrail.se>
Subject Sending a message between two ActiveMQ using mina
Date Fri, 29 May 2009 13:35:09 GMT

Ok so I want to route a message from a ActiveMQ queue through mina:tcp and
into an other activeMQ queue.

Using this code on one side (A):
        RouteBuilder builder = new RouteBuilder() {

            public void configure() {
               
from("mina:tcp://localhost:5500?textline=true").to("activemq:queue:q4");
               
from("activemq:queue:q1").to("mina:tcp://localhost:5501?textline=true");
            }
        };
        CamelContext myCamelContext = new DefaultCamelContext();

        ActiveMQComponent activeMQComponent = new ActiveMQComponent();
       
activeMQComponent.setBrokerURL("tcp://localhost:61616?jms.useAsyncSend=true");
        myCamelContext.addComponent("activemq", activeMQComponent);

        myCamelContext.addRoutes(builder);
        myCamelContext.start();

And the other side (B)
        RouteBuilder builder = new RouteBuilder() {

            public void configure() {
               
from("mina:tcp://localhost:5501?textline=true").to("activemq:queue:q2");
               
from("activemq:queue:q3").to("mina:tcp://localhost:5500?textline=true");
            }
        };
        CamelContext myCamelContext = new DefaultCamelContext();

        ActiveMQComponent activeMQComponent = new ActiveMQComponent();
       
activeMQComponent.setBrokerURL("tcp://localhost:61616?jms.useAsyncSend=true");
        myCamelContext.addComponent("activemq", activeMQComponent);

        myCamelContext.addRoutes(builder);
        myCamelContext.start();

Using this code I get the following error on side (B)
org.apache.camel.ExchangeTimedOutException: The OUT message was not received
within: 20000 millis on the exchange:
	at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:233)
	at
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(AsyncProcessorTypeConverter.java:43)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:83)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)
	at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:195)
	at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:130)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)
	at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.proceed(StreamCachingInterceptor.java:88)
	at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:83)
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:52)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:41)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:65)
	at
org.apache.camel.component.mina.MinaConsumer$ReceiveHandler.messageReceived(MinaConsumer.java:110)
	at
org.apache.mina.common.support.AbstractIoFilterChain$TailFilter.messageReceived(AbstractIoFilterChain.java:570)
	at
org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:299)
	at
org.apache.mina.common.support.AbstractIoFilterChain.access$1100(AbstractIoFilterChain.java:53)
	at
org.apache.mina.common.support.AbstractIoFilterChain$EntryImpl$1.messageReceived(AbstractIoFilterChain.java:648)
	at
org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput.flush(SimpleProtocolDecoderOutput.java:58)
	at
org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:180)
	at
org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:299)
	at
org.apache.mina.common.support.AbstractIoFilterChain.access$1100(AbstractIoFilterChain.java:53)
	at
org.apache.mina.common.support.AbstractIoFilterChain$EntryImpl$1.messageReceived(AbstractIoFilterChain.java:648)
	at
org.apache.mina.filter.executor.ExecutorFilter.processEvent(ExecutorFilter.java:220)
	at
org.apache.mina.filter.executor.ExecutorFilter$ProcessEventsRunnable.run(ExecutorFilter.java:264)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at
org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:51)
	at java.lang.Thread.run(Thread.java:619)

And on side (A)
org.apache.camel.CamelExchangeException: Response Handler had an exception
on the exchange:
	at
org.apache.camel.component.mina.MinaProducer.process(MinaProducer.java:112)
	at
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(AsyncProcessorTypeConverter.java:43)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:83)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)
	at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:195)
	at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:130)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)
	at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.proceed(StreamCachingInterceptor.java:88)
	at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:83)
	at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:52)
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:41)
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:65)
	at
org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:72)
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:531)
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:466)
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:435)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:322)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:260)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:944)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:875)
	at java.lang.Thread.run(Thread.java:619)



Ok and this is because that mina expects a reply.. so I have tried using a
process:
from("mina:tcp://localhost:5501?textline=true").process(new Processor() {
                    public void process(Exchange e) {
                        System.out.println("Camel was here: " +
e.getIn().getBody());
                        e.getOut().setBody("tcp reply");
                    }

Which takes care of the problems.. But My routed message is stuck in the
process... adding 
from("mina:tcp://localhost:5501?textline=true").process(new Processor() {
                    public void process(Exchange e) {
                        System.out.println("Camel was here: " +
e.getIn().getBody());
                        e.getOut().setBody("tcp reply");
                    }
}).to("activemq:queue:q2");

after the process gives an error...

What should I do?



-- 
View this message in context: http://www.nabble.com/Sending-a-message-between-two-ActiveMQ-using-mina-tp23780161p23780161.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Mime
View raw message