camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Camel > SEDA
Date Sun, 25 Nov 2012 15:02:00 GMT
<html>
<head>
    <base href="https://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/2042/9/1/_/styles/combined.css?spaceKey=CAMEL&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="https://cwiki.apache.org/confluence/display/CAMEL/SEDA">SEDA</a></h2>
    <h4>Page <b>edited</b> by             <a href="https://cwiki.apache.org/confluence/display/~davsclaus">Claus
Ibsen</a>
    </h4>
        <br/>
                         <h4>Changes (1)</h4>
                                 
    
<div id="page-diffs">
                    <table class="diff" cellpadding="0" cellspacing="0">
    
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >{div:class=confluenceTableSmall} <br>||
Name || Since || Default || Description || <br></td></tr>
            <tr><td class="diff-changed-lines" >| {{size}} | |  | The maximum
capacity of the SEDA queue (i.e., the number of messages it can hold). The default value in
Camel 2.2 or older is {{1000}}. From Camel 2.3 onwards, the size is unbounded by default.
<span class="diff-added-words"style="background-color: #dfd;">*Notice:* Mind if you
use this option, then its the first endpoint being created with the queue name, that determines
the size. To make sure all endpoints use same size, then configure the size option on all
of them, or the first endpoint being created. From *Camel 2.11* onwards, a validation is taken
place to ensure if using mixed queue sizes for the same queue name, Camel would detect this
and fail creating the endpoint.</span> | <br></td></tr>
            <tr><td class="diff-unchanged" >| {{concurrentConsumers}} | | {{1}}
| Number of concurrent threads processing exchanges. | <br>| {{waitForTaskToComplete}}
| | {{IfReplyExpected}} | Option to specify whether the caller should wait for the async task
to complete or not before continuing. The following three options are supported: {{Always}},
{{Never}} or {{IfReplyExpected}}. The first two values are self-explanatory. The last value,
{{IfReplyExpected}}, will only wait if the message is [Request Reply] based. The default option
is {{IfReplyExpected}}. See more information about [Async] messaging. | <br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
    
            </table>
    </div>                            <h4>Full Content</h4>
                    <div class="notificationGreySide">
        <h2><a name="SEDA-SEDAComponent"></a>SEDA Component</h2>

<p>The <b>seda:</b> component provides asynchronous <a href="http://www.eecs.harvard.edu/~mdw/proj/seda/"
class="external-link" rel="nofollow">SEDA</a> behavior, so that messages are exchanged
on a <a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html"
class="external-link" rel="nofollow">BlockingQueue</a> and consumers are invoked
in a separate thread from the producer.</p>

<p>Note that queues are only visible within a <em>single</em> <a href="/confluence/display/CAMEL/CamelContext"
title="CamelContext">CamelContext</a>. If you want to communicate across <tt>CamelContext</tt>
instances (for example, communicating between Web applications), see the <a href="/confluence/display/CAMEL/VM"
title="VM">VM</a> component.</p>

<p>This component does not implement any kind of persistence or recovery, if the VM
terminates while messages are yet to be processed. If you need persistence, reliability or
distributed SEDA, try using either <a href="/confluence/display/CAMEL/JMS" title="JMS">JMS</a>
or <a href="/confluence/display/CAMEL/ActiveMQ" title="ActiveMQ">ActiveMQ</a>.</p>

<div class='panelMacro'><table class='tipMacro'><colgroup><col width='24'><col></colgroup><tr><td
valign='top'><img src="/confluence/images/icons/emoticons/check.gif" width="16" height="16"
align="absmiddle" alt="" border="0"></td><td><b>Synchronous</b><br
/>The <a href="/confluence/display/CAMEL/Direct" title="Direct">Direct</a>
component provides synchronous invocation of any consumers when a producer sends a message
exchange.</td></tr></table></div>

<h3><a name="SEDA-URIformat"></a>URI format</h3>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
seda:someName[?options]
</pre>
</div></div>

<p>Where <b>someName</b> can be any string that uniquely identifies the
endpoint within the current <a href="/confluence/display/CAMEL/CamelContext" title="CamelContext">CamelContext</a>.</p>

<p>You can append query options to the URI in the following format: <tt>?option=value&amp;option=value&amp;&hellip;</tt></p>

<h3><a name="SEDA-Options"></a>Options</h3>
<div class="confluenceTableSmall"><div class='table-wrap'>
<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Name </th>
<th class='confluenceTh'> Since </th>
<th class='confluenceTh'> Default </th>
<th class='confluenceTh'> Description </th>
</tr>
<tr>
<td class='confluenceTd'> <tt>size</tt> </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> The maximum capacity of the SEDA queue (i.e., the number of
messages it can hold). The default value in Camel 2.2 or older is <tt>1000</tt>.
From Camel 2.3 onwards, the size is unbounded by default. <b>Notice:</b> Mind
if you use this option, then its the first endpoint being created with the queue name, that
determines the size. To make sure all endpoints use same size, then configure the size option
on all of them, or the first endpoint being created. From <b>Camel 2.11</b> onwards,
a validation is taken place to ensure if using mixed queue sizes for the same queue name,
Camel would detect this and fail creating the endpoint. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>concurrentConsumers</tt> </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> <tt>1</tt> </td>
<td class='confluenceTd'> Number of concurrent threads processing exchanges. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>waitForTaskToComplete</tt> </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> <tt>IfReplyExpected</tt> </td>
<td class='confluenceTd'> Option to specify whether the caller should wait for the async
task to complete or not before continuing. The following three options are supported: <tt>Always</tt>,
<tt>Never</tt> or <tt>IfReplyExpected</tt>. The first two values are
self-explanatory. The last value, <tt>IfReplyExpected</tt>, will only wait if
the message is <a href="/confluence/display/CAMEL/Request+Reply" title="Request Reply">Request
Reply</a> based. The default option is <tt>IfReplyExpected</tt>. See more
information about <a href="/confluence/display/CAMEL/Async" title="Async">Async</a>
messaging. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>timeout</tt> </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> <tt>30000</tt> </td>
<td class='confluenceTd'> Timeout (in milliseconds) before a SEDA producer will stop
waiting for an asynchronous task to complete. See <tt>waitForTaskToComplete</tt>
and <a href="/confluence/display/CAMEL/Async" title="Async">Async</a> for more
details. In <b>Camel 2.2</b> you can now disable timeout by using 0 or a negative
value. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>multipleConsumers</tt> </td>
<td class='confluenceTd'> <b>2.2</b> </td>
<td class='confluenceTd'> <tt>false</tt> </td>
<td class='confluenceTd'> Specifies whether multiple consumers are allowed. If enabled,
you can use <a href="/confluence/display/CAMEL/SEDA" title="SEDA">SEDA</a> for
<a href="http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern" class="external-link"
rel="nofollow">Publish-Subscribe</a> messaging. That is, you can send a message to
the SEDA queue and have each consumer receive a copy of the message. When enabled, this option
should be specified on every consumer endpoint. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>limitConcurrentConsumers</tt> </td>
<td class='confluenceTd'> <b>2.3</b> </td>
<td class='confluenceTd'> <tt>true</tt> </td>
<td class='confluenceTd'> Whether to limit the number of <tt>concurrentConsumers</tt>
to the maximum of <tt>500</tt>. By default, an exception will be thrown if a SEDA
endpoint is configured with a greater number. You can disable that check by turning this option
off. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>blockWhenFull</tt> </td>
<td class='confluenceTd'> <b>2.9</b> </td>
<td class='confluenceTd'> <tt>false</tt> </td>
<td class='confluenceTd'> Whether a thread that sends messages to a full SEDA queue
will block until the queue's capacity is no longer exhausted.  By default, an exception will
be thrown stating that the queue is full. By enabling this option, the calling thread will
instead block and wait until the message can be accepted. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>queueSize</tt> </td>
<td class='confluenceTd'> <b>2.9</b> </td>
<td class='confluenceTd'>&nbsp;</td>
<td class='confluenceTd'> <b>Component only:</b> The maximum default size
(capacity of the number of messages it can hold) of the SEDA queue. This option is used if
<tt>size</tt> is not in use. </td>
</tr>
<tr>
<td class='confluenceTd'> <tt>pollTimeout</tt> </td>
<td class='confluenceTd'> <b>2.9.3</b> </td>
<td class='confluenceTd'> <tt>1000</tt> </td>
<td class='confluenceTd'> <em>Consumer only</em> &#8211; The timeout
used when polling. When a timeout occurs, the consumer can check whether it is allowed to
continue running. Setting a lower value allows the consumer to react more quickly upon shutdown.
</td>
</tr>
</tbody></table>
</div>
</div>

<h3><a name="SEDA-UseofRequestReply"></a>Use of Request Reply</h3>
<p>The <a href="/confluence/display/CAMEL/SEDA" title="SEDA">SEDA</a> component
supports using <a href="/confluence/display/CAMEL/Request+Reply" title="Request Reply">Request
Reply</a>, where the caller will wait for the <a href="/confluence/display/CAMEL/Async"
title="Async">Async</a> route to complete. For instance:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"mina:tcp:<span class="code-comment">//0.0.0.0:9876?textline=<span
class="code-keyword">true</span>&amp;sync=<span class="code-keyword">true</span>"</span>).to(<span
class="code-quote">"seda:input"</span>);
</span>
from(<span class="code-quote">"seda:input"</span>).to(<span class="code-quote">"bean:processInput"</span>).to(<span
class="code-quote">"bean:createResponse"</span>);
</pre>
</div></div>
<p>In the route above, we have a TCP listener on port 9876 that accepts incoming requests.
The request is routed to the <tt>seda:input</tt> queue. As it is a <a href="/confluence/display/CAMEL/Request+Reply"
title="Request Reply">Request Reply</a> message, we wait for the response. When the
consumer on the <tt>seda:input</tt> queue is complete, it copies the response
to the original message response.</p>

<div class='panelMacro'><table class='noteMacro'><colgroup><col width='24'><col></colgroup><tr><td
valign='top'><img src="/confluence/images/icons/emoticons/warning.gif" width="16" height="16"
align="absmiddle" alt="" border="0"></td><td><b>until 2.2: Works only
with 2 endpoints</b><br />Using <a href="/confluence/display/CAMEL/Request+Reply"
title="Request Reply">Request Reply</a> over <a href="/confluence/display/CAMEL/SEDA"
title="SEDA">SEDA</a> or <a href="/confluence/display/CAMEL/VM" title="VM">VM</a>
only works with 2 endpoints. You <b>cannot</b> chain endpoints by sending to A
-&gt; B -&gt; C etc. Only between A -&gt; B. The reason is the implementation
logic is fairly simple. To support 3+ endpoints makes the logic much more complex to handle
ordering and notification between the waiting threads properly. 

<p>This has been improved in <b>Camel 2.3</b> onwards, which allows you
to chain as many endpoints as you like.</p></td></tr></table></div>

<h3><a name="SEDA-Concurrentconsumers"></a>Concurrent consumers</h3>
<p>By default, the SEDA endpoint uses a single consumer thread, but you can configure
it to use concurrent consumer threads. So instead of thread pools you can use:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"seda:stageName?concurrentConsumers=5"</span>).process(...)
</pre>
</div></div>

<p>As for the difference between the two, note a <em>thread pool</em> can
increase/shrink dynamically at runtime depending on load, whereas the number of concurrent
consumers is always fixed.</p>

<h3><a name="SEDA-Threadpools"></a>Thread pools</h3>
<p>Be aware that adding a thread pool to a SEDA endpoint by doing something like:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"seda:stageName"</span>).thread(5).process(...)
</pre>
</div></div>
<p>Can wind up with two <tt>BlockQueues</tt>: one from the SEDA endpoint,
and one from the workqueue of the thread pool, which may not be what you want. Instead, you
might wish to configure a <a href="/confluence/display/CAMEL/Direct" title="Direct">Direct</a>
endpoint with a thread pool, which can process messages both synchronously and asynchronously.
For example:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"direct:stageName"</span>).thread(5).process(...)
</pre>
</div></div>
<p>You can also directly configure number of threads that process messages on a SEDA
endpoint using the <tt>concurrentConsumers</tt> option.</p>

<h3><a name="SEDA-Sample"></a>Sample</h3>
<p>In the route below we use the SEDA queue to send the request to this async queue
to be able to send a fire-and-forget message for further processing in another thread, and
return a constant reply in this thread to the original caller. </p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java"><span class="code-keyword">public</span> void configure()
<span class="code-keyword">throws</span> Exception {
    from(<span class="code-quote">"direct:start"</span>)
        <span class="code-comment">// send it to the seda queue that is async
</span>        .to(<span class="code-quote">"seda:next"</span>)
        <span class="code-comment">// <span class="code-keyword">return</span>
a constant response
</span>        .transform(constant(<span class="code-quote">"OK"</span>));

    from(<span class="code-quote">"seda:next"</span>).to(<span class="code-quote">"mock:result"</span>);
}
</pre>
</div></div>

<p>Here we send a Hello World message and expects the reply to be OK.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java"><span class="code-object">Object</span> out = template.requestBody(<span
class="code-quote">"direct:start"</span>, <span class="code-quote">"Hello World"</span>);
assertEquals(<span class="code-quote">"OK"</span>, out);
</pre>
</div></div>

<p>The "Hello World" message will be consumed from the SEDA queue from another thread
for further processing. Since this is from a unit test, it will be sent to a <tt>mock</tt>
endpoint where we can do assertions in the unit test.</p>

<h3><a name="SEDA-UsingmultipleConsumers"></a>Using multipleConsumers</h3>
<p><b>Available as of Camel 2.2</b></p>

<p>In this example we have defined two consumers and registered them as spring beans.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-xml"><span class="code-tag"><span class="code-comment">&lt;!--
define the consumers as spring beans --&gt;</span></span>
<span class="code-tag">&lt;bean id=<span class="code-quote">"consumer1"</span>
class=<span class="code-quote">"org.apache.camel.spring.example.FooEventConsumer"</span>/&gt;</span>

<span class="code-tag">&lt;bean id=<span class="code-quote">"consumer2"</span>
class=<span class="code-quote">"org.apache.camel.spring.example.AnotherFooEventConsumer"</span>/&gt;</span>

<span class="code-tag">&lt;camelContext xmlns=<span class="code-quote">"http://camel.apache.org/schema/spring"</span>&gt;</span>
    <span class="code-tag"><span class="code-comment">&lt;!-- define a shared
endpoint which the consumers can refer to instead of using url --&gt;</span></span>
    <span class="code-tag">&lt;endpoint id=<span class="code-quote">"foo"</span>
uri=<span class="code-quote">"seda:foo?multipleConsumers=true"</span>/&gt;</span>
<span class="code-tag">&lt;/camelContext&gt;</span>
</pre>
</div></div>

<p>Since we have specified <b>multipleConsumers=true</b> on the seda foo
endpoint we can have those two consumers receive their own copy of the message as a kind of
pub-sub style messaging.</p>

<p>As the beans are part of an unit test they simply send the message to a mock endpoint,
but notice how we can use @Consume to consume from the seda queue.</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java"><span class="code-keyword">public</span> class FooEventConsumer
{

    @EndpointInject(uri = <span class="code-quote">"mock:result"</span>)
    <span class="code-keyword">private</span> ProducerTemplate destination;

    @Consume(ref = <span class="code-quote">"foo"</span>)
    <span class="code-keyword">public</span> void doSomething(<span class="code-object">String</span>
body) {
        destination.sendBody(<span class="code-quote">"foo"</span> + body);
    }

}
</pre>
</div></div>

<h3><a name="SEDA-Extractingqueueinformation."></a>Extracting queue information.</h3>
<p>If needed, information such as queue size, etc. can be obtained without using JMX
in this fashion:</p>
<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
SedaEndpoint seda = context.getEndpoint(<span class="code-quote">"seda:xxxx"</span>);
<span class="code-object">int</span> size = seda.getExchanges().size();
</pre>
</div></div>
<h3><a name="SEDA-SeeAlso"></a>See Also</h3>
<ul>
	<li><a href="/confluence/display/CAMEL/Configuring+Camel" title="Configuring Camel">Configuring
Camel</a></li>
	<li><a href="/confluence/display/CAMEL/Component" title="Component">Component</a></li>
	<li><a href="/confluence/display/CAMEL/Endpoint" title="Endpoint">Endpoint</a></li>
	<li><a href="/confluence/display/CAMEL/Getting+Started" title="Getting Started">Getting
Started</a></li>
</ul>

<ul class="alternate" type="square">
	<li><a href="/confluence/display/CAMEL/VM" title="VM">VM</a></li>
	<li><a href="/confluence/display/CAMEL/Direct" title="Direct">Direct</a></li>
	<li><a href="/confluence/display/CAMEL/Async" title="Async">Async</a></li>
</ul>

    </div>
        <div id="commentsSection" class="wiki-content pageSection">
        <div style="float: right;">
            <a href="https://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
        </div>
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/SEDA">View Online</a>
        |
        <a href="https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=61225&revisedVersion=43&originalVersion=42">View
Changes</a>
                |
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/SEDA?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

Mime
View raw message