camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zoran Regvart <>
Subject Re: Race Condition in Aggregation using HazelcastAggregationRepository in a cluster
Date Sat, 19 Aug 2017 22:52:25 GMT
Hi Michael,
it's a bit hard to follow so I could be misunderstanding your issue;
is your issue that there is a race condition between the aggregator
that expects the reply on node A and another aggregator that is not
aware of the initial request on node B?

If you're doing only request-reply correlation perhaps take a look at
InOut message exchange pattern with a correlation property[1] with the
replying application setting the ReplyToQMgr to the requester's queue

Or, place the reply in a Hazelcast queue regardless of the queue
manager the reply landed on and process the reply from there.

Also I think that it would be better to setup the reply coordination
expectation (with timeouts and without transactions -- that would
block) before sending the message.



On Wed, Aug 16, 2017 at 5:10 PM, Michael Lück <> wrote:
> Hi there,
> we just had an issue in one of our systems and it looks like there is an
> issue with locking in the AggregateProcessor in a
> distributed environment.
> I’ll try to explain it:
> ===================================================
> Scenario
> ===================================================
> We use camel-core and camel-hazelcast 2.16.5  and hazelcast 3.5.2
> We have a route which sends a message to an Websphere MQ Queue (via
> JMSComponent) and after that we put
> the message into an aggregator which uses the JMSCorrelationId to correlate
> the request and the response.
> from(epAggregation)
>   .aggregate(header("JMSCorrelationID"), new CustomAggregationStrategy())
> .completionTimeout(Integer.parseInt(getContext().resolvePropertyPlaceholders
> ("{{timeout}}")))
>   .completionSize(2)
>   .aggregationRepository(aggrRepo)
> The aggregationRepository aggrRepo is created like this
>   HazelcastAggregationRepository  aggrRepo = new
> HazelcastAggregationRepository ("aggrRepoDrsBatch", hcInst));
> where hcInst is an Instance of com.hazelcast.core.HazelcastInstance.
> We also have another route which reads the response from the response queue
> and forwards it to the aggregator.
> The environment consists of two nodes on which the same code is running (so
> essentially the send and response routes
> and the aggregation)
> The problem arises when the response is returned really fast and is consumed
> on the node that didn't sent the response.
> ========================================
> Analysis
> ========================================
> I digged a bit in the camel code and it seems to me that the problem here is
> the lock in the AggregateProcessor as it is local
> to the VM in which the code runs. I'll try for an example to make this more
> clear:
> - Node A sends a MQ message and after that it puts the message into the
> aggregator. The AggregateProcessor runs and
>   checks the lock before going into doAggregation()
>         - in doAggregation it tries to get the Exchange from the repository
> and doesn't find any. So it continues to aggregate
>                 the first message an writes this into the repository
> - In about the same time between reading the exchange from the repository
> and before writing the "aggregated" first
>   message into the repository Node B fetches the reply from the response
> queue and sends it to the aggregator. As in node A
>   the lock is checked and as the code runs on another VM the lock is free
> and the AggregateProcessor can go to doAggregation
>         - in doAggregation the Node tries to get the Exchange from the
> repository before the other node has written it.
>           And like Node A the code proceeds with creating the first Exchange
> for the aggregation and writes in into the
>                 repository.
> The result is that one of the nodes will override the Exchange the other
> created before. And the Aggreagtion will never
> complete (actually it does but because of the timeout)
> =========================================
> Ideas to solve the problem
> =========================================
> - probably optimistic locking is an option here as
> HazelcastAggregationRepository supports this by implementing
>   OptimisticLockingAggregationRepository
> => I'd like to hear your thoughts on this.
> - currently we can stop the route consuming from the response route on one
> Node to eliminate the error. But this is not
>   an option for a long time because we lose the ability for fail over
> - probably it's an idea to make the AggregateProcessor get the Lock Object
> from the repository. So for example for the
>   HazelcastAggregationRepository the repository can return the lock object
> for the hazelcast map which would lock it for the
>   whole cluster.
> - I thought about resending the MQ message in case of an timeout but as the
> request has side effects on the system that
>   processes the message this is not really an option.
> So I hope I could make myself clear,
> If you have any questions which would help you to help me, I'd happy to
> answer them.
> Regards,
> Michael

Zoran Regvart

View raw message