Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E7B2418467 for ; Thu, 15 Oct 2015 16:22:31 +0000 (UTC) Received: (qmail 76255 invoked by uid 500); 15 Oct 2015 16:22:31 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 76223 invoked by uid 500); 15 Oct 2015 16:22:31 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 76214 invoked by uid 99); 15 Oct 2015 16:22:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Oct 2015 16:22:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 59745E0C6E; Thu, 15 Oct 2015 16:22:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: Debugging slowdowns Date: Thu, 15 Oct 2015 16:22:31 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/ignite-1.4-slow-server-debug 4ce4ff198 -> 96c6bebac Debugging slowdowns Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96c6beba Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96c6beba Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96c6beba Branch: refs/heads/ignite-1.4-slow-server-debug Commit: 96c6bebac5a9f32efcdd448d9f13bc3e9071a367 Parents: 4ce4ff1 Author: Yakov Zhdanov Authored: Thu Oct 15 19:21:33 2015 +0300 Committer: Yakov Zhdanov Committed: Thu Oct 15 19:21:33 2015 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/internal/Bench.java | 99 ++++++++++ .../processors/cache/GridCacheMvccManager.java | 111 +++++------ .../distributed/GridDistributedTxMapping.java | 8 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 28 +-- .../distributed/dht/GridDhtTxPrepareFuture.java | 29 +-- .../cache/distributed/dht/GridDhtTxRemote.java | 47 ++--- .../near/GridNearOptimisticTxPrepareFuture.java | 11 +- .../near/GridNearTxFinishFuture.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 4 +- .../cache/transactions/IgniteTxManager.java | 183 ++++++++++--------- .../ignite/internal/util/IgniteUuidCache.java | 6 +- .../util/future/GridCompoundFuture.java | 175 ++++++++++++------ .../java/org/jsr166/ConcurrentHashMap8.java | 2 +- .../java/org/jsr166/ConcurrentLinkedDeque8.java | 2 +- .../org/jsr166/ConcurrentLinkedHashMap.java | 2 +- 15 files changed, 442 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/Bench.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/Bench.java b/modules/core/src/main/java/org/apache/ignite/internal/Bench.java new file mode 100644 index 0000000..994156e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/Bench.java @@ -0,0 +1,99 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jsr166.LongAdder8; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +/** + * + */ +public class Bench { + public static void main(String[] args) throws InterruptedException { + Ignition.start(config("1", + false)); + Ignition.start(config("2", + false)); + + final boolean client = false; + + final Ignite ignite = Ignition.start(config("0", + client)); + + final IgniteCache cache = + ignite.getOrCreateCache(new CacheConfiguration<>() + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setBackups(1).setRebalanceMode(CacheRebalanceMode.SYNC) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)); + + Thread.sleep(2000); + + + final LongAdder8 cnt = new LongAdder8(); + + final AtomicLong time = new AtomicLong(U.currentTimeMillis()); + + for (int i = 0; i < 3; i++) { + new Thread( + new Runnable() { + @Override public void run() { + for (;;) { + int key; + + if (client) + key = ThreadLocalRandom.current().nextInt(10000); + + else + for (;;) { + key = ThreadLocalRandom.current().nextInt(10000); + + if (ignite.affinity(null).isPrimary(ignite.cluster().localNode(), key)) + break; + } + + cache.put(key, 0); + + cnt.increment(); + + long l = time.get(); + long now = U.currentTimeMillis(); + + if (now - l > 1000 && time.compareAndSet(l, now)) + System.out.println("TPS [client=" + client + ", cnt=" + cnt.sumThenReset() + ']'); + } + } + } + ).start(); + } + } + + private static IgniteConfiguration config( + String name, + boolean client + ) { + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + return new IgniteConfiguration().setGridName(name).setLocalHost("127.0.0.1").setClientMode(client).setCommunicationSpi(commSpi); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index e2d0302..bc0f634 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -17,19 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -57,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; @@ -64,6 +52,19 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; @@ -114,7 +115,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap near2dht = newMap(); /** Finish futures. */ - private final Queue finishFuts = new ConcurrentLinkedDeque8<>(); + //private final Queue finishFuts = new ConcurrentLinkedDeque8<>(); /** Logger. */ @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"}) @@ -171,8 +172,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { else if (log.isDebugEnabled()) log.debug("Failed to find transaction for changed owner: " + owner); - for (FinishLockFuture f : finishFuts) - f.recheck(entry); +// for (FinishLockFuture f : finishFuts) +// f.recheck(entry); } /** {@inheritDoc} */ @@ -443,28 +444,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { return true; while (true) { - Collection> old = futs.putIfAbsent(fut.version(), - new ConcurrentLinkedDeque8>() { - /** */ - private int hash; - - { - // Make sure that we add future to queue before - // adding queue to the map of futures. - add(fut); - } + // TODO Properly optimize + Collection> col = new HashSet>(U.capacity(4), 0.75f) { + { + // Make sure that we add future to queue before + // adding queue to the map of futures. + add(fut); + } - @Override public int hashCode() { - if (hash == 0) - hash = System.identityHashCode(this); + @Override public int hashCode() { + return System.identityHashCode(this); + } - return hash; - } + @Override public boolean equals(Object obj) { + return obj == this; + } + }; - @Override public boolean equals(Object obj) { - return obj == this; - } - }); + Collection> old = futs.putIfAbsent(fut.version(), + col); if (old != null) { boolean empty, dup = false; @@ -477,6 +475,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { if (!empty && !dup) old.add(fut); + + if (old.size() > 4) + System.out.println("Old: " + old); } // Future is being removed, so we force-remove here and try again. @@ -630,7 +631,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { if (cacheCtx.isNear() || cacheCtx.isLocal()) return true; - boolean ret = rmvLocks.add(ver); + boolean ret = true;//rmvLocks.add(ver); if (log.isDebugEnabled()) log.debug("Added removed lock version: " + ver); @@ -944,7 +945,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { X.println(">>> lockedSize: " + locked.size()); X.println(">>> futsSize: " + futs.size()); X.println(">>> near2dhtSize: " + near2dht.size()); - X.println(">>> finishFutsSize: " + finishFuts.size()); +// X.println(">>> finishFutsSize: " + finishFuts.size()); } /** @@ -964,10 +965,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { public Map> unfinishedLocks(AffinityTopologyVersion topVer) { Map> cands = new HashMap<>(); - for (FinishLockFuture fut : finishFuts) { - if (fut.topologyVersion().equals(topVer)) - cands.putAll(fut.pendingLocks()); - } +// for (FinishLockFuture fut : finishFuts) { +// if (fut.topologyVersion().equals(topVer)) +// cands.putAll(fut.pendingLocks()); +// } return cands; } @@ -1059,17 +1060,17 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { ), topVer); - finishFuts.add(finishFut); - - finishFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture e) { - finishFuts.remove(finishFut); - - // This call is required to make sure that the concurrent queue - // clears memory occupied by internal nodes. - finishFuts.peek(); - } - }); +// finishFuts.add(finishFut); +// +// finishFut.listen(new CI1>() { +// @Override public void apply(IgniteInternalFuture e) { +// finishFuts.remove(finishFut); +// +// // This call is required to make sure that the concurrent queue +// // clears memory occupied by internal nodes. +// finishFuts.peek(); +// } +// }); finishFut.recheck(); @@ -1083,8 +1084,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { if (exchLog.isDebugEnabled()) exchLog.debug("Rechecking pending locks for completion."); - for (FinishLockFuture fut : finishFuts) - fut.recheck(); +// for (FinishLockFuture fut : finishFuts) +// fut.recheck(); } /** @@ -1245,4 +1246,4 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { return S.toString(FinishLockFuture.class, this, super.toString()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index 1e78ba2..2d2d935 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -23,13 +23,13 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -87,7 +87,7 @@ public class GridDistributedTxMapping implements Externalizable { public GridDistributedTxMapping(ClusterNode node) { this.node = node; - entries = new GridConcurrentLinkedHashSet<>(); + entries = new LinkedHashSet<>(); } /** @@ -297,7 +297,7 @@ public class GridDistributedTxMapping implements Externalizable { */ private void ensureModifiable() { if (readOnly) { - entries = new GridConcurrentLinkedHashSet<>(entries); + entries = new LinkedHashSet<>(entries); readOnly = false; } @@ -330,4 +330,4 @@ public class GridDistributedTxMapping implements Externalizable { @Override public String toString() { return S.toString(GridDistributedTxMapping.class, this, "node", node.id()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 8c7d985..dfeffb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.Externalizable; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -58,7 +59,6 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; @@ -78,10 +78,10 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { private static final long serialVersionUID = 0L; /** Near mappings. */ - protected Map nearMap = new ConcurrentHashMap8<>(); + protected Map nearMap = new HashMap<>(); /** DHT mappings. */ - protected Map dhtMap = new ConcurrentHashMap8<>(); + protected Map dhtMap = new HashMap<>(); /** Mapped flag. */ protected AtomicBoolean mapped = new AtomicBoolean(); @@ -141,20 +141,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { int taskNameHash ) { super( - cctx, - xidVer, - implicit, - implicitSingle, - sys, - plc, - concurrency, - isolation, - timeout, + cctx, + xidVer, + implicit, + implicitSingle, + sys, + plc, + concurrency, + isolation, + timeout, invalidate, storeEnabled, onePhaseCommit, - txSize, - subjId, + txSize, + subjId, taskNameHash ); http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 165c8a9..89a435a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -17,18 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import javax.cache.expiry.Duration; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; @@ -79,6 +67,21 @@ import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL; @@ -178,7 +181,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture lockKeys = new GridConcurrentHashSet<>(); + private Set lockKeys = new HashSet<>(); /** Force keys future for correct transforms. */ private IgniteInternalFuture forceKeysFut; http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index f8be2a7..3996d6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import javax.cache.processor.EntryProcessor; @@ -39,11 +40,11 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedHashMap; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; @@ -112,19 +113,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { int taskNameHash ) { super( - ctx, - nodeId, - rmtThreadId, - xidVer, - commitVer, + ctx, + nodeId, + rmtThreadId, + xidVer, + commitVer, sys, plc, - concurrency, - isolation, - invalidate, - timeout, + concurrency, + isolation, + invalidate, + timeout, txSize, - subjId, + subjId, taskNameHash ); @@ -138,7 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { readMap = Collections.emptyMap(); - writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f); + writeMap = new LinkedHashMap<>(U.capacity(txSize)); topologyVersion(topVer); } @@ -183,19 +184,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { int taskNameHash ) { super( - ctx, - nodeId, - rmtThreadId, - xidVer, - commitVer, + ctx, + nodeId, + rmtThreadId, + xidVer, + commitVer, sys, plc, - concurrency, - isolation, - invalidate, - timeout, + concurrency, + isolation, + invalidate, + timeout, txSize, - subjId, + subjId, taskNameHash ); @@ -207,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { this.rmtFutId = rmtFutId; readMap = Collections.emptyMap(); - writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f); + writeMap = new LinkedHashMap<>(U.capacity(txSize)); topologyVersion(topVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 25028c4..d11db2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import java.util.ArrayDeque; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; @@ -60,7 +62,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionOptimisticException; import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.transactions.TransactionState.PREPARED; @@ -478,7 +479,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd txMapping = new GridDhtTxMapping(); - ConcurrentLinkedDeque8 mappings = new ConcurrentLinkedDeque8<>(); + Queue mappings = new ArrayDeque<>(); if (!F.isEmpty(reads) || !F.isEmpty(writes)) { for (int cacheId : tx.activeCacheIds()) { @@ -555,7 +556,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd * * @param mappings Queue of mappings. */ - private void proceedPrepare(final ConcurrentLinkedDeque8 mappings) { + private void proceedPrepare(final Queue mappings) { if (isDone()) return; @@ -757,7 +758,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd private AtomicBoolean rcvRes = new AtomicBoolean(false); /** Mappings to proceed prepare. */ - private ConcurrentLinkedDeque8 mappings; + private Queue mappings; /** * @param m Mapping. @@ -765,7 +766,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd */ MiniFuture( GridDistributedTxMapping m, - ConcurrentLinkedDeque8 mappings + Queue mappings ) { this.m = m; this.mappings = mappings; http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index b62bbea..d058b67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -92,7 +92,7 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu private AtomicReference err = new AtomicReference<>(null); /** Node mappings. */ - private ConcurrentMap mappings; + private Map mappings; /** Trackable flag. */ private boolean trackable = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 216f978..17e3ac1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -87,7 +87,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { private static final long serialVersionUID = 0L; /** DHT mappings. */ - private ConcurrentMap mappings = new ConcurrentHashMap8<>(); + private Map mappings = new HashMap<>(); /** Future. */ @GridToStringExclude @@ -424,7 +424,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** * @return DHT map. */ - ConcurrentMap mappings() { + Map mappings() { return mappings; } http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 4074eee..032e043 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -851,8 +851,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { GridCacheVersion finishTn = cctx.versions().last(); // Add future to prepare queue only on first prepare call. - if (tx.markPreparing()) - prepareQ.offer(tx); +// if (tx.markPreparing()) +// prepareQ.offer(tx); + + tx.markPreparing(); // Check that our read set does not intersect with write set // of all transactions that completed their write phase @@ -888,50 +890,50 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // Check that our read and write sets do not intersect with write // sets of all active transactions. - for (Iterator iter = prepareQ.iterator(); iter.hasNext();) { - IgniteInternalTx prepareTx = iter.next(); - - if (prepareTx == tx) - // Skip yourself. - continue; - - // Optimistically remove completed transactions. - if (prepareTx.done()) { - iter.remove(); - - if (log.isDebugEnabled()) - log.debug("Removed finished transaction from active queue: " + prepareTx); - - continue; - } - - // Check if originating node left. - if (cctx.discovery().node(prepareTx.nodeId()) == null) { - iter.remove(); - - rollbackTx(prepareTx); - - if (log.isDebugEnabled()) - log.debug("Removed and rolled back transaction because sender node left grid: " + - CU.txString(prepareTx)); - - continue; - } - - if (tx.serializable() && !prepareTx.isRollbackOnly()) { - Set prepareWriteSet = prepareTx.writeSet(); - - if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) { - // Remove from active set. - iter.remove(); - - tx.setRollbackOnly(); - - throw new IgniteTxOptimisticCheckedException( - "Failed to prepare transaction (read-set/write-set conflict): " + tx); - } - } - } +// for (Iterator iter = prepareQ.iterator(); iter.hasNext();) { +// IgniteInternalTx prepareTx = iter.next(); +// +// if (prepareTx == tx) +// // Skip yourself. +// continue; +// +// // Optimistically remove completed transactions. +// if (prepareTx.done()) { +// iter.remove(); +// +// if (log.isDebugEnabled()) +// log.debug("Removed finished transaction from active queue: " + prepareTx); +// +// continue; +// } +// +// // Check if originating node left. +// if (cctx.discovery().node(prepareTx.nodeId()) == null) { +// iter.remove(); +// +// rollbackTx(prepareTx); +// +// if (log.isDebugEnabled()) +// log.debug("Removed and rolled back transaction because sender node left grid: " + +// CU.txString(prepareTx)); +// +// continue; +// } +// +// if (tx.serializable() && !prepareTx.isRollbackOnly()) { +// Set prepareWriteSet = prepareTx.writeSet(); +// +// if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) { +// // Remove from active set. +// iter.remove(); +// +// tx.setRollbackOnly(); +// +// throw new IgniteTxOptimisticCheckedException( +// "Failed to prepare transaction (read-set/write-set conflict): " + tx); +// } +// } +// } } // Optimistic. @@ -1097,23 +1099,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return If transaction was not already present in completed set. */ public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) { - if (nearXidVer != null) - xidVer = new CommittedVersion(xidVer, nearXidVer); - - Boolean committed = completedVers.putIfAbsent(xidVer, true); - - if (committed == null || committed) { - if (log.isDebugEnabled()) - log.debug("Added transaction to committed version set: " + xidVer); - - return true; - } - else { - if (log.isDebugEnabled()) - log.debug("Transaction is already present in rolled back version set: " + xidVer); - - return false; - } +// if (nearXidVer != null) +// xidVer = new CommittedVersion(xidVer, nearXidVer); +// +// Boolean committed = completedVers.putIfAbsent(xidVer, true); +// +// if (committed == null || committed) { +// if (log.isDebugEnabled()) +// log.debug("Added transaction to committed version set: " + xidVer); +// +// return true; +// } +// else { +// if (log.isDebugEnabled()) +// log.debug("Transaction is already present in rolled back version set: " + xidVer); +// +// return false; +// } + return true; } /** @@ -1121,20 +1124,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return If transaction was not already present in completed set. */ public boolean addRolledbackTx(GridCacheVersion xidVer) { - Boolean committed = completedVers.putIfAbsent(xidVer, false); - - if (committed == null || !committed) { - if (log.isDebugEnabled()) - log.debug("Added transaction to rolled back version set: " + xidVer); - return true; - } - else { - if (log.isDebugEnabled()) - log.debug("Transaction is already present in committed version set: " + xidVer); - - return false; - } + return true; +// Boolean committed = completedVers.putIfAbsent(xidVer, false); +// +// if (committed == null || !committed) { +// if (log.isDebugEnabled()) +// log.debug("Added transaction to rolled back version set: " + xidVer); +// +// return true; +// } +// else { +// if (log.isDebugEnabled()) +// log.debug("Transaction is already present in committed version set: " + xidVer); +// +// return false; +// } } /** @@ -1261,19 +1266,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * so we don't do it here. */ - Boolean committed = completedVers.get(tx.xidVersion()); - - // 1. Make sure that committed version has been recorded. - if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { - uncommitTx(tx); - - GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey(); - GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey(); - - throw new IgniteException("Missing commit version (consider increasing " + - IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" + - first + ", lastVer=" + last + ", tx=" + tx.xid() + ']'); - } +// Boolean committed = completedVers.get(tx.xidVersion()); +// +// // 1. Make sure that committed version has been recorded. +// if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { +// uncommitTx(tx); +// +// GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey(); +// GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey(); +// +// throw new IgniteException("Missing commit version (consider increasing " + +// IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" + +// first + ", lastVer=" + last + ", tx=" + tx.xid() + ']'); +// } ConcurrentMap txIdMap = transactionMap(tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java index 4ca00d9..d9ffdd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.util; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; + /** * */ @@ -29,7 +31,7 @@ public final class IgniteUuidCache { /** Cache. */ private static final ConcurrentMap cache = - new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64); + new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64, PER_SEGMENT_Q); /** * Gets cached UUID to preserve memory. @@ -56,4 +58,4 @@ public final class IgniteUuidCache { private IgniteUuidCache() { // No-op. } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 0a6d9aa..c795578 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -17,25 +17,23 @@ package org.apache.ignite.internal.util.future; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicMarkableReference; -import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteReducer; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * Future composed of multiple inner futures. @@ -44,33 +42,44 @@ public class GridCompoundFuture extends GridFutureAdapter { /** */ private static final long serialVersionUID = 0L; - /** Futures. */ - private final ConcurrentLinkedDeque8> futs = new ConcurrentLinkedDeque8<>(); + /** */ + private static final int INITED = 0b1; - /** Pending futures. */ - private final Collection> pending = new ConcurrentLinkedDeque8<>(); + /** */ + private static final AtomicIntegerFieldUpdater flagsUpd = + AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "flags"); - /** Listener call count. */ - private final AtomicInteger lsnrCalls = new AtomicInteger(); + /** */ + private static final AtomicIntegerFieldUpdater lsnrCallsUpd = + AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls"); - /** Finished flag. */ - private final AtomicBoolean finished = new AtomicBoolean(); + /** */ + private static final AtomicReferenceFieldUpdater errUpd = + AtomicReferenceFieldUpdater.newUpdater(GridCompoundFuture.class, Throwable.class, "err"); + + /** Futures. */ + private final Collection> futs = new ArrayList<>(); /** Reducer. */ @GridToStringInclude private IgniteReducer rdc; - /** Initialize flag. */ - private AtomicBoolean init = new AtomicBoolean(false); - - /** Result with a flag to control if reducer has been called. */ - private AtomicMarkableReference res = new AtomicMarkableReference<>(null, false); - /** Exceptions to ignore. */ private Class[] ignoreChildFailures; /** Error. */ - private AtomicReference err = new AtomicReference<>(); + private volatile Throwable err; + + /** + * @see #INITED + */ + private volatile int flags; + + /** */ + private volatile int lsnrCalls; + + /** */ + private final Object mux = new Object(); /** * @@ -104,7 +113,7 @@ public class GridCompoundFuture extends GridFutureAdapter { /** {@inheritDoc} */ @Override public boolean cancel() throws IgniteCheckedException { if (onCancelled()) { - for (IgniteInternalFuture fut : futs) + for (IgniteInternalFuture fut : futures()) fut.cancel(); return true; @@ -118,8 +127,25 @@ public class GridCompoundFuture extends GridFutureAdapter { * * @return Collection of futures. */ + private Collection> futures(boolean pending) { + synchronized (mux) { + Collection> res = new ArrayList<>(futs.size()); + + for (IgniteInternalFuture fut : futs) { + if (!pending || !fut.isDone()) + res.add(fut); + } + + return res; + } + } + /** + * Gets collection of futures. + * + * @return Collection of futures. + */ public Collection> futures() { - return futs; + return futures(false); } /** @@ -128,7 +154,7 @@ public class GridCompoundFuture extends GridFutureAdapter { * @return Pending futures. */ public Collection> pending() { - return pending; + return futures(true); } /** @@ -147,7 +173,7 @@ public class GridCompoundFuture extends GridFutureAdapter { * @return {@code True} if there are pending futures. */ public boolean hasPending() { - return !pending.isEmpty(); + return !pending().isEmpty(); } /** @@ -155,7 +181,7 @@ public class GridCompoundFuture extends GridFutureAdapter { * {@link #markInitialized()} method is called on future. */ public boolean initialized() { - return init.get(); + return flagSet(INITED); } /** @@ -166,8 +192,9 @@ public class GridCompoundFuture extends GridFutureAdapter { public void add(IgniteInternalFuture fut) { assert fut != null; - pending.add(fut); - futs.add(fut); + synchronized (mux) { + futs.add(fut); + } fut.listen(new Listener()); @@ -219,10 +246,34 @@ public class GridCompoundFuture extends GridFutureAdapter { } /** + * @param flag + * @return + */ + private boolean casFlag(int flag) { + for (;;) { + int flags0 = flags; + + if ((flags0 & flag) != 0) + return false; + + if (flagsUpd.compareAndSet(this, flags0, flags0 | flag)) + return true; + } + } + + /** + * @param flag + * @return + */ + private boolean flagSet(int flag) { + return (flags & flag) != 0; + } + + /** * Mark this future as initialized. */ public void markInitialized() { - if (init.compareAndSet(false, true)) + if (casFlag(INITED)) // Check complete to make sure that we take care // of all the ignored callbacks. checkComplete(); @@ -232,32 +283,44 @@ public class GridCompoundFuture extends GridFutureAdapter { * Check completeness of the future. */ private void checkComplete() { - Throwable err = this.err.get(); + Throwable err = this.err; boolean ignore = ignoreFailure(err); - if (init.get() && (res.isMarked() || lsnrCalls.get() == futs.sizex() || (err != null && !ignore)) - && finished.compareAndSet(false, true)) { + if (flagSet(INITED) && !isDone() && + ((err != null && !ignore) || lsnrCalls == futuresSize())) { try { - if (err == null && rdc != null && !res.isMarked()) - res.compareAndSet(null, rdc.reduce(), false, true); + onDone( + rdc != null ? rdc.reduce() : null, + ignore ? null : err); } catch (RuntimeException e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); + U.error( + null, + "Failed to execute compound future reducer: " + this, + e); onDone(e); - - return; } catch (AssertionError e) { - U.error(null, "Failed to execute compound future reducer: " + this, e); + U.error( + null, + "Failed to execute compound future reducer: " + this, + e); onDone(e); throw e; } + } + } - onDone(res.getReference(), ignore ? null : err); + /** + * @return + */ + private int futuresSize() { + synchronized (mux) { + return futs.size(); } } @@ -286,13 +349,15 @@ public class GridCompoundFuture extends GridFutureAdapter { return S.toString(GridCompoundFuture.class, this, "done", isDone(), "cancelled", isCancelled(), - "err", error(), - "futs", - F.viewReadOnly(futs, new C1, String>() { - @Override public String apply(IgniteInternalFuture f) { - return Boolean.toString(f.isDone()); - } - }) + "err", error() +// , +// +// "futs", +// F.viewReadOnly(futs, new C1, String>() { +// @Override public String apply(IgniteInternalFuture f) { +// return Boolean.toString(f.isDone()); +// } +// }) ); } @@ -305,14 +370,12 @@ public class GridCompoundFuture extends GridFutureAdapter { /** {@inheritDoc} */ @Override public void apply(IgniteInternalFuture fut) { - pending.remove(fut); - try { T t = fut.get(); try { - if (rdc != null && !rdc.collect(t) && !res.isMarked()) - res.compareAndSet(null, rdc.reduce(), false, true); + if (rdc != null && !rdc.collect(t)) + onDone(rdc.reduce()); } catch (RuntimeException e) { U.error(null, "Failed to execute compound future reducer: " + this, e); @@ -331,18 +394,18 @@ public class GridCompoundFuture extends GridFutureAdapter { } catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException | ClusterTopologyCheckedException e) { - err.compareAndSet(null, e); + errUpd.compareAndSet(GridCompoundFuture.this, null, e); } catch (IgniteCheckedException e) { if (!ignoreFailure(e)) U.error(null, "Failed to execute compound future reducer: " + this, e); - err.compareAndSet(null, e); + errUpd.compareAndSet(GridCompoundFuture.this, null, e); } catch (RuntimeException e) { U.error(null, "Failed to execute compound future reducer: " + this, e); - err.compareAndSet(null, e); + errUpd.compareAndSet(GridCompoundFuture.this, null, e); } catch (AssertionError e) { U.error(null, "Failed to execute compound future reducer: " + this, e); @@ -353,7 +416,7 @@ public class GridCompoundFuture extends GridFutureAdapter { throw e; } - lsnrCalls.incrementAndGet(); + lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this); checkComplete(); } @@ -363,4 +426,4 @@ public class GridCompoundFuture extends GridFutureAdapter { return "Compound future listener: " + GridCompoundFuture.this; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java index d93f12e..b3747d7 100644 --- a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java +++ b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java @@ -3805,4 +3805,4 @@ public class ConcurrentHashMap8 } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java index 75db13c..28e38d7 100644 --- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java +++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java @@ -1735,4 +1735,4 @@ public class ConcurrentLinkedDeque8 } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java index 5b7381e..22baa46 100644 --- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java +++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java @@ -2163,4 +2163,4 @@ public class ConcurrentLinkedHashMap extends AbstractMap implements */ PER_SEGMENT_Q_OPTIMIZED_RMV } -} \ No newline at end of file +}