camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jakub Korab <>
Subject Re: Aggregated JMS component
Date Mon, 13 Jul 2015 13:19:56 GMT
Thanks Claus. I'll make those changes and ping back when done.


On 13/07/15 13:10, Claus Ibsen wrote:
> Hi Jakub
> Looks good a few comments
> The component should extend UriEndpointComponent
> And the endpoint should have @UriEndpoint and other
> annotations for the options. You can take look how its done elsewhere.
> We use ObjectHelper.notNull for validate if a parameter is configured
> or not, you can use that instead of commons-lang code.
> The endpoint should be singleton as its thread-safe
> And we usually prefix all those keys with Camel, eg CamelJmsBatchSize
> - and dont use dots in the key.
> In the consumer stop you do an await, that may in theory wait forever
> if something fuckup and keep the thread running and never call the
> latch. Maybe have a timeout as fallback.
> The consumer should likely not have ActiveMQ dependent code like the
> prefix option you set. Maybe put that out as a configuration that
> people can configure. Also AMQ has a default of 1000 for prefetch.
> The capped wait time at max 100 millis may causes a bit chatty IO as
> that will cause each concurrent users to call 10 x sec when idle. That
> may lead to a bit of IO over the network. The spring jms component
> uses 1000 millis as default. Maybe put that into some option people
> can configure.
> You may look into using StopWatch for all that time tracking - we have
> that in camel-core which may be easier to use.
> Maybe consumer.close should be in a try catch and ignore so the
> aggregated data can be processed?
> On Tue, Jul 7, 2015 at 12:04 PM, Jakub Korab
> <> wrote:
>> Hi Claus,
>> I have a copy of it at Feel free
>> to take a look.
>> Thanks,
>> Jakub
>> On 01/07/15 19:26, Claus Ibsen wrote:
>>> Hi
>>> Ah cool. Maybe you could put it on a github repo we can use to take a peek
>>> at?
>>> As JMS is "big thing" and also can be complicated, we should have some
>>> time to review and see where it fits the best in the Camel family.
>>> On Wed, Jul 1, 2015 at 11:03 AM, Jakub Korab
>>> <> wrote:
>>>> Hi all,
>>>> I have written a consumer-only component that combines aggregation logic
>>>> with transacted JMS sessions that I would like to contribute. The
>>>> component
>>>> vastly speeds up message consumption and aggregation without message loss
>>>> on failure when compared with using a regular JMS component and
>>>> aggregator.
>>>> The problem that it solves is that when you want to aggregate a set of
>>>> messages from JMS and avoid message loss, you typically reach for a
>>>> JdbcAggregationRepository. This in turn fetches and writes progressively
>>>> larger blobs from the database on receipt of each message, slowing down
>>>> linearly in relation to to the number of messages consumed - i.e. it
>>>> performs progressively worse the larger the batch.
>>>> Old way:
>>>> from("jms:myQueue")
>>>>       .transacted()
>>>>       .aggregate(constant(true), myAggStrategy)
>>>>           .aggregationRepository(jdbcAggregationRepository)
>>>>           .completionSize(100)
>>>>           .completionTimeout(500)
>>>> This also suffers from a problem that message loss is still possible
>>>> between the message broker and the database that stores the aggregated
>>>> message (unless you use XA transactions....).
>>>> The component that I have developed starts a JMS session, and receives
>>>> messages synchronously until it meets a completion size, or until a
>>>> completion timeout is met, each time calling an AggregationStrategy. Only
>>>> when the completion conditions have been matched does it emit the
>>>> aggregated message.
>>>> The component will commit the batch transaction if the Exchange is
>>>> processed successfully, or roll the entire thing back on exception - so
>>>> all
>>>> of the original messages will end up back on the queue for re-processing.
>>>> In the event of failure of the Camel process, the messages remain on the
>>>> broker for re-dispatch.
>>>> So in terms of "where is my data stored?", the answer is it remains on
>>>> the
>>>> broker until the batch is successfully processed.
>>>> New way:
>>>> from("aggjms:myQueue?completionSize=100&completionTimeout=500&aggregationStrategy=#myAggStrategy")
>>>> The component also allows for setting the number of JMS consumers on the
>>>> endpoint, so you can scale out the number of threads that pick up
>>>> batches.
>>>> The transactional behaviour of this (and so its usage) is so different to
>>>> the regular JMS and SJMS components, that I believe it needs to be it's
>>>> own
>>>> component, as opposed to being integrated in to one of the others.
>>>> I would like to contribute this to Camel. What is the process for doing
>>>> this?
>>>> Thanks,
>>>> Jakub

View raw message