camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <Archis.Kulka...@nomura.com>
Subject Aggregator race condition in recovery task?
Date Wed, 29 Oct 2014 18:46:26 GMT
Hi,

We’re using the camel 2.12.3 library for aggregation in one of our applications. The JdbcAggregationRepository
is used to maintain the state. The aggregator is supposed to function based on time interval
i.e. every ‘n’ ms it should send out the aggregated output.
We’re seeing duplicate messages being generated from the recovery task on a frequent basis.
The recovery task is supposed to run every 5 sec.

By enabling the debug logs, and on further investigation I see the following sequence of events
in the logs

1.        [AggregateTimeoutChecker thread] - AggregateProcessor.run()  : Completion interval
triggered for correlation key

2.       [AggregateTimeoutChecker thread] - JdbcAggregationRepository.doInTransactionWithoutResult()
 : Removing key

3.       [AggregateRecoverChecker thread] - AggregateProcessor.run()  : Starting recover check

4.       [AggregateRecoverChecker thread] - JdbcAggregationRepository.mapRow()  : getKey

5.       [AggregateTimeoutChecker thread] - AggregateProcessor.onSubmitCompletion() : Aggregation
complete for correlation key

6.       [AggregateRecoverChecker thread] - AggregateProcessor.run() : Loading aggregated
exchange with id

7.       [AggregateTimeoutChecker thread] - AggregateProcessor.run() : Processing aggregated
exchange

8.       [AggregateRecoverChecker thread] - JdbcAggregationRepository.recover() - Recovering
exchangeId

9.       [AggregateRecoverChecker thread] - AggregateProcessor.run() - Delivery attempt: 1
to recover aggregated exchange with id

10.   [AggregateRecoverChecker thread] - AggregateProcessor.onSubmitCompletion() - Aggregation
complete for correlation key


In short, in the timeout checker, it removes the exchange from XXX table and adds it to XXX_completed
table. Then a little later in the processing, it adds the exchange to the “inProgressCompleteExchanges”
structure.
In the recover task, it scans the XXX_completed table and then checks if they are present
in “inProgressCompleteExchanges” before attempting to recover.

This usage of “inProgressCompleteExchanges” looks dodgy to me as I don’t see any explicit
locking for it in the recover task while the timeout task may be updating it simultaneously.
This is how the set is created in AggregateProcessor.
private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new
ConcurrentHashMap<String, Boolean>());

How does it ensure synchronization in this case?
Is there something I’m missing in this analysis? I don’t understand how add and contains
methods on this set called from different threads will be synchronized in this case? Any help
in this regards will be much appreciated.

Thanks,
Archis



PLEASE READ: This message is for the named person's use only. It may contain confidential,
proprietary or legally privileged information. No confidentiality or privilege is waived or
lost by any mistransmission. If you receive this message in error, please delete it and all
copies from your system, destroy any hard copies and notify the sender. You must not, directly
or indirectly, use, disclose, distribute, print, or copy any part of this message if you are
not the intended recipient. Nomura Holding America Inc., Nomura Securities International,
Inc, and their respective subsidiaries each reserve the right to monitor all e-mail communications
through its networks. Any views expressed in this message are those of the individual sender,
except where the message states otherwise and the sender is authorized to state the views
of such entity. Unless otherwise stated, any pricing information in this message is indicative
only, is subject to change and does not constitute an offer to deal at any price quoted. Any
reference to the terms of executed transactions should be treated as preliminary only and
subject to our formal written confirmation.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message