Return-Path: Delivered-To: apmail-activemq-camel-commits-archive@locus.apache.org Received: (qmail 66261 invoked from network); 22 Sep 2008 10:05:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 22 Sep 2008 10:05:43 -0000 Received: (qmail 83921 invoked by uid 500); 22 Sep 2008 10:05:40 -0000 Delivered-To: apmail-activemq-camel-commits-archive@activemq.apache.org Received: (qmail 83885 invoked by uid 500); 22 Sep 2008 10:05:40 -0000 Mailing-List: contact camel-commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: camel-dev@activemq.apache.org Delivered-To: mailing list camel-commits@activemq.apache.org Received: (qmail 83876 invoked by uid 99); 22 Sep 2008 10:05:40 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Sep 2008 03:05:40 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Sep 2008 10:04:49 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 64F3423889FC; Mon, 22 Sep 2008 03:05:22 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r697770 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/test/java/org/apache/camel/processor/ examples/camel-example-loan-broker/src/main/j... Date: Mon, 22 Sep 2008 10:05:22 -0000 To: camel-commits@activemq.apache.org From: ningjiang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080922100522.64F3423889FC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ningjiang Date: Mon Sep 22 03:05:21 2008 New Revision: 697770 URL: http://svn.apache.org/viewvc?rev=697770&view=rev Log: CAMEL-928 added property Exchange.AGGREGTED_COUNT Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=697770&r1=697769&r2=697770&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Mon Sep 22 03:05:21 2008 @@ -32,6 +32,8 @@ String CHARSET_NAME = "org.apache.camel.Exchange.CharsetName"; + String AGGREGATED_COUNT = "org.apache.camel.Exchange.AggregatedCount"; + /** * Returns the {@link ExchangePattern} (MEP) of this exchange. * Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=697770&r1=697769&r2=697770&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java Mon Sep 22 03:05:21 2008 @@ -56,12 +56,21 @@ Exchange oldExchange = map.get(correlationKey); Exchange newExchange = exchange; if (oldExchange != null) { + Integer count = oldExchange.getProperty(Exchange.AGGREGATED_COUNT, Integer.class); + if (count == null) { + count = 1; + } + count++; newExchange = aggregationStrategy.aggregate(oldExchange, newExchange); + newExchange.setProperty(Exchange.AGGREGATED_COUNT, count); } // the strategy may just update the old exchange and return it if (newExchange != oldExchange) { LOG.debug("put exchange:" + newExchange + " for key:" + correlationKey); + if (oldExchange == null) { + newExchange.setProperty(Exchange.AGGREGATED_COUNT, new Integer(1)); + } map.put(correlationKey, newExchange); } onAggregation(correlationKey, newExchange); Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java?rev=697770&r1=697769&r2=697770&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java Mon Sep 22 03:05:21 2008 @@ -29,18 +29,13 @@ String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newIn.getBody(String.class); newIn.setBody(oldBody + "+" + newBody); - Integer old = (Integer) oldExchange.getProperty("aggregated"); - if (old == null) { - old = 1; - } - copy.setProperty("aggregated", old + 1); return copy; } /** * An expression used to determine if the aggregation is complete */ - public boolean isCompleted(@Header(name = "aggregated") + public boolean isCompleted(@Header(name = Exchange.AGGREGATED_COUNT) Integer aggregated) { if (aggregated == null) { Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java?rev=697770&r1=697769&r2=697770&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java Mon Sep 22 03:05:21 2008 @@ -82,7 +82,7 @@ from("direct:z").process(new AppendingProcessor("z")).to("direct:aggregator"); from("direct:aggregator").aggregator(header("cheese"), new BodyInAggregatingStrategy()). - completedPredicate(header("aggregated").isEqualTo(3)).to("mock:result"); + completedPredicate(header(Exchange.AGGREGATED_COUNT).isEqualTo(3)).to("mock:result"); // END SNIPPET: example } }; Modified: activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java?rev=697770&r1=697769&r2=697770&view=diff ============================================================================== --- activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java (original) +++ activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java Mon Sep 22 03:05:21 2008 @@ -31,17 +31,14 @@ Double oldRate = oldExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class); Double newRate = newExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class); Exchange result = null; - if (old == null) { - old = 1; - } + if (newRate >= oldRate) { result = oldExchange; } else { result = newExchange; } LOG.debug("Get the lower rate exchange " + result); - // Set the property for the completeness condition - result.setProperty("aggregated", old + 1); + return result; } Modified: activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java?rev=697770&r1=697769&r2=697770&view=diff ============================================================================== --- activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java (original) +++ activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java Mon Sep 22 03:05:21 2008 @@ -93,7 +93,7 @@ // The aggregation will completed when all the three bank responses are received from("jms:queue:bankReplyQueue") .aggregator(header(Constants.PROPERTY_SSN), new BankResponseAggregationStrategy()) - .completedPredicate(header("aggregated").isEqualTo(3)) + .completedPredicate(header(Exchange.AGGREGATED_COUNT).isEqualTo(3)) // Here we do some translation and put the message back to loanReplyQueue .process(new Translator()).to("jms:queue:loanReplyQueue");