camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slava <>
Subject Nested aggregation with split
Date Wed, 06 Apr 2016 15:42:23 GMT
Hello,I need to implement a split/aggregate route same as in official
documentation example but with little bit more complex logic:

Form docs:
from("direct:start")    .split(body().tokenize("@"), new MyOrderStrategy())       
.to("bean:MyOrderService?method=handleOrder")                // ONE MORE
While processing the splitted messages I need to aggregate part of them to
one message and receive this message on aggregation strategy
(MyOrderStrategy here) along with other not aggregated messages.

*Is it possible to implement using Camel?*

If I put aggregator after bean:MyOrderService it successfully aggregate
selected by specific correlation id  messages to one message as required but
on  MyOrderStrategy aggregator I still receive output of
    @Override    public void configure()    {        from("direct:list") //
Receives ->
.process(filtersRequestProcessor)// Converts to  ->
.split(body().tokenize("\n"), new MyAgg())                   
.bean(FilterTypeMapper.class)                    .choice()                       
.when(header("filter.type").isEqualTo("single")) // {"search":"USER"}                    
.log("Go to single")                            .process(new
.when(header("filter.type").isEqualTo("daterange")) //
{"date-type":{"end_date", "from":49449600,"to":1456531200}}                           
.log("Go to daterange")                            .process(new
DateRangeFilterProcessor()) // Goes to aggregation MyAgg() !!!!                   
.end()                    .log("Sending to collections: ${body}")                   
.to("direct:collections")                .end()               
.to("direct:aggregated");        from("direct:collections")               
.log("Goes to DynamicRouter::routeCollection with header:
${headers.filter.collection}")                .choice()                   
.to("direct:range")                    .otherwise(); // Goes to aggregation
MyAgg()        from("direct:range")               
.aggregate(header("filter.collection"), filterCollectionAggregationStrategy)             
.completionSize(exchangeProperty("rangeCount")) // Aggregation completed
successfully: Setting bean invocation result on the OUT message:               
.bean(MyService, "processMessage"); // Does NOT go to aggregation MyAgg()
!!!!        /**         *  Process aggregated query         */       
from("direct:aggregated").transform(body())                .bean(MyService,
"retrieveAll")                .to("direct:finish");    }

View this message in context:
Sent from the Camel - Users mailing list archive at
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message