camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From conflue...@apache.org
Subject [CONF] Apache Camel > Idempotent Consumer
Date Mon, 27 Jun 2011 05:12:00 GMT
<html>
<head>
    <base href="https://cwiki.apache.org/confluence">
            <link rel="stylesheet" href="/confluence/s/2042/9/1/_/styles/combined.css?spaceKey=CAMEL&amp;forWysiwyg=true"
type="text/css">
    </head>
<body style="background: white;" bgcolor="white" class="email-body">
<div id="pageContent">
<div id="notificationFormat">
<div class="wiki-content">
<div class="email">
    <h2><a href="https://cwiki.apache.org/confluence/display/CAMEL/Idempotent+Consumer">Idempotent
Consumer</a></h2>
    <h4>Page <b>edited</b> by             <a href="https://cwiki.apache.org/confluence/display/~davsclaus">Claus
Ibsen</a>
    </h4>
        <br/>
                         <h4>Changes (1)</h4>
                                 
    
<div id="page-diffs">
                    <table class="diff" cellpadding="0" cellspacing="0">
    
            <tr><td class="diff-snipped" >...<br></td></tr>
            <tr><td class="diff-unchanged" >- [FileIdempotentRepository|FILE2]
<br>- [HazelcastIdempotentRepository|Hazelcast Component] (*Available as of Camel 2.8*)
<br></td></tr>
            <tr><td class="diff-changed-lines" >- <span class="diff-changed-words">[JdbcMessageIdRepository|SQL<span
class="diff-added-chars"style="background-color: #dfd;"> Component</span>]</span>
(*Available as of Camel 2.7*) <br></td></tr>
            <tr><td class="diff-unchanged" >- [JpaMessageIdRepository|JPA] <br>
<br></td></tr>
            <tr><td class="diff-snipped" >...<br></td></tr>
    
            </table>
    </div>                            <h4>Full Content</h4>
                    <div class="notificationGreySide">
        <h3><a name="IdempotentConsumer-IdempotentConsumer"></a>Idempotent
Consumer</h3>

<p>The <a href="http://www.enterpriseintegrationpatterns.com/IdempotentReceiver.html"
class="external-link" rel="nofollow">Idempotent Consumer</a> from the <a href="/confluence/display/CAMEL/Enterprise+Integration+Patterns"
title="Enterprise Integration Patterns">EIP patterns</a> is used to filter out duplicate
messages. </p>

<p>This pattern is implemented using the <a href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/idempotent/IdempotentConsumer.html"
class="external-link" rel="nofollow">IdempotentConsumer</a> class. This uses an <a
href="/confluence/display/CAMEL/Expression" title="Expression">Expression</a> to
calculate a unique message ID string for a given message exchange; this ID can then be looked
up in the <a href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/spi/IdempotentRepository.html"
class="external-link" rel="nofollow">IdempotentRepository</a> to see if it has been
seen before; if it has the message is consumed; if its not then the message is processed and
the ID is added to the repository.</p>

<p>The Idempotent Consumer essentially acts like a <a href="/confluence/display/CAMEL/Message+Filter"
title="Message Filter">Message Filter</a> to filter out duplicates.</p>

<p>Camel will add the message id eagerly to the repository to detect duplication also
for Exchanges currently in progress.<br/>
On completion Camel will remove the message id from the repository if the Exchange failed,
otherwise it stays there.</p>

<p>Camel provides the following Idempotent Consumer implementations:</p>
<ul class="alternate" type="square">
	<li>MemoryIdempotentRepository</li>
	<li><a href="/confluence/display/CAMEL/File2" title="File2">FileIdempotentRepository</a></li>
	<li><a href="/confluence/display/CAMEL/Hazelcast+Component" title="Hazelcast Component">HazelcastIdempotentRepository</a>
(<b>Available as of Camel 2.8</b>)</li>
	<li><a href="/confluence/display/CAMEL/SQL+Component" title="SQL Component">JdbcMessageIdRepository</a>
(<b>Available as of Camel 2.7</b>)</li>
	<li><a href="/confluence/display/CAMEL/JPA" title="JPA">JpaMessageIdRepository</a></li>
</ul>


<h3><a name="IdempotentConsumer-Options"></a>Options</h3>
<p>The Idempotent Consumer has the following options:</p>

<div class='table-wrap'>
<table class='confluenceTable'><tbody>
<tr>
<th class='confluenceTh'> Option </th>
<th class='confluenceTh'> Default </th>
<th class='confluenceTh'> Description </th>
</tr>
<tr>
<td class='confluenceTd'> eager </td>
<td class='confluenceTd'> true </td>
<td class='confluenceTd'> <b>Camel 2.0:</b> Eager controls whether Camel
adds the message to the repository before or after the exchange has been processed. If enabled
before then Camel will be able to detect duplicate messages even when messages are currently
in progress. By disabling Camel will only detect duplicates when a message has successfully
been processed. </td>
</tr>
<tr>
<td class='confluenceTd'> messageIdRepositoryRef </td>
<td class='confluenceTd'> <tt>null</tt> </td>
<td class='confluenceTd'> A reference to a <tt>IdempotentRepository</tt>
to lookup in the registry. This option is mandatory when using XML DSL. </td>
</tr>
<tr>
<td class='confluenceTd'> skipDuplicate </td>
<td class='confluenceTd'> true </td>
<td class='confluenceTd'> <b>Camel 2.8:</b> Sets whether to skip duplicate
messages. If set to <tt>false</tt> then the message will be continued. However
the <a href="/confluence/display/CAMEL/Exchange" title="Exchange">Exchange</a>
has been marked as a duplicate by having the <tt>Exchange.DUPLICATE_MESSAG</tt>
exchange property set to a <tt>Boolean.TRUE</tt> value. </td>
</tr>
</tbody></table>
</div>


<h3><a name="IdempotentConsumer-UsingtheFluentBuilders"></a><b>Using
the <a href="/confluence/display/CAMEL/Fluent+Builders" title="Fluent Builders">Fluent
Builders</a></b></h3>

<p>The following example will use the header <b>myMessageId</b> to filter
out duplicates</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">RouteBuilder builder = <span class="code-keyword">new</span>
RouteBuilder() {
    <span class="code-keyword">public</span> void configure() {
        errorHandler(deadLetterChannel(<span class="code-quote">"mock:error"</span>));

        from(<span class="code-quote">"seda:a"</span>)
            .idempotentConsumer(header(<span class="code-quote">"myMessageId"</span>),
                    MemoryIdempotentRepository.memoryIdempotentRepository(200))
            .to(<span class="code-quote">"seda:b"</span>);
    }
};
</pre>
</div></div>

<p>The above <a href="https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java"
class="external-link" rel="nofollow">example</a> will use an in-memory based <a
href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/idempotent/MessageIdRepository.html"
class="external-link" rel="nofollow">MessageIdRepository</a> which can easily run
out of memory and doesn't work in a clustered environment. So you might prefer to use the
JPA based implementation which uses  a database to store the message IDs which have been processed</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">from(<span class="code-quote">"direct:start"</span>).idempotentConsumer(
        header(<span class="code-quote">"messageId"</span>),
        jpaMessageIdRepository(lookup(JpaTemplate.class), PROCESSOR_NAME)
).to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>In the above <a href="https://svn.apache.org/repos/asf/camel/trunk/components/camel-jpa/src/test/java/org/apache/camel/processor/jpa/JpaIdempotentConsumerTest.java"
class="external-link" rel="nofollow">example</a> we are using the header <b>messageId</b>
to filter out duplicates and using the collection <b>myProcessorName</b> to indicate
the Message ID Repository to use. This name is important as you could process the same message
by many different processors; so each may require its own logical Message ID Repository.</p>

<p>For further examples of this pattern in use you could look at the <a href="http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java?view=markup"
class="external-link" rel="nofollow">junit test case</a></p>

<h3><a name="IdempotentConsumer-SpringXMLexample"></a>Spring XML example</h3>

<p>The following example will use the header <b>myMessageId</b> to filter
out duplicates</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-xml"><span class="code-tag"><span class="code-comment">&lt;!--
repository for the idempotent consumer --&gt;</span></span>
<span class="code-tag">&lt;bean id=<span class="code-quote">"myRepo"</span>
class=<span class="code-quote">"org.apache.camel.processor.idempotent.MemoryIdempotentRepository"</span>/&gt;</span>

<span class="code-tag">&lt;camelContext xmlns=<span class="code-quote">"http://camel.apache.org/schema/spring"</span>&gt;</span>
    <span class="code-tag">&lt;route&gt;</span>
        <span class="code-tag">&lt;from uri=<span class="code-quote">"direct:start"</span>/&gt;</span>
        <span class="code-tag">&lt;idempotentConsumer messageIdRepositoryRef=<span
class="code-quote">"myRepo"</span>&gt;</span>
            <span class="code-tag"><span class="code-comment">&lt;!-- use
the messageId header as key for identifying duplicate messages --&gt;</span></span>
            <span class="code-tag">&lt;header&gt;</span>messageId<span
class="code-tag">&lt;/header&gt;</span>
            <span class="code-tag"><span class="code-comment">&lt;!-- if not
a duplicate send it to this mock endpoint --&gt;</span></span>
            <span class="code-tag">&lt;to uri=<span class="code-quote">"mock:result"</span>/&gt;</span>
        <span class="code-tag">&lt;/idempotentConsumer&gt;</span>
    <span class="code-tag">&lt;/route&gt;</span>
<span class="code-tag">&lt;/camelContext&gt;</span>
</pre>
</div></div>


<h3><a name="IdempotentConsumer-Howtohandleduplicatemessagesintheroute"></a>How
to handle duplicate messages in the route</h3>
<p><b>Available as of Camel 2.8</b></p>

<p>You can now set the <tt>skipDuplicate</tt> option to <tt>false</tt>
which instructs the idempotent consumer to route duplicate messages as well. However the duplicate
message has been marked as duplicate by having a property on the <a href="/confluence/display/CAMEL/Exchange"
title="Exchange">Exchange</a> set to true. We can leverage this fact by using a <a
href="/confluence/display/CAMEL/Content+Based+Router" title="Content Based Router">Content
Based Router</a> or <a href="/confluence/display/CAMEL/Message+Filter" title="Message
Filter">Message Filter</a> to detect this and handle duplicate messages.</p>

<p>For example in the following example we use the <a href="/confluence/display/CAMEL/Message+Filter"
title="Message Filter">Message Filter</a> to send the message to a duplicate endpoint,
and then stop continue routing that message.</p>

<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader"
style="border-bottom-width: 1px;"><b>Filter duplicate messages</b></div><div
class="codeContent panelContent">
<pre class="code-java">from(<span class="code-quote">"direct:start"</span>)
    <span class="code-comment">// instruct idempotent consumer to not skip duplicates
as we will filter then our self
</span>    .idempotentConsumer(header(<span class="code-quote">"messageId"</span>)).messageIdRepository(repo).skipDuplicate(<span
class="code-keyword">false</span>)
    .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(<span class="code-keyword">true</span>))
        <span class="code-comment">// filter out duplicate messages by sending them
to someplace <span class="code-keyword">else</span> and then stop
</span>        .to(<span class="code-quote">"mock:duplicate"</span>)
        .stop()
    .end()
    <span class="code-comment">// and here we process only <span class="code-keyword">new</span>
messages (no duplicates)
</span>    .to(<span class="code-quote">"mock:result"</span>);
</pre>
</div></div>

<p>The sample example in XML DSL would be:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeHeader panelHeader"
style="border-bottom-width: 1px;"><b>Filter duplicate messages</b></div><div
class="codeContent panelContent">
<pre class="code-xml">
<span class="code-tag"><span class="code-comment">&lt;!-- idempotent repository,
just use a memory based for testing --&gt;</span></span>
<span class="code-tag">&lt;bean id=<span class="code-quote">"myRepo"</span>
class=<span class="code-quote">"org.apache.camel.processor.idempotent.MemoryIdempotentRepository"</span>/&gt;</span>

<span class="code-tag">&lt;camelContext xmlns=<span class="code-quote">"http://camel.apache.org/schema/spring"</span>&gt;</span>
    <span class="code-tag">&lt;route&gt;</span>
        <span class="code-tag">&lt;from uri=<span class="code-quote">"direct:start"</span>/&gt;</span>
        <span class="code-tag"><span class="code-comment">&lt;!-- we do not
want to skip any duplicate messages --&gt;</span></span>
        <span class="code-tag">&lt;idempotentConsumer messageIdRepositoryRef=<span
class="code-quote">"myRepo"</span> skipDuplicate=<span class="code-quote">"false"</span>&gt;</span>
            <span class="code-tag"><span class="code-comment">&lt;!-- use
the messageId header as key for identifying duplicate messages --&gt;</span></span>
            <span class="code-tag">&lt;header&gt;</span>messageId<span
class="code-tag">&lt;/header&gt;</span>
            <span class="code-tag"><span class="code-comment">&lt;!-- we will
to handle duplicate messages using a filter --&gt;</span></span>
            <span class="code-tag">&lt;filter&gt;</span>
                <span class="code-tag"><span class="code-comment">&lt;!--
the filter will only react on duplicate messages, if this property is set on the Exchange
--&gt;</span></span>
                <span class="code-tag">&lt;property&gt;</span>CamelDuplicateMessage<span
class="code-tag">&lt;/property&gt;</span>
                <span class="code-tag"><span class="code-comment">&lt;!--
and send the message to this mock, due its part of an unit test --&gt;</span></span>
                <span class="code-tag"><span class="code-comment">&lt;!--
but you can of course do anything as its part of the route --&gt;</span></span>
                <span class="code-tag">&lt;to uri=<span class="code-quote">"mock:duplicate"</span>/&gt;</span>
                <span class="code-tag"><span class="code-comment">&lt;!--
and then stop --&gt;</span></span>
                <span class="code-tag">&lt;stop/&gt;</span>
            <span class="code-tag">&lt;/filter&gt;</span>
            <span class="code-tag"><span class="code-comment">&lt;!-- here
we route only new messages --&gt;</span></span>
            <span class="code-tag">&lt;to uri=<span class="code-quote">"mock:result"</span>/&gt;</span>
        <span class="code-tag">&lt;/idempotentConsumer&gt;</span>
    <span class="code-tag">&lt;/route&gt;</span>
<span class="code-tag">&lt;/camelContext&gt;</span>
</pre>
</div></div>


<h3><a name="IdempotentConsumer-Howtohandleduplicatemessageinaclusteredenvironmentwithadatagrid"></a>How
to handle duplicate message in a clustered environment with a data grid</h3>
<p><b>Available as of Camel 2.8</b></p>

<p>If you have running Camel in a clustered environment, a in memory idempotent repository
doesn't work (see above). You can setup either a central database or use the idempotent consumer
implementation based on the <a href="http://www.hazelcast.com/" class="external-link" rel="nofollow">Hazelcast</a>
data grid. Hazelcast finds the nodes over multicast (which is default - configure Hazelcast
for tcp-ip) and creates automatically a map based repository:</p>

<div class="code panel" style="border-width: 1px;"><div class="codeContent panelContent">
<pre class="code-java">
  HazelcastIdempotentRepository idempotentRepo = <span class="code-keyword">new</span>
HazelcastIdempotentRepository(<span class="code-quote">"myrepo"</span>);

  from(<span class="code-quote">"direct:in"</span>).idempotentConsumer(header(<span
class="code-quote">"messageId"</span>), idempotentRepo).to(<span class="code-quote">"mock:out"</span>);
</pre>
</div></div>

<p>You have to define how long the repository should hold each message id (default is
to delete it never). To avoid that you run out of memory you should create an eviction strategy
based on the <a href="http://www.hazelcast.com/documentation.jsp#MapEviction" class="external-link"
rel="nofollow">Hazelcast configuration</a>. For additional information see <a
href="/confluence/display/CAMEL/Hazelcast+Component" title="Hazelcast Component">camel-hazelcast</a>.</p>

<p>See this <a href="/confluence/display/CAMEL/Hazelcast+Idempotent+Repository+Tutorial"
title="Hazelcast Idempotent Repository Tutorial">little tutorial</a>, how setup such
an idempotent repository on two cluster nodes using Apache Karaf.</p>

<h4><a name="IdempotentConsumer-UsingThisPattern"></a>Using This Pattern</h4>

<p>If you would like to use this EIP Pattern then please read the <a href="/confluence/display/CAMEL/Getting+Started"
title="Getting Started">Getting Started</a>, you may also find the <a href="/confluence/display/CAMEL/Architecture"
title="Architecture">Architecture</a> useful particularly the description of <a
href="/confluence/display/CAMEL/Endpoint" title="Endpoint">Endpoint</a> and <a
href="/confluence/display/CAMEL/URIs" title="URIs">URIs</a>. Then you could try out
some of the <a href="/confluence/display/CAMEL/Examples" title="Examples">Examples</a>
first before trying this pattern out.</p>
    </div>
        <div id="commentsSection" class="wiki-content pageSection">
        <div style="float: right;">
            <a href="https://cwiki.apache.org/confluence/users/viewnotifications.action"
class="grey">Change Notification Preferences</a>
        </div>
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/Idempotent+Consumer">View
Online</a>
        |
        <a href="https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=51954&revisedVersion=27&originalVersion=26">View
Changes</a>
                |
        <a href="https://cwiki.apache.org/confluence/display/CAMEL/Idempotent+Consumer?showComments=true&amp;showCommentArea=true#addcomment">Add
Comment</a>
            </div>
</div>
</div>
</div>
</div>
</body>
</html>

Mime
View raw message