camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Aggregator response not being sent consistently
Date Wed, 18 Aug 2010 10:48:46 GMT
Hi

Could you try to create a simpler example? For example without the
webservice bits, concurrency and the likes. To see if you can
reproduce the issue?
That makes it much easier for us to use and help look into your issue.

Also can you log what completion Camel picked for that aggregation
that is not supposed to be? eg the header CamelAggregatedCompletedBy
contains this info. See more at
http://camel.apache.org/aggregator2.html



2010/8/17 Kumar, Vivek <Vivek.Kumar2@qwest.com>:
>
>
> PFA logs (trace level) for more clarity on the problem: This log has been generated from
the same code, for 2 different requests...
>
>
>
> INCORRECT RESPONSE:
>
> [17/08/10 10:17:23:023 MDT] ICL-RnR TRACE processor.DefaultErrorHandler: Is exchangeId:
8ee0190b-a07b-4ea9-88f7-2e1c58b309a3 done? true
>
> [17/08/10 10:17:23:023 MDT] ICL-RnR TRACE processor.MulticastProcessor: ProcessorExchangePair
#0 done: Exchange[Message: <LogicalDevice>
>
>  <commonName>0510422a-4603-4f03-aa49-0736531e840e12820618434590510422a-4603-4f03-aa49-0736531e840e1282061843490</commonName>
>
> </LogicalDevice>]            ***********THIS IS THE EXPECTED RESPONSE ***
>
> [17/08/10 10:17:23:023 MDT] ICL-RnR TRACE processor.MulticastProcessor: Parallel processing
complete for exchange: Exchange[Message: <LogicalDevice>
>
>  <commonName>0510422a-4603-4f03-aa49-0736531e840e12820618434590510422a-4603-4f03-aa49-0736531e840e1282061843490</commonName>
>
> </LogicalDevice>]
>
> [17/08/10 10:17:23:023 MDT] ICL-RnR TRACE converter.DefaultTypeConverter: Converting
null -> org.apache.camel.processor.aggregate.AggregationStrategy with value: null
>
>  [17/08/10 10:17:23:023 MDT] ICL-RnR DEBUG processor.MulticastProcessor: Done parallel
processing 2 exchanges                ************ MULTICAST OVER ***
>
> [17/08/10 10:17:23:023 MDT] ICL-RnR TRACE management.InstrumentationProcessor: multicast:
Recording duration: 469 millis for exchange: Exchange[Message: [<LogicalDevice>
>
>  <commonName>0510422a-4603-4f03-aa49-0736531e840e1282061843459</commonName>
>
> </LogicalDevice>, <LogicalDevice>
>
>  <commonName>0510422a-4603-4f03-aa49-0736531e840e1282061843490</commonName>
>
> </LogicalDevice>]] *********** WHY IS THIS CAPTURED BY THE  InstrumentationProcessor?
It should be the exchange printed above
>
>
>
>
>
> CORRECT RESPONE LOG:
>
> [17/08/10 10:21:25:025 MDT] ICL-RnR TRACE aggregate.AggregateProcessor: Processing aggregated
exchange: Exchange[Message: <LogicalDevice>
>
>  <commonName>049c8229-25ed-43da-a8d8-c9441ff3cc2b1282062085850049c8229-25ed-43da-a8d8-c9441ff3cc2b1282062085865</commonName>
>
> </LogicalDevice>] complete.      ***********THIS IS THE EXPECTED RESPONSE ***
>
> [17/08/10 10:21:25:025 MDT] ICL-RnR TRACE converter.DefaultTypeConverter: Converting
null -> org.apache.camel.processor.aggregate.AggregationStrategy with value: null
>
> [17/08/10 10:21:25:025 MDT] ICL-RnR DEBUG processor.MulticastProcessor: Done parallel
processing 2 exchanges                ************ MULTICAST OVER ***
>
> [17/08/10 10:21:25:025 MDT] ICL-RnR TRACE management.InstrumentationProcessor: multicast:
Recording duration: 263 millis for exchange: Exchange[Message: <LogicalDevice>
>
>  <commonName>049c8229-25ed-43da-a8d8-c9441ff3cc2b1282062085850049c8229-25ed-43da-a8d8-c9441ff3cc2b1282062085865</commonName>
>
> </LogicalDevice>]*********** WITH THE SAME CODE, I AM GETTING A CORRECT RESPONSE
NOW !!!
>
>
>
>
>
> Thanks
>
> Vivek
>
> Ethernet service fulfillment program
>
> Tel  : +1 303 707 7570
>
> Cell: +1 720 231 8031
>
> www.techmahindra.com
>
>
>
> -----Original Message-----
> From: Kumar, Vivek [mailto:Vivek.Kumar2@qwest.com]
> Sent: Tuesday, August 17, 2010 9:34 AM
> To: 'users@camel.apache.org'
> Subject: RE: Aggregator response not being sent consistently
>
>
>
> Yeah I did that, Changed it to a higher value still facing the same issue.
>
>
>
> I can see the aggregator getting executed. When I print the exchange that I am returning
from the aggregator I can see that it contains the aggregated message. But on the client end
the response is not correct.
>
>
>
> What makes it tricky is that for 8 out of 10 requests the client application makes, it
gets valid aggregated responses. It's only for 1-2 requests that it is failing. I tried sending
request in parallel and sequential manner still the results are same (so we can rule multithreading
issues for objects being overwritten etc etc)
>
>
>
> I think the problem is somewhere between the point "aggregator finished its job" -->to-->
"camel returns this exchange as a WS response". Unfortunately I don’t have much understanding
of how that is happening.
>
>
>
> Also is there some blog or documentation about how apache camel will pick up the aggregators
response (i.e. exchange object) and convert it to a SOAP response?
>
>
>
> Thanks
>
> Vivek
>
> -----Original Message-----
>
> From: Willem Jiang [mailto:willem.jiang@gmail.com]
>
> Sent: Tuesday, August 17, 2010 3:01 AM
>
> To: users@camel.apache.org
>
> Subject: Re: Aggregator response not being sent consistently
>
>
>
> Hi,
>
>
>
> You set the aggregation completionTimeout(5000L), maybe you remove it or
>
> set a longer time.
>
>
>
> Wille
>
>
>
> vivek k wrote:
>
>> I have mapped a web service as the start endpoint and am multicasting the
>
>> request to multiple sources, aggregating the response and sending the
>
>> consolidated response back via the WS.
>
>>
>
>> The WS is a sync web service.
>
>>
>
>> The problem is the WS is not always sending the response that is returned by
>
>> the aggregator. It sometimes returns the response as the final exchange that
>
>> was sent to the aggregator.
>
>>
>
>> The route is defined as:
>
>>
>
>> from("cxf:bean:icDataService?dataFormat=POJO")
>
>> .process((SearchResourceRequestProcessor)
>
>> applicationContext.getBean("searchResourceRequestProcessor"))
>
>>                                 .multicast()
>
>>                                         .parallelProcessing()
>
>>
>
>> .executorServiceRef("customRequestThreadPoolExecutor")
>
>>
>
>> .to("direct:iclSystemEndpoint","direct:lisSystemEndpoint");
>
>>
>
>>                 from("direct:iclSystemEndpoint")
>
>>                         .to(propertyMapper.getICL_WS_ENDPOINT())
>
>>                         .to("direct:aggregation-strategy-icl-lis");
>
>>
>
>>                 from("direct:lisSystemEndpoint")
>
>>                         .to(propertyMapper.getLIS_WS_ENDPOINT())
>
>>                         .to("direct:aggregation-strategy-icl-lis");
>
>>
>
>>                 from("direct:aggregation-strategy-icl-lis")
>
>>                         .aggregate(header("CORELATION_ID"),
>
>> (SearchResourceAggregationStrategy)
>
>> applicationContext.getBean("aggregatorStrategy"))
>
>> .completionSize(2)
>
>>
>
>> .executorServiceRef("customRequestThreadPoolExecutor")
>
>>                                 .completionTimeout(5000L)
>
>>                                 .process(new Processor() {
>
>>                                         public void process(Exchange
>
>> exchange) throws Exception {
>
>>                                                 LOG.info("FINAL
RETURN ~~ NO
>
>> ROUTES LEFT -> " + exchange);
>
>>                                         }
>
>>                                 });
>
>>
>
>>
>
>> When I run this with 10-12 WS Requests at one time, I get 8-9 aggregated
>
>> responses and 1-2 responses where the result before aggregation was returned
>
>> (that is the list of multiple responses appended to exchange)
>
>>
>
>> For every request I can see the last process logging “Final Retun…”
>
>> successfully with correct response, somehow the WS Response is the one that
>
>> is not getting populated correctly.
>
>>
>
>> The Aggregator code:
>
>>
>
>> public class SearchResourceAggregationStrategy implements
>
>> AggregationStrategy {
>
>>
>
>>         private static final Log LOG =
>
>> LogFactory.getLog(SearchResourceAggregationStrategy.class);
>
>>         private final int totalResponseExpected = 2;
>
>>         protected JoinICLAndLISResponse joinICLAndLISResponse;
>
>>
>
>>     public Exchange aggregate(Exchange oldExchange, Exchange newExchange){
>
>>
>
>>     List<SearchResourceResponse> responses;
>
>>
>
>>         if (oldExchange == null)
>
>>         responses = new ArrayList<SearchResourceResponse>();
>
>>         else
>
>>         responses = (List<SearchResourceResponse>)
>
>> oldExchange.getIn().getBody();
>
>>
>
>>         LOG.debug("New exchange received " + newExchange);
>
>>
>
>>         responses.add((SearchResourceResponse)
>
>> newExchange.getIn().getBody(Message.class).getBody());
>
>>         newExchange.getIn().setBody(responses);
>
>>
>
>>         if(responses.size()==totalResponseExpected){
>
>>         newExchange = joinICLAndLISResponse.process(newExchange); // will
>
>> joing the responses in one response - This is working fine.
>
>>         }
>
>>
>
>>         return newExchange;
>
>>     }
>
>>
>
>>         public void setJoinICLAndLISResponse(JoinICLAndLISResponse
>
>> joinICLAndLISResponse) {
>
>>                 this.joinICLAndLISResponse = joinICLAndLISResponse;
>
>>         }
>
>>
>
>> }
>
>>
>
>> Logged response from the JUnit test:
>
>>
>
>> [16/08/10 10:42:55:055 MDT] ICL-RnR  INFO
>
>> init.TestSearchForDeviceParallelMultipleRequests: RESPONSE ~~~~~> Message:
>
>> <LogicalDevice>
>
>>
>
>> <commonName>552987dc-19d1-4a49-94b3-1da3ed207b991281976975442552987dc-19d1-4a49-94b3-1da3ed207b991281976975442</commonName>
>
>> </LogicalDevice> THIS WORKD FINE - RETURNED AGGREGATED RESPONSE
>
>>
>
>> [16/08/10 10:42:55:055 MDT] ICL-RnR  INFO
>
>> init.TestSearchForDeviceParallelMultipleRequests: RESPONSE ~~~~~> Message:
>
>> <LogicalDevice>
>
>>
>
>> <commonName>b8b17a83-eeef-42b2-a01a-c8523f69db551281976975458b8b17a83-eeef-42b2-a01a-c8523f69db551281976975474</commonName>
>
>> </LogicalDevice> THIS WORKD FINE - RETURNED AGGREGATED RESPONSE
>
>>
>
>> [16/08/10 10:42:55:055 MDT] ICL-RnR  INFO
>
>> init.TestSearchForDeviceParallelMultipleRequests: RESPONSE ~~~~~> Message:
>
>> [<LogicalDevice>
>
>>   <commonName>7f70aa4d-8c08-4385-ad22-5f3b6e577b051281976975505</commonName>
>
>> </LogicalDevice>, <LogicalDevice>
>
>>   <commonName>7f70aa4d-8c08-4385-ad22-5f3b6e577b051281976975505</commonName>
>
>> </LogicalDevice>] THIS DIDNT WORK - THE RESPONSE HERE IS THE INPUT TO
>
>> AGGREGATOR THEN THE AGGREGATOR AGGREGATED IT TO ONE RESPONSE (I CAN SEE IT
>
>> IN THE LOGS) STILL THIS WAS RETURNED !!!
>
>>
>
>> Any idea why am i seeing this unpredicted behavior ?
>
>> Thanks.
>
>> Vivek
>
>>
>
>
>
>
>
> This communication is the property of Qwest and may contain confidential or
>
> privileged information. Unauthorized use of this communication is strictly
>
> prohibited and may be unlawful.  If you have received this communication
>
> in error, please immediately notify the sender by reply e-mail and destroy
>
> all copies of the communication and any attachments.
>
> ________________________________
> This communication is the property of Qwest and may contain confidential or
> privileged information. Unauthorized use of this communication is strictly
> prohibited and may be unlawful. If you have received this communication
> in error, please immediately notify the sender by reply e-mail and destroy
> all copies of the communication and any attachments.
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Mime
View raw message