camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jakub Korab <>
Subject Re: Aggregated JMS component
Date Tue, 07 Jul 2015 10:04:21 GMT
Hi Claus,

I have a copy of it at Feel 
free to take a look.



On 01/07/15 19:26, Claus Ibsen wrote:
> Hi
> Ah cool. Maybe you could put it on a github repo we can use to take a peek at?
> As JMS is "big thing" and also can be complicated, we should have some
> time to review and see where it fits the best in the Camel family.
> On Wed, Jul 1, 2015 at 11:03 AM, Jakub Korab
> <> wrote:
>> Hi all,
>> I have written a consumer-only component that combines aggregation logic
>> with transacted JMS sessions that I would like to contribute. The component
>> vastly speeds up message consumption and aggregation without message loss
>> on failure when compared with using a regular JMS component and aggregator.
>> The problem that it solves is that when you want to aggregate a set of
>> messages from JMS and avoid message loss, you typically reach for a
>> JdbcAggregationRepository. This in turn fetches and writes progressively
>> larger blobs from the database on receipt of each message, slowing down
>> linearly in relation to to the number of messages consumed - i.e. it
>> performs progressively worse the larger the batch.
>> Old way:
>> from("jms:myQueue")
>>      .transacted()
>>      .aggregate(constant(true), myAggStrategy)
>>          .aggregationRepository(jdbcAggregationRepository)
>>          .completionSize(100)
>>          .completionTimeout(500)
>> This also suffers from a problem that message loss is still possible
>> between the message broker and the database that stores the aggregated
>> message (unless you use XA transactions....).
>> The component that I have developed starts a JMS session, and receives
>> messages synchronously until it meets a completion size, or until a
>> completion timeout is met, each time calling an AggregationStrategy. Only
>> when the completion conditions have been matched does it emit the
>> aggregated message.
>> The component will commit the batch transaction if the Exchange is
>> processed successfully, or roll the entire thing back on exception - so all
>> of the original messages will end up back on the queue for re-processing.
>> In the event of failure of the Camel process, the messages remain on the
>> broker for re-dispatch.
>> So in terms of "where is my data stored?", the answer is it remains on the
>> broker until the batch is successfully processed.
>> New way:
>> from("aggjms:myQueue?completionSize=100&completionTimeout=500&aggregationStrategy=#myAggStrategy")
>> The component also allows for setting the number of JMS consumers on the
>> endpoint, so you can scale out the number of threads that pick up batches.
>> The transactional behaviour of this (and so its usage) is so different to
>> the regular JMS and SJMS components, that I believe it needs to be it's own
>> component, as opposed to being integrated in to one of the others.
>> I would like to contribute this to Camel. What is the process for doing
>> this?
>> Thanks,
>> Jakub

View raw message