camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Camel > Resequencer
Date Tue, 17 Jul 2012 05:55:00 GMT
<html>
<head>
    <base href="https://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/2042/9/1/_/styles/combined.css?spaceKey=CAMEL&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="https://cwiki.apache.org/confluence/display/CAMEL/Resequencer">Resequencer</a></h2>
    <h4>Page <b>edited</b> by             <a href="https://cwiki.apache.org/confluence/display/~boday">Ben
O&#39;Day</a>
    </h4>
        <div id="versionComment">
        <b>Comment:</b>
        per CAMEL-4327 added "rejectOld" option to the Resequencer EIP<br />
    </div>
        <br/>
                         <h4>Changes (1)</h4>
                                 
    
<div id="page-diffs">
                    <table class="diff" cellpadding="0" cellspacing="0">
    
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >This option is available for both
batch and stream resequencer. <br> <br></td></tr>
            <tr><td class="diff-added-lines" style="background-color: #dfd;">h4.
Reject Old Exchanges <br>*Available as of Camel 2.11* <br> <br>This option
can be used to preserve ordering regardless of the event that delivered messages downstream
(capacity, timeout, etc).  If enabled using {{rejectOld()}}, the [Resequencer] will throw
a {{MessageRejectedException}} when an incoming [Exchange] is &quot;older&quot; (based
on the Comparator) than the last delivered message.  This provides an extra level of control
with regards to delayed message ordering.  <br> <br>{code} <br>from(&quot;direct:start&quot;)
<br>    .onException(MessageRejectedException.class).handled(true).to(&quot;mock:error&quot;).end()
<br>    .resequence(header(&quot;seqno&quot;)).stream().timeout(1000).rejectOld()
<br>    .to(&quot;mock:result&quot;); <br>{code} <br> <br>This
option is available for the stream resequencer only. <br> <br></td></tr>
            <tr><td class="diff-unchanged" >h3. Stream Resequencing <br>
<br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
    
            </table>
    </div>                            <h4>Full Content</h4>
                    <div class="notificationGreySide">
        <h3><a name="Resequencer-Resequencer"></a>Resequencer</h3>

<p>The <a href="http://www.enterpriseintegrationpatterns.com/Resequencer.html" class="external-link"
rel="nofollow">Resequencer</a> from the <a href="/confluence/display/CAMEL/Enterprise+Integration+Patterns"
title="Enterprise Integration Patterns">EIP patterns</a> allows you to reorganise
messages based on some comparator. By default in Camel we use an <a href="/confluence/display/CAMEL/Expression"
title="Expression">Expression</a> to create the comparator; so that you can compare
by a message header or the body or a piece of a message etc.</p>

<p><span class="image-wrap" style=""><img src="http://www.enterpriseintegrationpatterns.com/img/Resequencer.gif"
style="border: 0px solid black" /></span></p>

<div class='panelMacro'><table class='tipMacro'><colgroup><col width='24'><col></colgroup><tr><td
valign='top'><img src="/confluence/images/icons/emoticons/check.gif" width="16" height="16"
align="absmiddle" alt="" border="0"></td><td><b>Change in Camel 2.7</b><br
/>The <tt>&lt;batch-config&gt;</tt> and <tt>&lt;stream-config&gt;</tt>
tags in XML DSL in the Resequencer EIP must now be configured in the top, and not in the bottom.
So if you use those, then move them up just below the <tt>&lt;resequence&gt;</tt>
EIP starts in the XML. If you are using Camel older than 2.7, then those configs should be
at the bottom.</td></tr></table></div>

<p>Camel supports two resequencing algorithms:</p>

<ul>
	<li><b>Batch resequencing</b> collects messages into a batch, sorts the
messages and sends them to their output.</li>
	<li><b>Stream resequencing</b> re-orders (continuous) message streams based
on the detection of gaps between messages.</li>
</ul>


<p>By default the <a href="/confluence/display/CAMEL/Resequencer" title="Resequencer">Resequencer</a>
does not support duplicate messages and will only keep the last message, in case a message
arrives with the same message expression. However in the batch mode you can enable it to allow
duplicates.</p>

<h3><a name="Resequencer-BatchResequencing"></a>Batch Resequencing</h3>

<p>The following example shows how to use the batch-processing resequencer so that messages
are sorted in order of the <b>body()</b> expression. That is messages are collected
into a batch (either by a maximum number of messages per batch or using a timeout) then they
are sorted in order and then sent out to their output.</p>

<p><b>Using the <a href="/confluence/display/CAMEL/Fluent+Builders" title="Fluent
Builders">Fluent Builders</a></b></p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">from(<span class="code-quote">"direct:start"</span>)
    .resequence().body()
    .to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>This is equvalent to </p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"direct:start"</span>)
    .resequence(body()).batch()
    .to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>The batch-processing resequencer can be further configured via the <tt>size()</tt>
and <tt>timeout()</tt> methods.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"direct:start"</span>)
    .resequence(body()).batch().size(300).timeout(4000L)
    .to(<span class="code-quote">"mock:result"</span>)
</pre>
</div></div>

<p>This sets the batch size to 300 and the batch timeout to 4000 ms (by default, the
batch size is 100 and the timeout is 1000 ms). Alternatively, you can provide a configuration
object.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"direct:start"</span>)
    .resequence(body()).batch(<span class="code-keyword">new</span> BatchResequencerConfig(300,
4000L))
    .to(<span class="code-quote">"mock:result"</span>)
</pre>
</div></div>


<p>So the above example will reorder messages from endpoint <b>direct:a</b>
in order of their bodies, to the endpoint <b>mock:result</b>. <br/>
Typically you'd use a header rather than the body to order things; or maybe a part of the
body. So you could replace this expression with</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
resequencer(header(<span class="code-quote">"mySeqNo"</span>))
</pre>
</div></div>

<p>for example to reorder messages using a custom sequence number in the header <tt>mySeqNo</tt>.</p>

<p>You can of course use many different <a href="/confluence/display/CAMEL/Expression"
title="Expression">Expression</a> languages such as <a href="/confluence/display/CAMEL/XPath"
title="XPath">XPath</a>, <a href="/confluence/display/CAMEL/XQuery" title="XQuery">XQuery</a>,
<a href="/confluence/display/CAMEL/SQL" title="SQL">SQL</a> or various <a href="/confluence/display/CAMEL/Scripting+Languages"
title="Scripting Languages">Scripting Languages</a>.</p>

<p><b>Using the <a href="/confluence/display/CAMEL/Spring+XML+Extensions" title="Spring
XML Extensions">Spring XML Extensions</a></b></p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-xml">
<span class="code-tag">&lt;camelContext id=<span class="code-quote">"camel"</span>
xmlns=<span class="code-quote">"http://camel.apache.org/schema/spring"</span>&gt;</span>
  <span class="code-tag">&lt;route&gt;</span>
    <span class="code-tag">&lt;from uri=<span class="code-quote">"direct:start"</span>
/&gt;</span>
    <span class="code-tag">&lt;resequence&gt;</span>
      <span class="code-tag">&lt;simple&gt;</span>body<span class="code-tag">&lt;/simple&gt;</span>
      <span class="code-tag">&lt;to uri=<span class="code-quote">"mock:result"</span>
/&gt;</span>
      &lt;!-- 
        batch-config can be ommitted for default (batch) resequencer settings
      --&gt;
      <span class="code-tag">&lt;batch-config batchSize=<span class="code-quote">"300"</span>
batchTimeout=<span class="code-quote">"4000"</span> /&gt;</span>
    <span class="code-tag">&lt;/resequence&gt;</span>
  <span class="code-tag">&lt;/route&gt;</span>
<span class="code-tag">&lt;/camelContext&gt;</span>
</pre>
</div></div>

<h4><a name="Resequencer-AllowDuplicates"></a>Allow Duplicates</h4>
<p><b>Available as of Camel 2.4</b></p>

<p>In the <tt>batch</tt> mode, you can now allow duplicates. In Java DSL
there is a <tt>allowDuplicates()</tt> method and in Spring XML there is an <tt>allowDuplicates=true</tt>
attribute on the <tt>&lt;batch-config/&gt;</tt> you can use to enable
it.</p>

<h4><a name="Resequencer-Reverse"></a>Reverse</h4>
<p><b>Available as of Camel 2.4</b></p>

<p>In the <tt>batch</tt> mode, you can now reverse the expression ordering.
By default the order is based on 0..9,A..Z, which would let messages with low numbers be ordered
first, and thus also also outgoing first. In some cases you want to reverse order, which is
now possible.</p>

<p>In Java DSL there is a <tt>reverse()</tt> method and in Spring XML there
is an <tt>reverse=true</tt> attribute on the <tt>&lt;batch-config/&gt;</tt>
you can use to enable it.</p>

<h4><a name="Resequencer-ResequenceJMSmessagesbasedonJMSPriority"></a>Resequence
JMS messages based on JMSPriority</h4>
<p><b>Available as of Camel 2.4</b></p>

<p>It's now much easier to use the <a href="/confluence/display/CAMEL/Resequencer"
title="Resequencer">Resequencer</a> to resequence messages from <a href="/confluence/display/CAMEL/JMS"
title="JMS">JMS</a> queues based on <tt>JMSPriority</tt>. For that to
work you need to use the two new options <tt>allowDuplicates</tt> and <tt>reverse</tt>.
</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">from(<span class="code-quote">"jms:queue:foo"</span>)
    <span class="code-comment">// sort by JMSPriority by allowing duplicates (message
can have same JMSPriority)
</span>    <span class="code-comment">// and use reverse ordering so 9 is first
output (most important), and 0 is last
</span>    <span class="code-comment">// use batch mode and fire every 3th second
</span>    .resequence(header(<span class="code-quote">"JMSPriority"</span>)).batch().timeout(3000).allowDuplicates().reverse()
    .to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>Notice this is <b>only</b> possible in the <tt>batch</tt>
mode of the <a href="/confluence/display/CAMEL/Resequencer" title="Resequencer">Resequencer</a>.</p>

<h4><a name="Resequencer-Ignoreinvalidexchanges"></a>Ignore invalid exchanges</h4>
<p><b>Available as of Camel 2.9</b></p>

<p>The <a href="/confluence/display/CAMEL/Resequencer" title="Resequencer">Resequencer</a>
EIP will from Camel 2.9 onwards throw a <tt>CamelExchangeException</tt> if the
incoming <a href="/confluence/display/CAMEL/Exchange" title="Exchange">Exchange</a>
is not valid for the resequencer - ie. the expression cannot be evaluated, such as a missing
header. You can use the option <tt>ignoreInvalidExchanges</tt> to ignore these
exceptions which means the <a href="/confluence/display/CAMEL/Resequencer" title="Resequencer">Resequencer</a>
will then skip the invalid <a href="/confluence/display/CAMEL/Exchange" title="Exchange">Exchange</a>.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">from(<span class="code-quote">"direct:start"</span>)
    .resequence(header(<span class="code-quote">"seqno"</span>)).batch().timeout(1000)
        <span class="code-comment">// ignore invalid exchanges (they are discarded)
</span>        .ignoreInvalidExchanges()
    .to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>This option is available for both batch and stream resequencer.</p>

<h4><a name="Resequencer-RejectOldExchanges"></a>Reject Old Exchanges</h4>
<p><b>Available as of Camel 2.11</b></p>

<p>This option can be used to preserve ordering regardless of the event that delivered
messages downstream (capacity, timeout, etc).  If enabled using <tt>rejectOld()</tt>,
the <a href="/confluence/display/CAMEL/Resequencer" title="Resequencer">Resequencer</a>
will throw a <tt>MessageRejectedException</tt> when an incoming <a href="/confluence/display/CAMEL/Exchange"
title="Exchange">Exchange</a> is "older" (based on the Comparator) than the last
delivered message.  This provides an extra level of control with regards to delayed message
ordering. </p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"direct:start"</span>)
    .onException(MessageRejectedException.class).handled(<span class="code-keyword">true</span>).to(<span
class="code-quote">"mock:error"</span>).end()
    .resequence(header(<span class="code-quote">"seqno"</span>)).stream().timeout(1000).rejectOld()
    .to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>This option is available for the stream resequencer only.</p>

<h3><a name="Resequencer-StreamResequencing"></a>Stream Resequencing</h3>

<p>The next example shows how to use the stream-processing resequencer. Messages are
re-ordered based on their sequence numbers given by a <tt>seqnum</tt> header using
gap detection and timeouts on the level of individual messages. </p>

<p><b>Using the <a href="/confluence/display/CAMEL/Fluent+Builders" title="Fluent
Builders">Fluent Builders</a></b></p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">from(<span class="code-quote">"direct:start"</span>).resequence(header(<span
class="code-quote">"seqnum"</span>)).stream().to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>The stream-processing resequencer can be further configured via the <tt>capacity()</tt>
and <tt>timeout()</tt> methods.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"direct:start"</span>)
    .resequence(header(<span class="code-quote">"seqnum"</span>)).stream().capacity(5000).timeout(4000L)
    .to(<span class="code-quote">"mock:result"</span>)
</pre>
</div></div>

<p>This sets the resequencer's capacity to 5000 and the timeout to 4000 ms (by default,
the capacity is 1000 and the timeout is 1000 ms). Alternatively, you can provide a configuration
object.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
from(<span class="code-quote">"direct:start"</span>)
    .resequence(header(<span class="code-quote">"seqnum"</span>)).stream(<span
class="code-keyword">new</span> StreamResequencerConfig(5000, 4000L))
    .to(<span class="code-quote">"mock:result"</span>)
</pre>
</div></div>

<p>The stream-processing resequencer algorithm is based on the detection of gaps in
a message stream rather than on a fixed batch size. Gap detection in combination with timeouts
removes the constraint of having to know the number of messages of a sequence (i.e. the batch
size) in advance. Messages must contain a unique sequence number for which a predecessor and
a successor is known. For example a message with the sequence number 3 has a predecessor message
with the sequence number 2 and a successor message with the sequence number 4. The message
sequence 2,3,5 has a gap because the sucessor of 3 is missing. The resequencer therefore has
to retain message 5 until message 4 arrives (or a timeout occurs). </p>

<p>If the maximum time difference between messages (with successor/predecessor relationship
with respect to the sequence number) in a message stream is known, then the resequencer's
timeout parameter should be set to this value. In this case it is guaranteed that all messages
of a stream are delivered in correct order to the next processor. The lower the timeout value
is compared to the out-of-sequence time difference the higher is the probability for out-of-sequence
messages delivered by this resequencer. Large timeout values should be supported by sufficiently
high capacity values. The capacity parameter is used to prevent the resequencer from running
out of memory. </p>

<p>By default, the stream resequencer expects <tt>long</tt> sequence numbers
but other sequence numbers types can be supported as well by providing a  custom expression.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java"><span class="code-keyword">public</span> class MyFileNameExpression
<span class="code-keyword">implements</span> Expression {
    
    <span class="code-keyword">public</span> <span class="code-object">String</span>
getFileName(Exchange exchange) {
        <span class="code-keyword">return</span> exchange.getIn().getBody(<span
class="code-object">String</span>.class);
    }
    
    <span class="code-keyword">public</span> <span class="code-object">Object</span>
evaluate(Exchange exchange) {
        <span class="code-comment">// parser the file name with YYYYMMDD-DNNN pattern
</span>        <span class="code-object">String</span> fileName = getFileName(exchange);
        <span class="code-object">String</span>[] files = fileName.split(<span
class="code-quote">"-D"</span>);
        <span class="code-object">Long</span> answer = <span class="code-object">Long</span>.parseLong(files[0])
* 1000 + <span class="code-object">Long</span>.parseLong(files[1]);
        <span class="code-keyword">return</span> answer;
    }
    

    <span class="code-keyword">public</span> &lt;T&gt; T evaluate(Exchange
exchange, <span class="code-object">Class</span>&lt;T&gt; type) {
        <span class="code-object">Object</span> result = evaluate(exchange);
        <span class="code-keyword">return</span> exchange.getContext().getTypeConverter().convertTo(type,
result);
    }

}
</pre>
</div></div>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">from(<span class="code-quote">"direct:start"</span>).resequence(<span
class="code-keyword">new</span> MyFileNameExpression()).stream().timeout(100).to(<span
class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>or custom comparator via the <tt>comparator()</tt> method</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
ExpressionResultComparator&lt;Exchange&gt; comparator = <span class="code-keyword">new</span>
MyComparator();
from(<span class="code-quote">"direct:start"</span>)
    .resequence(header(<span class="code-quote">"seqnum"</span>)).stream().comparator(comparator)
    .to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>or via a <tt>StreamResequencerConfig</tt> object.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
ExpressionResultComparator&lt;Exchange&gt; comparator = <span class="code-keyword">new</span>
MyComparator();
StreamResequencerConfig config = <span class="code-keyword">new</span> StreamResequencerConfig(100,
1000L, comparator);

from(<span class="code-quote">"direct:start"</span>)
    .resequence(header(<span class="code-quote">"seqnum"</span>)).stream(config)
    .to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>


<p><b>Using the <a href="/confluence/display/CAMEL/Spring+XML+Extensions" title="Spring
XML Extensions">Spring XML Extensions</a></b></p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-xml">
<span class="code-tag">&lt;camelContext id=<span class="code-quote">"camel"</span>
xmlns=<span class="code-quote">"http://camel.apache.org/schema/spring"</span>&gt;</span>
  <span class="code-tag">&lt;route&gt;</span>
    <span class="code-tag">&lt;from uri=<span class="code-quote">"direct:start"</span>/&gt;</span>
    <span class="code-tag">&lt;resequence&gt;</span>
      <span class="code-tag">&lt;simple&gt;</span>in.header.seqnum<span
class="code-tag">&lt;/simple&gt;</span>
      <span class="code-tag">&lt;to uri=<span class="code-quote">"mock:result"</span>
/&gt;</span>
      <span class="code-tag">&lt;stream-config capacity=<span class="code-quote">"5000"</span>
timeout=<span class="code-quote">"4000"</span>/&gt;</span>
    <span class="code-tag">&lt;/resequence&gt;</span>
  <span class="code-tag">&lt;/route&gt;</span>
<span class="code-tag">&lt;/camelContext&gt;</span>
</pre>
</div></div>

<h3><a name="Resequencer-FurtherExamples"></a>Further Examples</h3>

<p>For further examples of this pattern in use you could look at the <a href="http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java?view=markup"
class="external-link" rel="nofollow">batch-processing resequencer junit test case</a>
and the <a href="http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?view=markup"
class="external-link" rel="nofollow">stream-processing resequencer junit test case</a></p>

<h4><a name="Resequencer-UsingThisPattern"></a>Using This Pattern</h4>

<p>If you would like to use this EIP Pattern then please read the <a href="/confluence/display/CAMEL/Getting+Started"
title="Getting Started">Getting Started</a>, you may also find the <a href="/confluence/display/CAMEL/Architecture"
title="Architecture">Architecture</a> useful particularly the description of <a
href="/confluence/display/CAMEL/Endpoint" title="Endpoint">Endpoint</a> and <a
href="/confluence/display/CAMEL/URIs" title="URIs">URIs</a>. Then you could try out
some of the <a href="/confluence/display/CAMEL/Examples" title="Examples">Examples</a>
first before trying this pattern out.</p>

    </div>
        <div id="commentsSection" class="wiki-content pageSection">
        <div style="float: right;">
            <a href="https://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
        </div>
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/Resequencer">View
Online</a>
        |
        <a href="https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=55179&revisedVersion=22&originalVersion=21">View
Changes</a>
                |
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/Resequencer?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

Mime
View raw message