Return-Path: Delivered-To: apmail-activemq-camel-user-archive@locus.apache.org Received: (qmail 1344 invoked from network); 7 Sep 2007 22:23:56 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 7 Sep 2007 22:23:56 -0000 Received: (qmail 30347 invoked by uid 500); 7 Sep 2007 22:23:51 -0000 Delivered-To: apmail-activemq-camel-user-archive@activemq.apache.org Received: (qmail 30331 invoked by uid 500); 7 Sep 2007 22:23:50 -0000 Mailing-List: contact camel-user-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: camel-user@activemq.apache.org Delivered-To: mailing list camel-user@activemq.apache.org Received: (qmail 30322 invoked by uid 99); 7 Sep 2007 22:23:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Sep 2007 15:23:50 -0700 X-ASF-Spam-Status: No, hits=3.7 required=10.0 tests=DNS_FROM_OPENWHOIS,FORGED_HOTMAIL_RCVD2,SPF_HELO_PASS,SPF_PASS,WHOIS_MYPRIVREG X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of lists@nabble.com designates 216.139.236.158 as permitted sender) Received: from [216.139.236.158] (HELO kuber.nabble.com) (216.139.236.158) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Sep 2007 22:25:09 +0000 Received: from isper.nabble.com ([192.168.236.156]) by kuber.nabble.com with esmtp (Exim 4.63) (envelope-from ) id 1ITmEW-0005rf-Bq for camel-user@activemq.apache.org; Fri, 07 Sep 2007 15:23:24 -0700 Message-ID: <12564277.post@talk.nabble.com> Date: Fri, 7 Sep 2007 15:23:24 -0700 (PDT) From: Neil Thorne To: camel-user@activemq.apache.org Subject: Better Aggregator support MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit X-Nabble-From: mr_neil_thorne@hotmail.com X-Virus-Checked: Checked by ClamAV on apache.org Hi, I'm looking at the current aggregator support which seems to be based around throttling, but I need a full blown stateful aggregator. I implemented my own Aggregator albeit in memory like this which uses the messageId as we can get multiple payment messages for the same orderId, which is dependent on my Splitter. Please find my test case with the expressions that I am using below. My questions are * how can I implement an aggregator whose batch size will change based on an external event (in my case I can route the original order to the aggregator first to give it a heads up on how many payments to expect) - I need an aggregator per message batch. * can we make this stateful easily (by using some generic JPA mechanism?) for system crash recovery? thanks, Neil package examples; import org.apache.camel.*; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.activemq.ActiveMQComponent; import org.apache.camel.component.jms.JmsExchange; import org.apache.camel.component.jms.JmsMessage; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.Aggregator; import javax.jms.JMSException; import java.util.ArrayList; import java.util.List; import java.util.Iterator; import junit.framework.TestCase; /** * @author Neil Thorne */ public class MyTest extends TestCase { private MockEndpoint resultEndpoint; public void testSplitAndAggregate() throws Exception { Order order1 = createOrder("1"); Order order2 = createOrder("2"); final CamelContext camelContext = new DefaultCamelContext(); resultEndpoint = (MockEndpoint) camelContext.getEndpoint("mock:result"); resultEndpoint.expectedBodiesReceived(order1, order2); camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=true")); camelContext.addRoutes(new RouteBuilder(){ public void configure() throws Exception { Expression splitterExpression = new Expression(){ public Object evaluate(Exchange exchange) { Order order = (Order) exchange.getIn().getBody(); JmsExchange jmsExchange = (JmsExchange) exchange; javax.jms.Message jmsMessage = jmsExchange.getIn().getJmsMessage(); try { String jmsMessageID = jmsMessage.getJMSMessageID(); jmsMessage.setJMSCorrelationID(jmsMessageID); } catch (JMSException e) { throw new RuntimeException(e); } return order.getPayments(); } }; Expression aggregatorExpression = new Expression(){ public Object evaluate(Exchange exchange) { Payment payment = (Payment) exchange.getIn().getBody(); JmsMessage in = (JmsMessage) exchange.getIn(); String jmsCorrelationID = null; try { jmsCorrelationID = in.getJmsMessage().getJMSCorrelationID(); } catch (JMSException e) { throw new RuntimeException(e); } return jmsCorrelationID + payment.getOrderId(); } }; AggregationStrategy aggregationStrategy = new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object oldBody = oldExchange.getIn().getBody(); if (oldBody instanceof Payment) { Payment oldPayment = (Payment) oldBody; Order order = new Order(oldPayment.getOrderId()); order.getPayments().add(oldPayment); Object newBody = newExchange.getIn().getBody(); if (newBody instanceof Payment) { Payment newPayment = (Payment) newBody; order.getPayments().add(newPayment); } newExchange.getIn().setBody(order); return newExchange; } else if(oldBody instanceof Order){ Order order = (Order) oldBody; Object newBody = newExchange.getIn().getBody(); if (newBody instanceof Payment) { Payment newPayment = (Payment) newBody; order.getPayments().add(newPayment); newExchange.getIn().setBody(order); } return newExchange; } throw new IllegalStateException("OldBody should be type Payment or Order but was " + oldBody.getClass()); } }; from("activemq:queue:bus.in").splitter(splitterExpression).to("activemq:queue:splitThings"); from("activemq:queue:splitThings").aggregator(aggregatorExpression, aggregationStrategy).to("mock:result"); } }); camelContext.start(); List routeList = camelContext.getRoutes(); for (Iterator iterator = routeList.iterator(); iterator.hasNext();) { Route route = iterator.next(); List servicesForRoute = route.getServicesForRoute(); for (Iterator iterator1 = servicesForRoute.iterator(); iterator1.hasNext();) { Service service = iterator1.next(); if(service instanceof Aggregator){ Aggregator aggregator = (Aggregator) service; aggregator.setBatchSize(4); aggregator.setBatchTimeout(10000); } } } CamelTemplate camelTemplate = new CamelTemplate(camelContext); camelTemplate.send("activemq:queue:bus.in", new Processor(){ public void process(Exchange exchange) throws Exception { Order order = createOrder("1"); exchange.getIn().setBody(order); } }); camelTemplate.send("activemq:queue:bus.in", new Processor(){ public void process(Exchange exchange) throws Exception { Order order = createOrder("2"); exchange.getIn().setBody(order); } }); Thread.sleep(1000); resultEndpoint.assertIsSatisfied(); camelContext.stop(); } private Order createOrder(String orderId) { final Order order = new Order(orderId); List orderPayments = new ArrayList(); orderPayments.add(new Payment(orderId, orderId + "1")); orderPayments.add(new Payment(orderId, orderId + "2")); orderPayments.add(new Payment(orderId, orderId + "3")); orderPayments.add(new Payment(orderId, orderId + "4")); order.setPayments(orderPayments); return order; } } package examples; import java.io.Serializable; import java.util.List; import java.util.ArrayList; /** * @author Neil Thorne */ public class Order implements Serializable { private String orderId; private List payments = new ArrayList(); public Order(String orderId) { this.orderId = orderId; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public List getPayments() { return payments; } public void setPayments(List payments) { this.payments = payments; } public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof Order)) return false; final Order order = (Order) o; if (orderId != null ? !orderId.equals(order.orderId) : order.orderId != null) return false; if (payments != null ? !payments.equals(order.payments) : order.payments != null) return false; return true; } public int hashCode() { int result; result = (orderId != null ? orderId.hashCode() : 0); result = 29 * result + (payments != null ? payments.hashCode() : 0); return result; } public String toString() { return "Order{orderId=" + orderId + ",payments=" + payments + "}"; } } package examples; import java.io.Serializable; import java.util.List; /** * @author Neil Thorne */ public class Payment implements Serializable { private String orderId; private String paymentId; public Payment(String orderId, String paymentId) { this.orderId = orderId; this.paymentId = paymentId; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getPaymentId() { return paymentId; } public void setPaymentId(String paymentId) { this.paymentId = paymentId; } public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof Payment)) return false; final Payment payment = (Payment) o; if (orderId != null ? !orderId.equals(payment.orderId) : payment.orderId != null) return false; if (paymentId != null ? !paymentId.equals(payment.paymentId) : payment.paymentId != null) return false; return true; } public int hashCode() { int result; result = (orderId != null ? orderId.hashCode() : 0); result = 29 * result + (paymentId != null ? paymentId.hashCode() : 0); return result; } public String toString() { return "Payment{orderId=" + orderId + ",paymentId=" + paymentId + "}"; } } -- View this message in context: http://www.nabble.com/Better-Aggregator-support-tf4404085s22882.html#a12564277 Sent from the Camel - Users mailing list archive at Nabble.com.