Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5A96A18A9A for ; Fri, 18 Mar 2016 10:21:02 +0000 (UTC) Received: (qmail 33319 invoked by uid 500); 18 Mar 2016 10:21:02 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 33272 invoked by uid 500); 18 Mar 2016 10:21:02 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 33263 invoked by uid 99); 18 Mar 2016 10:21:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Mar 2016 10:21:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C8197C0217 for ; Fri, 18 Mar 2016 10:21:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.999 X-Spam-Level: X-Spam-Status: No, score=0.999 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id p_O9yZBlbbJ7 for ; Fri, 18 Mar 2016 10:20:51 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 13A125F23C for ; Fri, 18 Mar 2016 10:20:50 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 32575E0113 for ; Fri, 18 Mar 2016 10:20:49 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 2E5153A0217 for ; Fri, 18 Mar 2016 10:20:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r983097 [1/4] - in /websites/production/camel/content: book-component-appendix.html book-in-one-page.html cache/main.pageCache camel-2170-release.html sql-component.html Date: Fri, 18 Mar 2016 10:20:48 -0000 To: commits@camel.apache.org From: buildbot@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20160318102049.2E5153A0217@svn01-us-west.apache.org> Author: buildbot Date: Fri Mar 18 10:20:48 2016 New Revision: 983097 Log: Production update by buildbot for camel Modified: websites/production/camel/content/book-component-appendix.html websites/production/camel/content/book-in-one-page.html websites/production/camel/content/cache/main.pageCache websites/production/camel/content/camel-2170-release.html websites/production/camel/content/sql-component.html Modified: websites/production/camel/content/book-component-appendix.html ============================================================================== --- websites/production/camel/content/book-component-appendix.html (original) +++ websites/production/camel/content/book-component-appendix.html Fri Mar 18 10:20:48 2016 @@ -1016,11 +1016,11 @@ template.send("direct:alias-verify& ]]>

See Also

CXF Component

When using CXF as a consumer, the CXF Bean Component allows you to factor out how message payloads are received from their processing as a RESTful or SOAP web service. This has the potential of using a multitude of transports to cons ume web services. The bean component's configuration is also simpler and provides the fastest method to implement web services using Camel and CXF.

When using CXF in streaming modes (see DataFormat option), then also read about Stream caching.

The cxf: component provides integration with Apache CXF for connecting to JAX-WS services hosted in CXF.

+/*]]>*/

Using the JDBC based idempotent repository

Available as of Camel 2.7: In this section we will use the JDBC based idempotent repository.

Abstract class

From Camel 2.9 onwards there is an abstract class org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository you can extend to build custom JDBC idempotent repository.

First we have to create the database table which will be used by the idempotent repository. For Camel 2.7, we use the following schema:

- -

In Camel 2.8, we added the createdAt column:

- -
The SQL Server TIMESTAMP type is a fixed-length binary-string type. It does not map to any of the JDBC time types: DATE, TIME, or TIMESTAMP.

 

We recommend to have a unique constraint on the columns processorName and messageId. Because the syntax for this constraint differs for database to database, we do not show it here.

Second we need to setup a javax.sql.DataSource in the spring XML file:

- -
And finally we can create our JDBC idempotent repository in the spring XML file as well:
- -

Customize the JdbcMessageIdRepository

Starting with Camel 2.9.1 you have a few options to tune the org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository for your needs:

Parameter

Default Value

Description

createTableIfNotExists

true

Defines whether or not Camel should try to create the table if it doesn't exist.

tableExistsString

SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0

This query is used to figure out whether the table already exists or not. It must throw an exception to indicate the table doesn't exist.

createString

CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)

The statement which is used to create the table.

queryString

SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?

The query which is used to figure out whether the message already exists in the repository (the result is not equals to '0'). It takes two parameters. This first one is the processor name (String) and the second one is the message id (String).

insertString

INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)

The statement which is used to add the entry into the table. It takes three parameter. The first one is the processor name (String), the second one is the message id (String) and the third one is the timestamp (java.sql.Timestamp) when this entry was added to the repository.

deleteString

DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?

The statement which is used to delete the entry from the database. It takes two parameter. This first one is the processor name (String) and the second one is the message id (String).

A customized org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository could look like:

- -

Using the JDBC based aggregation repository

Available as of Camel 2.6

Using JdbcAggregationRepository in Camel 2.6

In Camel 2.6, the JdbcAggregationRepository is provided in the camel-jdbc-aggregator component. From Camel 2.7 onwards, the JdbcAggregationRepository is provided in the camel-sql component.

JdbcAggregationRepository is an AggregationRepository which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only AggregationRepository.
The JdbcAggregationRepository allows together with Camel to provide persistent support for the Aggregator.

It has the following options:

< td colspan="1" rowspan="1" class="confluenceTd">

Mandatory: The name of the repository.

< tr>

Option

Type

Description

dataSource

DataSource

Mandatory: The javax.sql.DataSource to use for accessing the database.

repositoryName

String

transactionManager

TransactionManager

Mandatory: The org.springframework.transaction.PlatformTransactionManager to mange transactions for the database. The TransactionManager must be able to support databases.

lobHandler

LobHandler

A org.springframework.jdbc.support.lob.LobHandler to handle Lob types in the database. Use this option to use a vendor specific LobHandler, for example when using Oracle.

returnOldExchange

boolean

Whether the get operation should return the old existing Exchange if any existed. By default this option is false to optimize as we do not need the old exchange when aggregating.

useRecovery

boolean

Whether or not recovery is enabled. This option is by default true. When enabled the Camel Aggregator automatic recover failed aggregated exchange and have them resubmitted.

recoveryInterval

long

If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis.

maximumRedeliveries

int

Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the deadLetterUri option must also be provided.

deadLetterUri

String

An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be moved. If this option is used then the maximumRedeliveries option must also be provided.

storeBodyAsText

boolean

Camel 2.11: Whether to store the message body as String which is human readable. By default this option is false storing the body in binary format.

headersToStoreAsText

List<String>

Camel 2.11: Allows to store headers as String which is human readable. By default this option is disabled, storing the headers in binary format.

optimisticLocking

false

Camel 2.12: To turn on optimistic locking, which often would be needed in clustered environments where multiple Camel applications shared the same JDBC based aggregation repository.

jdbcOptimisticLockingExceptionMapper

 

Camel 2.12: Allows to plugin a custom org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper to map vendor specific error codes to an optimistick locking error, for Camel to p erform a retry. This requires optimisticLocking to be enabled.

What is preserved when persisting

JdbcAggregationRepository will only preserve any Serializable compatible data types. If a data type is not such a type its dropped and a WARN is logged. And it only persists the Message body and the Message headers. The Exchange properties are not persisted.

From Camel 2.11 onwards you can store the message body and select(ed) headers as String in separate columns.

Recovery

The JdbcAggregationRepository will by default recover any failed Exchange. It does this by having a background tasks that scans for failed Exchanges in the persi stent store. You can use the checkInterval option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored from the persistent store and resubmitted and send out again.

The following headers is set when an Exchange is being recovered/redelivered:

Header

Type

Description

Exchange.REDELIVERED

Boolean

Is set to true to indicate the Exchange is being redelivered.

Exchange.REDELIVERY_COUNTER

Integer

The redelivery attempt, starting from 1.

Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm method is invoked on the AggregationRepository. This means if the same Exchange fails again it will be kept retried until it success.

You can use option maximumRedeliveries to limit the maximum number of redelivery attempts for a given recovered Exchange. You must also set the deadLetterUri option so Camel knows where to send the Exchange when the maximumRedeliveries was hit.

You can see some examples in the unit tests of camel-sql, for example this test.

Database

To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with "_COMPLETED". The name must be configured in the Spring bean with the RepositoryName property. In the following example aggregation will be used.

The table structure definition of both table are identical: in both case a String value is u sed as key (id) whereas a Blob contains the exchange serialized in byte array.
However one difference should be remembered: the id field does not have the same content depending on the table.
In the aggregation table id holds the correlation Id used by the component to aggregate the messages. In the completed table, id holds the id of the exchange stored in corresponding the blob field.

Here is the SQL query used to create the tables, just replace "aggregation" with your aggregator repository name.

- -

Storing body and headers as text

Available as of Camel 2.11

You can configure the JdbcAggregationRepository to store message body and select(ed) headers as String in separate columns. For example to store the body, and the following two headers companyName and accountName use the following SQL:

- -

And then configure the repository to enable this behavior as shown below:

- -

Codec (Serialization)

Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the JdbcCodec class. One detail of the code requires your attention: the ClassLoadingAwareObjectInputStream.

The ClassLoadingAwareObjectInputStream has been reused from the Apache ActiveMQ project. It wraps an ObjectInputStream and use it with the ContextClassLoader rather than the currentThread one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references.

Transaction

A Spring PlatformTransac tionManager is required to orchestrate transaction.

Service (Start/Stop)

The start method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting.

Aggregator configuration

Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the lobHandler property.

Here is the declaration for Oracle:

- +

In the following route:

+ +

Then the IN query can use a header with the key names with the dynamic values such as:

+ -

Optimistic locking

From Camel 2.12 onwards you can turn on optimisticLocking and use this JDBC based aggregation repository in a clustered environment where multiple Camel applications shared the same database for the aggregation repository. If there is a race condition there JDBC driver will throw a vendor specific exception which the JdbcAggregationRepository can react upon. To know which caused exceptions from the JDBC driver is regarded as an optimistick locking error we need a mapper to do this. Therefore there is a org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper allows you to implement your custom logic if needed. There is a default implementation org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper which works as follows:

The following check is done:

  • If the caused exception is an SQLException then the SQLState is checked if starts with 23.
  • If the caused exception is a DataIntegrityViolationException
  • If the caused exception class name has "ConstraintViolation" in its name.
  • optional checking for FQN class name matches if any class names has been configured

You can in addition add FQN classnames, and if any of the caused exception (or any nested) equals any of the FQN class names, then its an optimistick locking error.

Here is an example, where we define 2 extra FQN class names from the JDBC vendor.

- -

See Also

-

Test Component

Testing of distributed and asynchronous processing is notoriously difficult. The Mock, Test and DataSet endpoints work great with the Camel Testing Framework to simplify your unit and integration testing using Enterprise Integration Patterns and Camel's large range of Components together with the powerful Bean Integration.

The test component extends the Mock component to support pulling messages from another endpoint on startup to set the expected message bodies on the underlying Mock endpoint. That is, you use the test endpoint in a route and messages arriving on it will be implicitly compared to some expected messages extracted from some other location.

So you can use, for example, an expected set of message bodies as files. This will then set up a properly configured Mock endpoint, which is only valid if the received messages match the number of expected messages and their message payloads are equal.

Maven users will need to add the following dependency to their pom.xml for this component when using Camel 2.8 or older:

+template.requestBodyAndHeader("direct:query", "Hi there!", "names", names); + + +// use a string separated values with comma +template.requestBodyAndHeader("direct:query", "Hi there!", "names", "Camel,AMQ");]]> +

The query can also be specified in the endpoint instead of being externalized (notice that externalizing makes maintaining the SQL queries easier)

+ +

 

Using the JDBC based idempotent repository

Available as of Camel 2.7: In this section we will use the JDBC based idempotent repository.

Abstract class

From Camel 2.9 onwards there is an abstract class org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository you can extend to build custom JDBC idempotent repository.

 

First we have to create the database table which will be used by the idempotent repository. For Camel 2.7, we use the following schema:

+ +

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100) )

 

In Camel 2.8, we added the createdAt column:

+ +

CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP )

 

 

The SQL Server TIMESTAMP type is a fixed-length binary-string type. It does not map to any of the JDBC time types: DATE, TIME, or TIMESTAMP.

 

 

We recommend to have a unique constraint on the columns processorName and messageId. Because the syntax for this constraint differs for database to database, we do not show it here.

Second we need to setup a javax.sql.DataSource in the spring XML file:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-s ql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}

 

And finally we can create our JDBC idempotent repository in the spring XML file as well:{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}

 

Customize the JdbcMessageIdRepository

Starting with Camel 2.9.1 you have a few options to tune the org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository for your needs:

 

 

 

Parameter

 

 

Default Value

 

Description

 

 

 

createTableIfNotExists

 

 

true

 

Defines whether or not Camel should try to create the table if it doesn't exi st.

 

 

 

tableExistsString

 

 

SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0

 

This query is used to figure out whether the table already exists or not. It must throw an exception to indicate the table doesn't exist.

 

 

 

createString

 

 

CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)

 

The statement which is used to create the table.

 

 

 

queryString

 

 

SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?

 

The query which is used to figure out whether the message already exists in the repository (the result i s not equals to '0'). It takes two parameters. This first one is the processor name (String) and the second one is the message id (String).

 

 

 

insertString

 

 

INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)

 

The statement which is used to add the entry into the table. It takes three parameter. The first one is the processor name (String), the second one is the message id (String) and the third one is the timestamp (java.sql.Timestamp) when this entry was added to the repository.

 

 

deleteString

 

 

DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?

 

The statement which is used to delete the entry from the dat abase. It takes two parameter. This first one is the processor name (String) and the second one is the message id (String).

 

A customized org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository could look like:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml}

 

Using the JDBC based aggregation repository

Available as of Camel 2.6

Using JdbcAggregationRepository in Camel 2.6

In Camel 2.6, the JdbcAggregationRepository is provided in the camel-jdbc-aggregator component. From Camel 2.7 onwards, the JdbcAggr egationRepository is provided in the camel-sql component.

 

JdbcAggregationRepository is an AggregationRepository which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only AggregationRepository.
The JdbcAggregationRepository allows together with Camel to provide persistent support for the Aggregator.

It has the following options:

 

 

 

Option

 

 

Type

 

Description

 

 

 

dataSource

 

 

DataSource

 

Mandatory: The javax. sql.DataSource to use for accessing the database.

 

 

 

repositoryName

 

 

String

 

Mandatory: The name of the repository.

 

 

 

transactionManager

 

 

TransactionManager

 

Mandatory: The org.springframework.transaction.PlatformTransactionManager to mange transactions for the database. The TransactionManager must be able to support databases.

 

 

 

lobHandler

 

 

LobHandler

 

A org.springframework.jdbc.support.lob.LobHandler to handle Lob types in the database. Use this option to use a vendor specific LobHandler, for example when using Oracle.

 

 

 

returnOldExchange

 

 

boolean

 

Whether the get operation should return the old existing Exchange if any existed. By default this option is false to optimize as we do not need the old exchange when aggregating.

 

 

 

useRecovery

 

 

boolean

 

Whether or not recovery is enabled. This option is by default true. When enabled the Camel Aggregator automatic recover failed aggregated exchange and have them resubmitted.

 

 

 

recoveryInterval

 

 

l ong

 

If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis.

 

 

 

maximumRedeliveries

 

 

int

 

Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the deadLetterUri option must also be provided.

 

 

 

deadLetterUri

 

 

String

 

An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be moved. If this option is used then the maximumRedeliveries option must also be provided.

 

 

 

storeBodyAsText

 

 

boolean

 

Camel 2.11: Whether to store the message body as String which is human readable. By default this option is false storing the body in binary format.

 

 

 

headersToStoreAsText

 

 

List<String>

 

Camel 2.11: Allows to store headers as String which is human readable. By default this option is disabled, storing the headers in binary format.

 

 

 

optimisticLocking

 

 

false

 

Camel 2.12: To turn on optimistic locking, which often would be needed in clustered environments where multiple Camel applications shared the same JDBC based aggregation repository.

 

 

jdbcOptimisticLockingExceptionMapper

 

 

 

 

Camel 2.12: Allows to plugin a custom org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper to map vendor specific error codes to an optimistick locking error, for Camel to perform a retry. This requires optimisticLocking to be enabled.

 

What is preserved when persisting

JdbcAggregationRepository will only preserve any Serializable compatible data types. If a data type is not such a type its dropped and a WARN is logged. And it only persists the Message body and the Message headers. The Exchange properties are not persisted.

From Camel 2.11 onwards you can store the message body and select(ed) headers as String in separate columns.

Recovery

The JdbcAggregationRepository will by default recover any failed Exchange. It does this by having a background tasks that scans for failed Exchanges in the persistent store. You can use the checkInterval option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored from the persistent store and resubmitted and send out again.

The following headers is set when an Exchange is being recovered/redelivered:

 

 

 

Header

 

 

Type

 

Description

 

 

 

Exchange.REDELIVERED

 

 

Boolean

 

Is set to true to indicate the Exchange is being redelivered.

 

 

Exchange.REDELIVERY_COUNTER

 

 

Integer

 

The redelivery attempt, starting from 1.

 

Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm method is invoked on the AggregationRepository. This means if the s ame Exchange fails again it will be kept retried until it success.

You can use option maximumRedeliveries to limit the maximum number of redelivery attempts for a given recovered Exchange. You must also set the deadLetterUri option so Camel knows where to send the Exchange when the maximumRedeliveries was hit.

You can see some examples in the unit tests of camel-sql, for example this test.

Database

To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with "_COMPLETED". The name must be configured in the Spring bean with the RepositoryName property. In the following example aggregation will be used.

The table structure definition of both table are identical: in both case a String value is used as key (id) whereas a Blob contains the exchange serialized in byte array.
However one difference should be remembered: the id field does not have the same content depending on the table.
In the aggregation table id holds the correlation Id used by the component to aggregate the messages. In the completed table, id holds the id of the exchange stored in corresponding the blob field.

Here is the SQL query used to create the tables, just replace "aggregation" with your aggregator repository name.

+ +

CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );

 

Storing body and headers as text

Available as of Camel 2.11

You can configure the JdbcAggregationRepository to store message body and select(ed) headers as String in separate columns. For example to store the body, and the following two headers companyName and accountName use the following SQL:

+ +

CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) );

 

And then configure the repository to enable this behavior as shown below:

+ +

<bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo3"/> <property name="transactionManager" ref="txManager3"/> <property name="dataSource" ref="dataSource3"/> <!-- configure to store the message body and following headers as text in the repo --> <property name="storeBodyAsText" value="true"/> <property name="headersToStoreAsText"> <list> <value>companyName</value> <value>accountName</value> </list> </property> </bean>

 

Codec (Serialization)

Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the JdbcCodec class. One detail of the code requires your attention: the ClassLoadingAwareObjectInputStream.

The ClassLoadingAwareObjectInputStream has been reused from the Apache ActiveMQ project. It wraps an ObjectInputStream and use it with the ContextClassLoader rather than the currentThread one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references.

Transaction

A Spring PlatformTransactionManager is required to orchestrate transaction.

Service (Start/Stop)

The start method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting.

Aggregator configuration

Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the correspo nding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the lobHandler property.

Here is the declaration for Oracle:

+ +

<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean>

 

Optimistic locking

From Camel 2.12 onwards you can turn on optimisticLocking and use this JDBC based aggregation repository in a clustered environment where multiple Camel applications shared the same database for the aggregation r epository. If there is a race condition there JDBC driver will throw a vendor specific exception which the JdbcAggregationRepository can react upon. To know which caused exceptions from the JDBC driver is regarded as an optimistick locking error we need a mapper to do this. Therefore there is a org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper allows you to implement your custom logic if needed. There is a default implementation org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper which works as follows:

The following check is done:

If the caused exception is an SQLException then the SQLState is checked if starts with 23.

If the caused exception is a DataIntegrityViolationException

If the caused exception class name has "ConstraintViolation" in its name.

optional checking for FQN class name matches if any class names has been co nfigured

You can in addition add FQN classnames, and if any of the caused exception (or any nested) equals any of the FQN class names, then its an optimistick locking error.

Here is an example, where we define 2 extra FQN class names from the JDBC vendor.

+ +

<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extra FQN class names from our JDBC driver --> <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"> <property name="classNames"> <util:set> <value>com.foo.sql.MyViolationExceptoion</value> <value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set> </property> </bean>

 

See Also

+

SQL Stored Procedure

JDBC

Test Component

Testing of distributed and asynchronous processing is notoriously difficult. The Mock, Test and DataSet endpoints work great with the Camel Testing Framework to simplify your unit and integration testing using Enterprise Integration Patterns and Camel's large range of Components together with the powerful Bean Integration.

The test component extends the Mock component to support pulling messages from another endpoint on startup to set the expected message bodies on the underlying Mock endpoint. That is, you use the test endpoint in a route and messages arriving on it will be implicitly compared to some expected messages extracted from some other location.

So you can use, for example, an expected set of message bodies as files. This will then set up a properly configured Mock endpoint, which is only valid if the received messages match the number of expected messages and their message payloads are equal.

Maven users will need to add the following dependency to their pom.xml for this component when us ing Camel 2.8 or older: