Return-Path: Delivered-To: apmail-camel-users-archive@www.apache.org Received: (qmail 71017 invoked from network); 21 Oct 2009 13:18:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 21 Oct 2009 13:18:58 -0000 Received: (qmail 93791 invoked by uid 500); 21 Oct 2009 13:18:58 -0000 Delivered-To: apmail-camel-users-archive@camel.apache.org Received: (qmail 93739 invoked by uid 500); 21 Oct 2009 13:18:58 -0000 Mailing-List: contact users-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@camel.apache.org Delivered-To: mailing list users@camel.apache.org Received: (qmail 93729 invoked by uid 99); 21 Oct 2009 13:18:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Oct 2009 13:18:58 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of claus.ibsen@gmail.com designates 209.85.220.207 as permitted sender) Received: from [209.85.220.207] (HELO mail-fx0-f207.google.com) (209.85.220.207) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Oct 2009 13:18:55 +0000 Received: by fxm3 with SMTP id 3so6917879fxm.20 for ; Wed, 21 Oct 2009 06:18:33 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:mime-version:received:in-reply-to:references :from:date:message-id:subject:to:content-type :content-transfer-encoding; bh=lwenYv0sDAbflnm3dsx9VUVvTz6khp0y20kwCSpbzu0=; b=t0DEPXuasL+tT/Cd2iOlKTXql0YTradXqSwlxpfUVs6DNVLHRnF8Wt0b8QV8mEbXeC /virfYxG5fXYxO7zQyNHjSSOxlZ8Rnx8iPQGTAINVr9ylBmKx/rb99I2mSRSsN319sw6 oq07/8pSQQy9ggkFh9Ks+y+s4W9t8dmFX7Ct0= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type:content-transfer-encoding; b=EmdUCf7qiQh2f7uvazJ9kivWrgUATSR9l7kPT1a73VTJ/6zmQqZa6rY7aLn+Sp/4g6 fRL0YBo7+9x9rcSa0DxySJdf4vcs2VdiD6H3t5PTApGsc5FknjriD7efyOE6aZUe/IsR QdyRKZ+VY+ubLrkc9Dv2VCjlp63QiK5YVN3kY= MIME-Version: 1.0 Received: by 10.204.154.207 with SMTP id p15mr754871bkw.202.1256131113211; Wed, 21 Oct 2009 06:18:33 -0700 (PDT) In-Reply-To: <25976380.post@talk.nabble.com> References: <25976380.post@talk.nabble.com> From: Claus Ibsen Date: Wed, 21 Oct 2009 15:18:13 +0200 Message-ID: <5380c69c0910210618o707f960fhd782cf1e8b9aa5f0@mail.gmail.com> Subject: Re: Aggregator message lost To: users@camel.apache.org Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable Hi Do you "loose" message every time you run the unit test? Have you tried with a higher batch timeout? On Tue, Oct 20, 2009 at 4:43 PM, Wilson wrote: > > Hi, > > I am using an aggregator to concatenate the body of a bunch of messages i= nto > a single message. The following code illustrates the scenario: > > > package my.package; > > import org.apache.camel.Exchange; > import org.apache.camel.Processor; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.processor.aggregate.AggregationStrategy; > import org.apache.camel.spring.Main; > > public class MyRouteBuilder extends RouteBuilder { > =A0 =A0 =A0 =A0private int messageIndex =3D 0; > > =A0 =A0public static void main(String... args) { > =A0 =A0 =A0 =A0Main.main(args); > =A0 =A0} > > =A0 =A0public void configure() { > > =A0 =A0 =A0 =A0from("timer://foo?period=3D500") > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0.process(new Processor() { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0public voi= d process(Exchange exchange) throws Exception { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0exchange.getOut().setBody("[myBody-" + (messageIndex++) + "]"); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0exchange.getOut().setHeader("aggregateGroup", "group1"); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}).to("direct:step1"); > > =A0 =A0 =A0 =A0from("direct:step1").multicast().to("direct:step2", "direc= t:step3"); > > =A0 =A0 =A0 =A0from("direct:step2").to("direct:aggregator"); > =A0 =A0 =A0 =A0from("direct:step3").to("direct:aggregator"); > > =A0 =A0 =A0 =A0from("direct:aggregator").aggregate(header("aggregateGroup= "), new > AggregationStrategy() { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0public Exchange aggregate(= Exchange oldExchange, Exchange newExchange) { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out= .println("Getting new exchange in aggretator: " + > newExchange); > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if(oldExch= ange =3D=3D null){ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0return newExchange; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}else{ > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 = =A0 =A0oldExchange.getOut().setBody((String)newExchange.getIn().getBody() + > (String)oldExchange.getIn().getBody()); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0return old= Exchange; > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}).batchSize(10).batchTimeout(2000L).proce= ss(new Processor() { > > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0public void process(Exchan= ge exchange) throws Exception { > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0System.out= .println("Received group: " + exchange.getIn().getBody() + " - > " + exchange.getIn().getHeader("aggregateGroup")); > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0}); > > =A0 =A0} > } > > > When running, this route configuration generates the following output: > > > lease use a packageScan element instead. > [pache.camel.spring.Main.main()] SpringCamelContext =A0 =A0 =A0 =A0 =A0 = =A0 INFO > Starting Apache Camel as property ShouldStartContext is true > [pache.camel.spring.Main.main()] DefaultCamelContext =A0 =A0 =A0 =A0 =A0 = =A0INFO =A0Apache > Camel 2.0.0 (CamelContext:camelContext) is starting > [pache.camel.spring.Main.main()] DefaultCamelContext =A0 =A0 =A0 =A0 =A0 = =A0INFO =A0Apache > Camel 2.0.0 (CamelContext:camelContext) started > Getting new exchange in aggretator: Exchange[Message: [myBody-0]] > Getting new exchange in aggretator: Exchange[Message: [myBody-0]] > Getting new exchange in aggretator: Exchange[Message: [myBody-1]] > Getting new exchange in aggretator: Exchange[Message: [myBody-1]] > Getting new exchange in aggretator: Exchange[Message: [myBody-2]] > Getting new exchange in aggretator: Exchange[Message: [myBody-2]] > Getting new exchange in aggretator: Exchange[Message: [myBody-3]] > Getting new exchange in aggretator: Exchange[Message: [myBody-3]] > Getting new exchange in aggretator: Exchange[Message: [myBody-4]] > Getting new exchange in aggretator: Exchange[Message: [myBody-4]] > Received group: > [myBody-4][myBody-3][myBody-3][myBody-2][myBody-2][myBody-1][myBody-1][my= Body-0][myBody-0] > - group1 > Getting new exchange in aggretator: Exchange[Message: [myBody-5]] > Getting new exchange in aggretator: Exchange[Message: [myBody-5]] > Getting new exchange in aggretator: Exchange[Message: [myBody-6]] > Getting new exchange in aggretator: Exchange[Message: [myBody-6]] > Getting new exchange in aggretator: Exchange[Message: [myBody-7]] > Getting new exchange in aggretator: Exchange[Message: [myBody-7]] > Getting new exchange in aggretator: Exchange[Message: [myBody-8]] > Getting new exchange in aggretator: Exchange[Message: [myBody-8]] > Getting new exchange in aggretator: Exchange[Message: [myBody-9]] > Getting new exchange in aggretator: Exchange[Message: [myBody-9]] > Received group: > [myBody-9][myBody-8][myBody-8][myBody-7][myBody-7][myBody-6][myBody-6][my= Body-5][myBody-5] > - group1 > Getting new exchange in aggretator: Exchange[Message: [myBody-10]] > Getting new exchange in aggretator: Exchange[Message: [myBody-10]] > Getting new exchange in aggretator: Exchange[Message: [myBody-11]] > Getting new exchange in aggretator: Exchange[Message: [myBody-11]] > Getting new exchange in aggretator: Exchange[Message: [myBody-12]] > Getting new exchange in aggretator: Exchange[Message: [myBody-12]] > Getting new exchange in aggretator: Exchange[Message: [myBody-13]] > Getting new exchange in aggretator: Exchange[Message: [myBody-13]] > Getting new exchange in aggretator: Exchange[Message: [myBody-14]] > Getting new exchange in aggretator: Exchange[Message: [myBody-14]] > Received group: > [myBody-14][myBody-13][myBody-13][myBody-12][myBody-12][myBody-11][myBody= -11][myBody-10][myBody-10] > - group1 > > > The output shows that all messages are passing through the Aggregator but > the last one is missing in the resulting Exchange body. For example: the > first message group is composed by 10 messages: > {[myBody-0],[myBody-0],[myBody-1],[myBody-1],[myBody-2],[myBody-2],[myBod= y-3],[myBody-3],[myBody-4],[myBody-4]} > but the resulting body is missing the last message ([myBody-14]). > > Am I doing anything wrong? > > Thank you. > > -- > Wilson Freitas > Vetta Technologies > http://www.vettatech.com > -- > View this message in context: http://www.nabble.com/Aggregator-message-lo= st-tp25976380p25976380.html > Sent from the Camel - Users mailing list archive at Nabble.com. > > --=20 Claus Ibsen Apache Camel Committer Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus