camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Babak Vahdat <babak.vah...@swissonline.ch>
Subject Re: WireTap doesn't process the messages in the order they arrived for each message
Date Fri, 03 May 2013 15:16:18 GMT
Just for the record, Claus is ABSOLUTELY right with his solution as I simply
overlooked the typo (threadPoolProfile instead of threadPool tag) though the
proposed "solution" of mine would also do the trick :-)

Babak


Christian Mueller wrote
> Thank you guys for the response!
> 
> Best,
> Christian
> 
> 
> On Fri, May 3, 2013 at 3:38 PM, Babak Vahdat &lt;

> babak.vahdat@

> &gt;wrote:
> 
>> Yeah that was the cause of the behaviour threadPoolProfile instead of
>> threadProfile, sorry!
>>
>> Babak
>>
>> Am 03.05.2013 um 15:23 schrieb Claus Ibsen &lt;

> claus.ibsen@

> &gt;:
>>
>> > Babak well spotted. Though its not a bug.
>> >
>> > 
> <threadPoolProfile>
>  is a profile, (aka like a skeleton/template).
>> > Which you use to define options, when creating new thread pools.
>> >
>> > Use
>> > 
> <threadPool>
>> >
>> > if you want a single thread pool and share it between the 2 wire taps,
>> > then yeah Christian should use a 
> <threadPool>
>  instead.
>> >
>> > Mind that you can still define a 
> <threadPoolProfile>
>  and then a
>> > 
> <threadPool>
>  which can use the profile to define its basic options.
>> >
>> > See more details at
>> > http://camel.apache.org/threading-model.html
>> >
>> >
>> >
>> > On Fri, May 3, 2013 at 2:52 PM, Babak Vahdat
>> > &lt;

> babak.vahdat@

> &gt; wrote:
>> >> Hi,
>> >>
>> >> The reason of the behavior you're observing is simply because of a
>> current
>> >> bug in Camel itself as given your 2 routes below, the current logic
>> creates
>> >> TWO instances of ExecutorService and not as you would expect ONE
>> >> ExecutorService object (each being passed to each of the 2
>> WireTapProcessor
>> >> in your route). Looking at
>> >> ProcessorDefinitionHelper#lookupExecutorServiceRef() the current logic
>> looks
>> >> up ONLY inside the registry causing 2 ExecutorService being returned
>> by
>> each
>> >> invocation of this method.
>> >>
>> >> One simple current workaround to get that lovely green lines inside
>> your IDE
>> >> for your test would be to bind "executorService" into the Camel
>> Registry, in
>> >> your case ApplicationContextRegistry, that's:
>> >>
>> >>  
> <bean id="executorService" class="java.util.concurrent.Executors"
>>
>  >> factory-method="newSingleThreadExecutor" destroy-method="shutdownNow"
> />
>> >>
>> >> Would you mind to raise a ticket for this?
>> >>
>> >> Babak
>> >>
>> >>
>> >> Christian Mueller wrote
>> >>> The following test fails randomly fails at message 6 up to 961 (on my
>> >>> machine). I consider this as an bug:
>> >>>
>> >>> public class WireTapTest extends CamelSpringTestSupport {
>> >>>
>> >>>    private int counter = 10000;
>> >>>
>> >>>    @Test
>> >>>    public void test() throws InterruptedException {
>> >>>        getMockEndpoint("mock:result").expectedMessageCount(counter *
>> 2);
>> >>>
>> >>>        template.setDefaultEndpointUri("direct:start");
>> >>>        for (int i = 0; i < counter; i++) {
>> >>>            template.requestBody("Camel");
>> >>>        }
>> >>>
>> >>>        assertMockEndpointsSatisfied();
>> >>>
>> >>>        List
>> >>> 
> <Exchange>
>> >>> receivedExchanges =
>> >>> getMockEndpoint("mock:result").getReceivedExchanges();
>> >>>        for (int i = 0; i < receivedExchanges.size(); i++) {
>> >>>            System.out.println("check exchange number " + i);
>> >>>
>> >>>            String body =
>> >>> receivedExchanges.get(i).getIn().getBody(String.class);
>> >>>            if (i % 2 == 0) {
>> >>>                assertEquals("REQUEST", body);
>> >>>            } else {
>> >>>                assertEquals("RESPONSE", body);
>> >>>            }
>> >>>        }
>> >>>    }
>> >>>
>> >>>    @Override
>> >>>    protected AbstractApplicationContext createApplicationContext() {
>> >>>        return new
>> ClassPathXmlApplicationContext("bundle-context.xml");
>> >>>    }
>> >>> }
>> >>>
>> >>> which is using the following route:
>> >>> public class WireTapRouteBuilder extends RouteBuilder {
>> >>>
>> >>>    @Override
>> >>>    public void configure() throws Exception {
>> >>>        from("direct:start")
>> >>>
>>  .wireTap("seda:wireTap").executorServiceRef("executorService")
>> >>>            .setHeader("TYPE", constant("RESPONSE"))
>> >>>
>> >>> .wireTap("seda:wireTap").executorServiceRef("executorService");
>> >>>
>> >>>        from("seda:wireTap")
>> >>>            .choice()
>> >>>                .when(header("TYPE").isNull())
>> >>>                    .setBody(constant("REQUEST"))
>> >>>                    .to("mock:result")
>> >>>                .otherwise()
>> >>>                    .setBody(constant("RESPONSE"))
>> >>>                    .to("mock:result")
>> >>>            .end();
>> >>>    }
>> >>> }
>> >>>
>> >>> and the following configuration:
>> >>> 
> <beans xmlns="http://www.springframework.org/schema/beans"
>>
>  >>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> >>>    xmlns:camel="http://camel.apache.org/schema/spring"
>> >>>    xsi:schemaLocation="
>> >>>        http://www.springframework.org/schema/beans
>> >>> http://www.springframework.org/schema/beans/spring-beans.xsd
>> >>>        http://camel.apache.org/schema/spring
>> >>> http://camel.apache.org/schema/spring/camel-spring.xsd">
>> >>>
>> >>> 
> <camel:camelContext id="com.xxx.xxxx.wireTapTest">
>> >>>
>> >>> 
> <camel:routeBuilder ref="wireTapRouteBuilder" />
>> >>>
>> >>> 
> <camel:threadPoolProfile id="executorService" defaultProfile="false"
>>
>  >>> poolSize="1" maxPoolSize="1" maxQueueSize="10000"
>> >>> rejectedPolicy="Discard"/>
>> >>>
>> >>> 
> </camel:camelContext>
>> >>>
>> >>> 
> <bean id="wireTapRouteBuilder"
>>
>  >>> class="com.xxx.xxxx.wiretap.WireTapRouteBuilder" />
>> >>> 
> </beans>
>> >>> We use the wiretap to send messages to an endpoint which persist some
>> key
>> >>> information about the execution in a database. It's important for us
>> to
>> >>> persist the messages into the database in the same order as they
>> arrive in
>> >>> the seda endpoint. Because of this, we configured the
>> threadPoolProfile to
>> >>> only use one thread. But some times the messages receive in the
>> reverse
>> >>> order in our database/mock endpoint.
>> >>>
>> >>> Any suggestions what is wrong here?
>> >>>
>> >>> Best,
>> >>> Christian
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> http://camel.465427.n5.nabble.com/WireTap-doesn-t-process-the-messages-in-the-order-they-arrived-for-each-message-tp5731733p5731969.html
>> >> Sent from the Camel - Users mailing list archive at Nabble.com.
>> >
>> >
>> >
>> > --
>> > Claus Ibsen
>> > -----------------
>> > Red Hat, Inc.
>> > FuseSource is now part of Red Hat
>> > Email: 

> cibsen@

>> > Web: http://fusesource.com
>> > Twitter: davsclaus
>> > Blog: http://davsclaus.com
>> > Author of Camel in Action: http://www.manning.com/ibsen
>>





--
View this message in context: http://camel.465427.n5.nabble.com/WireTap-doesn-t-process-the-messages-in-the-order-they-arrived-for-each-message-tp5731733p5731986.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message