camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From build...@apache.org
Subject svn commit: r983097 [4/4] - in /websites/production/camel/content: book-component-appendix.html book-in-one-page.html cache/main.pageCache camel-2170-release.html sql-component.html
Date Fri, 18 Mar 2016 10:20:48 GMT
Modified: websites/production/camel/content/sql-component.html
==============================================================================
--- websites/production/camel/content/sql-component.html (original)
+++ websites/production/camel/content/sql-component.html Fri Mar 18 10:20:48 2016
@@ -179,136 +179,53 @@ assertEquals("Linux", row.get(
   .setBody(constant("ASF"))
   .setProperty("min", constant(123))
   .to(&quot;sql:select * from projects where license = :#${body} and id &gt; :#${property.min} order by id&quot;)]]></script>
-</div></div><h3 id="SQLComponent-UsingtheJDBCbasedidempotentrepository">Using the JDBC based idempotent repository</h3><p><strong>Available as of Camel 2.7</strong>: In this section we will use the JDBC based idempotent repository.</p><div class="confluence-information-macro confluence-information-macro-tip"><p class="title">Abstract class</p><span class="aui-icon aui-icon-small aui-iconfont-approve confluence-information-macro-icon"></span><div class="confluence-information-macro-body"><p>From Camel 2.9 onwards there is an abstract class <code>org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository</code> you can extend to build custom JDBC idempotent repository.</p></div></div><p>First we have to create the database table which will be used by the idempotent repository. For <strong>Camel 2.7</strong>, we use the following schema:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: sql; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[CREATE TABLE CAMEL_MESSAGEPROCESSED (
-  processorName VARCHAR(255),
-  messageId VARCHAR(100)
-)
-]]></script>
-</div></div><p>In <strong>Camel 2.8</strong>, we added the createdAt column:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: sql; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[CREATE TABLE CAMEL_MESSAGEPROCESSED (
-  processorName VARCHAR(255),
-  messageId VARCHAR(100),
-  createdAt TIMESTAMP
-)
-]]></script>
-</div></div><div class="confluence-information-macro confluence-information-macro-warning"><span class="aui-icon aui-icon-small aui-iconfont-error confluence-information-macro-icon"></span><div class="confluence-information-macro-body">The SQL Server&#160;<strong>TIMESTAMP</strong> type is a fixed-length binary-string type. It does not map to any of the JDBC time types: <strong>DATE</strong>, <strong>TIME</strong>, or <strong>TIMESTAMP</strong>.</div></div><p>&#160;</p><p>We recommend to have a unique constraint on the columns processorName and messageId. Because the syntax for this constraint differs for database to database, we do not show it here.</p><p>Second we need to setup a <code>javax.sql.DataSource</code> in the spring XML file:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[
-&lt;jdbc:embedded-database id=&quot;dataSource&quot; type=&quot;DERBY&quot; /&gt;
-]]></script>
-</div></div>And finally we can create our JDBC idempotent repository in the spring XML file as well:<div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[
-&lt;bean id=&quot;messageIdRepository&quot; class=&quot;org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository&quot;&gt;
-	&lt;constructor-arg ref=&quot;dataSource&quot; /&gt;
-	&lt;constructor-arg value=&quot;myProcessorName&quot; /&gt;
-&lt;/bean&gt;
+</div></div><h4 id="SQLComponent-UsingINquerieswithdynamicvalues">Using IN queries with dynamic values</h4><p><strong>Available as of Camel 2.17</strong></p><p>From Camel 2.17 onwards the SQL producer allows to use SQL queries with IN statements where the IN values is dynamic computed. For example from the message body or a header etc.</p><p>To use IN you need to:</p><ul><li><span style="line-height: 1.42857;">prefix the parameter name with&#160;<code>in:</code></span></li><li><span style="line-height: 1.42857;">add <code>( )</code>&#160;around the parameter</span></li></ul><p><span style="line-height: 1.42857;">An example explains this better. The following query is used:</span></p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[-- this is a comment
+select *
+from projects
+where project in (:#in:names)
+order by id]]></script>
+</div></div><p><span style="line-height: 1.42857;">In the following route:</span></p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[from(&quot;direct:query&quot;)
+    .to(&quot;sql:classpath:sql/selectProjectsIn.sql&quot;)
+    .to(&quot;log:query&quot;)
+    .to(&quot;mock:query&quot;);]]></script>
+</div></div><p><span style="line-height: 1.42857;">Then the IN query can use a header with the key names with the dynamic values such as:</span></p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[// use an array
+template.requestBodyAndHeader(&quot;direct:query&quot;, &quot;Hi there!&quot;, &quot;names&quot;, new String[]{&quot;Camel&quot;, &quot;AMQ&quot;});
 
-&lt;camel:camelContext&gt;
-	&lt;camel:errorHandler id=&quot;deadLetterChannel&quot; type=&quot;DeadLetterChannel&quot; deadLetterUri=&quot;mock:error&quot;&gt;
-		&lt;camel:redeliveryPolicy maximumRedeliveries=&quot;0&quot; maximumRedeliveryDelay=&quot;0&quot; logStackTrace=&quot;false&quot; /&gt;
-	&lt;/camel:errorHandler&gt;
-	
-	&lt;camel:route id=&quot;JdbcMessageIdRepositoryTest&quot; errorHandlerRef=&quot;deadLetterChannel&quot;&gt;
-		&lt;camel:from uri=&quot;direct:start&quot; /&gt;
-		&lt;camel:idempotentConsumer messageIdRepositoryRef=&quot;messageIdRepository&quot;&gt;
-			&lt;camel:header&gt;messageId&lt;/camel:header&gt;
-			&lt;camel:to uri=&quot;mock:result&quot; /&gt;
-		&lt;/camel:idempotentConsumer&gt;
-	&lt;/camel:route&gt;
-&lt;/camel:camelContext&gt;
-]]></script>
-</div></div><h4 id="SQLComponent-CustomizetheJdbcMessageIdRepository">Customize the JdbcMessageIdRepository</h4><p>Starting with <strong>Camel 2.9.1</strong> you have a few options to tune the <code>org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository</code> for your needs:</p><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Parameter</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Default Value</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>createTableIfNotExists</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>true</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>Defines whether or not Camel should try to create the table if it doesn't exist.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>tableExistsString</p></td><td colspan="1" rowspan="1" class="con
 fluenceTd"><p>SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>This query is used to figure out whether the table already exists or not. It must throw an exception to indicate the table doesn't exist.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>createString</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The statement which is used to create the table.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>queryString</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The query which is used to figure out whether the message already exists in the reposito
 ry (the result is not equals to '0'). It takes two parameters. This first one is the processor name (<code>String</code>) and the second one is the message id (<code>String</code>).</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>insertString</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The statement which is used to add the entry into the table. It takes three parameter. The first one is the processor name (<code>String</code>), the second one is the message id (<code>String</code>) and the third one is the timestamp (<code>java.sql.Timestamp</code>) when this entry was added to the repository.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p>deleteString</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND mes
 sageId = ?</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The statement which is used to delete the entry from the database. It takes two parameter. This first one is the processor name (<code>String</code>) and the second one is the message id (<code>String</code>).</p></td></tr></tbody></table></div><p>A customized <code>org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository</code> could look like:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[
-&lt;bean id=&quot;messageIdRepository&quot; class=&quot;org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository&quot;&gt;
-	&lt;constructor-arg ref=&quot;dataSource&quot; /&gt;
-	&lt;constructor-arg value=&quot;myProcessorName&quot; /&gt;
-	&lt;property name=&quot;tableExistsString&quot; value=&quot;SELECT 1 FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE 1 = 0&quot; /&gt;
-	&lt;property name=&quot;createString&quot; value=&quot;CREATE TABLE CUSTOMIZED_MESSAGE_REPOSITORY (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)&quot; /&gt;
-	&lt;property name=&quot;queryString&quot; value=&quot;SELECT COUNT(*) FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?&quot; /&gt;
-	&lt;property name=&quot;insertString&quot; value=&quot;INSERT INTO CUSTOMIZED_MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, ?, ?)&quot; /&gt;
-	&lt;property name=&quot;deleteString&quot; value=&quot;DELETE FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?&quot; /&gt;
-&lt;/bean&gt;
-]]></script>
-</div></div><h3 id="SQLComponent-UsingtheJDBCbasedaggregationrepository">Using the JDBC based aggregation repository</h3><p><strong>Available as of Camel 2.6</strong></p><div class="confluence-information-macro confluence-information-macro-information"><p class="title">Using JdbcAggregationRepository in Camel 2.6</p><span class="aui-icon aui-icon-small aui-iconfont-info confluence-information-macro-icon"></span><div class="confluence-information-macro-body"><p>In Camel 2.6, the JdbcAggregationRepository is provided in the <code>camel-jdbc-aggregator</code> component. From Camel 2.7 onwards, the <code>JdbcAggregationRepository</code> is provided in the <code>camel-sql</code> component.</p></div></div><p><code>JdbcAggregationRepository</code> is an <code>AggregationRepository</code> which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only <code>AggregationRepository</code>.<br clear="none"> T
 he <code>JdbcAggregationRepository</code> allows together with Camel to provide persistent support for the <a shape="rect" href="aggregator2.html">Aggregator</a>.</p><p>It has the following options:</p><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Option</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Type</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>dataSource</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><code>DataSource</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Mandatory:</strong> The <code>javax.sql.DataSource</code> to use for accessing the database.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>repositoryName</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><code>String</code></p></td><td colspa
 n="1" rowspan="1" class="confluenceTd"><p><strong>Mandatory:</strong> The name of the repository.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>transactionManager</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><code>TransactionManager</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Mandatory:</strong> The <code>org.springframework.transaction.PlatformTransactionManager</code> to mange transactions for the database. The TransactionManager must be able to support databases.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>lobHandler</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><code>LobHandler</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>A <code>org.springframework.jdbc.support.lob.LobHandler</code> to handle Lob types in the database. Use this option to use a vendor specific LobHandler, for example when using Oracle.</p></td></tr><tr><td co
 lspan="1" rowspan="1" class="confluenceTd"><p><code>returnOldExchange</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>boolean</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>Whether the get operation should return the old existing Exchange if any existed. By default this option is <code>false</code> to optimize as we do not need the old exchange when aggregating.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>useRecovery</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>boolean</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>Whether or not recovery is enabled. This option is by default <code>true</code>. When enabled the Camel <a shape="rect" href="aggregator2.html">Aggregator</a> automatic recover failed aggregated exchange and have them resubmitted.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>recoveryInterval</code></p></td><td colspan="1" rowspan="1" class="confl
 uenceTd"><p>long</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>maximumRedeliveries</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>int</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the <code>deadLetterUri</code> option must also be provided.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>deadLetterUri</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>String</p></td><td colspan="1" rowspan="1" class="conflu
 enceTd"><p>An endpoint uri for a <a shape="rect" href="dead-letter-channel.html">Dead Letter Channel</a> where exhausted recovered Exchanges will be moved. If this option is used then the <code>maximumRedeliveries</code> option must also be provided.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>storeBodyAsText</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>boolean</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.11:</strong> Whether to store the message body as String which is human readable. By default this option is <code>false</code> storing the body in binary format.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>headersToStoreAsText</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><code>List&lt;String&gt;</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.11:</strong> Allows to store headers as String which is human readable.
  By default this option is disabled, storing the headers in binary format.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>optimisticLocking</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.12:</strong> To turn on optimistic locking, which often would be needed in clustered environments where multiple Camel applications shared the same JDBC based aggregation repository.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>jdbcOptimisticLockingExceptionMapper</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.12:</strong> Allows to plugin a custom <code>org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper</code> to map vendor specific error codes to an optimistick locking error, for Camel to perform a 
 retry. This requires <code>optimisticLocking</code> to be enabled.</p></td></tr></tbody></table></div><h4 id="SQLComponent-Whatispreservedwhenpersisting">What is preserved when persisting</h4><p><code>JdbcAggregationRepository</code> will only preserve any <code>Serializable</code> compatible data types. If a data type is not such a type its dropped and a <code>WARN</code> is logged. And it only persists the <code>Message</code> body and the <code>Message</code> headers. The <code>Exchange</code> properties are <strong>not</strong> persisted.</p><p>From Camel 2.11 onwards you can store the message body and select(ed) headers as String in separate columns.</p><h4 id="SQLComponent-Recovery">Recovery</h4><p>The <code>JdbcAggregationRepository</code> will by default recover any failed <a shape="rect" href="exchange.html">Exchange</a>. It does this by having a background tasks that scans for failed <a shape="rect" href="exchange.html">Exchange</a>s in the persistent store. You can use th
 e <code>checkInterval</code> option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed <a shape="rect" href="exchange.html">Exchange</a>. Any <a shape="rect" href="exchange.html">Exchange</a> which was found to be recovered will be restored from the persistent store and resubmitted and send out again.</p><p>The following headers is set when an <a shape="rect" href="exchange.html">Exchange</a> is being recovered/redelivered:</p><div class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" class="confluenceTh"><p>Header</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Type</p></th><th colspan="1" rowspan="1" class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>Exchange.REDELIVERED</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>Boolean</p></td><td colspan="1" rowspan="1" class="co
 nfluenceTd"><p>Is set to true to indicate the <a shape="rect" href="exchange.html">Exchange</a> is being redelivered.</p></td></tr><tr><td colspan="1" rowspan="1" class="confluenceTd"><p><code>Exchange.REDELIVERY_COUNTER</code></p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>Integer</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The redelivery attempt, starting from 1.</p></td></tr></tbody></table></div><p>Only when an <a shape="rect" href="exchange.html">Exchange</a> has been successfully processed it will be marked as complete which happens when the <code>confirm</code> method is invoked on the <code>AggregationRepository</code>. This means if the same <a shape="rect" href="exchange.html">Exchange</a> fails again it will be kept retried until it success.</p><p>You can use option <code>maximumRedeliveries</code> to limit the maximum number of redelivery attempts for a given recovered <a shape="rect" href="exchange.html">Exchange</a>. You must also set the <
 code>deadLetterUri</code> option so Camel knows where to send the <a shape="rect" href="exchange.html">Exchange</a> when the <code>maximumRedeliveries</code> was hit.</p><p>You can see some examples in the unit tests of camel-sql, for example <a shape="rect" class="external-link" href="https://svn.apache.org/repos/asf/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java">this test</a>.</p><h4 id="SQLComponent-Database">Database</h4><p>To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with <code>"_COMPLETED"</code>. The name must be configured in the Spring bean with the <code>RepositoryName</code> property. In the following example aggregation will be used.</p><p>The table structure definition of both table are identical: in both case a String value is used as key (<strong>id</strong>) whe
 reas a Blob contains the exchange serialized in byte array.<br clear="none"> However one difference should be remembered: the <strong>id</strong> field does not have the same content depending on the table.<br clear="none"> In the aggregation table <strong>id</strong> holds the correlation Id used by the component to aggregate the messages. In the completed table, <strong>id</strong> holds the id of the exchange stored in corresponding the blob field.</p><p>Here is the SQL query used to create the tables, just replace <code>"aggregation"</code> with your aggregator repository name.</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: sql; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[CREATE TABLE aggregation (
-    id varchar(255) NOT NULL,
-    exchange blob NOT NULL,
-    constraint aggregation_pk PRIMARY KEY (id)
-);
-CREATE TABLE aggregation_completed (
-    id varchar(255) NOT NULL,
-    exchange blob NOT NULL,
-    constraint aggregation_completed_pk PRIMARY KEY (id)
-);
-]]></script>
-</div></div><h4 id="SQLComponent-Storingbodyandheadersastext">Storing body and headers as text</h4><p><strong>Available as of Camel 2.11</strong></p><p>You can configure the <code>JdbcAggregationRepository</code> to store message body and select(ed) headers as String in separate columns. For example to store the body, and the following two headers <code>companyName</code> and <code>accountName</code> use the following SQL:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: sql; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[CREATE TABLE aggregationRepo3 (
-    id varchar(255) NOT NULL,
-    exchange blob NOT NULL,
-    body varchar(1000),
-    companyName varchar(1000),
-    accountName varchar(1000),
-    constraint aggregationRepo3_pk PRIMARY KEY (id)
-);
-CREATE TABLE aggregationRepo3_completed (
-    id varchar(255) NOT NULL,
-    exchange blob NOT NULL,
-    body varchar(1000),
-    companyName varchar(1000),
-    accountName varchar(1000),
-    constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
-);
-]]></script>
-</div></div><p>And then configure the repository to enable this behavior as shown below:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[    &lt;bean id=&quot;repo3&quot; class=&quot;org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository&quot;&gt;
-        &lt;property name=&quot;repositoryName&quot; value=&quot;aggregationRepo3&quot;/&gt;
-        &lt;property name=&quot;transactionManager&quot; ref=&quot;txManager3&quot;/&gt;
-        &lt;property name=&quot;dataSource&quot; ref=&quot;dataSource3&quot;/&gt;
-        &lt;!-- configure to store the message body and following headers as text in the repo --&gt;
-        &lt;property name=&quot;storeBodyAsText&quot; value=&quot;true&quot;/&gt;
-        &lt;property name=&quot;headersToStoreAsText&quot;&gt;
-          &lt;list&gt;
-      	    &lt;value&gt;companyName&lt;/value&gt;
-    	    &lt;value&gt;accountName&lt;/value&gt;
-          &lt;/list&gt;
-        &lt;/property&gt;
-    &lt;/bean&gt;
-]]></script>
-</div></div><h4 id="SQLComponent-Codec(Serialization)">Codec (Serialization)</h4><p>Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the <code>JdbcCodec</code> class. One detail of the code requires your attention: the <code>ClassLoadingAwareObjectInputStream</code>.</p><p>The <code>ClassLoadingAwareObjectInputStream</code> has been reused from the <a shape="rect" class="external-link" href="http://activemq.apache.org/">Apache ActiveMQ</a> project. It wraps an <code>ObjectInputStream</code> and use it with the <code>ContextClassLoader</code> rather than the <code>currentThread</code> one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references.</p><h4 id="SQLComponent-Transaction">Transaction</h4><p>A Spring <code>PlatformTransactionManager</code>
  is required to orchestrate transaction.</p><h4 id="SQLComponent-Service(Start/Stop)">Service (Start/Stop)</h4><p>The <code>start</code> method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting.</p><h4 id="SQLComponent-Aggregatorconfiguration">Aggregator configuration</h4><p>Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the <code>lobHandler</code> property.</p><p>Here is the declaration for Oracle:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[    &lt;bean id=&quot;lobHandler&quot; class=&quot;org.springframework.jdbc.support.lob.OracleLobHandler&quot;&gt;
-        &lt;property name=&quot;nativeJdbcExtractor&quot; ref=&quot;nativeJdbcExtractor&quot;/&gt;
-    &lt;/bean&gt;
 
-    &lt;bean id=&quot;nativeJdbcExtractor&quot; class=&quot;org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor&quot;/&gt;
+// use a list
+List&lt;String&gt; names = new ArrayList&lt;String&gt;();
+names.add(&quot;Camel&quot;);
+names.add(&quot;AMQ&quot;);
 
-    &lt;bean id=&quot;repo&quot; class=&quot;org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository&quot;&gt;
-        &lt;property name=&quot;transactionManager&quot; ref=&quot;transactionManager&quot;/&gt;
-        &lt;property name=&quot;repositoryName&quot; value=&quot;aggregation&quot;/&gt;
-        &lt;property name=&quot;dataSource&quot; ref=&quot;dataSource&quot;/&gt;
-        &lt;!-- Only with Oracle, else use default --&gt;
-        &lt;property name=&quot;lobHandler&quot; ref=&quot;lobHandler&quot;/&gt;
-    &lt;/bean&gt;
-]]></script>
-</div></div><h3 id="SQLComponent-Optimisticlocking">Optimistic locking</h3><p>From <strong>Camel 2.12</strong> onwards you can turn on <code>optimisticLocking</code> and use this JDBC based aggregation repository in a clustered environment where multiple Camel applications shared the same database for the aggregation repository. If there is a race condition there JDBC driver will throw a vendor specific exception which the <code>JdbcAggregationRepository</code> can react upon. To know which caused exceptions from the JDBC driver is regarded as an optimistick locking error we need a mapper to do this. Therefore there is a <code>org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper</code> allows you to implement your custom logic if needed. There is a default implementation <code>org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper</code> which works as follows:</p><p>The following check is done:</p><ul class="alternate"><li>If
  the caused exception is an <code>SQLException</code> then the SQLState is checked if starts with 23.</li><li>If the caused exception is a <code>DataIntegrityViolationException</code></li><li>If the caused exception class name has "ConstraintViolation" in its name.</li><li>optional checking for FQN class name matches if any class names has been configured</li></ul><p>You can in addition add FQN classnames, and if any of the caused exception (or any nested) equals any of the FQN class names, then its an optimistick locking error.</p><p>Here is an example, where we define 2 extra FQN class names from the JDBC vendor.</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[    &lt;bean id=&quot;repo&quot; class=&quot;org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository&quot;&gt;
-        &lt;property name=&quot;transactionManager&quot; ref=&quot;transactionManager&quot;/&gt;
-        &lt;property name=&quot;repositoryName&quot; value=&quot;aggregation&quot;/&gt;
-        &lt;property name=&quot;dataSource&quot; ref=&quot;dataSource&quot;/&gt;
-        &lt;property name&quot;jdbcOptimisticLockingExceptionMapper&quot; ref=&quot;myExceptionMapper&quot;/&gt;
-    &lt;/bean&gt;
+template.requestBodyAndHeader(&quot;direct:query&quot;, &quot;Hi there!&quot;, &quot;names&quot;, names);
 
-    &lt;!-- use the default mapper with extra FQN class names from our JDBC driver --&gt;
-    &lt;bean id=&quot;myExceptionMapper&quot; class=&quot;org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper&quot;&gt;
-      &lt;property name=&quot;classNames&quot;&gt;
-        &lt;util:set&gt;
-          &lt;value&gt;com.foo.sql.MyViolationExceptoion&lt;/value&gt;
-          &lt;value&gt;com.foo.sql.MyOtherViolationExceptoion&lt;/value&gt;
-        &lt;/util:set&gt;
-      &lt;/property&gt;
-    &lt;/bean&gt;
-]]></script>
-</div></div><p></p><h3 id="SQLComponent-SeeAlso">See Also</h3>
-<ul><li><a shape="rect" href="configuring-camel.html">Configuring Camel</a></li><li><a shape="rect" href="component.html">Component</a></li><li><a shape="rect" href="endpoint.html">Endpoint</a></li><li><a shape="rect" href="getting-started.html">Getting Started</a></li></ul><ul class="alternate"><li><a shape="rect" href="sql-stored-procedure.html">SQL Stored Procedure</a></li><li><a shape="rect" href="jdbc.html">JDBC</a></li></ul></div>
+
+// use a string separated values with comma
+template.requestBodyAndHeader(&quot;direct:query&quot;, &quot;Hi there!&quot;, &quot;names&quot;, &quot;Camel,AMQ&quot;);]]></script>
+</div></div><p>The query can also be specified in the endpoint instead of being externalized (notice that externalizing makes maintaining the SQL queries easier)</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[from(&quot;direct:query&quot;)
+    .to(&quot;sql:select * from projects where project in (:#in:names) order by id&quot;)
+    .to(&quot;log:query&quot;)
+    .to(&quot;mock:query&quot;);]]></script>
+</div></div><p>&#160;</p><h2 id="SQLComponent-UsingtheJDBCbasedidempotentrepository">Using the JDBC based idempotent repository</h2><p><strong>Available as of Camel 2.7</strong>: In this section we will use the JDBC based idempotent repository.</p><div class="confluence-information-macro confluence-information-macro-tip"><p class="title">Abstract class</p><span class="aui-icon aui-icon-small aui-iconfont-approve confluence-information-macro-icon"></span><div class="confluence-information-macro-body"></div></div><p>From Camel 2.9 onwards there is an abstract class <code>org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository</code> you can extend to build custom JDBC idempotent repository.</p><p>&#160;</p><p>First we have to create the database table which will be used by the idempotent repository. For <strong>Camel 2.7</strong>, we use the following schema:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: sql; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100) )</p><p>&#160;</p><p>In <strong>Camel 2.8</strong>, we added the createdAt column:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: sql; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP )</p><p>&#160;</p><p>&#160;</p><div class="confluence-information-macro confluence-information-macro-warning"><span class="aui-icon aui-icon-small aui-iconfont-error confluence-information-macro-icon"></span><div class="confluence-information-macro-body"></div></div><p class="wysiwyg-macro-body">The SQL Server&#160;<strong>TIMESTAMP</strong> type is a fixed-length binary-string type. It does not map to any of the JDBC time types: <strong>DATE</strong>, <strong>TIME</strong>, or <strong>TIMESTAMP</strong>.</p><p>&#160;</p><p>&#160;</p><p>We recommend to have a unique constraint on the columns processorName and messageId. Because the syntax for this constraint differs for database to database, we do not show it here.</p><p>Second we need to setup a <code>javax.sql.DataSource</code> in the spring XML file:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-s
 ql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}</p><p>&#160;</p><p>And finally we can create our JDBC idempotent repository in the spring XML file as well:{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}</p><p>&#160;</p><p>Customize the JdbcMessageIdRepository</p><p>Starting with <strong>Camel 2.9.1</strong> you have a few options to tune the <code>org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository</code> for your needs:</p><p class="confluenceTable">&#160;</p><p>&#160;</p><p class="confluenceTh">&#160;</p><p>Parameter</p><p>&#160;</p><p class="confluenceTh">&#160;</p><p>Default Value</p><p>&#160;</p><p>Description</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>createTableIfNotExists</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>true</p><p>&#160;</p><p>Defines whether or not Camel should try to create the table if it doesn't exi
 st.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>tableExistsString</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0</p><p>&#160;</p><p>This query is used to figure out whether the table already exists or not. It must throw an exception to indicate the table doesn't exist.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>createString</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)</p><p>&#160;</p><p>The statement which is used to create the table.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>queryString</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?</p><p>&#160;</p><p>The query which is used to figure out whether the message already exists in the repository (the result i
 s not equals to '0'). It takes two parameters. This first one is the processor name (<code>String</code>) and the second one is the message id (<code>String</code>).</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>insertString</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)</p><p>&#160;</p><p>The statement which is used to add the entry into the table. It takes three parameter. The first one is the processor name (<code>String</code>), the second one is the message id (<code>String</code>) and the third one is the timestamp (<code>java.sql.Timestamp</code>) when this entry was added to the repository.</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>deleteString</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?</p><p>&#160;</p><p>The statement which is used to delete the entry from the dat
 abase. It takes two parameter. This first one is the processor name (<code>String</code>) and the second one is the message id (<code>String</code>).</p><p>&#160;</p><p>A customized <code>org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository</code> could look like:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml}</p><p>&#160;</p><p>Using the JDBC based aggregation repository</p><p><strong>Available as of Camel 2.6</strong></p><div class="confluence-information-macro confluence-information-macro-information"><p class="title">Using JdbcAggregationRepository in Camel 2.6</p><span class="aui-icon aui-icon-small aui-iconfont-info confluence-information-macro-icon"></span><div class="confluence-information-macro-body"></div></div><p>In Camel 2.6, the JdbcAggregationRepository is provided in the <code>camel-jdbc-aggregator</code> component. From Camel 2.7 onwards, the <code>JdbcAggr
 egationRepository</code> is provided in the <code>camel-sql</code> component.</p><p>&#160;</p><p><code>JdbcAggregationRepository</code> is an <code>AggregationRepository</code> which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only <code>AggregationRepository</code>.<br clear="none"> The <code>JdbcAggregationRepository</code> allows together with Camel to provide persistent support for the <a shape="rect" href="aggregator2.html">Aggregator</a>.</p><p>It has the following options:</p><p class="confluenceTable">&#160;</p><p>&#160;</p><p class="confluenceTh">&#160;</p><p>Option</p><p>&#160;</p><p class="confluenceTh">&#160;</p><p>Type</p><p>&#160;</p><p>Description</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>dataSource</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>DataSource</code></p><p>&#160;</p><p><strong>Mandatory:</strong> The <code>javax.
 sql.DataSource</code> to use for accessing the database.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>repositoryName</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>String</code></p><p>&#160;</p><p><strong>Mandatory:</strong> The name of the repository.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>transactionManager</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>TransactionManager</code></p><p>&#160;</p><p><strong>Mandatory:</strong> The <code>org.springframework.transaction.PlatformTransactionManager</code> to mange transactions for the database. The TransactionManager must be able to support databases.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>lobHandler</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>LobHandler</code></p><p>&#160;</p><p>A <code>org.springframework.jdbc.support.lob.LobHandler</code> to handle Lob types in the database. Use this
  option to use a vendor specific LobHandler, for example when using Oracle.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>returnOldExchange</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>boolean</p><p>&#160;</p><p>Whether the get operation should return the old existing Exchange if any existed. By default this option is <code>false</code> to optimize as we do not need the old exchange when aggregating.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>useRecovery</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>boolean</p><p>&#160;</p><p>Whether or not recovery is enabled. This option is by default <code>true</code>. When enabled the Camel <a shape="rect" href="aggregator2.html">Aggregator</a> automatic recover failed aggregated exchange and have them resubmitted.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>recoveryInterval</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>l
 ong</p><p>&#160;</p><p>If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>maximumRedeliveries</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>int</p><p>&#160;</p><p>Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the <code>deadLetterUri</code> option must also be provided.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>deadLetterUri</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>String</p><p>&#160;</p><p>An endpoint uri for a <a shape="rect" href="dead-letter-channel.html">Dead Letter Channel</a> where exhausted recovered Exchanges will be moved. 
 If this option is used then the <code>maximumRedeliveries</code> option must also be provided.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>storeBodyAsText</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>boolean</p><p>&#160;</p><p><strong>Camel 2.11:</strong> Whether to store the message body as String which is human readable. By default this option is <code>false</code> storing the body in binary format.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>headersToStoreAsText</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>List&lt;String&gt;</code></p><p>&#160;</p><p><strong>Camel 2.11:</strong> Allows to store headers as String which is human readable. By default this option is disabled, storing the headers in binary format.</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>optimisticLocking</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>false</code></p><p>&#160;</
 p><p><strong>Camel 2.12:</strong> To turn on optimistic locking, which often would be needed in clustered environments where multiple Camel applications shared the same JDBC based aggregation repository.</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>jdbcOptimisticLockingExceptionMapper</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>&#160;</p><p>&#160;</p><p><strong>Camel 2.12:</strong> Allows to plugin a custom <code>org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper</code> to map vendor specific error codes to an optimistick locking error, for Camel to perform a retry. This requires <code>optimisticLocking</code> to be enabled.</p><p>&#160;</p><p>What is preserved when persisting</p><p><code>JdbcAggregationRepository</code> will only preserve any <code>Serializable</code> compatible data types. If a data type is not such a type its dropped and a <code>WARN</code> is logged. And it only persists the <code>Message</code> body and 
 the <code>Message</code> headers. The <code>Exchange</code> properties are <strong>not</strong> persisted.</p><p>From Camel 2.11 onwards you can store the message body and select(ed) headers as String in separate columns.</p><p>Recovery</p><p>The <code>JdbcAggregationRepository</code> will by default recover any failed <a shape="rect" href="exchange.html">Exchange</a>. It does this by having a background tasks that scans for failed <a shape="rect" href="exchange.html">Exchange</a>s in the persistent store. You can use the <code>checkInterval</code> option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed <a shape="rect" href="exchange.html">Exchange</a>. Any <a shape="rect" href="exchange.html">Exchange</a> which was found to be recovered will be restored from the persistent store and resubmitted and send out again.</p><p>The following headers is set when an <a shape="rect" href="exchange.html">
 Exchange</a> is being recovered/redelivered:</p><p class="confluenceTable">&#160;</p><p>&#160;</p><p class="confluenceTh">&#160;</p><p>Header</p><p>&#160;</p><p class="confluenceTh">&#160;</p><p>Type</p><p>&#160;</p><p>Description</p><p>&#160;</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>Exchange.REDELIVERED</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>Boolean</p><p>&#160;</p><p>Is set to true to indicate the <a shape="rect" href="exchange.html">Exchange</a> is being redelivered.</p><p>&#160;</p><p class="confluenceTd">&#160;</p><p><code>Exchange.REDELIVERY_COUNTER</code></p><p>&#160;</p><p class="confluenceTd">&#160;</p><p>Integer</p><p>&#160;</p><p>The redelivery attempt, starting from 1.</p><p>&#160;</p><p>Only when an <a shape="rect" href="exchange.html">Exchange</a> has been successfully processed it will be marked as complete which happens when the <code>confirm</code> method is invoked on the <code>AggregationRepository</code>. This means if the s
 ame <a shape="rect" href="exchange.html">Exchange</a> fails again it will be kept retried until it success.</p><p>You can use option <code>maximumRedeliveries</code> to limit the maximum number of redelivery attempts for a given recovered <a shape="rect" href="exchange.html">Exchange</a>. You must also set the <code>deadLetterUri</code> option so Camel knows where to send the <a shape="rect" href="exchange.html">Exchange</a> when the <code>maximumRedeliveries</code> was hit.</p><p>You can see some examples in the unit tests of camel-sql, for example <a shape="rect" class="external-link" href="https://svn.apache.org/repos/asf/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java">this test</a>.</p><p>Database</p><p>To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with <code>"_COMPLETED"</co
 de>. The name must be configured in the Spring bean with the <code>RepositoryName</code> property. In the following example aggregation will be used.</p><p>The table structure definition of both table are identical: in both case a String value is used as key (<strong>id</strong>) whereas a Blob contains the exchange serialized in byte array.<br clear="none"> However one difference should be remembered: the <strong>id</strong> field does not have the same content depending on the table.<br clear="none"> In the aggregation table <strong>id</strong> holds the correlation Id used by the component to aggregate the messages. In the completed table, <strong>id</strong> holds the id of the exchange stored in corresponding the blob field.</p><p>Here is the SQL query used to create the tables, just replace <code>"aggregation"</code> with your aggregator repository name.</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: sql; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );</p><p>&#160;</p><p>Storing body and headers as text</p><p><strong>Available as of Camel 2.11</strong></p><p>You can configure the <code>JdbcAggregationRepository</code> to store message body and select(ed) headers as String in separate columns. For example to store the body, and the following two headers <code>companyName</code> and <code>accountName</code> use the following SQL:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: sql; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) );</p><p>&#160;</p><p>And then configure the repository to enable this behavior as shown below:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>&lt;bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"&gt; &lt;property name="repositoryName" value="aggregationRepo3"/&gt; &lt;property name="transactionManager" ref="txManager3"/&gt; &lt;property name="dataSource" ref="dataSource3"/&gt; &lt;!-- configure to store the message body and following headers as text in the repo --&gt; &lt;property name="storeBodyAsText" value="true"/&gt; &lt;property name="headersToStoreAsText"&gt; &lt;list&gt; &lt;value&gt;companyName&lt;/value&gt; &lt;value&gt;accountName&lt;/value&gt; &lt;/list&gt; &lt;/property&gt; &lt;/bean&gt;</p><p>&#160;</p><p>Codec (Serialization)</p><p>Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the <code>JdbcCodec</code> class. One detail of the code requires your attention: the <code>ClassLoadingAwareObjectInputStream</co
 de>.</p><p>The <code>ClassLoadingAwareObjectInputStream</code> has been reused from the <a shape="rect" class="external-link" href="http://activemq.apache.org/">Apache ActiveMQ</a> project. It wraps an <code>ObjectInputStream</code> and use it with the <code>ContextClassLoader</code> rather than the <code>currentThread</code> one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references.</p><p>Transaction</p><p>A Spring <code>PlatformTransactionManager</code> is required to orchestrate transaction.</p><p>Service (Start/Stop)</p><p>The <code>start</code> method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting.</p><p>Aggregator configuration</p><p>Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the correspo
 nding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the <code>lobHandler</code> property.</p><p>Here is the declaration for Oracle:</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>&lt;bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"&gt; &lt;property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/&gt; &lt;/bean&gt; &lt;bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/&gt; &lt;bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"&gt; &lt;property name="transactionManager" ref="transactionManager"/&gt; &lt;property name="repositoryName" value="aggregation"/&gt; &lt;property name="dataSource" ref="dataSource"/&gt; &lt;!-- Only with Oracle, else use default --&gt; &lt;property name="lobHandler" ref="lobHandler"/&gt; &lt;/bean&gt;</p><p>&#160;</p><p>Optimistic locking</p><p>From <strong>Camel 2.12</strong> onwards you can turn on <code>optimisticLocking</code> and use this JDBC based aggregation repository in a clustered environment where multiple Camel applications shared the same database for the aggregation r
 epository. If there is a race condition there JDBC driver will throw a vendor specific exception which the <code>JdbcAggregationRepository</code> can react upon. To know which caused exceptions from the JDBC driver is regarded as an optimistick locking error we need a mapper to do this. Therefore there is a <code>org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper</code> allows you to implement your custom logic if needed. There is a default implementation <code>org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper</code> which works as follows:</p><p>The following check is done:</p><p>If the caused exception is an <code>SQLException</code> then the SQLState is checked if starts with 23.</p><p>If the caused exception is a <code>DataIntegrityViolationException</code></p><p>If the caused exception class name has "ConstraintViolation" in its name.</p><p>optional checking for FQN class name matches if any class names has been co
 nfigured</p><p>You can in addition add FQN classnames, and if any of the caused exception (or any nested) equals any of the FQN class names, then its an optimistick locking error.</p><p>Here is an example, where we define 2 extra FQN class names from the JDBC vendor.</p><div class="code panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: xml; gutter: false; theme: Default" type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>&lt;bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"&gt; &lt;property name="transactionManager" ref="transactionManager"/&gt; &lt;property name="repositoryName" value="aggregation"/&gt; &lt;property name="dataSource" ref="dataSource"/&gt; &lt;property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/&gt; &lt;/bean&gt; &lt;!-- use the default mapper with extra FQN class names from our JDBC driver --&gt; &lt;bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"&gt; &lt;property name="classNames"&gt; &lt;util:set&gt; &lt;value&gt;com.foo.sql.MyViolationExceptoion&lt;/value&gt; &lt;value&gt;com.foo.sql.MyOtherViolationExceptoion&lt;/value&gt; &lt;/util:set&gt; &lt;/property&gt; &lt;/bean&gt;</p><p>&#160;</p><p></p><h3 id="SQLComponent-SeeAlso">See Also</h3>
+<ul><li><a shape="rect" href="configuring-camel.html">Configuring Camel</a></li><li><a shape="rect" href="component.html">Component</a></li><li><a shape="rect" href="endpoint.html">Endpoint</a></li><li><a shape="rect" href="getting-started.html">Getting Started</a></li></ul><p><a shape="rect" href="sql-stored-procedure.html">SQL Stored Procedure</a></p><p><a shape="rect" href="jdbc.html">JDBC</a></p></div>
         </td>
         <td valign="top">
           <div class="navigation">



Mime
View raw message