camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Babak Vahdat <babak.vah...@swissonline.ch>
Subject Re: Splitter + aggregator + dynamic timeout
Date Wed, 15 Aug 2012 11:18:42 GMT
Hi Alex,

Regarding your "open question" by your mail below please find a slightly
modified version of your routing which now would pass. See also the "XXX"
comments of mine. Hope this helps.

Babak

public class SplitterWithAggregatorTest extends CamelTestSupport {

    @Test
    public void shouldProcessCorrectlyOnBothSources() throws Exception {
            MockEndpoint split = getMockEndpoint("mock:split");
            split.expectedBodiesReceivedInAnyOrder("1", "2", "3");
            
            MockEndpoint result = getMockEndpoint("mock:result2");
            result.expectedBodiesReceived("1+2+3");
            
            template.requestBody("direct:start", "A,B,C");
            assertMockEndpointsSatisfied();
    }
    
@Override
protected RouteBuilder createRouteBuilder() throws Exception {

    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            
            from("direct:start")
                    .log("start body: ${body}")
                    .to("direct:process")
                    .log("result body: ${body}")
                    .to("mock:result");
            
            from("direct:process")
                .split(body())
                // .parallelProcessing() XXX: no parallelProcessing as
otherwise no gurantee for the expected order (we want the data flow "1" then
"2" and at last "3")
                    .log("Split line ${body}")
                            .bean(new Responder())
                    .to("mock:split")
                .aggregate(header("myId"), new MyAggregationStrategy())
                    .completionSize(3)
                    .log("aggregated ${body}")
                    .log("completed by
${property.CamelAggregatedCompletedBy}")
                .log("test body: ${body}")
                .to("direct:result2");
            
            from("direct:result2") // XXX: a third route where we get the
final result of the whole process
           // XXX: any possible final processing goes here
           // ...
            .to("mock:result2");
        }
    };
}

    public class Responder {

            public String translate(Exchange ex, String key) {
                    ex.getIn().setHeader("myId", "correlation id 1");
                    if ("A".equals(key)) {
                            return "1";
                    } else if ("B".equals(key)) {
                            return "2";
                    } else {
                            return "3";
                    }
            }
    }

    public class MyAggregationStrategy implements AggregationStrategy {

            public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
                    if (oldExchange == null) {
                            return newExchange;
                    }
                    String body = newExchange.getIn().getBody(String.class);
                    String existing =
oldExchange.getIn().getBody(String.class);

                    oldExchange.getIn().setBody(existing + "+" + body);
                    return oldExchange;
            }

    }
}



Aleksander Pena wrote
> 
> Henryk,
> 
> thanks for another way of resolving my requirements it works perfectly
> well, but there are missing some important req: I need to setup timeout
> dynamically, so the flow could looks like following:
> 
> .when(body().isEqualTo("foo"))
>      .enrich("direct:emulateLegacyHttp").timeout(property(TIMEOUT))
> .otherwise()
>      .enrich("direct:emulateNonLegacyHttp").timeout(property(TIMEOUT))
> .end()
> 
> Unfortunately I cannot find timeout with argument of Expression type :(
> 
> It would be the best if I could specify timeout per each flow in route
> like
> in the below excerpt:
> 
> from("direct:start")
>     .timeout(property(TIMEOUT))
>     .process(new SomeProcessingHere())
>     .to("direct:anotherEndpoint");
> 
> But probably it is difficult to implement such behaviour in Camel.
> 
> Anyway there is still open question why my original example doesn't work.
> Is it really a bug in Camel as Babak suggesting?
> 
> Thanks for help,
> Alek
> 



--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp5717166p5717376.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message