camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aaron Whiteside (JIRA)" <j...@apache.org>
Subject [jira] [Created] (CAMEL-6042) AggregateProcessor/AggregationRepository does not deal with optimistic locking - will not work correctly in a distributed environmental.
Date Wed, 06 Feb 2013 20:57:13 GMT
Aaron Whiteside created CAMEL-6042:
--------------------------------------

             Summary: AggregateProcessor/AggregationRepository does not deal with optimistic
locking - will not work correctly in a distributed environmental.
                 Key: CAMEL-6042
                 URL: https://issues.apache.org/jira/browse/CAMEL-6042
             Project: Camel
          Issue Type: Bug
          Components: camel-core
    Affects Versions: 2.10.3
         Environment: Glassfish + Gemini Blueprint + Spring 3.2
            Reporter: Aaron Whiteside


AggregateProcessor/AggregationRepository does not deal with optimistic locking - and will
not work correctly in a distributed environmental.

I started to write a Voldemort specific AggregationRepository only to see that the AggregateProcessor
does not deal with optimistic locking issues. It uses a single AggregateProcessor instance
specific lock.

In a distributed environmental where there are many Camel instances on many servers using
a shared data store for the AggregationRepository this will not work.

Consider the following scenario using a persistent/shared AggregationRepository:

Camel instance A on server A, receives Exchange 1..
# AggregateProcessor first acquires instance specific lock.
# AggregateProcessor calls oldExchange = AggregationRepository.get()
# oldExchange is null

Camel instance B on server B, receives Exchange 2..
# AggregateProcessor first acquires instance specific lock.
# AggregateProcessor calls oldExchange = AggregationRepository.get()
# oldExchange is null

Camel instance A & B at the same time both call..
# aggregationRepository.add() along with the new exchange and old null exchange.

# Camel instance A succeeds to store the new exchange.
# Camel instance B fails with an exception stating that something is already stored using
that exchange id.
## at this point I could write my AggregationRepository implementation to ignore the existing
entry and overwrite it. At which point the exiting exchange is lost and never aggregated.

A possible solution would be:
a) Either remove the lock from AggregateProcessor and put it in the MemoryAggregationRepository
or use the ConcurrentHashMaps putIfAbsent method (and then do B).

b) Introduce a AggregationRepositoryOptimisticLockException (name it whatever you want) that
is thrown when a AggregationRepository detects that someone is trying to add() the same exchange
id at the same time (or rather during the race condition window).

Upon receiving this exception the AggregateProcessor would re-get() the oldExchange from the
AggregationRepository and call the AggregationStrategy again to aggregate it.

This would ensure that no exchanges fail to aggregate in a distributed environment. Given
that the underlying AggregationRepository can detect concurrent add()'s. Which most should
be able to.

For example:
SQL could try and insert into a table with a unique constraint on the exchange id. When the
constraint is violated JPA/JDBC/whatever will throw a unique constraint violation exception
which can be converted into a AggregationRepositoryOptimisticLockException.

And HawtDB only supports optimistic locking, by throwing a OptimisticUpdateException when
it detects concurrent updates.




--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message