camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiram Chirino" <hi...@hiramchirino.com>
Subject Re: Aggregator strategies (again)
Date Mon, 05 Nov 2007 14:49:17 GMT
On 11/5/07, James Strachan <james.strachan@gmail.com> wrote:
> BTW Alberto I've added your test case to the distro so folks can
> noodle on it and see if we can improve things some. See
> AlbertoAggregatorTest in the camel-core module.
>
> I commented out the use of the set batch size; as we don't really need
> to worry about that in this case, as we can just assume if the timeout
> fires after a few seconds we've got to the end of any possible batch.
>
> The tricky thing is knowing when you're at the end of the batch
> really. One simple solution would be to add some kinda predicate to
> detect batch-completion. For example  when we split messages we could
> record how many split messages there are and each messages' counter so
> that we know when we've aggregated them all together again?

I wonder if the simplest way to implement the described split and
aggregate pattern could be to just combine the splitter and aggregator
into 1 delegate processor.  For example it would take it's input,
split it and forward each part to the delegate for processing.  Then
it would wait for the result of processing and aggregate those
results.  The delegate processor would know how many things it split
up so it would know how many things to aggregate.

Regards,
Hiram


>
>
> On 05/11/2007, almilo <alberto.mijares@fundacionctic.org> wrote:
> >
> > Hi all:
> >
> > First, Camel is a very interesting project. Congrats to the team!!
> >
> > Now the question...
> >
> > I´ve seen a post about better support for Aggregator pattern but with no
> > answer. The testcases seem to be very simple and I think this is a really
> > relevant pattern for distributed processing. But, being based in "a priori"
> > batch size and timeout it lacks value for most of the uses I can think of.
> >
> > Any plans for an Aggregator face-lift? :)
> >
> > I attach an imaginative testcase :O) with some comments on FIXMEs
> >
> > Lot of thanks, Alberto Mijares
> >
> > // ------------ START ---------------//
> > package org.fundacionctic.taw;
> >
> > import java.util.ArrayList;
> > import java.util.HashMap;
> > import java.util.List;
> > import java.util.Map;
> >
> > import org.apache.camel.CamelTemplate;
> > import org.apache.camel.ContextTestSupport;
> > import org.apache.camel.Exchange;
> > import org.apache.camel.Message;
> > import org.apache.camel.Processor;
> > import org.apache.camel.builder.RouteBuilder;
> > import org.apache.camel.component.mock.MockEndpoint;
> > import org.apache.camel.model.AggregatorType;
> > import org.apache.camel.processor.aggregate.AggregationStrategy;
> > import org.apache.commons.logging.Log;
> > import org.apache.commons.logging.LogFactory;
> >
> > public class AggregatorTest extends ContextTestSupport {
> >
> >         private Log log = LogFactory.getLog(this.getClass());
> >
> >         private static final String SURNAME_HEADER = "surname";
> >
> >         private static final String TYPE_HEADER = "type";
> >
> >         private static final String BROTHERS_TYPE = "brothers";
> >
> >         public void testAggregator() throws Exception {
> >
> >                 String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan
> > Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";
> >
> >                 List<String> marxBrothers = new ArrayList<String>();
> >                 marxBrothers.add("Harpo");
> >                 marxBrothers.add("Chico");
> >                 marxBrothers.add("Groucho");
> >
> >                 List<String> karamazovBrothers = new ArrayList<String>();
> >                 karamazovBrothers.add("Fiodor");
> >                 karamazovBrothers.add("Ivan");
> >                 karamazovBrothers.add("Alexei");
> >                 karamazovBrothers.add("Dimitri");
> >
> >                 Map<String, List> allBrothers = new HashMap<String, List>();
> >                 allBrothers.put("Marx", marxBrothers);
> >                 allBrothers.put("Karamazov", karamazovBrothers);
> >
> >                 MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
> >                                 MockEndpoint.class);
> >                 resultEndpoint.expectedMessageCount(1);
> >                 resultEndpoint.expectedBodiesReceived(allBrothers);
> >
> >                 CamelTemplate template = new CamelTemplate(context);
> >                 template.sendBody("direct:start", allNames);
> >
> >                 resultEndpoint.assertIsSatisfied();
> >
> >         }
> >
> >         @Override
> >         protected RouteBuilder createRouteBuilder() throws Exception {
> >
> >                 return new RouteBuilder() {
> >
> >                         private void debugIn(String stringId, Exchange oldExchange,
> >                                         Exchange newExchange) {
> >
> >                                 log.debug(stringId + " old headers in: "
> >                                                 + oldExchange.getIn().getHeaders());
> >                                 log.debug(stringId + " old body in: "
> >                                                 + oldExchange.getIn().getBody());
> >                                 log.debug(stringId + " new headers in: "
> >                                                 + newExchange.getIn().getHeaders());
> >                                 log.debug(stringId + " new body in: "
> >                                                 + newExchange.getIn().getBody());
> >
> >                         }
> >
> >                         private void debugOut(String stringId, Exchange exchange)
{
> >
> >                                 log.debug(stringId + " old headers out: "
> >                                                 + exchange.getIn().getHeaders());
> >                                 log.debug(stringId + " old body out: "
> >                                                 + exchange.getIn().getBody());
> >
> >                         }
> >
> >                         AggregationStrategy surnameAggregator = new AggregationStrategy()
{
> >
> >                                 public Exchange aggregate(Exchange oldExchange,
> >                                                 Exchange newExchange) {
> >
> >                                         debugIn("Surname Aggregator", oldExchange,
newExchange);
> >
> >                                         Message oldIn = oldExchange.getIn();
> >                                         Message newIn = newExchange.getIn();
> >
> >                                         List<String> brothers = null;
> >                                         if (oldIn.getBody() instanceof List) {
> >
> >                                                 brothers = oldIn.getBody(List.class);
> >                                                 brothers.add(newIn.getBody(String.class));
> >
> >                                         } else {
> >
> >                                                 brothers = new ArrayList<String>();
> >                                                 brothers.add(oldIn.getBody(String.class));
> >                                                 brothers.add(newIn.getBody(String.class));
> >                                                 oldExchange.getIn().setBody(brothers);
> >
> >                                         } // else
> >
> >                                         debugOut("Surname Aggregator", oldExchange);
> >
> >                                         return oldExchange;
> >
> >                                 }
> >
> >                         };
> >
> >                         AggregationStrategy brothersAggregator = new AggregationStrategy()
{
> >
> >                                 public Exchange aggregate(Exchange oldExchange,
> >                                                 Exchange newExchange) {
> >
> >                                         debugIn("Brothers Aggregator", oldExchange,
newExchange);
> >
> >                                         Message oldIn = oldExchange.getIn();
> >                                         Message newIn = newExchange.getIn();
> >
> >                                         Map<String, List> brothers = null;
> >                                         if (oldIn.getBody() instanceof Map) {
> >
> >                                                 brothers = oldIn.getBody(Map.class);
> >                                                 brothers.put(newIn.getHeader(SURNAME_HEADER,
> >                                                                 String.class), newIn.getBody(List.class));
> >
> >                                         } else {
> >
> >                                                 brothers = new HashMap<String,
List>();
> >                                                 brothers.put(oldIn.getHeader(SURNAME_HEADER,
String.class),
> >                                                                 oldIn.getBody(List.class));
> >                                                 brothers.put(newIn.getHeader(SURNAME_HEADER,
> >                                                                 String.class), newIn.getBody(List.class));
> >                                                 oldExchange.getIn().setBody(brothers);
> >
> >                                         } // else
> >
> >                                         debugOut("Brothers Aggregator", oldExchange);
> >
> >                                         return oldExchange;
> >
> >                                 }
> >
> >                         };
> >
> >                         @Override
> >                         public void configure() throws Exception {
> >
> >                                 from("direct:start")
> >                                                 // Separate people
> >                                                 .splitter(bodyAs(String.class).tokenize(",")).process(
> >
> >                                                 // Split the name, erase the surname
and put it in a
> >                                                 // header
> >                                                                 new Processor()
{
> >
> >                                                                         public void
process(Exchange exchange)
> >                                                                                
        throws Exception {
> >
> >                                                                                
String[] parts = exchange.getIn()
> >                                                                                
                .getBody(String.class).split(
> >                                                                                
                                " ");
> >                                                                                
exchange.getIn().setBody(parts[0]);
> >                                                                                
exchange.getIn().setHeader(
> >                                                                                
                SURNAME_HEADER, parts[1]);
> >
> >                                                                         } // process
> >
> >                                                                 }) // Processor
> >
> >                                                 .to("direct:joinSurnames");
> >
> >                                 // FIXME: This aggregator doesn´t usually fail
but could also due to
> > timeout
> >                                 // or an incorrect batch size
> >                                 // Join in a list by the surname on the header and
mark as
> >                                 // brothers list
> >                                 from("direct:joinSurnames")
> >                                 .aggregator(header(SURNAME_HEADER),
> >                                                 surnameAggregator).setHeader(TYPE_HEADER,
> >                                                 constant(BROTHERS_TYPE)).to("direct:joinBrothers");
> >
> >                                 // Join all brothers lists and remove surname and
type headers
> >                                 AggregatorType agg =
> > from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
> >                                                 brothersAggregator);
> >
> >                                 // FIXME: If these lines get commented the test
fails some times with
> > different errors
> >                                 // due to a timeout or incorrect batch size that
must be adjusted by
> > hand
> >                                 // There are two brothers lists to join but we don´t
know always the
> > number "a priori"
> >                                 agg.setBatchSize(2);
> >                                 agg.setBatchTimeout(10000);
> >                                 agg.removeHeader(SURNAME_HEADER)
> >                                 .removeHeader(TYPE_HEADER)
> >                                 .to("mock:result");
> >
> >                         }
> >
> >                 };
> >
> >         }
> >
> > }
> > // ------------ END ---------------//
> >
> >
> > --
> > View this message in context: http://www.nabble.com/Aggregator-strategies-%28again%29-tf4750834s22882.html#a13584751
> > Sent from the Camel - Users mailing list archive at Nabble.com.
> >
> >
>
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>
> Open Source SOA
> http://open.iona.com
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Mime
View raw message