camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r1453234 - in /camel/trunk/components/camel-rx: ./ src/main/java/org/apache/camel/rx/ src/main/java/org/apache/camel/rx/support/ src/test/java/org/apache/camel/rx/
Date Wed, 06 Mar 2013 09:26:49 GMT
Author: jstrachan
Date: Wed Mar  6 09:26:48 2013
New Revision: 1453234

URL: http://svn.apache.org/r1453234
Log:
added the ObservableMessage and ObservableBody helper classes which are Processors and make
it easy to embed some RX processing code to handle messages / bodies inside an existing camel
route. e.g. its handy if you want to do filtering, marshalling or transforming before hitting
the RX code. Also refactored some useful functions and classes into the support package, renamed
the test cases to be more descriptive

Added:
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
  (with props)
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
  (with props)
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
  (with props)
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
  (with props)
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
  (with props)
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
  (contents, props changed)
      - copied, changed from r1453182, camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
  (with props)
    camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
  (with props)
    camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
  (with props)
    camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
  (contents, props changed)
      - copied, changed from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java
    camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
  (contents, props changed)
      - copied, changed from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
    camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java
  (contents, props changed)
      - copied, changed from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
Removed:
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
    camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java
Modified:
    camel/trunk/components/camel-rx/pom.xml
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
    camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java

Modified: camel/trunk/components/camel-rx/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/pom.xml?rev=1453234&r1=1453233&r2=1453234&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/pom.xml (original)
+++ camel/trunk/components/camel-rx/pom.xml Wed Mar  6 09:26:48 2013
@@ -54,7 +54,7 @@
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.camel</groupId>
-      <artifactId>camel-test-spring</artifactId>      
+      <artifactId>camel-test</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>

Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java?rev=1453234&view=auto
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
(added)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
Wed Mar  6 09:26:48 2013
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.rx.support.ExchangeToBodyFunc1;
+import org.apache.camel.rx.support.ObservableProcessor;
+
+/**
+ * A base class for a {@link Processor} which allows you to process
+ * messages using an {@link Observable< org.apache.camel.Message>} by implementing
the
+ * abstract {@link org.apache.camel.rx.support.ObservableProcessor#configure(rx.Observable}
method.
+ */
+public abstract class ObservableBody<T> extends ObservableProcessor<T> {
+    private final Class<T> bodyType;
+
+    public ObservableBody(Class<T> bodyType) {
+        super(new ExchangeToBodyFunc1(bodyType));
+        this.bodyType = bodyType;
+    }
+
+    public String toString() {
+        return "ObservableBody[" + bodyType.getName() + "]";
+    }
+}

Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java?rev=1453234&view=auto
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
(added)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
Wed Mar  6 09:26:48 2013
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.Message;
+import org.apache.camel.rx.support.ExchangeToMessageFunc1;
+import org.apache.camel.rx.support.ObservableProcessor;
+
+/**
+ * A base class for a {@link Processor} which allows you to process
+ * messages using an {@link Observable<Message>} by implementing the
+ * abstract {@link org.apache.camel.rx.support.ObservableProcessor#configure(rx.Observable}
method.
+ */
+public abstract class ObservableMessage extends ObservableProcessor<Message> {
+    public ObservableMessage() {
+        super(ExchangeToMessageFunc1.getInstance());
+    }
+}

Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java?rev=1453234&r1=1453233&r2=1453234&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java (original)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java Wed
Mar  6 09:26:48 2013
@@ -23,7 +23,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.rx.support.EndpointObservable;
 import org.apache.camel.rx.support.EndpointSubscription;
-import org.apache.camel.rx.support.ProducerObserver;
+import org.apache.camel.rx.support.ExchangeToBodyFunc1;
+import org.apache.camel.rx.support.ExchangeToMessageFunc1;
+import org.apache.camel.rx.support.ObserverSender;
 import org.apache.camel.util.CamelContextHelper;
 
 import rx.Observable;
@@ -66,12 +68,7 @@ public class ReactiveCamel {
      * to be processed using  <a href="https://rx.codeplex.com/">Reactive Extensions</a>
      */
     public Observable<Message> toObservable(Endpoint endpoint) {
-        return createEndpointObservable(endpoint, new Func1<Exchange, Message>() {
-            @Override
-            public Message call(Exchange exchange) {
-                return exchange.getIn();
-            }
-        });
+        return createEndpointObservable(endpoint, ExchangeToMessageFunc1.getInstance());
     }
 
     /**
@@ -80,13 +77,7 @@ public class ReactiveCamel {
      * to be processed using  <a href="https://rx.codeplex.com/">Reactive Extensions</a>
      */
     public <T> Observable<T> toObservable(Endpoint endpoint, final Class<T>
bodyType) {
-        return createEndpointObservable(endpoint, new Func1<Exchange, T>() {
-            @Override
-            public T call(Exchange exchange) {
-                Message in = exchange.getIn();
-                return in.getBody(bodyType);
-            }
-        });
+        return createEndpointObservable(endpoint, new ExchangeToBodyFunc1<T>(bodyType));
     }
 
     /**
@@ -100,7 +91,7 @@ public class ReactiveCamel {
      */
     public <T> void sendTo(Observable<T> observable, Endpoint endpoint) {
         try {
-            ProducerObserver observer = new ProducerObserver(endpoint);
+            ObserverSender observer = new ObserverSender(endpoint);
             observable.subscribe(observer);
         } catch (Exception e) {
             throw new RuntimeCamelRxException(e);

Modified: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java?rev=1453234&r1=1453233&r2=1453234&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
(original)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
Wed Mar  6 09:26:48 2013
@@ -40,13 +40,7 @@ public class EndpointSubscription<T> imp
         this.observer = observer;
 
         // lets create the consumer
-        Processor processor = new Processor() {
-            @Override
-            public void process(Exchange exchange) throws Exception {
-                T value = func.call(exchange);
-                observer.onNext(value);
-            }
-        };
+        Processor processor = new ProcessorToObserver<T>(func, observer);
         try {
             this.consumer = endpoint.createConsumer(processor);
             this.consumer.start();
@@ -81,4 +75,5 @@ public class EndpointSubscription<T> imp
     public Observer<T> getObserver() {
         return observer;
     }
+
 }

Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java?rev=1453234&view=auto
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
(added)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
Wed Mar  6 09:26:48 2013
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+import rx.util.functions.Func1;
+
+/**
+ * A simple {@link Func1} to convert an {@link Exchange} to the given type using the
+ * IN {@link Message}'s body
+ */
+public class ExchangeToBodyFunc1<T> implements Func1<Exchange, T> {
+    private final Class<T> bodyType;
+
+    public ExchangeToBodyFunc1(Class<T> bodyType) {
+        this.bodyType = bodyType;
+    }
+
+    @Override
+    public T call(Exchange exchange) {
+        Message in = exchange.getIn();
+        return in.getBody(bodyType);
+    }
+}

Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java?rev=1453234&view=auto
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
(added)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
Wed Mar  6 09:26:48 2013
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+import rx.util.functions.Func1;
+
+/**
+ * A simple {@link Func1} to convert an {@link Exchange} to its IN {@link Message}
+ */
+public class ExchangeToMessageFunc1 implements Func1<Exchange, Message> {
+    private static ExchangeToMessageFunc1 instance = new ExchangeToMessageFunc1();
+
+    public static ExchangeToMessageFunc1 getInstance() {
+        return instance;
+    }
+
+    @Override
+    public Message call(Exchange exchange) {
+        return exchange.getIn();
+    }
+}

Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java?rev=1453234&view=auto
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
(added)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
Wed Mar  6 09:26:48 2013
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.support.ServiceSupport;
+
+import rx.Observable;
+import rx.subjects.Subject;
+import rx.util.functions.Func1;
+
+/**
+ * A base class for implementing a {@link Processor} which provides access to an {@link Observable}
+ * so that the messages can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX
Java API</a>
+ */
+public abstract class ObservableProcessor<T> extends ServiceSupport implements Processor
{
+    private final Subject<T> observable = Subject.create();
+    private final ProcessorToObserver processor;
+
+    protected ObservableProcessor(Func1<Exchange, T> func) {
+        this.processor = new ProcessorToObserver(func, observable);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        processor.process(exchange);
+    }
+
+    /**
+     * Returns the {@link Observable} for this {@link Processor} so that the messages that
are received
+     * can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX
Java API</a>
+     */
+    public Observable<T> getObservable() {
+        return observable;
+    }
+
+    /**
+     * Provides the configuration hook so that derived classes can process the observable
+     * to use whatever RX methods they wish to process the incoming events
+     * @param observable
+     */
+    protected abstract void configure(Observable<T> observable);
+
+    protected void doStart() throws Exception {
+        configure(getObservable());
+    }
+
+
+    protected void doStop() throws Exception {
+        observable.onCompleted();
+    }
+}

Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
(from r1453182, camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java?p2=camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java&p1=camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java&r1=1453182&r2=1453234&rev=1453234&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
(original)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
Wed Mar  6 09:26:48 2013
@@ -26,12 +26,13 @@ import org.apache.camel.rx.RuntimeCamelR
 import rx.Observer;
 
 /**
+ * An {@link Observer} which sends events to a given {@link Endpoint}
  */
-public class ProducerObserver implements Observer {
+public class ObserverSender implements Observer {
     private Endpoint endpoint;
     private Producer producer;
 
-    public ProducerObserver(Endpoint endpoint) throws Exception {
+    public ObserverSender(Endpoint endpoint) throws Exception {
         this.endpoint = endpoint;
         this.producer = endpoint.createProducer();
         this.producer.start();

Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java?rev=1453234&view=auto
==============================================================================
--- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
(added)
+++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
Wed Mar  6 09:26:48 2013
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import rx.Observer;
+import rx.util.functions.Func1;
+
+/**
+ * A {@link Processor} which invokes an underling {@link Observer} as messages
+ * arrive using hte given function to convert the {@link Exchange} to the required
+ * object
+ */
+public class ProcessorToObserver<T> implements Processor {
+    private final Func1<Exchange, T> func;
+    private final Observer<T> observer;
+
+    public ProcessorToObserver(Func1<Exchange, T> func, Observer<T> observer)
{
+        this.func = func;
+        this.observer = observer;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        Exception exception = null;
+        if (exchange.isFailed()) {
+            exception = exchange.getException();
+        }
+        if (exception != null) {
+            observer.onError(exception);
+        } else {
+            T value = func.call(exchange);
+            observer.onNext(value);
+        }
+    }
+}

Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java?rev=1453234&view=auto
==============================================================================
--- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
(added)
+++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
Wed Mar  6 09:26:48 2013
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+
+public class ObservableBodyTest extends CamelTestSupport {
+    protected MyObservableBody observableBody = new MyObservableBody();
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate template;
+
+    @Test
+    public void testUseObservableInRoute() throws Exception {
+        resultEndpoint.expectedBodiesReceived("Hello James", "Hello Claus");
+
+        template.sendBody("James");
+        template.sendBody("Claus");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public class MyObservableBody extends ObservableBody<String> {
+        public MyObservableBody() {
+            super(String.class);
+        }
+
+        protected void configure(Observable<String> observable) {
+            // lets process the messages using the RX API
+            observable.map(new Func1<String, String>() {
+                public String call(String body) {
+                    return "Hello " + body;
+                }
+            }).subscribe(new Action1<String>() {
+                public void call(String body) {
+                    template.sendBody(resultEndpoint, body);
+                }
+            });
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").process(observableBody);
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java?rev=1453234&view=auto
==============================================================================
--- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
(added)
+++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
Wed Mar  6 09:26:48 2013
@@ -0,0 +1,75 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Message;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+
+public class ObservableMessageTest extends CamelTestSupport {
+    protected MyObservableMessage observableMessage = new MyObservableMessage();
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate template;
+
+    @Test
+    public void testUseObservableInRoute() throws Exception {
+        resultEndpoint.expectedBodiesReceived("Hello James", "Hello Claus");
+
+        template.sendBody("James");
+        template.sendBody("Claus");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public class MyObservableMessage extends ObservableMessage {
+        protected void configure(Observable<Message> observable) {
+            // lets process the messages using the RX API
+            observable.map(new Func1<Message, String>() {
+                public String call(Message message) {
+                    return "Hello " + message.getBody(String.class);
+                }
+            }).subscribe(new Action1<String>() {
+                public void call(String body) {
+                    template.sendBody(resultEndpoint, body);
+                }
+            });
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").process(observableMessage);
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
(from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java&r1=1453182&r2=1453234&rev=1453234&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java
(original)
+++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
Wed Mar  6 09:26:48 2013
@@ -29,8 +29,8 @@ import rx.util.functions.Func1;
 
 /**
  */
-public class ObservableMessageMapTest extends RxTestSupport {
-    private static final transient Logger LOG = LoggerFactory.getLogger(ObservableMessageMapTest.class);
+public class ToObservableAndMapTest extends RxTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(ToObservableAndMapTest.class);
 
     @Test
     public void testConsume() throws Exception {

Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
(from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java&r1=1453182&r2=1453234&rev=1453234&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
(original)
+++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
Wed Mar  6 09:26:48 2013
@@ -28,8 +28,8 @@ import rx.util.functions.Func1;
 
 /**
  */
-public class ObservableBodyTest extends RxTestSupport {
-    private static final transient Logger LOG = LoggerFactory.getLogger(ObservableBodyTest.class);
+public class ToObservableBodyTest extends RxTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(ToObservableBodyTest.class);
 
     @Test
     public void testConsume() throws Exception {

Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java
(from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java&r1=1453182&r2=1453234&rev=1453234&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
(original)
+++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java
Wed Mar  6 09:26:48 2013
@@ -28,8 +28,8 @@ import rx.util.functions.Action1;
 
 /**
  */
-public class ObservableMessageTest extends RxTestSupport {
-    private static final transient Logger LOG = LoggerFactory.getLogger(ObservableMessageTest.class);
+public class ToObservableTest extends RxTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(ToObservableTest.class);
 
     @Test
     public void testConsume() throws Exception {

Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message