camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Willem Jiang <willem.ji...@gmail.com>
Subject Re: Parallel processing route
Date Fri, 14 Mar 2008 15:10:07 GMT
Yes ,the  multicast  do not as you want.
Here is the MulticastProcessor's process code.
The processors is the "to" endpoints' processor,  MulticastProcessor 
will call them one by one . We could not get any benefit by forking 
multi-thread  to parallel calling the "to" endpoints process.
public void process(Exchange exchange) throws Exception {
        Exchange result = null;
        for (Processor producer : processors) {
            Exchange copy = copyExchangeStrategy(producer, exchange);
            producer.process(copy);
            if (aggregationStrategy != null) {
                if (result == null) {
                    result = copy;
                } else {
                    result = aggregationStrategy.aggregate(result, 
copy);                   
                }
            }   
        }
        if (result != null) {
            ExchangeHelper.copyResults(exchange, result);
        }
    }

Maybe we could define a new ParallelizedMulticastProcessor  for calling 
the to endpoint in different thread, and a barrier class could help us 
to get the calling threads aggregated.

BTW, you can find ThreadProcessor for Thread syntax implementation.

Any thought?

Willem

aswin.nair wrote:
> willem.jiang wrote:
>   
>> For aswin's case , How about ?
>>
>> from("cxf:bean:soapMessageEndpoint").multicast(new
>> BodyOutAggregatingStrategy()).thread(3).to("direct:webservice1",
>> "direct:webservice2", "direct:webService3");
>> from("direct:webservice1").to("cxf://webservice1");
>> from("direct:webservice2").to("cxf://webservice2");
>> from("direct:webservice3").to("cxf://webservice3");
>>
>>     
>
> I tried the same route with slight modification, so that I can test it
> rapidly using different DSLs and I have the following example
>
>     // a simple processor that just prints something and waits for 5 seconds 
> 	Processor simpleProcessor = new Processor() {		
> 		public void process(Exchange exchange) throws Exception {
> 			System.out.println("Simple Processor processing stuff for 5 seconds.");
> 			Thread.sleep(1000 * 5);
> 		}		
> 	}; 
>
>     // routes from Willem's example slightly modified
>       	from("direct:start")
>        .multicast().thread(3)
>        .to("direct:webservice1","direct:webservice2", "direct:webService3");
>             	
>        from("direct:webservice1").process(simpleProcessor);
>        from("direct:webservice2").process(simpleProcessor);
>        from("direct:webservice3").process(simpleProcessor);
>
>     // invoke using camel template with a basic default exchange
>     template.send("direct:start", exchange);
>
> Output  
> Simple Processor processsing stuff for 5 seconds.
> [delay for 5 sec]
> Simple Processor processsing stuff for 5 seconds.
>
> So basically the routes "from"webservice1" and "from:webservice3" are
> running sequentially and in same thread. 
> The following route 
>     from("direct:start")
>     .multicast()
>     .thread(3).process(simpleProcessor).end()
>     .thread(3).process(simpleProcessor).end()
>     .thread(3).process(simpleProcessor).end();
>
> Output
> Thread[0] Simple Processor processsing stuff for 5 seconds. 
> [delay for 5 sec]
> Thread[1] Simple Processor processsing stuff for 5 seconds. 
> [delay for 5 sec]
> Thread[2] Simple Processor processsing stuff for 5 seconds. 
>
>
> I tried Chirno's suggestion also, but was unlucky.
>
> Digging the code revealed that splitter/mulitcastprocessor etc process
> message synchronously (as far as I understood it). Anyways if there is
> anything I should be doing to get this working, please let me know and I  am
> stuck on this part. 
>
>   


Mime
View raw message