camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Chandler <a...@riftware.com>
Subject Split, custom threadpools, aggregationstrategy
Date Thu, 25 Feb 2010 22:36:37 GMT
I'm hoping someone can help me out - I'm relatively new to camel,
however I've googled and tried reading the documentation.    What I'm
trying to do ultimately is  take a message, send it through a splitter
( a custom one in the end), and send it to processors in parallel and
then use a custom aggregate.   This needs to happen in parallel and
there will be multiple instances of the route in question running.


What I've done is setup a proof of concept proto-type.    What has me
stumped is I had planned on sending in a ThreadPoolExecutor so that I
could control thread core, max and timeout values.   In Camel 2.2 this
doesn't seem to be an option in spite of what the documentation says.
As a sample here is the beginings of my process (no aggregation done
yet)


           context.addRoutes(new RouteBuilder() {

                public void configure() {

//                    ThreadPoolExecutor threadPoolExecutor =
//                            new ThreadPoolExecutor(8, 150, 30L,
TimeUnit.SECONDS, new LinkedBlockingQueue());                    



from("activemq:queue:sping-in.queue").split().method(mySplitterBean,
"splitBody")
                              .parallelProcessing().threads(150).process(new Processor() {
                                     public void process(Exchange arg0)
throws Exception {
                                         System.out.println("Exchange :"
+ arg0.toString());
                                         PingerBean pBean = new
PingerBean();

pBean.ping((String)arg0.getIn().getBody());
                                    }
                                });
                
                }
            });


When you look at http://camel.apache.org/splitter.html    and search for
"Specifying a custom ThreadPoolExecutor" you will see a java snippet
that says you should be able to do this
from("activemq:my.queue").split(xPathBuilder, true,
threadPoolExecutor).to("activemq:my.parts");     However my code
completion in the ide only admits to these choices on .split:
split()  ExpressionClause <SplitDefinition>
split(Expression)
split(Expression, AggregationStrategy)

My working example uses .threads(150) but that doesn't let me control,
min, max, and timeout values on the threadpool.    Also its unclear to
me if the .threads(150) is a TOTAL of 150 threads for ALL instances of
this route that are running at the same time, or is it a pool for just
this route?   (In which case I don't need 150 - I need more like 5)    


Any help is appreciated.



Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message