camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slava <mail...@gmail.com>
Subject Nested aggregation with split
Date Wed, 06 Apr 2016 15:42:23 GMT
Hello,I need to implement a split/aggregate route same as in official
documentation example but with little bit more complex logic:

Form docs: http://camel.apache.org/composed-message-processor.html
from("direct:start")    .split(body().tokenize("@"), new MyOrderStrategy())       
.to("bean:MyOrderService?method=handleOrder")                // ONE MORE
AGGREGATOR WITH SPECIFIC CORRELATION ID.    .end()   
.to("bean:MyOrderService?method=buildCombinedResponse")
While processing the splitted messages I need to aggregate part of them to
one message and receive this message on aggregation strategy
(MyOrderStrategy here) along with other not aggregated messages.

*Is it possible to implement using Camel?*

If I put aggregator after bean:MyOrderService it successfully aggregate
selected by specific correlation id  messages to one message as required but
on  MyOrderStrategy aggregator I still receive output of
bean:MyOrderService:
    @Override    public void configure()    {        from("direct:list") //
Receives ->
{"search":"USER","date-type":{"end_date","from":49449600,"to":1456531200}}               
.process(filtersRequestProcessor)// Converts to  ->
{"search":"USER"}\n{"date-type":{"end_date",
"from":49449600,"to":1456531200}}               
.split(body().tokenize("\n"), new MyAgg())                   
.bean(FilterTypeMapper.class)                    .choice()                       
.when(header("filter.type").isEqualTo("single")) // {"search":"USER"}                    
      
.log("Go to single")                            .process(new
SingleFilterProcessor())                       
.when(header("filter.type").isEqualTo("daterange")) //
{"date-type":{"end_date", "from":49449600,"to":1456531200}}                           
.log("Go to daterange")                            .process(new
DateRangeFilterProcessor()) // Goes to aggregation MyAgg() !!!!                   
.end()                    .log("Sending to collections: ${body}")                   
.to("direct:collections")                .end()               
.to("direct:aggregated");        from("direct:collections")               
.log("Goes to DynamicRouter::routeCollection with header:
${headers.filter.collection}")                .choice()                   
.when(header("filter.collection").isEqualTo("range"))                       
.to("direct:range")                    .otherwise(); // Goes to aggregation
MyAgg()        from("direct:range")               
.aggregate(header("filter.collection"), filterCollectionAggregationStrategy)             
 
.completionSize(exchangeProperty("rangeCount")) // Aggregation completed
successfully: Setting bean invocation result on the OUT message:
org.springframework.data.mongodb.core.query.Criteria@e143c6a3               
.bean(MyService, "processMessage"); // Does NOT go to aggregation MyAgg()
!!!!        /**         *  Process aggregated query         */       
from("direct:aggregated").transform(body())                .bean(MyService,
"retrieveAll")                .to("direct:finish");    }




--
View this message in context: http://camel.465427.n5.nabble.com/Nested-aggregation-with-split-tp5780607.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message