camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Claus Ibsen (JIRA)" <>
Subject [jira] Assigned: (CAMEL-3502) JDBC Persistent Aggregator
Date Thu, 06 Jan 2011 08:22:48 GMT


Claus Ibsen reassigned CAMEL-3502:

    Assignee: Claus Ibsen

> JDBC Persistent Aggregator
> --------------------------
>                 Key: CAMEL-3502
>                 URL:
>             Project: Camel
>          Issue Type: New Feature
>    Affects Versions: 2.6.0
>         Environment: JDBC Aggregator Database
>            Reporter: Olivier Roger
>            Assignee: Claus Ibsen
>             Fix For: 2.7.0
>         Attachments:, jdbc-aggregator-other-changes.patch,
jdbc-aggregator.patch, jdbc-aggregator.patch
> The patch provided is an implementation of a Aggregator for JDBC.
> It implements *org.apache.camel.spi.AggregationRepository* (2.4+) and has been tested
on Camel 2.4 and 2.6.
> Some details have been disscussed on the [mailling list|]
with Claus.
> The implementation is very similar to the one for HawtDB. 
> Unit tests have been included and use the H2 in-memory database.
> About that, the Test *JdbcAggregateLoadAndRecoverTest* has been adapted to exclude the
redeliveries from the failling messages. It is possible for a message to be sent due to a
_scan()_ background operation evn before it is send by the _onCompletion()_ method ? 
> Here is a small documentation
> {quote}
> h2. Database
> To be operational, each aggregator uses two table: the aggregation and completed one.
By convention the completed has the same name as the aggregation one suffixed with "_COMPLETED".
The name must be configured in the Spring bean with the _RepositoryName_ property. In the
following example aggregation will be used.
> The table structure definition of both table are identical: in both case a String value
is used as key (*id*) whereas a Blob contains the exchange serialized in byte array.
> However one difference should be remembered: the *id* field does not have the same content
depending on the table.
> In the aggregation table *id* holds the correlation Id used by the component to aggregate
the messages. In the completed table, *id* holds the id of the exchange stored in corresponding
the blob field.
> Here is the SQL query used to create the tables, just replace "aggregation" with your
aggregator repository name.
> {code}
> CREATE TABLE aggregation (
>     id varchar(255) NOT NULL,
>     exchange blob NOT NULL,
>     constraint aggregation_pk PRIMARY KEY (id)
> );
> CREATE TABLE aggregation_completed (
>     id varchar(255) NOT NULL,
>     exchange blob NOT NULL,
>     constraint aggregation_completed_pk PRIMARY KEY (id)
> );
> {code}
> h3. Codec (Serialization)
> Since they can contain any type of payload, Exchanges are not serializable by design.
It is converted into a byte array to be stored in a database BLOB field.
> All those conversions are handled by the *JdbcCodec* class. One detail of the code requires
your attention: the *ClassLoadingAwareObjectInputStream*.
> The *ClassLoadingAwareObjectInputStream* has been reused from the ActiveMQ project. It
wraps an *ObjectInputStream* and use it with the ContextClassLoader rather than the currentThread
one. The benefit is to be able to load classes exposed by other bundles.
> This allows the exchange body and headers to have custom types object references.
> h2. Transactional 
> TransactionTemplate is use to wrap all the calls to the database.
> Therefore a Transaction Manager is required (see bean-declaration)
> h2. Service (Start/Stop)
> The *JdbcAggregationRepository* extends ServiceSupport. This includes the components
in Camel component life cycle. 
> The _start()_ method verify the connection of the database and the presence of the required
> h2. Aggregator configuration
> Depending on the targeted environment, the aggregator might need some configuration.
As you already know, each aggregator should have its own repository (with the corresponding
pair of table created in the database) and a data source. If the default lobHandler is not
adapted to your database system, it can be injected with the _lobHandler_ property.
> Here is the declaration for Oracle :
> {code}
>     <bean id="lobHandler" class="">
>         <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
>     </bean>
>     <bean id="nativeJdbcExtractor" class=""/>
>     <bean id="repo" class="org.apache.camel.component.jdbc.aggregationRepository.JdbcAggregationRepository">
>         <constructor-arg name="transactionManager" ref="transactionManager"/>
>         <constructor-arg name="repositoryName" value="aggregation"/>
>         <constructor-arg name="dataSource" ref="dataSource"/>
>         <!-- Only with Oracle, else use default -->
>         <property name="lobHandler" ref="lobHandler"/>
>     </bean>
> {code}
> h2. Feature
> A feature has been created : *camel-jdbc-aggregator*.
> {quote}

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message