camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aaron Whiteside (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CAMEL-6042) AggregateProcessor/AggregationRepository does not deal with optimistic locking - will not work correctly in a distributed environment
Date Mon, 11 Feb 2013 15:21:13 GMT

    [ https://issues.apache.org/jira/browse/CAMEL-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13575830#comment-13575830
] 

Aaron Whiteside commented on CAMEL-6042:
----------------------------------------

Hi Claus,

You are correct I mean exchanges with the same correlation key. Sorry I didn't explicitly
state that.

If the repository supports optimistic locking, then only one camel instance should be able
to recover any one exchange. Because no matter how many concurrent updates with optimistic
locking only one will succeed. I haven't gotten that far with writing my VoldemortAggregationRepository
yet.. as I stopped to work on this patch.

I also need to write a test for the distributed timeout scenario, as that is the same problem.
                
> AggregateProcessor/AggregationRepository does not deal with optimistic locking - will
not work correctly in a distributed environment
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-6042
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6042
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-core
>    Affects Versions: 2.10.3
>         Environment: Glassfish + Gemini Blueprint + Spring 3.2
>            Reporter: Aaron Whiteside
>             Fix For: Future
>
>         Attachments: aggregate-optimistic-locking-support.patch
>
>
> AggregateProcessor/AggregationRepository does not deal with optimistic locking - and
will not work correctly in a distributed environment.
> I started to write a Voldemort specific AggregationRepository I saw that the AggregateProcessor
does not deal with optimistic locking. It uses a single AggregateProcessor instance specific
lock.
> In a distributed environment where there are many Camel instances on many servers using
a shared data store for the AggregationRepository this will not work.
> Consider the following scenario using a persistent/shared AggregationRepository:
> Camel instance A on server A, receives Exchange 1..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance B on server B, receives Exchange 2..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance A & B at the same time both call..
> # AggregateProcessor calls AggregationStrategy with the new exchange and old null exchange
> # aggregationRepository.add() with the result (the new exchange)
> # Camel instance A succeeds to store the new exchange.
> # Camel instance B fails with an exception stating that something is already stored using
that exchange id.
> ## at this point I could write my AggregationRepository implementation to ignore the
existing entry and overwrite it. But this would mean the exiting exchange is lost and never
aggregated.
> A possible solution would be:
> a) Remove the lock from AggregateProcessor 
>  a1) Put the lock in the MemoryAggregationRepository or 
>  a2) Use a ConcurrentHashMap.putIfAbsent method (and then continue on to do B below).
> b) Introduce an AggregationRepositoryOptimisticLockException (name it whatever you want)
that is thrown when an AggregationRepository detects that someone is trying to add() the same
exchange id at the same time.
> Upon receiving this exception the AggregateProcessor would re-get() the oldExchange (now
not null) from the AggregationRepository and call the AggregationStrategy again to aggregate
the old and the new exchanges.
> This would ensure that no exchanges fail to aggregate in a distributed environment. Given
that the underlying AggregationRepository is able to detect concurrent add()'s. Which most
should be able to (using conditional updates).
> For example:
> SQL could try and insert into a table with a unique constraint on the exchange id. When
the constraint is violated JPA/JDBC/whatever will throw a unique constraint violation exception
which can be converted into a AggregationRepositoryOptimisticLockException.
> And HawtDB supports optimistic locking out of the box, by throwing a OptimisticUpdateException
when it detects concurrent updates. So updating this component to take advantage of this feature
should be very simple.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message