camel-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aaron Whiteside (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (CAMEL-6042) AggregateProcessor/AggregationRepository does not deal with optimistic locking - will not work correctly in a distributed environment
Date Fri, 15 Feb 2013 19:23:15 GMT

    [ https://issues.apache.org/jira/browse/CAMEL-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13579421#comment-13579421
] 

Aaron Whiteside edited comment on CAMEL-6042 at 2/15/13 7:21 PM:
-----------------------------------------------------------------

Oh somehow I missed that DistributedCompletionIntervalTest isn't correct, I'll provide another
patch soon. Thanks for pointing it out.. I think it was a copy and paste error on my behalf.

Thank you for getting this committed.

I'll also prepare another patch with javadoc updates to OptimisticLockingAggregationRepository.
And some more unit tests for various scenarios.
                
      was (Author: aaronjwhiteside):
    DistributedCompletionIntervalTest extends AbstractDistributedTest which setups up the
two CamelContext instances.. You'll see that we call getMockEndpoint() and getMockEndpoint2()...

Thank you for getting this committed.

I'll prepare another patch with javadoc updates to OptimisticLockingAggregationRepository.
And some more unit tests for various scenarios.
                  
> AggregateProcessor/AggregationRepository does not deal with optimistic locking - will
not work correctly in a distributed environment
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-6042
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6042
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-core
>    Affects Versions: 2.10.3
>         Environment: Glassfish + Gemini Blueprint + Spring 3.2
>            Reporter: Aaron Whiteside
>            Assignee: Claus Ibsen
>             Fix For: 2.11.0
>
>         Attachments: aggregate-optimistic-locking-support1.patch, aggregate-optimistic-locking-support2.patch,
aggregate-optimistic-locking-support.patch
>
>
> AggregateProcessor/AggregationRepository does not deal with optimistic locking - and
will not work correctly in a distributed environment.
> I started to write a Voldemort specific AggregationRepository I saw that the AggregateProcessor
does not deal with optimistic locking. It uses a single AggregateProcessor instance specific
lock.
> In a distributed environment where there are many Camel instances on many servers using
a shared data store for the AggregationRepository this will not work.
> Consider the following scenario using a persistent/shared AggregationRepository:
> Camel instance A on server A, receives Exchange 1..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance B on server B, receives Exchange 2..
> # AggregateProcessor first acquires instance specific lock.
> # AggregateProcessor calls oldExchange = AggregationRepository.get()
> # oldExchange is null
> Camel instance A & B at the same time both call..
> # AggregateProcessor calls AggregationStrategy with the new exchange and old null exchange
> # aggregationRepository.add() with the result (the new exchange)
> # Camel instance A succeeds to store the new exchange.
> # Camel instance B fails with an exception stating that something is already stored using
that exchange id.
> ## at this point I could write my AggregationRepository implementation to ignore the
existing entry and overwrite it. But this would mean the exiting exchange is lost and never
aggregated.
> A possible solution would be:
> a) Remove the lock from AggregateProcessor 
>  a1) Put the lock in the MemoryAggregationRepository or 
>  a2) Use a ConcurrentHashMap.putIfAbsent method (and then continue on to do B below).
> b) Introduce an AggregationRepositoryOptimisticLockException (name it whatever you want)
that is thrown when an AggregationRepository detects that someone is trying to add() the same
exchange id at the same time.
> Upon receiving this exception the AggregateProcessor would re-get() the oldExchange (now
not null) from the AggregationRepository and call the AggregationStrategy again to aggregate
the old and the new exchanges.
> This would ensure that no exchanges fail to aggregate in a distributed environment. Given
that the underlying AggregationRepository is able to detect concurrent add()'s. Which most
should be able to (using conditional updates).
> For example:
> SQL could try and insert into a table with a unique constraint on the correlation id.
When the constraint is violated JPA/JDBC/whatever will throw a unique constraint violation
exception which can be converted into a AggregationRepositoryOptimisticLockException.
> And HawtDB supports optimistic locking out of the box, by throwing a OptimisticUpdateException
when it detects concurrent updates. So updating this component to take advantage of this feature
should be very simple.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message