camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r1452197 - in /camel/trunk/components/camel-rx/src: main/java/org/apache/camel/rx/ test/java/org/apache/camel/rx/
Date Mon, 04 Mar 2013 07:06:12 GMT
Author: jstrachan
Date: Mon Mar  4 07:06:12 2013
New Revision: 1452197

URL: http://svn.apache.org/r1452197
Log:
added an overloaded version of toObservable() that takes the body type as an argument so that
the body of the message can be extracted and processed as an Observable directly for cases
where the user knows the type of the payload in a message

Added:
    camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
      - copied, changed from r1452192, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
    camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java   (with props)
Modified:
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
    camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java

Modified: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java?rev=1452197&r1=1452196&r2=1452197&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java (original)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java Mon
Mar  4 07:06:12 2013
@@ -49,6 +49,17 @@ public class ReactiveCamel {
     }
 
     /**
+     * Returns an {@link rx.Observable <T>} for the messages with their payload converted
to the given type
+     * to allow the messages sent on the endpoint
+     * to be processed using  <a href="https://rx.codeplex.com/">Reactive Extensions</a>
+     */
+    public <T> Observable<T> toObservable(String uri, final Class<T> bodyType)
{
+        return toObservable(camelContext.getEndpoint(uri), bodyType);
+    }
+
+
+
+    /**
      * Returns an {@link rx.Observable < org.apache.camel.Message >} to allow the messages
sent on the endpoint
      * to be processed using  <a href="https://rx.codeplex.com/">Reactive Extensions</a>
      */
@@ -61,6 +72,21 @@ public class ReactiveCamel {
         });
     }
 
+    /**
+     * Returns an {@link rx.Observable <T>} for the messages with their payload converted
to the given type
+     * to allow the messages sent on the endpoint
+     * to be processed using  <a href="https://rx.codeplex.com/">Reactive Extensions</a>
+     */
+    public <T> Observable<T> toObservable(Endpoint endpoint, final Class<T>
bodyType) {
+        return createEndpointObservable(endpoint, new Func1<Exchange, T>() {
+            @Override
+            public T call(Exchange exchange) {
+                Message in = exchange.getIn();
+                return in.getBody(bodyType);
+            }
+        });
+    }
+
     public CamelContext getCamelContext() {
         return camelContext;
     }

Copied: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
(from r1452192, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java&r1=1452192&r2=1452197&rev=1452197&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
(original)
+++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
Mon Mar  4 07:06:12 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.camel.rx;
 
-import org.apache.camel.Message;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -25,27 +24,48 @@ import org.slf4j.LoggerFactory;
 
 import rx.Observable;
 import rx.util.functions.Action1;
+import rx.util.functions.Func1;
 
 /**
  */
-public class ObservableMessageTest extends RxTestSupport {
-    private static final transient Logger LOG = LoggerFactory.getLogger(ObservableMessageTest.class);
+public class ObservableBodyTest extends RxTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(ObservableBodyTest.class);
 
     @Test
     public void testConsume() throws Exception {
         final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class);
-        mockEndpoint.expectedMessageCount(4);
+        mockEndpoint.expectedBodiesReceived("b", "d");
 
-        Observable<Message> observable = reactiveCamel.toObservable("timer://foo?fixedRate=true&period=100");
-        observable.take(4).subscribe(new Action1<Message>() {
+        // lets consume, filter and map events
+        Observable<Order> observable = reactiveCamel.toObservable("seda:orders", Order.class);
+        Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>()
{
+            public Boolean call(Order order) {
+                return order.getAmount() > 100.0;
+            }
+        }).map(new Func1<Order, String>() {
+            public String call(Order order) {
+                return order.getId();
+            }
+        });
+
+
+        // lets route the largeOrderIds to the mock endpoint for testing
+        largeOrderIds.take(2).subscribe(new Action1<String>() {
             @Override
-            public void call(Message message) {
-                String body = "Processing message headers " + message.getHeaders();
-                LOG.info(body);
+            public void call(String body) {
+                LOG.info("Processing  " + body);
                 producerTemplate.sendBody(mockEndpoint, body);
             }
         });
 
+
+        // now lets send some orders in
+        Order[] orders = {new Order("a", 49.95), new Order("b", 125.50), new Order("c", 22.95),
+                new Order("d", 259.95), new Order("e", 1.25)};
+        for (Order order : orders) {
+            producerTemplate.sendBody("seda:orders", order);
+        }
+
         mockEndpoint.assertIsSatisfied();
     }
 }

Added: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java?rev=1452197&view=auto
==============================================================================
--- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java (added)
+++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java Mon Mar 
4 07:06:12 2013
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+/**
+ */
+public class Order {
+    private String id;
+    private double amount;
+
+    public Order(String id, double amount) {
+        this.amount = amount;
+        this.id = id;
+    }
+
+    public String toString() {
+        return "Order[id " + id + ", amount " + amount + "]";
+    }
+
+    public double getAmount() {
+        return amount;
+    }
+
+    public String getId() {
+        return id;
+    }
+}

Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/Order.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java?rev=1452197&r1=1452196&r2=1452197&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java (original)
+++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/RxTestSupport.java Mon
Mar  4 07:06:12 2013
@@ -36,10 +36,12 @@ public abstract class RxTestSupport {
         reactiveCamel = new ReactiveCamel(camelContext);
         producerTemplate = camelContext.createProducerTemplate();
         camelContext.start();
+        producerTemplate.start();
     }
 
     @After
     public void destroy() throws Exception {
+        producerTemplate.stop();
         camelContext.stop();
     }
 }



Mime
View raw message