camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From almilo <alberto.mija...@fundacionctic.org>
Subject Aggregator strategies (again)
Date Mon, 05 Nov 2007 10:59:10 GMT

Hi all:

First, Camel is a very interesting project. Congrats to the team!!

Now the question...

I´ve seen a post about better support for Aggregator pattern but with no
answer. The testcases seem to be very simple and I think this is a really
relevant pattern for distributed processing. But, being based in "a priori"
batch size and timeout it lacks value for most of the uses I can think of.

Any plans for an Aggregator face-lift? :)

I attach an imaginative testcase :O) with some comments on FIXMEs

Lot of thanks, Alberto Mijares

// ------------ START ---------------//
package org.fundacionctic.taw;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.camel.CamelTemplate;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.AggregatorType;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class AggregatorTest extends ContextTestSupport {

	private Log log = LogFactory.getLog(this.getClass());

	private static final String SURNAME_HEADER = "surname";

	private static final String TYPE_HEADER = "type";

	private static final String BROTHERS_TYPE = "brothers";

	public void testAggregator() throws Exception {

		String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan
Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";

		List<String> marxBrothers = new ArrayList<String>();
		marxBrothers.add("Harpo");
		marxBrothers.add("Chico");
		marxBrothers.add("Groucho");

		List<String> karamazovBrothers = new ArrayList<String>();
		karamazovBrothers.add("Fiodor");
		karamazovBrothers.add("Ivan");
		karamazovBrothers.add("Alexei");
		karamazovBrothers.add("Dimitri");

		Map<String, List> allBrothers = new HashMap<String, List>();
		allBrothers.put("Marx", marxBrothers);
		allBrothers.put("Karamazov", karamazovBrothers);

		MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
				MockEndpoint.class);
		resultEndpoint.expectedMessageCount(1);
		resultEndpoint.expectedBodiesReceived(allBrothers);

		CamelTemplate template = new CamelTemplate(context);
		template.sendBody("direct:start", allNames);

		resultEndpoint.assertIsSatisfied();

	}

	@Override
	protected RouteBuilder createRouteBuilder() throws Exception {

		return new RouteBuilder() {

			private void debugIn(String stringId, Exchange oldExchange,
					Exchange newExchange) {

				log.debug(stringId + " old headers in: "
						+ oldExchange.getIn().getHeaders());
				log.debug(stringId + " old body in: "
						+ oldExchange.getIn().getBody());
				log.debug(stringId + " new headers in: "
						+ newExchange.getIn().getHeaders());
				log.debug(stringId + " new body in: "
						+ newExchange.getIn().getBody());

			}

			private void debugOut(String stringId, Exchange exchange) {

				log.debug(stringId + " old headers out: "
						+ exchange.getIn().getHeaders());
				log.debug(stringId + " old body out: "
						+ exchange.getIn().getBody());

			}

			AggregationStrategy surnameAggregator = new AggregationStrategy() {

				public Exchange aggregate(Exchange oldExchange,
						Exchange newExchange) {

					debugIn("Surname Aggregator", oldExchange, newExchange);

					Message oldIn = oldExchange.getIn();
					Message newIn = newExchange.getIn();

					List<String> brothers = null;
					if (oldIn.getBody() instanceof List) {

						brothers = oldIn.getBody(List.class);
						brothers.add(newIn.getBody(String.class));

					} else {

						brothers = new ArrayList<String>();
						brothers.add(oldIn.getBody(String.class));
						brothers.add(newIn.getBody(String.class));
						oldExchange.getIn().setBody(brothers);

					} // else

					debugOut("Surname Aggregator", oldExchange);

					return oldExchange;

				}

			};

			AggregationStrategy brothersAggregator = new AggregationStrategy() {

				public Exchange aggregate(Exchange oldExchange,
						Exchange newExchange) {

					debugIn("Brothers Aggregator", oldExchange, newExchange);

					Message oldIn = oldExchange.getIn();
					Message newIn = newExchange.getIn();

					Map<String, List> brothers = null;
					if (oldIn.getBody() instanceof Map) {

						brothers = oldIn.getBody(Map.class);
						brothers.put(newIn.getHeader(SURNAME_HEADER,
								String.class), newIn.getBody(List.class));

					} else {

						brothers = new HashMap<String, List>();
						brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
								oldIn.getBody(List.class));
						brothers.put(newIn.getHeader(SURNAME_HEADER,
								String.class), newIn.getBody(List.class));
						oldExchange.getIn().setBody(brothers);

					} // else

					debugOut("Brothers Aggregator", oldExchange);

					return oldExchange;

				}

			};

			@Override
			public void configure() throws Exception {

				from("direct:start")
						// Separate people
						.splitter(bodyAs(String.class).tokenize(",")).process(

						// Split the name, erase the surname and put it in a
						// header
								new Processor() {

									public void process(Exchange exchange)
											throws Exception {

										String[] parts = exchange.getIn()
												.getBody(String.class).split(
														" ");
										exchange.getIn().setBody(parts[0]);
										exchange.getIn().setHeader(
												SURNAME_HEADER, parts[1]);

									} // process

								}) // Processor

						.to("direct:joinSurnames");

				// FIXME: This aggregator doesn´t usually fail but could also due to
timeout
				// or an incorrect batch size
				// Join in a list by the surname on the header and mark as
				// brothers list
				from("direct:joinSurnames")
				.aggregator(header(SURNAME_HEADER),
						surnameAggregator).setHeader(TYPE_HEADER,
						constant(BROTHERS_TYPE)).to("direct:joinBrothers");

				// Join all brothers lists and remove surname and type headers
				AggregatorType agg =
from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
						brothersAggregator);
				
				// FIXME: If these lines get commented the test fails some times with
different errors
				// due to a timeout or incorrect batch size that must be adjusted by
hand
				// There are two brothers lists to join but we don´t know always the
number "a priori"
				agg.setBatchSize(2);
				agg.setBatchTimeout(10000);
				agg.removeHeader(SURNAME_HEADER)
				.removeHeader(TYPE_HEADER)
				.to("mock:result");

			}

		};

	}

}
// ------------ END ---------------//

-- 
View this message in context: http://www.nabble.com/Aggregator-strategies-%28again%29-tf4750834s22882.html#a13584751
Sent from the Camel - Users mailing list archive at Nabble.com.


Mime
View raw message