camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Lück <>
Subject Race Condition in Aggregation using HazelcastAggregationRepository in a cluster
Date Wed, 16 Aug 2017 15:10:41 GMT
Hi there,

we just had an issue in one of our systems and it looks like there is an
issue with locking in the AggregateProcessor in a 
distributed environment.

I’ll try to explain it:

We use camel-core and camel-hazelcast 2.16.5  and hazelcast 3.5.2

We have a route which sends a message to an Websphere MQ Queue (via
JMSComponent) and after that we put 
the message into an aggregator which uses the JMSCorrelationId to correlate
the request and the response.

  .aggregate(header("JMSCorrelationID"), new CustomAggregationStrategy())

The aggregationRepository aggrRepo is created like this 
  HazelcastAggregationRepository  aggrRepo = new
HazelcastAggregationRepository ("aggrRepoDrsBatch", hcInst));
where hcInst is an Instance of com.hazelcast.core.HazelcastInstance.

We also have another route which reads the response from the response queue
and forwards it to the aggregator. 

The environment consists of two nodes on which the same code is running (so
essentially the send and response routes 
and the aggregation)

The problem arises when the response is returned really fast and is consumed
on the node that didn't sent the response.


I digged a bit in the camel code and it seems to me that the problem here is
the lock in the AggregateProcessor as it is local 
to the VM in which the code runs. I'll try for an example to make this more

- Node A sends a MQ message and after that it puts the message into the
aggregator. The AggregateProcessor runs and 
  checks the lock before going into doAggregation()
	- in doAggregation it tries to get the Exchange from the repository
and doesn't find any. So it continues to aggregate 
                the first message an writes this into the repository
- In about the same time between reading the exchange from the repository
and before writing the "aggregated" first 
  message into the repository Node B fetches the reply from the response
queue and sends it to the aggregator. As in node A 
  the lock is checked and as the code runs on another VM the lock is free
and the AggregateProcessor can go to doAggregation
	- in doAggregation the Node tries to get the Exchange from the
repository before the other node has written it. 
	  And like Node A the code proceeds with creating the first Exchange
for the aggregation and writes in into the 

The result is that one of the nodes will override the Exchange the other
created before. And the Aggreagtion will never 
complete (actually it does but because of the timeout)

Ideas to solve the problem
- probably optimistic locking is an option here as
HazelcastAggregationRepository supports this by implementing 
=> I'd like to hear your thoughts on this.

- currently we can stop the route consuming from the response route on one
Node to eliminate the error. But this is not 
  an option for a long time because we lose the ability for fail over 
- probably it's an idea to make the AggregateProcessor get the Lock Object
from the repository. So for example for the 
  HazelcastAggregationRepository the repository can return the lock object
for the hazelcast map which would lock it for the 
  whole cluster. 
- I thought about resending the MQ message in case of an timeout but as the
request has side effects on the system that 
  processes the message this is not really an option.

So I hope I could make myself clear,
If you have any questions which would help you to help me, I'd happy to
answer them. 


View raw message