camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Müller (JIRA) <j...@apache.org>
Subject [jira] [Commented] (CAMEL-6042) AggregateProcessor/AggregationRepository does not deal with optimistic locking - will not work correctly in a distributed environment
Date Fri, 01 Mar 2013 06:43:13 GMT

    [ https://issues.apache.org/jira/browse/CAMEL-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13590286#comment-13590286
] 

Christian Müller commented on CAMEL-6042:
-----------------------------------------

In general, we only wait for Karaf 2.3.1 which is in vote already. This means in the next
couple of days if possible...
                
> AggregateProcessor/AggregationRepository does not deal with optimistic locking - will
not work correctly in a distributed environment
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-6042
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6042
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-core
>    Affects Versions: 2.10.3
>         Environment: Glassfish + Gemini Blueprint + Spring 3.2
>            Reporter: Aaron Whiteside
>            Assignee: Claus Ibsen
>             Fix For: 2.11.0
>
>         Attachments: aggregate-optimistic-locking-support1.patch, aggregate-optimistic-locking-support2.patch,
aggregate-optimistic-locking-support.patch, javadoc-and-unit-test-updates.patch
>
>
> AggregateProcessor/AggregationRepository does not deal with optimistic locking - and
will not work correctly in a distributed environment.
> I started to write a Voldemort specific AggregationRepository I saw that the AggregateProcessor
does not deal with optimistic locking. It uses a single AggregateProcessor instance specific
lock.
> In a distributed environment where there are many Camel instances on many servers using
a shared data store for the AggregationRepository this will not work.
> Consider the following scenario using a persistent/shared AggregationRepository:
> Camel instance A on server A, receives Exchange 1..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance B on server B, receives Exchange 2..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance A & B at the same time both call..
> # AggregateProcessor calls AggregationStrategy with the new exchange and old null exchange
> # aggregationRepository.add() with the result (the new exchange)
> # Camel instance A succeeds to store the new exchange.
> # Camel instance B fails with an exception stating that something is already stored using
that exchange id.
> ## at this point I could write my AggregationRepository implementation to ignore the
existing entry and overwrite it. But this would mean the exiting exchange is lost and never
aggregated.
> A possible solution would be:
> a) Remove the lock from AggregateProcessor 
>  a1) Put the lock in the MemoryAggregationRepository or 
>  a2) Use a ConcurrentHashMap.putIfAbsent method (and then continue on to do B below).
> b) Introduce an AggregationRepositoryOptimisticLockException (name it whatever you want)
that is thrown when an AggregationRepository detects that someone is trying to add() the same
exchange id at the same time.
> Upon receiving this exception the AggregateProcessor would re-get() the oldExchange (now
not null) from the AggregationRepository and call the AggregationStrategy again to aggregate
the old and the new exchanges.
> This would ensure that no exchanges fail to aggregate in a distributed environment. Given
that the underlying AggregationRepository is able to detect concurrent add()'s. Which most
should be able to (using conditional updates).
> For example:
> SQL could try and insert into a table with a unique constraint on the correlation id.
When the constraint is violated JPA/JDBC/whatever will throw a unique constraint violation
exception which can be converted into a AggregationRepositoryOptimisticLockException.
> And HawtDB supports optimistic locking out of the box, by throwing a OptimisticUpdateException
when it detects concurrent updates. So updating this component to take advantage of this feature
should be very simple.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message