camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r672260 - /activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
Date Fri, 27 Jun 2008 12:48:40 GMT
Author: ningjiang
Date: Fri Jun 27 05:48:40 2008
New Revision: 672260

URL: http://svn.apache.org/viewvc?rev=672260&view=rev
Log:
Added a test case to show the error of loan broker queue version

Modified:
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java

Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java?rev=672260&r1=672259&r2=672260&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
(original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
Fri Jun 27 05:48:40 2008
@@ -23,6 +23,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.BatchProcessor;
@@ -34,7 +35,8 @@
 public class AggregratedJmsRouteTest extends ContextTestSupport {
 
     private static final transient Log LOG = LogFactory.getLog(AggregratedJmsRouteTest.class);
-    private String startEndpointUri = "jms:queue:test.a";
+    private String timeOutEndpointUri = "jms:queue:test.a";
+    private String multicastEndpointUri = "jms:queue:mutilcast";
 
     /*
      * negative recieve wait timeout for jms is blocking so timeout during processing does
not hang
@@ -45,14 +47,29 @@
         resultEndpoint.expectedMessageCount(1);
         for (int i = 1; i <= 2; i++) {
             String body = "message:" + i;
-            sendExchange(body);
+            sendExchange(timeOutEndpointUri, body);
         }
 
         resultEndpoint.assertIsSatisfied();
     }
 
-    protected void sendExchange(final Object expectedBody) {
-        template.sendBodyAndHeader(startEndpointUri, expectedBody, "cheese", 123);
+
+    public void xtestJmsMulticastAndAggregration() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:reply", MockEndpoint.class);
+
+        resultEndpoint.expectedMessageCount(2);
+        for (int i = 1; i <= 6; i++) {
+            String body = "message:" + i;
+            sendExchange("jms:queue:point1", body);
+        }
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
+
+
+    protected void sendExchange(String uri, final Object expectedBody) {
+        template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
     }
 
 
@@ -68,7 +85,7 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from(startEndpointUri).to("jms:queue:test.b");
+                from(timeOutEndpointUri).to("jms:queue:test.b");
                 from("jms:queue:test.b").aggregator(header("cheese"), new AggregationStrategy()
{
                     public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
{
                         try {
@@ -80,7 +97,32 @@
                         return newExchange;
                     }
                 }).to("mock:result");
+
+                from(multicastEndpointUri).to("jms:queue:point1", "jms:queue:point2", "jms:queue:point3");
+                from("jms:queue:point1").process(new MyProcessor()).to("jms:queue:reply");
+                from("jms:queue:point2").process(new MyProcessor()).to("jms:queue:reply");
+                from("jms:queue:point3").process(new MyProcessor()).to("jms:queue:reply");
+                from("jms:queue:reply").aggregator(header("cheese"), new AggregationStrategy()
{
+                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange)
{
+                        LOG.info("try to aggregating the message ");
+                        Integer old = (Integer) oldExchange.getProperty("aggregated");
+                        if (old == null) {
+                            old = 1;
+                        }
+                        Exchange result = newExchange;
+                        result.setProperty("aggregated", old + 1);
+                        return result;
+                    }
+                }).completedPredicate(header("aggregated").isEqualTo(3))
+                .to("mock:reply");
             }
         };
     }
+    private class MyProcessor implements Processor {
+
+        public void process(Exchange exchange) throws Exception {
+            LOG.info("get the exchange here " + exchange);
+        }
+
+    }
 }



Mime
View raw message