camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From build...@apache.org
Subject svn commit: r1007888 [4/4] - in /websites/production/camel/content: book-in-one-page.html book-pattern-appendix.html cache/main.pageCache idempotent-consumer.html kafka.html
Date Tue, 07 Mar 2017 15:20:59 GMT
Modified: websites/production/camel/content/idempotent-consumer.html
==============================================================================
--- websites/production/camel/content/idempotent-consumer.html (original)
+++ websites/production/camel/content/idempotent-consumer.html Tue Mar  7 15:20:59 2017
@@ -86,7 +86,7 @@
 	<tbody>
         <tr>
         <td valign="top" width="100%">
-<div class="wiki-content maincontent"><h3 id="IdempotentConsumer-IdempotentConsumer">Idempotent Consumer</h3><p>The <a shape="rect" class="external-link" href="http://www.enterpriseintegrationpatterns.com/IdempotentReceiver.html" rel="nofollow">Idempotent Consumer</a> from the <a shape="rect" href="enterprise-integration-patterns.html">EIP patterns</a> is used to filter out duplicate messages.</p><p>This pattern is implemented using the <a shape="rect" class="external-link" href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/idempotent/IdempotentConsumer.html">IdempotentConsumer</a> class. This uses an <a shape="rect" href="expression.html">Expression</a> to calculate a unique message ID string for a given message exchange; this ID can then be looked up in the <a shape="rect" class="external-link" href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/spi/IdempotentRepository.html">IdempotentRepository</a> to see if it h
 as been seen before; if it has the message is consumed; if its not then the message is processed and the ID is added to the repository.</p><p>The Idempotent Consumer essentially acts like a <a shape="rect" href="message-filter.html">Message Filter</a> to filter out duplicates.</p><p>Camel will add the message id eagerly to the repository to detect duplication also for Exchanges currently in progress.<br clear="none"> On completion Camel will remove the message id from the repository if the Exchange failed, otherwise it stays there.</p><p>Camel provides the following Idempotent Consumer implementations:</p><ul class="alternate"><li>MemoryIdempotentRepository</li><li><a shape="rect" href="file2.html">FileIdempotentRepository</a></li><li><a shape="rect" href="hazelcast-component.html">HazelcastIdempotentRepository</a> (<strong>Available as of Camel 2.8</strong>)</li><li><a shape="rect" href="sql-component.html">JdbcMessageIdRepository</a> (<strong>Available as of Camel 2.7</strong>)</l
 i><li><a shape="rect" href="jpa.html">JpaMessageIdRepository</a></li><li><p><a shape="rect" href="infinispan.html">InfinispanIdempotentRepository</a> (<strong>Available as of Camel 2.13.0)</strong></p></li><li><p><a shape="rect" href="jcache.html">JCacheIdempotentRepository</a><strong>&#160;(<strong>Available as of Camel 2.17.0)</strong></strong></p></li><li><p><a shape="rect" href="spring.html">SpringCacheIdempotentRepository</a>&#160;<strong>(<strong>Available as of Camel 2.17.1)</strong></strong><strong><strong><br clear="none"></strong></strong></p></li><li><p><strong><strong><a shape="rect" href="ehcache.html">EhcacheIdempotentRepository</a>&#160;<strong>(<strong>Available as of Camel 2.18.0)</strong></strong><br clear="none"></strong></strong></p></li></ul><h3 id="IdempotentConsumer-Options">Options</h3><p>The Idempotent Consumer has the following options:</p><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>O
 ption</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>eager</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>Eager controls whether Camel adds the message to the repository before or after the exchange has been processed. If enabled before then Camel will be able to detect duplicate messages even when messages are currently in progress. By disabling Camel will only detect duplicates when a message has successfully been processed.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>messageIdRepositoryRef</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><code>null</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>A reference to a <code>IdempotentRepository</code> to lookup in the registry. This option is man
 datory when using XML DSL.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>skipDuplicate</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.8:</strong> Sets whether to skip duplicate messages. If set to <code>false</code> then the message will be continued. However the <a shape="rect" href="exchange.html">Exchange</a> has been marked as a duplicate by having the <code>Exchange.DUPLICATE_MESSAG</code> exchange property set to a <code>Boolean.TRUE</code> value.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>removeOnFailure</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.9:</strong> Sets whether to remove the id of an Exchange that failed.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">completionEager</td><td colspan="1" rowspan="1" class="confl
 uenceTd">false</td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.16:</strong> Sets whether to complete the idempotent consumer eager or when the exchange is done.</p><p>If this option is true to complete eager, then the idempotent consumer will trigger its completion when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange is continued routed after the block ends, then whatever happens there does not affect the state.</p><p>If this option is false (default) to not complete eager, then the idempotent consumer will complete when the exchange is done being routed. So if the exchange is continued routed after the block ends, then whatever happens there also affect the state. For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback.</p></td></tr></tbody></table></div><h3 id="IdempotentConsumer-Usingthe"><strong>Using the <a shape="rect" href="fluent-builders.html
 ">Fluent Builders</a></strong></h3><p>The following example will use the header <strong>myMessageId</strong> to filter out duplicates</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<div class="wiki-content maincontent"><h3 id="IdempotentConsumer-IdempotentConsumer">Idempotent Consumer</h3><p>The <a shape="rect" class="external-link" href="http://www.enterpriseintegrationpatterns.com/IdempotentReceiver.html" rel="nofollow">Idempotent Consumer</a> from the <a shape="rect" href="enterprise-integration-patterns.html">EIP patterns</a> is used to filter out duplicate messages.</p><p>This pattern is implemented using the <a shape="rect" class="external-link" href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/idempotent/IdempotentConsumer.html">IdempotentConsumer</a> class. This uses an <a shape="rect" href="expression.html">Expression</a> to calculate a unique message ID string for a given message exchange; this ID can then be looked up in the <a shape="rect" class="external-link" href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/spi/IdempotentRepository.html">IdempotentRepository</a> to see if it h
 as been seen before; if it has the message is consumed; if its not then the message is processed and the ID is added to the repository.</p><p>The Idempotent Consumer essentially acts like a <a shape="rect" href="message-filter.html">Message Filter</a> to filter out duplicates.</p><p>Camel will add the message id eagerly to the repository to detect duplication also for Exchanges currently in progress.<br clear="none"> On completion Camel will remove the message id from the repository if the Exchange failed, otherwise it stays there.</p><p>Camel provides the following Idempotent Consumer implementations:</p><ul class="alternate"><li>MemoryIdempotentRepository</li><li><a shape="rect" href="file2.html">FileIdempotentRepository</a></li><li><a shape="rect" href="hazelcast-component.html">HazelcastIdempotentRepository</a> (<strong>Available as of Camel 2.8</strong>)</li><li><a shape="rect" href="sql-component.html">JdbcMessageIdRepository</a> (<strong>Available as of Camel 2.7</strong>)</l
 i><li><a shape="rect" href="jpa.html">JpaMessageIdRepository</a></li><li><p><a shape="rect" href="infinispan.html">InfinispanIdempotentRepository</a> (<strong>Available as of Camel 2.13.0)</strong></p></li><li><p><a shape="rect" href="jcache.html">JCacheIdempotentRepository</a><strong>&#160;(<strong>Available as of Camel 2.17.0)</strong></strong></p></li><li><p><a shape="rect" href="spring.html">SpringCacheIdempotentRepository</a>&#160;<strong>(<strong>Available as of Camel 2.17.1)</strong></strong><strong><strong><br clear="none"></strong></strong></p></li><li><p><a shape="rect" href="ehcache.html">EhcacheIdempotentRepository</a><strong><strong>&#160;<strong>(<strong>Available as of Camel 2.18.0)</strong></strong></strong></strong></p></li><li><a shape="rect" href="kafka.html">KafkaIdempotentRepository</a> (<strong>Available as of Camel 2.19.0)</strong></li></ul><h3 id="IdempotentConsumer-Options">Options</h3><p>The Idempotent Consumer has the following options:</p><div class="tabl
 e-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Option</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>eager</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>Eager controls whether Camel adds the message to the repository before or after the exchange has been processed. If enabled before then Camel will be able to detect duplicate messages even when messages are currently in progress. By disabling Camel will only detect duplicates when a message has successfully been processed.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>messageIdRepositoryRef</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><code>null</code></p></td><td colspan="1" rowspan="1" class="confluence
 Td"><p>A reference to a <code>IdempotentRepository</code> to lookup in the registry. This option is mandatory when using XML DSL.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>skipDuplicate</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.8:</strong> Sets whether to skip duplicate messages. If set to <code>false</code> then the message will be continued. However the <a shape="rect" href="exchange.html">Exchange</a> has been marked as a duplicate by having the <code>Exchange.DUPLICATE_MESSAG</code> exchange property set to a <code>Boolean.TRUE</code> value.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>removeOnFailure</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.9:</strong> Sets whether to remove the id of an Exchange that failed.</p></td></tr><tr><td co
 lspan="1" rowspan="1" class="confluenceTd">completionEager</td><td colspan="1" rowspan="1" class="confluenceTd">false</td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.16:</strong> Sets whether to complete the idempotent consumer eager or when the exchange is done.</p><p>If this option is true to complete eager, then the idempotent consumer will trigger its completion when the exchange reached the end of the block of the idempotent consumer pattern. So if the exchange is continued routed after the block ends, then whatever happens there does not affect the state.</p><p>If this option is false (default) to not complete eager, then the idempotent consumer will complete when the exchange is done being routed. So if the exchange is continued routed after the block ends, then whatever happens there also affect the state. For example if the exchange failed due to an exception, then the state of the idempotent consumer will be a rollback.</p></td></tr></tbody></table>
 </div><h3 id="IdempotentConsumer-Usingthe"><strong>Using the <a shape="rect" href="fluent-builders.html">Fluent Builders</a></strong></h3><p>The following example will use the header <strong>myMessageId</strong> to filter out duplicates</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
 <script class="brush: java; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[
 RouteBuilder builder = new RouteBuilder() {
     public void configure() {

Modified: websites/production/camel/content/kafka.html
==============================================================================
--- websites/production/camel/content/kafka.html (original)
+++ websites/production/camel/content/kafka.html Tue Mar  7 15:20:59 2017
@@ -102,13 +102,16 @@
 </div></div><h3 id="Kafka-URIformat">URI format</h3><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
 <script class="brush: java; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[kafka:server:port[?options]
 ]]></script>
-</div></div><p>&#160;</p><h3 id="Kafka-Options(Camel2.16orolder)">Options (Camel 2.16 or older)</h3><div class="confluenceTableSmall"><div class="table-wrap">
- <table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperHost</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The zookeeper host to use</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperPort</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>2181</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The zookeeper port to use</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">zookeeperConnect</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd"><strong>Camel 2.13.3/2.14.1:</strong> If in use, then zookeeperHost/zookeeperPort is no
 t used.</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>topic</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The topic to use</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>groupId</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>partitioner</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>consumerStreams</p></td><td colspan="1" rowspan="1" class="confluenceTd">10</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>clientId</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" c
 lass="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperSessionTimeoutMs</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperConnectionTimeoutMs</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperSyncTimeMs</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">consumersCount</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">1</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.15.0:</strong> The numbe
 r of consumers that connect to kafka server</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">batchSize</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">100</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.15.0: </strong>The batchSize that the BatchingConsumerTask processes once, deprecated since <strong>2.17.1</strong>, removed<strong><br clear="none"></strong>since <strong>2.18.0</strong></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">barrierAwaitTimeoutMs</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">10000</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.15.0: </strong>If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for <span style="color: rgb(51,51,51);">barr
 ierAwaitTimeoutMs</span>, deprecated since <strong>2.17.1</strong>, removed since&#160;<strong>2.18.0</strong><strong>.<br clear="none"></strong></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">bridgeEndpoint</td><td colspan="1" rowspan="1" class="confluenceTd">false</td><td colspan="1" rowspan="1" class="confluenceTd">Camel 2.16.0: If bridgeEndpoint is true, the producer will ignore the topic header setting of the message.</td></tr></tbody></table>
-</div></div><p>You can append query options to the URI in the following format, <code>?option=value&amp;option=value&amp;...</code></p><h3 id="Kafka-ProducerOptions(Camel2.16orolder)">Producer Options&#160;(Camel 2.16 or older)</h3><div class="confluenceTableSmall"><div class="table-wrap">
- <table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>producerType</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>sync (Taken from native KafkaProducer class)</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>sync - send message/batch immediately, and wait until response is received</p><p>async - queue the message/batch to send. There is a thread per broker (Kafka node) which polls from this queue upon <span>queueBufferingMaxMs or <span>batchNumMessages</span></span></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">compressionCodec</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1"
  class="confluenceTd">compressedTopics</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">messageSendMaxRetries</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">retryBackoffMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">topicMetadataRefreshIntervalMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">sendBufferBytes</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="
 1" rowspan="1" class="confluenceTd">requestRequiredAcks</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">requestTimeoutMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">queueBufferingMaxMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">queueBufferingMaxMessages</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">queueEnqueueTimeoutMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr
 ><tr><td colspan="1" rowspan="1" class="confluenceTd">batchNumMessages</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">serializerClass</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">keySerializerClass</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr></tbody></table>
-</div></div><h3 id="Kafka-ConsumerOptions(Camel2.16orolder)">Consumer Options&#160;(Camel 2.16 or older)</h3><div class="confluenceTableSmall"><div class="table-wrap">
- <table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>consumerId</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">socketTimeoutMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">socketReceiveBufferBytes</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">fetchMessageMaxBytes</td><td colspan="1" rowspan="1" class="confluenceTd">&
 #160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">autoCommitEnable</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">autoCommitIntervalMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">queuedMaxMessages</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">rebalanceMaxRetries</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">fetchMinBytes</td><td colspan="1" rowspan="1" class="confluenceTd
 ">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">fetchWaitMaxMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">rebalanceBackoffMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">refreshLeaderBackoffMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">autoOffsetReset</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">consumerTimeoutMs</td><td colspan="1" rowspan="1" class="confluen
 ceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr></tbody></table>
-</div></div><h3 id="Kafka-Options(Camel2.17ornewer)">Options (Camel 2.17 or newer)</h3><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh">Property</th><th colspan="1" rowspan="1" class="confluenceTh">Default</th><th colspan="1" rowspan="1" class="confluenceTh">Description</th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">topic</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">Topic to use. From the <strong>consumer</strong> side you can specify also a comma separated list of topics.</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">groupId</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">con
 sumerStreams</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">10</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">clientId</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">consumersCount</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">1</td><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">The number of consumers that connect to kafka server</span></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">batchSize</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">100</td><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">Commit Size if auto commit is false</span></p><
 /td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">bridgeEndpoint</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">false</td><td colspan="1" rowspan="1" class="confluenceTd"><span>If the bridgeEndpoint is true, the producer will ignore the topic header setting of the message.</span></td></tr></tbody></table></div><p>&#160;</p><h3 id="Kafka-ProducerOptions(Camel2.17ornewer)">Producer Options&#160;(Camel 2.17 or newer)</h3><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh">Property</th><th colspan="1" rowspan="1" class="confluenceTh">Default &amp; Description Reference</th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>serializerClass</p></td><td colspan="1" rowspan="46" class="confluenceTd"><p class="p1"><span class="s1"><span class="nolink"><a shape="rect" class="external-link" href="http://kafka.apache.org/documentation.html#producerconfigs">http://
 kafka.apache.org/documentation.html#producerconfigs</a></span></span></p><p class="p1"><span class="s1"><span class="nolink">serializerClass : <span class="pl-s">org.apache.kafka.common.serialization.StringSerializer</span></span></span></p><p class="p1"><span class="s1"><span class="nolink">keySerializerClass : <span class="s1"><span class="nolink"><span class="pl-s">org.apache.kafka.common.serialization.StringSerializer</span></span></span></span></span></p><p>partitioner : <span class="pl-s">org.apache.kafka.clients.producer.internals.DefaultPartitioner<span class="pl-pds">&#160;</span></span></p><p>&#160;</p><p>&#160;</p><p class="p1"><span class="s1"><span class="nolink"><br clear="none"></span></span></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>keySerializerClass</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>requestRequiredAcks&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>bufferMemorySize&#160;</p></
 td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>compressionCodec&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>retries&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeyPassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystoreLocation</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystorePassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststoreLocation</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststorePassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>producerBatchSize&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>clientId</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>connectionMaxIdleMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>lingerMs&#160;</p></td></tr><tr><td colspan="1" 
 rowspan="1" class="confluenceTd"><p>maxBlockMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>maxRequestSize&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>partitioner&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>receiveBufferBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>requestTimeoutMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>saslKerberosServiceName</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>saslMechanism <strong>(from Camel 2.18)</strong></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>securityProtocol</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sendBufferBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslEnabledProtocols&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystoreType&#160;</p></td
 ></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslProtocol&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslProvider</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststoreType</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>maxInFlightRequest&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metadataMaxAgeMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metricReporters</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>noOfMetricsSample&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metricsSampleWindowMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>reconnectBackoffMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>retryBackoffMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosInitCmd&#160;</p></td></tr>
 <tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosBeforeReloginMinTime&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosRenewJitter&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosRenewWindowFactor&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslCipherSuites</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslEndpointAlgorithm</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeymanagerAlgorithm&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTrustmanagerAlgorithm&#160;</p></td></tr></tbody></table></div><h3 id="Kafka-ConsumerOptions(Camel2.17ornewer)">Consumer Options&#160;(Camel 2.17 or newer)</h3><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh">Property</th><th colspan="1" rowspan="1" class="confluenceTh">Default &amp; Description Re
 ference</th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="45" class="confluenceTd"><p class="p1"><span class="s1"><span class="nolink"><a shape="rect" class="external-link" href="http://kafka.apache.org/documentation.html#newconsumerconfigs">http://kafka.apache.org/documentation.html#newconsumerconfigs</a></span></span></p><p class="p1"><span class="s1"><span class="nolink">keyDeserializer : <span class="pl-s">org.apache.kafka.common.serialization.StringDeserializer</span></span></span></p><p class="p1"><span class="s1"><span class="nolink">valueDeserializer : <span class="s1"><span class="nolink"><span class="pl-s">org.apache.kafka.common.serialization.StringDeserializer</span></span></span></span></span></p><p>partitionAssignor : <span class="pl-s">org.apache.kafka.clients.consumer.RangeAssignor</span></p><p>&#160;</p><p class="p1"><span class="s1"><span class="nolink"><span class="s1"><span class="nolink"><span class="pl-s"><
 br clear="none"></span></span></span></span></span></p><p class="p1"><span class="s1"><span class="nolink"><br clear="none"></span></span></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>keyDeserializer</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>valueDeserializer</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>fetchMinBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>groupId</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>heartbeatIntervalMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>maxPartitionFetchBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sessionTimeoutMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeyPassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystoreLocation</p></td></tr><tr><td colspan="1" rowspan="1" class="confluence
 Td"><p>sslKeystorePassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststoreLocation</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststorePassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>autoOffsetReset&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>connectionMaxIdleMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>autoCommitEnable&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>partitionAssignor&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>receiveBufferBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>consumerRequestTimeoutMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>saslKerberosServiceName</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><span>saslMechanism </span><strong>(from Camel 2.18)</strong>
 </p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>securityProtocol</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sendBufferBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslEnabledProtocols&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystoreType&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslProtocol&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslProvider</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststoreType</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>autoCommitIntervalMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>checkCrcs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>clientId</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>fetchWaitMaxMs&#160;</p></td></tr><tr><td colspan=
 "1" rowspan="1" class="confluenceTd"><p>metadataMaxAgeMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metricReporters</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>noOfMetricsSample&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metricsSampleWindowMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>reconnectBackoffMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>retryBackoffMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosInitCmd&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosBeforeReloginMinTime&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosRenewJitter&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosRenewWindowFactor&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslCipherSuites
 </p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslEndpointAlgorithm</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeymanagerAlgorithm&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTrustmanagerAlgorithm&#160;</p></td></tr></tbody></table></div><p>&#160;</p><h3 id="Kafka-Samples">Samples</h3><h4 id="Kafka-Camel2.16orolder.1">Camel 2.16 or older</h4><p>Consuming messages:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+</div></div><p>&#160;</p><h3 id="Kafka-Options(Camel2.16orolder)">Options (Camel 2.16 or older)</h3><div class="confluenceTableSmall"><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperHost</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The zookeeper host to use</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperPort</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>2181</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The zookeeper port to use</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">zookeeperConnect</td><td colspan="1" rowspan="1" class="conf
 luenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd"><strong>Camel 2.13.3/2.14.1:</strong> If in use, then zookeeperHost/zookeeperPort is not used.</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>topic</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The topic to use</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>groupId</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>partitioner</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>consumerStreams</p></td><td colspan="1" rowspan="1" class="confluenceTd">10</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><
 td colspan="1" rowspan="1" class="confluenceTd"><p>clientId</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperSessionTimeoutMs</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperConnectionTimeoutMs</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>zookeeperSyncTimeMs</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">consumersCount</span></p></td><td colspan="1" rowspan="1" class="confl
 uenceTd"><p><span style="color: rgb(51,51,51);">1</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.15.0:</strong> The number of consumers that connect to kafka server</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">batchSize</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">100</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.15.0: </strong>The batchSize that the BatchingConsumerTask processes once, deprecated since <strong>2.17.1</strong>, removed<strong><br clear="none"></strong>since <strong>2.18.0</strong></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">barrierAwaitTimeoutMs</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><span style="color: rgb(51,51,51);">10000</span></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>
 <strong>Camel 2.15.0: </strong>If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for <span style="color: rgb(51,51,51);">barrierAwaitTimeoutMs</span>, deprecated since <strong>2.17.1</strong>, removed since&#160;<strong>2.18.0</strong><strong>.<br clear="none"></strong></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">bridgeEndpoint</td><td colspan="1" rowspan="1" class="confluenceTd">false</td><td colspan="1" rowspan="1" class="confluenceTd">Camel 2.16.0: If bridgeEndpoint is true, the producer will ignore the topic header setting of the message.</td></tr></tbody></table></div></div>
+
+
+<p>You can append query options to the URI in the following format, <code>?option=value&amp;option=value&amp;...</code></p><h3 id="Kafka-ProducerOptions(Camel2.16orolder)">Producer Options&#160;(Camel 2.16 or older)</h3><div class="confluenceTableSmall"><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>producerType</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>sync (Taken from native KafkaProducer class)</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>sync - send message/batch immediately, and wait until response is received</p><p>async - queue the message/batch to send. There is a thread per broker (Kafka node) which polls from this queue upon <span>queueBufferingMaxMs or <sp
 an>batchNumMessages</span></span></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">compressionCodec</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">compressedTopics</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">messageSendMaxRetries</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">retryBackoffMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">topicMetadataRefreshIntervalMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td
  colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">sendBufferBytes</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">requestRequiredAcks</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">requestTimeoutMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">queueBufferingMaxMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">queueBufferingMaxMessages</td><td colspan="1" rowspan="1" class="confluenceTd">&#
 160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">queueEnqueueTimeoutMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">batchNumMessages</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">serializerClass</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">keySerializerClass</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr></tbody></table></div></div>
+
+
+<h3 id="Kafka-ConsumerOptions(Camel2.16orolder)">Consumer Options&#160;(Camel 2.16 or older)</h3><div class="confluenceTableSmall"><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Property</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>consumerId</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">socketTimeoutMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">socketReceiveBufferBytes</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="conf
 luenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">fetchMessageMaxBytes</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">autoCommitEnable</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">autoCommitIntervalMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">queuedMaxMessages</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">rebalanceMaxRetries</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" c
 lass="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">fetchMinBytes</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">fetchWaitMaxMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">rebalanceBackoffMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">refreshLeaderBackoffMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">autoOffsetReset</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" c
 lass="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd">consumerTimeoutMs</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr></tbody></table></div></div>
+
+
+<h3 id="Kafka-Options(Camel2.17ornewer)">Options (Camel 2.17 or newer)</h3><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh">Property</th><th colspan="1" rowspan="1" class="confluenceTh">Default</th><th colspan="1" rowspan="1" class="confluenceTh">Description</th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">topic</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">Topic to use. From the <strong>consumer</strong> side you can specify also a comma separated list of topics.</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">groupId</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">consumerStreams
 </span></p></td><td colspan="1" rowspan="1" class="confluenceTd">10</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">clientId</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td><td colspan="1" rowspan="1" class="confluenceTd">&#160;</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">consumersCount</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">1</td><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">The number of consumers that connect to kafka server</span></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">batchSize</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">100</td><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">Commit Size if auto commit is false</span></p></td></tr><tr
 ><td colspan="1" rowspan="1" class="confluenceTd"><p class="p1"><span class="s1">bridgeEndpoint</span></p></td><td colspan="1" rowspan="1" class="confluenceTd">false</td><td colspan="1" rowspan="1" class="confluenceTd"><span>If the bridgeEndpoint is true, the producer will ignore the topic header setting of the message.</span></td></tr></tbody></table></div><p>&#160;</p><h3 id="Kafka-ProducerOptions(Camel2.17ornewer)">Producer Options&#160;(Camel 2.17 or newer)</h3><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh">Property</th><th colspan="1" rowspan="1" class="confluenceTh">Default &amp; Description Reference</th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>serializerClass</p></td><td colspan="1" rowspan="46" class="confluenceTd"><p class="p1"><span class="s1"><span class="nolink"><a shape="rect" class="external-link" href="http://kafka.apache.org/documentation.html#producerconfigs">http://kafka.apache
 .org/documentation.html#producerconfigs</a></span></span></p><p class="p1"><span class="s1"><span class="nolink">serializerClass : <span class="pl-s">org.apache.kafka.common.serialization.StringSerializer</span></span></span></p><p class="p1"><span class="s1"><span class="nolink">keySerializerClass : <span class="s1"><span class="nolink"><span class="pl-s">org.apache.kafka.common.serialization.StringSerializer</span></span></span></span></span></p><p>partitioner : <span class="pl-s">org.apache.kafka.clients.producer.internals.DefaultPartitioner<span class="pl-pds">&#160;</span></span></p><p>&#160;</p><p>&#160;</p><p class="p1"><span class="s1"><span class="nolink"><br clear="none"></span></span></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>keySerializerClass</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>requestRequiredAcks&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>bufferMemorySize&#160;</p></td></tr><tr>
 <td colspan="1" rowspan="1" class="confluenceTd"><p>compressionCodec&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>retries&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeyPassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystoreLocation</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystorePassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststoreLocation</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststorePassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>producerBatchSize&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>clientId</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>connectionMaxIdleMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>lingerMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" 
 class="confluenceTd"><p>maxBlockMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>maxRequestSize&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>partitioner&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>receiveBufferBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>requestTimeoutMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>saslKerberosServiceName</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>saslMechanism <strong>(from Camel 2.18)</strong></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>securityProtocol</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sendBufferBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslEnabledProtocols&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystoreType&#160;</p></td></tr><tr><t
 d colspan="1" rowspan="1" class="confluenceTd"><p>sslProtocol&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslProvider</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststoreType</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>maxInFlightRequest&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metadataMaxAgeMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metricReporters</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>noOfMetricsSample&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metricsSampleWindowMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>reconnectBackoffMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>retryBackoffMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosInitCmd&#160;</p></td></tr><tr><td cols
 pan="1" rowspan="1" class="confluenceTd"><p>kerberosBeforeReloginMinTime&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosRenewJitter&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosRenewWindowFactor&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslCipherSuites</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslEndpointAlgorithm</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeymanagerAlgorithm&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTrustmanagerAlgorithm&#160;</p></td></tr></tbody></table></div><h3 id="Kafka-ConsumerOptions(Camel2.17ornewer)">Consumer Options&#160;(Camel 2.17 or newer)</h3><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh">Property</th><th colspan="1" rowspan="1" class="confluenceTh">Default &amp; Description Reference</th>
 </tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="45" class="confluenceTd"><p class="p1"><span class="s1"><span class="nolink"><a shape="rect" class="external-link" href="http://kafka.apache.org/documentation.html#newconsumerconfigs">http://kafka.apache.org/documentation.html#newconsumerconfigs</a></span></span></p><p class="p1"><span class="s1"><span class="nolink">keyDeserializer : <span class="pl-s">org.apache.kafka.common.serialization.StringDeserializer</span></span></span></p><p class="p1"><span class="s1"><span class="nolink">valueDeserializer : <span class="s1"><span class="nolink"><span class="pl-s">org.apache.kafka.common.serialization.StringDeserializer</span></span></span></span></span></p><p>partitionAssignor : <span class="pl-s">org.apache.kafka.clients.consumer.RangeAssignor</span></p><p>&#160;</p><p class="p1"><span class="s1"><span class="nolink"><span class="s1"><span class="nolink"><span class="pl-s"><br clear="no
 ne"></span></span></span></span></span></p><p class="p1"><span class="s1"><span class="nolink"><br clear="none"></span></span></p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>keyDeserializer</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>valueDeserializer</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>fetchMinBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>groupId</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>heartbeatIntervalMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>maxPartitionFetchBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sessionTimeoutMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeyPassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystoreLocation</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKe
 ystorePassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststoreLocation</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststorePassword</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>autoOffsetReset&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>connectionMaxIdleMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>autoCommitEnable&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>partitionAssignor&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>receiveBufferBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>consumerRequestTimeoutMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>saslKerberosServiceName</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><span>saslMechanism </span><strong>(from Camel 2.18)</strong></p></td></t
 r><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>securityProtocol</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sendBufferBytes&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslEnabledProtocols&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeystoreType&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslProtocol&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslProvider</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTruststoreType</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>autoCommitIntervalMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>checkCrcs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>clientId</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>fetchWaitMaxMs&#160;</p></td></tr><tr><td colspan="1" rowspan=
 "1" class="confluenceTd"><p>metadataMaxAgeMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metricReporters</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>noOfMetricsSample&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>metricsSampleWindowMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>reconnectBackoffMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>retryBackoffMs&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosInitCmd&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosBeforeReloginMinTime&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosRenewJitter&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>kerberosRenewWindowFactor&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslCipherSuites</p></td></t
 r><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslEndpointAlgorithm</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslKeymanagerAlgorithm&#160;</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>sslTrustmanagerAlgorithm&#160;</p></td></tr></tbody></table></div><p>&#160;</p><h3 id="Kafka-Samples">Samples</h3><h4 id="Kafka-Camel2.16orolder.1">Camel 2.16 or older</h4><p>Consuming messages:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
 <script class="brush: java; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[from(&quot;kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&quot;).to(&quot;log:input&quot;);
 ]]></script>
 </div></div><p>Producing messages:</p><p>See unit tests of camel-kafka for more examples</p><h4 id="Kafka-Camel2.17ornewer.1">Camel 2.17 or newer</h4><p>Consuming messages:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
@@ -149,6 +152,42 @@
 					}
 				}).to(&quot;kafka:localhost:9092?topic=test&quot;);
 ]]></script>
+</div></div><h3 id="Kafka-UsingtheKafkaidempotentrepository(AvailablefromCamel2.19)">Using the Kafka idempotent repository (Available from Camel 2.19)</h3><p>The <code>camel-kafka</code> library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing.</p><p>The topic used must be unique per idempotent repository instance. The mechanism does not have any requirements about the number of topic partitions; as the repository consumes from all partitions at the same time. It also does not have any requirements about the replication factor of the topic.</p><p>Each repository instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic each will control its own offset.</p><p>On start
 up, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the latest state. The cache will not be considered warmed up until one poll of <code>pollDurationMs</code> in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic.</p><p>A <code>KafkaIdempotentRepository</code> has the following properties:</p><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh">Property</th><th colspan="1" rowspan="1" class="confluenceTh">Description</th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><pre><span style="color: rgb(102,14,122);">topic</span></pre></td><td colspan="1" rowspan="1" class="confluenceTd">The name of the Kafka topic to use to broadcast changes. (required)</td></tr><tr><td 
 colspan="1" rowspan="1" class="confluenceTd"><code>bootstrapServers</code></td><td colspan="1" rowspan="1" class="confluenceTd">The <code>bootstrap.servers</code> property on the internal Kafka producer and consumer. Use this as shorthand if not setting <code>consumerConfig</code> and <code>producerConfig</code>. If used, this component will apply sensible default configurations for the producer and consumer.</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><code>producerConfig</code></td><td colspan="1" rowspan="1" class="confluenceTd">Sets the properties that will be used by the Kafka producer that broadcasts changes. Overrides <code>bootstrapServers</code>, so must define the Kafka <code>bootstrap.servers</code> property itself</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><code>consumerConfig</code></td><td colspan="1" rowspan="1" class="confluenceTd">Sets the properties that will be used by the Kafka consumer that populates the cache from the topic.
  Overrides <code>bootstrapServers</code>, so must define the Kafka <code>bootstrap.servers</code> property itself</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><code>maxCacheSize</code></td><td colspan="1" rowspan="1" class="confluenceTd">How many of the most recently used keys should be stored in memory (default 1000).</td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><pre><span style="color: rgb(0,0,0);">pollDurationMs</span></pre></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The poll duration of the Kafka consumer. The local caches are updated immediately; this value will affect how far behind other peers in the cluster are, which are updating their caches from the topic, relative to the idempotent consumer instance issued the cache action message.</p><p>The default value of this is 100 ms. If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repo
 sitory's consumer and the Kafka brokers.</p></td></tr></tbody></table></div><p>The repository can be instantiated by defining the topic and <code>bootstrapServers</code>, or the <code>producerConfig</code> and <code>consumerConfig</code> property sets can be explicitly defined to enable features such as SSL/SASL.</p><p>To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is <code>CamelContext</code> aware.</p><p>Sample usage is as follows:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository(&quot;idempotent-db-inserts&quot;, &quot;localhost:9091&quot;);
+
+SimpleRegistry registry = new SimpleRegistry();
+registry.put(&quot;insertDbIdemRepo&quot;, kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
+CamelContext context = new CamelContext(registry);
+
+
+// later in RouteBuilder...
+from(&quot;direct:performInsert&quot;)
+    .idempotentConsumer(header(&quot;id&quot;)).messageIdRepositoryRef(&quot;insertDbIdemRepo&quot;)
+        // once-only insert into database
+    .end()]]></script>
+</div></div><p>In XML:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[&lt;!-- simple --&gt;
+&lt;bean id=&quot;insertDbIdemRepo&quot; class=&quot;org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository&quot;&gt;
+  &lt;property name=&quot;topic&quot; value=&quot;idempotent-db-inserts&quot;/&gt;
+  &lt;property name=&quot;bootstrapServers&quot; value=&quot;localhost:9091&quot;/&gt;
+&lt;/bean&gt;
+
+&lt;!-- complex --&gt;
+&lt;bean id=&quot;insertDbIdemRepo&quot; class=&quot;org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository&quot;&gt;
+  &lt;property name=&quot;topic&quot; value=&quot;idempotent-db-inserts&quot;/&gt;
+  &lt;property name=&quot;maxCacheSize&quot; value=&quot;10000&quot;/&gt;
+  &lt;property name=&quot;consumerConfig&quot;&gt;
+    &lt;props&gt;
+      &lt;prop key=&quot;bootstrap.servers&quot;&gt;localhost:9091&lt;/prop&gt;
+    &lt;/props&gt;
+  &lt;/property&gt;
+  &lt;property name=&quot;producerConfig&quot;&gt;
+    &lt;props&gt;
+      &lt;prop key=&quot;bootstrap.servers&quot;&gt;localhost:9091&lt;/prop&gt;
+    &lt;/props&gt;
+  &lt;/property&gt;
+&lt;/bean&gt;
+]]></script>
 </div></div><p>&#160;</p><p></p><h2 id="Kafka-Endpoints">Endpoints</h2>
 
 <p></p><p>Camel supports the <a shape="rect" href="message-endpoint.html">Message Endpoint</a> pattern using the <a shape="rect" class="external-link" href="http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/Endpoint.html">Endpoint</a> interface. Endpoints are usually created by a <a shape="rect" href="component.html">Component</a> and Endpoints are usually referred to in the <a shape="rect" href="dsl.html">DSL</a> via their <a shape="rect" href="uris.html">URIs</a>. </p>



Mime
View raw message