camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Camel > RX
Date Wed, 06 Mar 2013 09:54:00 GMT
<html>
<head>
    <base href="https://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/2042/9/1/_/styles/combined.css?spaceKey=CAMEL&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="https://cwiki.apache.org/confluence/display/CAMEL/RX">RX</a></h2>
    <h4>Page <b>edited</b> by             <a href="https://cwiki.apache.org/confluence/display/~jstrachan">James
Strachan</a>
    </h4>
        <br/>
                         <h4>Changes (1)</h4>
                                 
    
<div id="page-diffs">
                    <table class="diff" cellpadding="0" cellspacing="0">
    
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >rx.sendTo(observable, &quot;activemq:MyQueue&quot;);
<br>{code}  <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">
<br>h2. Embedding some RxJava processing inside a Camel route <br> <br>Sometimes
you may wish to use a Camel route to consume messages, perform content based routing, transformation,
deal with data format marshalling and so forth and then within the route invoke some typesafe
RxJava event processing. <br> <br>One approach is to just send messages from inside
the Camel route to an endpoint; then use the *toObservable()* method to bind the endpoint
to an Observable&lt;T&gt;. <br> <br>However if you prefer to embed the
RxJava processing of messages inside your route there are 2 helper classes which can be used
to wrap up the RxJava processing as a Camel Processor that can be easily embed into a Camel
route. <br> <br>You can use the *ObservableMessage* or *ObservableBody* classes
which both have an abstract *configure()* method like RouteBuilder. In the configure method
you can then process the Observable&lt;T&gt; for the Camel Message or the message
body. <br> <br>e.g.  <br> <br>{code:java} <br>    public class
MyObservableBody extends ObservableBody&lt;String&gt; { <br>        public MyObservableBody()
{ <br>            super(String.class); <br>        } <br> <br>   
    protected void configure(Observable&lt;String&gt; observable) { <br>   
        // lets process the messages using the RX API <br>            observable.map(new
Func1&lt;String, String&gt;() { <br>                public String call(String
body) { <br>                    return &quot;Hello &quot; + body; <br>
               } <br>            }).subscribe(new Action1&lt;String&gt;() {
<br>                public void call(String body) { <br>                    template.sendBody(resultEndpoint,
body); <br>                } <br>            }); <br>        } <br>
   } <br>    ... <br>    // now lets use this inside a route... <br>   
from(&quot;seda:foo&quot;).process(new MyObservableBody()); <br>{code} <br>
<br>Another approach, if you are consuming directly from Camel using the [Bean Integration]
is to just use the RxJava Subject directly: <br> <br>{code:java} <br>import
rx.subjects.Subject; <br> <br>public class MyThing { <br>    private final
Subject&lt;String&gt; observable = Subject.create(); <br> <br>    public
MyThing() { <br>         // now process the observable somehow.... <br>    } <br>
<br>    @Consume(uri=&quot;activemq:myqueue&quot;) <br>    public void
onMessageBody(String body) { <br>      subject.onNext(body); <br>    } <br>}
<br>{code} <br> <br>Though using the *toObservable* on *ReactiveCamel* is
maybe a little simpler. <br></td></tr>
    
            </table>
    </div>                            <h4>Full Content</h4>
                    <div class="notificationGreySide">
        <h1><a name="RX-CamelRX"></a>Camel RX</h1>
<p><b>Available as of Camel 2.11</b></p>

<p>The camel-rx library provides Camel support for the <a href="https://rx.codeplex.com/"
class="external-link" rel="nofollow">Reactive Extensions</a>&nbsp;(RX) using
the <a href="https://github.com/Netflix/RxJava/wiki" class="external-link" rel="nofollow">RxJava</a>
library so that:</p>

<ul>
	<li>Camel users can use the <a href="http://netflix.github.com/RxJava/javadoc/"
class="external-link" rel="nofollow">RxJava API</a> for processing messages on endpoints
using a typesafe composable API</li>
	<li><a href="https://github.com/Netflix/RxJava/wiki" class="external-link" rel="nofollow">RxJava</a>
users get to use all of the <a href="/confluence/display/CAMEL/Components" title="Components">Camel
transports and protocols</a> from within the <a href="http://netflix.github.com/RxJava/javadoc/"
class="external-link" rel="nofollow">RxJava API</a></li>
</ul>


<h2><a name="RX-BackgroundonRX"></a>Background on RX</h2>

<p>For a more in depth background on RX check out <a href="https://github.com/Netflix/RxJava/wiki/Observable"
class="external-link" rel="nofollow">the RxJava wiki on Observable and the Reactive pattern</a>
or the <a href="https://rx.codeplex.com/" class="external-link" rel="nofollow">Microsoft
RX documentation</a>.</p>

<p>You can think of RX as providing an API similar to Java 8 / Groovy / Scala collections
(methods like filter, forEach, map, reduce, zip etc) - but which operates on an asynchronous
stream of events rather than a collection. So you could think of RX as like working with asynchronous
push based collections (rather than the traditional synchronous pull based collections).</p>

<p>In RX you work with an <a href="http://netflix.github.com/RxJava/javadoc/rx/Observable.html"
class="external-link" rel="nofollow">Observable&lt;T&gt;</a> which behaves
quite like a Collection&lt;T&gt; in Java 8 so you can filter/map/concat and so forth.
The Observable&lt;T&gt; then acts as a typesafe composable API for working with asynchronous
events in a collection-like way.</p>

<p>Once you have an <a href="http://netflix.github.com/RxJava/javadoc/rx/Observable.html"
class="external-link" rel="nofollow">Observable&lt;T&gt;</a> you can then
</p>

<ul>
	<li><a href="https://github.com/Netflix/RxJava/wiki/Filtering-Operators" class="external-link"
rel="nofollow">filter events</a></li>
	<li><a href="https://github.com/Netflix/RxJava/wiki/Transformative-Operators" class="external-link"
rel="nofollow">transform events</a></li>
	<li><a href="https://github.com/Netflix/RxJava/wiki/Combinatorial-Operators" class="external-link"
rel="nofollow">combine event streams</a></li>
	<li><a href="https://github.com/Netflix/RxJava/wiki/Utility-Operators" class="external-link"
rel="nofollow">other utility methods</a></li>
</ul>


<h2><a name="RX-ObservingeventsonCamelendpoints"></a>Observing events on
Camel endpoints </h2>

<p>You can create an Observable&lt;Message&gt; from any endpoint using the ReactiveCamel
helper class and the <b>toObservable()</b> method.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
<span class="code-keyword">import</span> org.apache.camel.rx.*;

ReactiveCamel rx = <span class="code-keyword">new</span> ReactiveCamel(camelContext);
Observable&lt;Message&gt; observable = rx.toObservable(<span class="code-quote">"activemq:MyMessages"</span>);

<span class="code-comment">// we can now call filter/map/concat etc
</span>filtered = observable.filter(m -&gt; m.getHeader(<span class="code-quote">"foo"</span>)
!= <span class="code-keyword">null</span>).map(m -&gt; <span class="code-quote">"Hello
"</span> + m.getBody()); 
</pre>
</div></div>

<p>If you know the type of the message payload (its body), you can use an overloaded
version of toObservable() to pass in the class and get a typesafe Observable&lt;T&gt;
back:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
<span class="code-keyword">import</span> org.apache.camel.rx.*;

ReactiveCamel rx = <span class="code-keyword">new</span> ReactiveCamel(camelContext);
Observable&lt;Order&gt; observable = rx.toObservable(<span class="code-quote">"seda:orders"</span>,
Order.class);

<span class="code-comment">// now lets filter and map using Java 7
</span>Observable&lt;<span class="code-object">String</span>&gt;
largeOrderIds = observable.filter(<span class="code-keyword">new</span> Func1&lt;Order,
<span class="code-object">Boolean</span>&gt;() {
    <span class="code-keyword">public</span> <span class="code-object">Boolean</span>
call(Order order) {
        <span class="code-keyword">return</span> order.getAmount() &gt; 100.0;
    }
}).map(<span class="code-keyword">new</span> Func1&lt;Order, <span class="code-object">String</span>&gt;()
{
    <span class="code-keyword">public</span> <span class="code-object">String</span>
call(Order order) {
        <span class="code-keyword">return</span> order.getId();
    }
});
</pre>
</div></div>

<h2><a name="RX-SendingObservable%3CT%3EeventstoCamelendpoints"></a>Sending
Observable&lt;T&gt; events to Camel endpoints</h2>

<p>If you have an <a href="http://netflix.github.com/RxJava/javadoc/rx/Observable.html"
class="external-link" rel="nofollow">Observable&lt;T&gt;</a> from some other
library; or have created one from a <a href="http://netflix.github.com/RxJava/javadoc/rx/Observable.html#toObservable(java.util.concurrent.Future)"
class="external-link" rel="nofollow">Future&lt;T&gt; using RxJava</a> and
you wish to send the events on the observable to a Camel endpoint you can use the <b>sendTo()</b>
method on ReactiveCamel:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
<span class="code-keyword">import</span> org.apache.camel.rx.*;

<span class="code-comment">// take some observable from somewhere
</span>Observable&lt;T&gt; observable = ...;
ReactiveCamel rx = <span class="code-keyword">new</span> ReactiveCamel(camelContext);

<span class="code-comment">// lets send the events to a message queue
</span>rx.sendTo(observable, <span class="code-quote">"activemq:MyQueue"</span>);
</pre>
</div></div> 

<h2><a name="RX-EmbeddingsomeRxJavaprocessinginsideaCamelroute"></a>Embedding
some RxJava processing inside a Camel route</h2>

<p>Sometimes you may wish to use a Camel route to consume messages, perform content
based routing, transformation, deal with data format marshalling and so forth and then within
the route invoke some typesafe RxJava event processing.</p>

<p>One approach is to just send messages from inside the Camel route to an endpoint;
then use the <b>toObservable()</b> method to bind the endpoint to an Observable&lt;T&gt;.</p>

<p>However if you prefer to embed the RxJava processing of messages inside your route
there are 2 helper classes which can be used to wrap up the RxJava processing as a Camel Processor
that can be easily embed into a Camel route.</p>

<p>You can use the <b>ObservableMessage</b> or <b>ObservableBody</b>
classes which both have an abstract <b>configure()</b> method like RouteBuilder.
In the configure method you can then process the Observable&lt;T&gt; for the Camel
Message or the message body.</p>

<p>e.g. </p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
    <span class="code-keyword">public</span> class MyObservableBody <span class="code-keyword">extends</span>
ObservableBody&lt;<span class="code-object">String</span>&gt; {
        <span class="code-keyword">public</span> MyObservableBody() {
            <span class="code-keyword">super</span>(<span class="code-object">String</span>.class);
        }

        <span class="code-keyword">protected</span> void configure(Observable&lt;<span
class="code-object">String</span>&gt; observable) {
            <span class="code-comment">// lets process the messages using the RX API
</span>            observable.map(<span class="code-keyword">new</span>
Func1&lt;<span class="code-object">String</span>, <span class="code-object">String</span>&gt;()
{
                <span class="code-keyword">public</span> <span class="code-object">String</span>
call(<span class="code-object">String</span> body) {
                    <span class="code-keyword">return</span> <span class="code-quote">"Hello
"</span> + body;
                }
            }).subscribe(<span class="code-keyword">new</span> Action1&lt;<span
class="code-object">String</span>&gt;() {
                <span class="code-keyword">public</span> void call(<span class="code-object">String</span>
body) {
                    template.sendBody(resultEndpoint, body);
                }
            });
        }
    }
    ...
    <span class="code-comment">// now lets use <span class="code-keyword">this</span>
inside a route...
</span>    from(<span class="code-quote">"seda:foo"</span>).process(<span
class="code-keyword">new</span> MyObservableBody());
</pre>
</div></div>

<p>Another approach, if you are consuming directly from Camel using the <a href="/confluence/display/CAMEL/Bean+Integration"
title="Bean Integration">Bean Integration</a> is to just use the RxJava Subject directly:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
<span class="code-keyword">import</span> rx.subjects.Subject;

<span class="code-keyword">public</span> class MyThing {
    <span class="code-keyword">private</span> <span class="code-keyword">final</span>
Subject&lt;<span class="code-object">String</span>&gt; observable = Subject.create();
  
    <span class="code-keyword">public</span> MyThing() {
         <span class="code-comment">// now process the observable somehow....
</span>    }

    @Consume(uri=<span class="code-quote">"activemq:myqueue"</span>)
    <span class="code-keyword">public</span> void onMessageBody(<span class="code-object">String</span>
body) {
      subject.onNext(body);
    }
}
</pre>
</div></div>

<p>Though using the <b>toObservable</b> on <b>ReactiveCamel</b>
is maybe a little simpler.</p>
    </div>
        <div id="commentsSection" class="wiki-content pageSection">
        <div style="float: right;">
            <a href="https://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
        </div>
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/RX">View Online</a>
        |
        <a href="https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=30756029&revisedVersion=18&originalVersion=17">View
Changes</a>
                |
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/RX?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

Mime
View raw message