camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Camel > SEDA
Date Thu, 25 Aug 2011 05:24:00 GMT
<html>
<head>
    <base href="https://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/2042/9/1/_/styles/combined.css?spaceKey=CAMEL&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="https://cwiki.apache.org/confluence/display/CAMEL/SEDA">SEDA</a></h2>
    <h4>Page <b>edited</b> by             <a href="https://cwiki.apache.org/confluence/display/~davsclaus">Claus
Ibsen</a>
    </h4>
        <br/>
                         <h4>Changes (1)</h4>
                                 
    
<div id="page-diffs">
                    <table class="diff" cellpadding="0" cellspacing="0">
    
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >| {{multipleConsumers}} | {{false}}
| *Camel 2.2:* Specifies whether multiple consumers is allowed or not. If enabled you can
use [SEDA] for a pubsub kinda style messaging. Send a message to a seda queue and have multiple
consumers receive a copy of the message. | <br>| {{limitConcurrentConsumers}} | {{true}}
| *Camel 2.3:* Whether to limit the concurrentConsumers to maximum 500. If its configured
with a higher number an exception will be thrown. You can disable this check by turning this
option off. | <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">|
{{blockWhenFull}} | {{false}} | *Camel 2.9:* Whether to block the current thread when sending
a message to a SEDA endpoint, and the SEDA queue is full (capacity hit). By default an exception
will be thrown stating the queue is full. By setting this option to {{true}} the caller thread
will instead block and wait until the message can be delivered to the SEDA queue. | <br></td></tr>
            <tr><td class="diff-unchanged" >{div} <br> <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
    
            </table>
    </div>                            <h4>Full Content</h4>
                    <div class="notificationGreySide">
        <h2><a name="SEDA-SEDAComponent"></a>SEDA Component</h2>

<p>The <b>seda:</b> component provides asynchronous <a href="http://www.eecs.harvard.edu/~mdw/proj/seda/"
class="external-link" rel="nofollow">SEDA</a> behavior, so that messages are exchanged
on a <a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html"
class="external-link" rel="nofollow">BlockingQueue</a> and consumers are invoked
in a separate thread from the producer.</p>

<p>Note that queues are only visible within a <em>single</em> <a href="/confluence/display/CAMEL/CamelContext"
title="CamelContext">CamelContext</a>. If you want to communicate across <tt>CamelContext</tt>
instances (for example, communicating between Web applications), see the <a href="/confluence/display/CAMEL/VM"
title="VM">VM</a> component.</p>

<p>This component does not implement any kind of persistence or recovery, if the VM
terminates while messages are yet to be processed. If you need persistence, reliability or
distributed SEDA, try using either <a href="/confluence/display/CAMEL/JMS" title="JMS">JMS</a>
or <a href="/confluence/display/CAMEL/ActiveMQ" title="ActiveMQ">ActiveMQ</a>.</p>

<div class='panelMacro'><table class='tipMacro'><colgroup><col width='24'><col></colgroup><tr><td
valign='top'><img src="/confluence/images/icons/emoticons/check.gif" width="16" height="16"
align="absmiddle" alt="" border="0"></td><td><b>Synchronous</b><br
/>The <a href="/confluence/display/CAMEL/Direct" title="Direct">Direct</a>
component provides synchronous invocation of any consumers when a producer sends a message
exchange.</td></tr></table></div>

<h3><a name="SEDA-URIformat"></a>URI format</h3>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
seda:someName[?options]
</pre>
</div></div>

<p>Where <b>someName</b> can be any string that uniquely identifies the
endpoint within the current <a href="/confluence/display/CAMEL/CamelContext" title="CamelContext">CamelContext</a>.</p>

<p>You can append query options to the URI in the following format, <tt>?option=value&amp;option=value&amp;...</tt></p>

<div class='panelMacro'><table class='infoMacro'><colgroup><col width='24'><col></colgroup><tr><td
valign='top'><img src="/confluence/images/icons/emoticons/information.gif" width="16"
height="16" align="absmiddle" alt="" border="0"></td><td><b>Camel 1.x
- Same URI must be used for both producer and consumer</b><br />An exactly identical
<a href="/confluence/display/CAMEL/SEDA" title="SEDA">SEDA</a> endpoint URI <b>must</b>
be used for both the producer endpoint and the consumer endpoint. Otherwise Camel will create
a second <a href="/confluence/display/CAMEL/SEDA" title="SEDA">SEDA</a> endpoint,
even thought the <tt>someName</tt> portion of the URI is identical. For example:
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"direct:foo"</span>).to(<span class="code-quote">"seda:bar?concurrentConsumers=5"</span>);

from(<span class="code-quote">"seda:bar?concurrentConsumers=5"</span>).to(<span
class="code-quote">"file:<span class="code-comment">//output"</span>);</span>
</pre>
</div></div>
<p>Notice that we have to use the full URI including options in both the producer and
consumer.</p>

<p>In Camel 2.x this has been fixed so its the queue name that must match, eg in this
example we are using bar as the queue name.</p></td></tr></table></div>

<h3><a name="SEDA-Options"></a>Options</h3>
<div class="confluenceTableSmall"><div class='table-wrap'>
<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Name </th>
<th class='confluenceTh'> Default </th>
<th class='confluenceTh'> Description </th>
</tr>
<tr>
<td class='confluenceTd'> <tt>size</tt> </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> The maximum size (= capacity of the number of messages it
can max hold) of the SEDA queue. The default value in Camel 2.2 or older is <tt>1000</tt>.
From Camel 2.3 onwards the size is unbounded by default. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>concurrentConsumers</tt> </td>
<td class='confluenceTd'> <tt>1</tt> </td>
<td class='confluenceTd'> <b>Camel 1.6.1/2.0</b>: Number of concurrent threads
processing exchanges. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>waitForTaskToComplete</tt> </td>
<td class='confluenceTd'> <tt>IfReplyExpected</tt> </td>
<td class='confluenceTd'> <b>Camel 2.0</b>: Option to specify whether the
caller should wait for the async task to complete or not before continuing. The following
three options are supported: <tt>Always</tt>, <tt>Never</tt> or <tt>IfReplyExpected</tt>.
The first two values are self-explanatory. The last value, <tt>IfReplyExpected</tt>,
will only wait if the message is <a href="/confluence/display/CAMEL/Request+Reply" title="Request
Reply">Request Reply</a> based. The default option is <tt>IfReplyExpected</tt>.
See more information about <a href="/confluence/display/CAMEL/Async" title="Async">Async</a>
messaging. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>timeout</tt> </td>
<td class='confluenceTd'> <tt>30000</tt> </td>
<td class='confluenceTd'> <b>Camel 2.0:</b> Timeout in millis a seda producer
will at most waiting for an async task to complete. See <tt>waitForTaskToComplete</tt>
and <a href="/confluence/display/CAMEL/Async" title="Async">Async</a> for more
details. In <b>Camel 2.2</b> you can now disable timeout by using 0 or a negative
value. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>multipleConsumers</tt> </td>
<td class='confluenceTd'> <tt>false</tt> </td>
<td class='confluenceTd'> <b>Camel 2.2:</b> Specifies whether multiple consumers
is allowed or not. If enabled you can use <a href="/confluence/display/CAMEL/SEDA" title="SEDA">SEDA</a>
for a pubsub kinda style messaging. Send a message to a seda queue and have multiple consumers
receive a copy of the message. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>limitConcurrentConsumers</tt> </td>
<td class='confluenceTd'> <tt>true</tt> </td>
<td class='confluenceTd'> <b>Camel 2.3:</b> Whether to limit the concurrentConsumers
to maximum 500. If its configured with a higher number an exception will be thrown. You can
disable this check by turning this option off. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>blockWhenFull</tt> </td>
<td class='confluenceTd'> <tt>false</tt> </td>
<td class='confluenceTd'> <b>Camel 2.9:</b> Whether to block the current
thread when sending a message to a SEDA endpoint, and the SEDA queue is full (capacity hit).
By default an exception will be thrown stating the queue is full. By setting this option to
<tt>true</tt> the caller thread will instead block and wait until the message
can be delivered to the SEDA queue. </td>
</tr>
</tbody></table>
</div>
</div>

<h3><a name="SEDA-ChangesinCamel2.0"></a>Changes in Camel 2.0</h3>
<p>In Camel 2.0 the <a href="/confluence/display/CAMEL/SEDA" title="SEDA">SEDA</a>
component supports using <a href="/confluence/display/CAMEL/Request+Reply" title="Request
Reply">Request Reply</a>, where the caller will wait for the <a href="/confluence/display/CAMEL/Async"
title="Async">Async</a> route to complete. For instance:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"mina:tcp:<span class="code-comment">//0.0.0.0:9876?textline=<span
class="code-keyword">true</span>&amp;sync=<span class="code-keyword">true</span>"</span>).to(<span
class="code-quote">"seda:input"</span>);
</span>
from(<span class="code-quote">"seda:input"</span>).to(<span class="code-quote">"bean:processInput"</span>).to(<span
class="code-quote">"bean:createResponse"</span>);
</pre>
</div></div>
<p>In the route above, we have a TCP listener on port 9876 that accepts incoming requests.
The request is routed to the <tt>seda:input</tt> queue. As it is a <a href="/confluence/display/CAMEL/Request+Reply"
title="Request Reply">Request Reply</a> message, we wait for the response. When the
consumer on the <tt>seda:input</tt> queue is complete, it copies the response
to the original message response.</p>

<p>Camel 1.x does <b>not</b> have this feature implemented, the <a href="/confluence/display/CAMEL/SEDA"
title="SEDA">SEDA</a> queues in Camel 1.x will never wait.</p>

<div class='panelMacro'><table class='noteMacro'><colgroup><col width='24'><col></colgroup><tr><td
valign='top'><img src="/confluence/images/icons/emoticons/warning.gif" width="16" height="16"
align="absmiddle" alt="" border="0"></td><td><b>Camel 2.0 - 2.2: Works
only with 2 endpoints</b><br />Using <a href="/confluence/display/CAMEL/Request+Reply"
title="Request Reply">Request Reply</a> over <a href="/confluence/display/CAMEL/SEDA"
title="SEDA">SEDA</a> or <a href="/confluence/display/CAMEL/VM" title="VM">VM</a>
only works with 2 endpoints. You <b>cannot</b> chain endpoints by sending to A
-&gt; B -&gt; C etc. Only between A -&gt; B. The reason is the implementation
logic is fairly simple. To support 3+ endpoints makes the logic much more complex to handle
ordering and notification between the waiting threads properly. 

<p>This has been improved in <b>Camel 2.3</b> onwards, which allows you
to chain as many endpoints as you like.</p></td></tr></table></div>

<h3><a name="SEDA-Concurrentconsumers"></a>Concurrent consumers</h3>
<p>By default, the SEDA endpoint uses a single consumer thread, but you can configure
it to use concurrent consumer threads. So instead of thread pools you can use:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"seda:stageName?concurrentConsumers=5"</span>).process(...)
</pre>
</div></div>

<h4><a name="SEDA-Differencebetweenthreadpoolsandconcurrentconsumers"></a>Difference
between thread pools and concurrent consumers</h4>
<p>The <em>thread pool</em> is a pool that can increase/shrink dynamically
at runtime depending on load, whereas the concurrent consumers are always fixed.</p>

<h3><a name="SEDA-Threadpools"></a>Thread pools</h3>
<p>Be aware that adding a thread pool to a SEDA endpoint by doing something like:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"seda:stageName"</span>).thread(5).process(...)
</pre>
</div></div>
<p>Can wind up with two <tt>BlockQueues</tt>: one from the SEDA endpoint,
and one from the workqueue of the thread pool, which may not be what you want. Instead, you
might want to consider configuring a <a href="/confluence/display/CAMEL/Direct" title="Direct">Direct</a>
endpoint with a thread pool, which can process messages both synchronously and asynchronously.
For example:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"direct:stageName"</span>).thread(5).process(...)
</pre>
</div></div>
<p>You can also directly configure number of threads that process messages on a SEDA
endpoint using the <tt>concurrentConsumers</tt> option.</p>

<h3><a name="SEDA-Sample"></a>Sample</h3>
<p>In the route below we use the SEDA queue to send the request to this async queue
to be able to send a fire-and-forget message for further processing in another thread, and
return a constant reply in this thread to the original caller. </p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java"><span class="code-keyword">public</span> void configure()
<span class="code-keyword">throws</span> Exception {
    from(<span class="code-quote">"direct:start"</span>)
        <span class="code-comment">// send it to the seda queue that is async
</span>        .to(<span class="code-quote">"seda:next"</span>)
        <span class="code-comment">// <span class="code-keyword">return</span>
a constant response
</span>        .transform(constant(<span class="code-quote">"OK"</span>));

    from(<span class="code-quote">"seda:next"</span>).to(<span class="code-quote">"mock:result"</span>);
}
</pre>
</div></div>

<p>Here we send a Hello World message and expects the reply to be OK.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java"><span class="code-object">Object</span> out = template.requestBody(<span
class="code-quote">"direct:start"</span>, <span class="code-quote">"Hello World"</span>);
assertEquals(<span class="code-quote">"OK"</span>, out);
</pre>
</div></div>

<p>The "Hello World" message will be consumed from the SEDA queue from another thread
for further processing. Since this is from a unit test, it will be sent to a <tt>mock</tt>
endpoint where we can do assertions in the unit test.</p>

<h3><a name="SEDA-UsingmultipleConsumers"></a>Using multipleConsumers</h3>
<p><b>Available as of Camel 2.2</b></p>

<p>In this example we have defined two consumers and registered them as spring beans.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-xml"><span class="code-tag"><span class="code-comment">&lt;!--
define the consumers as spring beans --&gt;</span></span>
<span class="code-tag">&lt;bean id=<span class="code-quote">"consumer1"</span>
class=<span class="code-quote">"org.apache.camel.spring.example.FooEventConsumer"</span>/&gt;</span>

<span class="code-tag">&lt;bean id=<span class="code-quote">"consumer2"</span>
class=<span class="code-quote">"org.apache.camel.spring.example.AnotherFooEventConsumer"</span>/&gt;</span>

<span class="code-tag">&lt;camelContext xmlns=<span class="code-quote">"http://camel.apache.org/schema/spring"</span>&gt;</span>
    <span class="code-tag"><span class="code-comment">&lt;!-- define a shared
endpoint which the consumers can refer to instead of using url --&gt;</span></span>
    <span class="code-tag">&lt;endpoint id=<span class="code-quote">"foo"</span>
uri=<span class="code-quote">"seda:foo?multipleConsumers=true"</span>/&gt;</span>
<span class="code-tag">&lt;/camelContext&gt;</span>
</pre>
</div></div>

<p>Since we have specified <b>multipleConsumers=true</b> on the seda foo
endpoint we can have those two consumers receive their own copy of the message as a kind of
pub-sub style messaging.</p>

<p>As the beans are part of an unit test they simply send the message to a mock endpoint,
but notice how we can use @Consume to consume from the seda queue.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java"><span class="code-keyword">public</span> class FooEventConsumer
{

    @EndpointInject(uri = <span class="code-quote">"mock:result"</span>)
    <span class="code-keyword">private</span> ProducerTemplate destination;

    @Consume(ref = <span class="code-quote">"foo"</span>)
    <span class="code-keyword">public</span> void doSomething(<span class="code-object">String</span>
body) {
        destination.sendBody(<span class="code-quote">"foo"</span> + body);
    }

}
</pre>
</div></div>

<h3><a name="SEDA-Extractingqueueinformation."></a>Extracting queue information.</h3>
<p>If you need it, you can also get information like queue size etc without using JMX
like this:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
SedaEndpoint seda = context.getEndpoint(<span class="code-quote">"seda:xxxx"</span>);
<span class="code-object">int</span> size = seda.getExchanges().size()
</pre>
</div></div>
<h3><a name="SEDA-SeeAlso"></a>See Also</h3>
<ul>
	<li><a href="/confluence/display/CAMEL/Configuring+Camel" title="Configuring Camel">Configuring
Camel</a></li>
	<li><a href="/confluence/display/CAMEL/Component" title="Component">Component</a></li>
	<li><a href="/confluence/display/CAMEL/Endpoint" title="Endpoint">Endpoint</a></li>
	<li><a href="/confluence/display/CAMEL/Getting+Started" title="Getting Started">Getting
Started</a></li>
</ul>

<ul class="alternate" type="square">
	<li><a href="/confluence/display/CAMEL/VM" title="VM">VM</a></li>
	<li><a href="/confluence/display/CAMEL/Direct" title="Direct">Direct</a></li>
	<li><a href="/confluence/display/CAMEL/Async" title="Async">Async</a></li>
</ul>

    </div>
        <div id="commentsSection" class="wiki-content pageSection">
        <div style="float: right;">
            <a href="https://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
        </div>
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/SEDA">View Online</a>
        |
        <a href="https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=61225&revisedVersion=33&originalVersion=32">View
Changes</a>
                |
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/SEDA?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

Mime
View raw message