camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Serge Smertin <serg.smer...@gmail.com>
Subject [Aggregation] [Optimistic Locking] [Hazelcast] DefaultExchangeHolder for distributed concurrent maps
Date Thu, 05 Mar 2015 13:54:16 GMT
Greetings,

Currently i have an aggregation route with approximately following
configuration:
                .aggregate(header("kind"))
                .aggregationRepository(getHdfsFlushRepo())
                .optimisticLockRetryPolicy(new
OptimisticLockRetryPolicy().maximumRetries(10).randomBackOff())
                .optimisticLocking()

Adding new exchanges result in error:
*Optimistic locking failed for exchange with key other: IMap#replace
returned no Exchanges, while it's expected to replace one*

Code from HazelcastAggregationRepository that is related to the issue:
        if (!cache.replace(key, oldHolder, newHolder)) {
                LOG.error("Optimistic locking failed for exchange with key
{}: IMap#replace returned no Exchanges, while it's expected to replace one",
                        key);
                throw new OptimisticLockingException();
            }

And i guess the source of the problem is in marshalling of exchange holder:
hazelcast does compare-and-swap based on serialized binary content and not
hashcode. If default exchange holder contains any hashmaps (either headers,
properties or hashmap withing a body), binary representations are
different, but hash codes are the same: 70 3F 40  0C 77 74 vs 70 3F 40  77
74.

I'm not sure - should this problem be fixed on hazelcast component level or
camel core, so I reported a bug -
https://issues.apache.org/jira/browse/CAMEL-8438.

There are some possible ways to get optimistic locking working with
hazelcast (and probably other storages that have atomic update
functionality, like InfiniSpan):
- Hazelcast supports InMemory.OBJECT storage, so that ObjectRecordFactory
would invoke DefaultExchangeHolder#equals method. Though, it
invokes Object#equals, as current implementation in 2.14 doesn't have
equals() method
- think about making serialisation of exchange holder to more consistent
binary form.
- don't use HashMaps in aggregated exchanges at all. that means, no headers
as well.

Something like that, thank you for reading this.

Cheers,
Serge

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message