camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r697770 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/test/java/org/apache/camel/processor/ examples/camel-example-loan-broker/src/main/j...
Date Mon, 22 Sep 2008 10:05:22 GMT
Author: ningjiang
Date: Mon Sep 22 03:05:21 2008
New Revision: 697770

URL: http://svn.apache.org/viewvc?rev=697770&view=rev
Log:
CAMEL-928 added property Exchange.AGGREGTED_COUNT

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
    activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
    activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Mon Sep 22
03:05:21 2008
@@ -32,6 +32,8 @@
 
     String CHARSET_NAME = "org.apache.camel.Exchange.CharsetName";
 
+    String AGGREGATED_COUNT = "org.apache.camel.Exchange.AggregatedCount";
+
     /**
      * Returns the {@link ExchangePattern} (MEP) of this exchange.
      *

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
Mon Sep 22 03:05:21 2008
@@ -56,12 +56,21 @@
         Exchange oldExchange = map.get(correlationKey);
         Exchange newExchange = exchange;
         if (oldExchange != null) {
+            Integer count = oldExchange.getProperty(Exchange.AGGREGATED_COUNT, Integer.class);
+            if (count == null) {
+                count = 1;
+            }
+            count++;
             newExchange = aggregationStrategy.aggregate(oldExchange, newExchange);
+            newExchange.setProperty(Exchange.AGGREGATED_COUNT, count);
         }
 
         // the strategy may just update the old exchange and return it
         if (newExchange != oldExchange) {
             LOG.debug("put exchange:" + newExchange + " for key:"  + correlationKey);
+            if (oldExchange == null) {
+                newExchange.setProperty(Exchange.AGGREGATED_COUNT, new Integer(1));
+            }
             map.put(correlationKey, newExchange);
         }
         onAggregation(correlationKey, newExchange);

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
Mon Sep 22 03:05:21 2008
@@ -29,18 +29,13 @@
         String oldBody = oldExchange.getIn().getBody(String.class);
         String newBody = newIn.getBody(String.class);
         newIn.setBody(oldBody + "+" + newBody);
-        Integer old = (Integer) oldExchange.getProperty("aggregated");
-        if (old == null) {
-            old = 1;
-        }
-        copy.setProperty("aggregated", old + 1);
         return copy;
     }
 
     /**
      * An expression used to determine if the aggregation is complete
      */
-    public boolean isCompleted(@Header(name = "aggregated")
+    public boolean isCompleted(@Header(name = Exchange.AGGREGATED_COUNT)
                                Integer aggregated) {
 
         if (aggregated == null) {

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
Mon Sep 22 03:05:21 2008
@@ -82,7 +82,7 @@
                 from("direct:z").process(new AppendingProcessor("z")).to("direct:aggregator");
 
                 from("direct:aggregator").aggregator(header("cheese"), new BodyInAggregatingStrategy()).
-                completedPredicate(header("aggregated").isEqualTo(3)).to("mock:result");
+                completedPredicate(header(Exchange.AGGREGATED_COUNT).isEqualTo(3)).to("mock:result");
                 // END SNIPPET: example
             }
         };

Modified: activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
--- activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
(original)
+++ activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
Mon Sep 22 03:05:21 2008
@@ -31,17 +31,14 @@
         Double oldRate = oldExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class);
         Double newRate = newExchange.getIn().getHeader(Constants.PROPERTY_RATE, Double.class);
         Exchange result = null;
-        if (old == null) {
-            old = 1;
-        }
+
         if (newRate >= oldRate) {
             result = oldExchange;
         } else {
             result = newExchange;
         }
         LOG.debug("Get the lower rate exchange " + result);
-        // Set the property for the completeness condition
-        result.setProperty("aggregated", old + 1);
+
         return result;
 
     }

Modified: activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java?rev=697770&r1=697769&r2=697770&view=diff
==============================================================================
--- activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
(original)
+++ activemq/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
Mon Sep 22 03:05:21 2008
@@ -93,7 +93,7 @@
         // The aggregation will completed when all the three bank responses are received
         from("jms:queue:bankReplyQueue")
             .aggregator(header(Constants.PROPERTY_SSN), new BankResponseAggregationStrategy())
-            .completedPredicate(header("aggregated").isEqualTo(3))
+            .completedPredicate(header(Exchange.AGGREGATED_COUNT).isEqualTo(3))
 
         // Here we do some translation and put the message back to loanReplyQueue
             .process(new Translator()).to("jms:queue:loanReplyQueue");



Mime
View raw message