ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject ignite git commit: Debugging slowdowns
Date Thu, 15 Oct 2015 16:22:31 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1.4-slow-server-debug 4ce4ff198 -> 96c6bebac


Debugging slowdowns


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96c6beba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96c6beba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96c6beba

Branch: refs/heads/ignite-1.4-slow-server-debug
Commit: 96c6bebac5a9f32efcdd448d9f13bc3e9071a367
Parents: 4ce4ff1
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Thu Oct 15 19:21:33 2015 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Thu Oct 15 19:21:33 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/internal/Bench.java  |  99 ++++++++++
 .../processors/cache/GridCacheMvccManager.java  | 111 +++++------
 .../distributed/GridDistributedTxMapping.java   |   8 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  28 +--
 .../distributed/dht/GridDhtTxPrepareFuture.java |  29 +--
 .../cache/distributed/dht/GridDhtTxRemote.java  |  47 ++---
 .../near/GridNearOptimisticTxPrepareFuture.java |  11 +-
 .../near/GridNearTxFinishFuture.java            |   2 +-
 .../cache/distributed/near/GridNearTxLocal.java |   4 +-
 .../cache/transactions/IgniteTxManager.java     | 183 ++++++++++---------
 .../ignite/internal/util/IgniteUuidCache.java   |   6 +-
 .../util/future/GridCompoundFuture.java         | 175 ++++++++++++------
 .../java/org/jsr166/ConcurrentHashMap8.java     |   2 +-
 .../java/org/jsr166/ConcurrentLinkedDeque8.java |   2 +-
 .../org/jsr166/ConcurrentLinkedHashMap.java     |   2 +-
 15 files changed, 442 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/Bench.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/Bench.java b/modules/core/src/main/java/org/apache/ignite/internal/Bench.java
new file mode 100644
index 0000000..994156e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/Bench.java
@@ -0,0 +1,99 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jsr166.LongAdder8;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+public class Bench {
+    public static void main(String[] args) throws InterruptedException {
+        Ignition.start(config("1",
+            false));
+        Ignition.start(config("2",
+            false));
+
+        final boolean client = false;
+
+        final Ignite ignite = Ignition.start(config("0",
+            client));
+
+        final IgniteCache<Object, Object> cache =
+            ignite.getOrCreateCache(new CacheConfiguration<>()
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setBackups(1).setRebalanceMode(CacheRebalanceMode.SYNC)
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC));
+
+        Thread.sleep(2000);
+
+
+        final LongAdder8 cnt = new LongAdder8();
+
+        final AtomicLong time = new AtomicLong(U.currentTimeMillis());
+
+        for (int i = 0; i < 3; i++) {
+            new Thread(
+                new Runnable() {
+                    @Override public void run() {
+                        for (;;) {
+                            int key;
+
+                            if (client)
+                                key = ThreadLocalRandom.current().nextInt(10000);
+
+                            else
+                                for (;;) {
+                                    key = ThreadLocalRandom.current().nextInt(10000);
+
+                                    if (ignite.affinity(null).isPrimary(ignite.cluster().localNode(), key))
+                                        break;
+                                }
+
+                            cache.put(key, 0);
+
+                            cnt.increment();
+
+                            long l = time.get();
+                            long now = U.currentTimeMillis();
+
+                            if (now - l > 1000 && time.compareAndSet(l, now))
+                                System.out.println("TPS [client=" + client + ", cnt=" + cnt.sumThenReset() + ']');
+                        }
+                    }
+                }
+            ).start();
+        }
+    }
+
+    private static IgniteConfiguration config(
+        String name,
+        boolean client
+    ) {
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        return new IgniteConfiguration().setGridName(name).setLocalHost("127.0.0.1").setClientMode(client).setCommunicationSpi(commSpi);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index e2d0302..bc0f634 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,19 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -57,6 +44,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
@@ -64,6 +52,19 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
@@ -114,7 +115,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
 
     /** Finish futures. */
-    private final Queue<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
+    //private final Queue<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
 
     /** Logger. */
     @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -171,8 +172,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             else if (log.isDebugEnabled())
                 log.debug("Failed to find transaction for changed owner: " + owner);
 
-            for (FinishLockFuture f : finishFuts)
-                f.recheck(entry);
+//            for (FinishLockFuture f : finishFuts)
+//                f.recheck(entry);
         }
 
         /** {@inheritDoc} */
@@ -443,28 +444,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             return true;
 
         while (true) {
-            Collection<GridCacheFuture<?>> old = futs.putIfAbsent(fut.version(),
-                new ConcurrentLinkedDeque8<GridCacheFuture<?>>() {
-                    /** */
-                    private int hash;
-
-                    {
-                        // Make sure that we add future to queue before
-                        // adding queue to the map of futures.
-                        add(fut);
-                    }
+            // TODO Properly optimize
+            Collection<GridCacheFuture<?>> col = new HashSet<GridCacheFuture<?>>(U.capacity(4), 0.75f) {
+                {
+                    // Make sure that we add future to queue before
+                    // adding queue to the map of futures.
+                    add(fut);
+                }
 
-                    @Override public int hashCode() {
-                        if (hash == 0)
-                            hash = System.identityHashCode(this);
+                @Override public int hashCode() {
+                    return System.identityHashCode(this);
+                }
 
-                        return hash;
-                    }
+                @Override public boolean equals(Object obj) {
+                    return obj == this;
+                }
+            };
 
-                    @Override public boolean equals(Object obj) {
-                        return obj == this;
-                    }
-                });
+            Collection<GridCacheFuture<?>> old = futs.putIfAbsent(fut.version(),
+                col);
 
             if (old != null) {
                 boolean empty, dup = false;
@@ -477,6 +475,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
 
                     if (!empty && !dup)
                         old.add(fut);
+
+                    if (old.size() > 4)
+                        System.out.println("Old: " + old);
                 }
 
                 // Future is being removed, so we force-remove here and try again.
@@ -630,7 +631,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         if (cacheCtx.isNear() || cacheCtx.isLocal())
             return true;
 
-        boolean ret = rmvLocks.add(ver);
+        boolean ret = true;//rmvLocks.add(ver);
 
         if (log.isDebugEnabled())
             log.debug("Added removed lock version: " + ver);
@@ -944,7 +945,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         X.println(">>>   lockedSize: " + locked.size());
         X.println(">>>   futsSize: " + futs.size());
         X.println(">>>   near2dhtSize: " + near2dht.size());
-        X.println(">>>   finishFutsSize: " + finishFuts.size());
+//        X.println(">>>   finishFutsSize: " + finishFuts.size());
     }
 
     /**
@@ -964,10 +965,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
         Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
 
-        for (FinishLockFuture fut : finishFuts) {
-            if (fut.topologyVersion().equals(topVer))
-                cands.putAll(fut.pendingLocks());
-        }
+//        for (FinishLockFuture fut : finishFuts) {
+//            if (fut.topologyVersion().equals(topVer))
+//                cands.putAll(fut.pendingLocks());
+//        }
 
         return cands;
     }
@@ -1059,17 +1060,17 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 ),
             topVer);
 
-        finishFuts.add(finishFut);
-
-        finishFut.listen(new CI1<IgniteInternalFuture<?>>() {
-            @Override public void apply(IgniteInternalFuture<?> e) {
-                finishFuts.remove(finishFut);
-
-                // This call is required to make sure that the concurrent queue
-                // clears memory occupied by internal nodes.
-                finishFuts.peek();
-            }
-        });
+//        finishFuts.add(finishFut);
+//
+//        finishFut.listen(new CI1<IgniteInternalFuture<?>>() {
+//            @Override public void apply(IgniteInternalFuture<?> e) {
+//                finishFuts.remove(finishFut);
+//
+//                // This call is required to make sure that the concurrent queue
+//                // clears memory occupied by internal nodes.
+//                finishFuts.peek();
+//            }
+//        });
 
         finishFut.recheck();
 
@@ -1083,8 +1084,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         if (exchLog.isDebugEnabled())
             exchLog.debug("Rechecking pending locks for completion.");
 
-        for (FinishLockFuture fut : finishFuts)
-            fut.recheck();
+//        for (FinishLockFuture fut : finishFuts)
+//            fut.recheck();
     }
 
     /**
@@ -1245,4 +1246,4 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 return S.toString(FinishLockFuture.class, this, super.toString());
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 1e78ba2..2d2d935 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -23,13 +23,13 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -87,7 +87,7 @@ public class GridDistributedTxMapping implements Externalizable {
     public GridDistributedTxMapping(ClusterNode node) {
         this.node = node;
 
-        entries = new GridConcurrentLinkedHashSet<>();
+        entries = new LinkedHashSet<>();
     }
 
     /**
@@ -297,7 +297,7 @@ public class GridDistributedTxMapping implements Externalizable {
      */
     private void ensureModifiable() {
         if (readOnly) {
-            entries = new GridConcurrentLinkedHashSet<>(entries);
+            entries = new LinkedHashSet<>(entries);
 
             readOnly = false;
         }
@@ -330,4 +330,4 @@ public class GridDistributedTxMapping implements Externalizable {
     @Override public String toString() {
         return S.toString(GridDistributedTxMapping.class, this, "node", node.id());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 8c7d985..dfeffb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.io.Externalizable;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -58,7 +59,6 @@ import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
@@ -78,10 +78,10 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     private static final long serialVersionUID = 0L;
 
     /** Near mappings. */
-    protected Map<UUID, GridDistributedTxMapping> nearMap = new ConcurrentHashMap8<>();
+    protected Map<UUID, GridDistributedTxMapping> nearMap = new HashMap<>();
 
     /** DHT mappings. */
-    protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
+    protected Map<UUID, GridDistributedTxMapping> dhtMap = new HashMap<>();
 
     /** Mapped flag. */
     protected AtomicBoolean mapped = new AtomicBoolean();
@@ -141,20 +141,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         int taskNameHash
     ) {
         super(
-            cctx, 
-            xidVer, 
-            implicit, 
-            implicitSingle, 
-            sys, 
-            plc, 
-            concurrency, 
-            isolation, 
-            timeout, 
+            cctx,
+            xidVer,
+            implicit,
+            implicitSingle,
+            sys,
+            plc,
+            concurrency,
+            isolation,
+            timeout,
             invalidate,
             storeEnabled,
             onePhaseCommit,
-            txSize, 
-            subjId, 
+            txSize,
+            subjId,
             taskNameHash
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 165c8a9..89a435a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -17,18 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.expiry.Duration;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
@@ -79,6 +67,21 @@ import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
@@ -178,7 +181,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
     /** Keys that should be locked. */
     @GridToStringInclude
-    private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+    private Set<IgniteTxKey> lockKeys = new HashSet<>();
 
     /** Force keys future for correct transforms. */
     private IgniteInternalFuture<?> forceKeysFut;

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index f8be2a7..3996d6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 import javax.cache.processor.EntryProcessor;
@@ -39,11 +40,11 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedHashMap;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 
@@ -112,19 +113,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         int taskNameHash
     ) {
         super(
-            ctx, 
-            nodeId, 
-            rmtThreadId, 
-            xidVer, 
-            commitVer, 
+            ctx,
+            nodeId,
+            rmtThreadId,
+            xidVer,
+            commitVer,
             sys,
             plc,
-            concurrency, 
-            isolation, 
-            invalidate, 
-            timeout, 
+            concurrency,
+            isolation,
+            invalidate,
+            timeout,
             txSize,
-            subjId, 
+            subjId,
             taskNameHash
         );
 
@@ -138,7 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
 
         readMap = Collections.emptyMap();
 
-        writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+        writeMap = new LinkedHashMap<>(U.capacity(txSize));
 
         topologyVersion(topVer);
     }
@@ -183,19 +184,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         int taskNameHash
     ) {
         super(
-            ctx, 
-            nodeId, 
-            rmtThreadId, 
-            xidVer, 
-            commitVer, 
+            ctx,
+            nodeId,
+            rmtThreadId,
+            xidVer,
+            commitVer,
             sys,
             plc,
-            concurrency, 
-            isolation, 
-            invalidate, 
-            timeout, 
+            concurrency,
+            isolation,
+            invalidate,
+            timeout,
             txSize,
-            subjId, 
+            subjId,
             taskNameHash
         );
 
@@ -207,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         this.rmtFutId = rmtFutId;
 
         readMap = Collections.emptyMap();
-        writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+        writeMap = new LinkedHashMap<>(U.capacity(txSize));
 
         topologyVersion(topVer);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 25028c4..d11db2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
@@ -60,7 +62,6 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionOptimisticException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -478,7 +479,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
         txMapping = new GridDhtTxMapping();
 
-        ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>();
+        Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
 
         if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
             for (int cacheId : tx.activeCacheIds()) {
@@ -555,7 +556,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
      *
      * @param mappings Queue of mappings.
      */
-    private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) {
+    private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) {
         if (isDone())
             return;
 
@@ -757,7 +758,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         private AtomicBoolean rcvRes = new AtomicBoolean(false);
 
         /** Mappings to proceed prepare. */
-        private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
+        private Queue<GridDistributedTxMapping> mappings;
 
         /**
          * @param m Mapping.
@@ -765,7 +766,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
          */
         MiniFuture(
             GridDistributedTxMapping m,
-            ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
+            Queue<GridDistributedTxMapping> mappings
         ) {
             this.m = m;
             this.mappings = mappings;

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index b62bbea..d058b67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -92,7 +92,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     private AtomicReference<Throwable> err = new AtomicReference<>(null);
 
     /** Node mappings. */
-    private ConcurrentMap<UUID, GridDistributedTxMapping> mappings;
+    private Map<UUID, GridDistributedTxMapping> mappings;
 
     /** Trackable flag. */
     private boolean trackable = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 216f978..17e3ac1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -87,7 +87,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     private static final long serialVersionUID = 0L;
 
     /** DHT mappings. */
-    private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
+    private Map<UUID, GridDistributedTxMapping> mappings = new HashMap<>();
 
     /** Future. */
     @GridToStringExclude
@@ -424,7 +424,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /**
      * @return DHT map.
      */
-    ConcurrentMap<UUID, GridDistributedTxMapping> mappings() {
+    Map<UUID, GridDistributedTxMapping> mappings() {
         return mappings;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4074eee..032e043 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -851,8 +851,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             GridCacheVersion finishTn = cctx.versions().last();
 
             // Add future to prepare queue only on first prepare call.
-            if (tx.markPreparing())
-                prepareQ.offer(tx);
+//            if (tx.markPreparing())
+//                prepareQ.offer(tx);
+
+            tx.markPreparing();
 
             // Check that our read set does not intersect with write set
             // of all transactions that completed their write phase
@@ -888,50 +890,50 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
             // Check that our read and write sets do not intersect with write
             // sets of all active transactions.
-            for (Iterator<IgniteInternalTx> iter = prepareQ.iterator(); iter.hasNext();) {
-                IgniteInternalTx prepareTx = iter.next();
-
-                if (prepareTx == tx)
-                    // Skip yourself.
-                    continue;
-
-                // Optimistically remove completed transactions.
-                if (prepareTx.done()) {
-                    iter.remove();
-
-                    if (log.isDebugEnabled())
-                        log.debug("Removed finished transaction from active queue: " + prepareTx);
-
-                    continue;
-                }
-
-                // Check if originating node left.
-                if (cctx.discovery().node(prepareTx.nodeId()) == null) {
-                    iter.remove();
-
-                    rollbackTx(prepareTx);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Removed and rolled back transaction because sender node left grid: " +
-                            CU.txString(prepareTx));
-
-                    continue;
-                }
-
-                if (tx.serializable() && !prepareTx.isRollbackOnly()) {
-                    Set<IgniteTxKey> prepareWriteSet = prepareTx.writeSet();
-
-                    if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) {
-                        // Remove from active set.
-                        iter.remove();
-
-                        tx.setRollbackOnly();
-
-                        throw new IgniteTxOptimisticCheckedException(
-                            "Failed to prepare transaction (read-set/write-set conflict): " + tx);
-                    }
-                }
-            }
+//            for (Iterator<IgniteInternalTx> iter = prepareQ.iterator(); iter.hasNext();) {
+//                IgniteInternalTx prepareTx = iter.next();
+//
+//                if (prepareTx == tx)
+//                    // Skip yourself.
+//                    continue;
+//
+//                // Optimistically remove completed transactions.
+//                if (prepareTx.done()) {
+//                    iter.remove();
+//
+//                    if (log.isDebugEnabled())
+//                        log.debug("Removed finished transaction from active queue: " + prepareTx);
+//
+//                    continue;
+//                }
+//
+//                // Check if originating node left.
+//                if (cctx.discovery().node(prepareTx.nodeId()) == null) {
+//                    iter.remove();
+//
+//                    rollbackTx(prepareTx);
+//
+//                    if (log.isDebugEnabled())
+//                        log.debug("Removed and rolled back transaction because sender node left grid: " +
+//                            CU.txString(prepareTx));
+//
+//                    continue;
+//                }
+//
+//                if (tx.serializable() && !prepareTx.isRollbackOnly()) {
+//                    Set<IgniteTxKey> prepareWriteSet = prepareTx.writeSet();
+//
+//                    if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) {
+//                        // Remove from active set.
+//                        iter.remove();
+//
+//                        tx.setRollbackOnly();
+//
+//                        throw new IgniteTxOptimisticCheckedException(
+//                            "Failed to prepare transaction (read-set/write-set conflict): " + tx);
+//                    }
+//                }
+//            }
         }
 
         // Optimistic.
@@ -1097,23 +1099,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @return If transaction was not already present in completed set.
      */
     public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) {
-        if (nearXidVer != null)
-            xidVer = new CommittedVersion(xidVer, nearXidVer);
-
-        Boolean committed = completedVers.putIfAbsent(xidVer, true);
-
-        if (committed == null || committed) {
-            if (log.isDebugEnabled())
-                log.debug("Added transaction to committed version set: " + xidVer);
-
-            return true;
-        }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Transaction is already present in rolled back version set: " + xidVer);
-
-            return false;
-        }
+//        if (nearXidVer != null)
+//            xidVer = new CommittedVersion(xidVer, nearXidVer);
+//
+//        Boolean committed = completedVers.putIfAbsent(xidVer, true);
+//
+//        if (committed == null || committed) {
+//            if (log.isDebugEnabled())
+//                log.debug("Added transaction to committed version set: " + xidVer);
+//
+//            return true;
+//        }
+//        else {
+//            if (log.isDebugEnabled())
+//                log.debug("Transaction is already present in rolled back version set: " + xidVer);
+//
+//            return false;
+//        }
+        return true;
     }
 
     /**
@@ -1121,20 +1124,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * @return If transaction was not already present in completed set.
      */
     public boolean addRolledbackTx(GridCacheVersion xidVer) {
-        Boolean committed = completedVers.putIfAbsent(xidVer, false);
-
-        if (committed == null || !committed) {
-            if (log.isDebugEnabled())
-                log.debug("Added transaction to rolled back version set: " + xidVer);
 
-            return true;
-        }
-        else {
-            if (log.isDebugEnabled())
-                log.debug("Transaction is already present in committed version set: " + xidVer);
-
-            return false;
-        }
+        return true;
+//        Boolean committed = completedVers.putIfAbsent(xidVer, false);
+//
+//        if (committed == null || !committed) {
+//            if (log.isDebugEnabled())
+//                log.debug("Added transaction to rolled back version set: " + xidVer);
+//
+//            return true;
+//        }
+//        else {
+//            if (log.isDebugEnabled())
+//                log.debug("Transaction is already present in committed version set: " + xidVer);
+//
+//            return false;
+//        }
     }
 
     /**
@@ -1261,19 +1266,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
          * so we don't do it here.
          */
 
-        Boolean committed = completedVers.get(tx.xidVersion());
-
-        // 1. Make sure that committed version has been recorded.
-        if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
-            uncommitTx(tx);
-
-            GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey();
-            GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey();
-
-            throw new IgniteException("Missing commit version (consider increasing " +
-                IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" +
-                first + ", lastVer=" + last + ", tx=" + tx.xid() + ']');
-        }
+//        Boolean committed = completedVers.get(tx.xidVersion());
+//
+//        // 1. Make sure that committed version has been recorded.
+//        if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
+//            uncommitTx(tx);
+//
+//            GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey();
+//            GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey();
+//
+//            throw new IgniteException("Missing commit version (consider increasing " +
+//                IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" +
+//                first + ", lastVer=" + last + ", tx=" + tx.xid() + ']');
+//        }
 
         ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
index 4ca00d9..d9ffdd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.util;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
+
 /**
  *
  */
@@ -29,7 +31,7 @@ public final class IgniteUuidCache {
 
     /** Cache. */
     private static final ConcurrentMap<UUID, UUID> cache =
-        new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64);
+        new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64, PER_SEGMENT_Q);
 
     /**
      * Gets cached UUID to preserve memory.
@@ -56,4 +58,4 @@ public final class IgniteUuidCache {
     private IgniteUuidCache() {
         // No-op.
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 0a6d9aa..c795578 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -17,25 +17,23 @@
 
 package org.apache.ignite.internal.util.future;
 
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicMarkableReference;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /**
  * Future composed of multiple inner futures.
@@ -44,33 +42,44 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Futures. */
-    private final ConcurrentLinkedDeque8<IgniteInternalFuture<T>> futs = new ConcurrentLinkedDeque8<>();
+    /** */
+    private static final int INITED = 0b1;
 
-    /** Pending futures. */
-    private final Collection<IgniteInternalFuture<T>> pending = new ConcurrentLinkedDeque8<>();
+    /** */
+    private static final AtomicIntegerFieldUpdater<GridCompoundFuture> flagsUpd =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "flags");
 
-    /** Listener call count. */
-    private final AtomicInteger lsnrCalls = new AtomicInteger();
+    /** */
+    private static final AtomicIntegerFieldUpdater<GridCompoundFuture> lsnrCallsUpd =
+        AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
 
-    /** Finished flag. */
-    private final AtomicBoolean finished = new AtomicBoolean();
+    /** */
+    private static final AtomicReferenceFieldUpdater<GridCompoundFuture, Throwable> errUpd =
+        AtomicReferenceFieldUpdater.newUpdater(GridCompoundFuture.class, Throwable.class, "err");
+
+    /** Futures. */
+    private final Collection<IgniteInternalFuture<T>> futs = new ArrayList<>();
 
     /** Reducer. */
     @GridToStringInclude
     private IgniteReducer<T, R> rdc;
 
-    /** Initialize flag. */
-    private AtomicBoolean init = new AtomicBoolean(false);
-
-    /** Result with a flag to control if reducer has been called. */
-    private AtomicMarkableReference<R> res = new AtomicMarkableReference<>(null, false);
-
     /** Exceptions to ignore. */
     private Class<? extends Throwable>[] ignoreChildFailures;
 
     /** Error. */
-    private AtomicReference<Throwable> err = new AtomicReference<>();
+    private volatile Throwable err;
+
+    /**
+     * @see #INITED
+     */
+    private volatile int flags;
+
+    /** */
+    private volatile int lsnrCalls;
+
+    /** */
+    private final Object mux = new Object();
 
     /**
      *
@@ -104,7 +113,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
     /** {@inheritDoc} */
     @Override public boolean cancel() throws IgniteCheckedException {
         if (onCancelled()) {
-            for (IgniteInternalFuture<T> fut : futs)
+            for (IgniteInternalFuture<T> fut : futures())
                 fut.cancel();
 
             return true;
@@ -118,8 +127,25 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
      *
      * @return Collection of futures.
      */
+    private Collection<IgniteInternalFuture<T>> futures(boolean pending) {
+        synchronized (mux) {
+            Collection<IgniteInternalFuture<T>> res = new ArrayList<>(futs.size());
+
+            for (IgniteInternalFuture<T> fut : futs) {
+                if (!pending || !fut.isDone())
+                    res.add(fut);
+            }
+
+            return res;
+        }
+    }
+    /**
+     * Gets collection of futures.
+     *
+     * @return Collection of futures.
+     */
     public Collection<IgniteInternalFuture<T>> futures() {
-        return futs;
+        return futures(false);
     }
 
     /**
@@ -128,7 +154,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
      * @return Pending futures.
      */
     public Collection<IgniteInternalFuture<T>> pending() {
-        return pending;
+        return futures(true);
     }
 
     /**
@@ -147,7 +173,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
      * @return {@code True} if there are pending futures.
      */
     public boolean hasPending() {
-        return !pending.isEmpty();
+        return !pending().isEmpty();
     }
 
     /**
@@ -155,7 +181,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
      *      {@link #markInitialized()} method is called on future.
      */
     public boolean initialized() {
-        return init.get();
+        return flagSet(INITED);
     }
 
     /**
@@ -166,8 +192,9 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
     public void add(IgniteInternalFuture<T> fut) {
         assert fut != null;
 
-        pending.add(fut);
-        futs.add(fut);
+        synchronized (mux) {
+            futs.add(fut);
+        }
 
         fut.listen(new Listener());
 
@@ -219,10 +246,34 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
     }
 
     /**
+     * @param flag
+     * @return
+     */
+    private boolean casFlag(int flag) {
+        for (;;) {
+            int flags0 = flags;
+
+            if ((flags0 & flag) != 0)
+                return false;
+
+            if (flagsUpd.compareAndSet(this, flags0, flags0 | flag))
+                return true;
+        }
+    }
+
+    /**
+     * @param flag
+     * @return
+     */
+    private boolean flagSet(int flag) {
+        return (flags & flag) != 0;
+    }
+
+    /**
      * Mark this future as initialized.
      */
     public void markInitialized() {
-        if (init.compareAndSet(false, true))
+        if (casFlag(INITED))
             // Check complete to make sure that we take care
             // of all the ignored callbacks.
             checkComplete();
@@ -232,32 +283,44 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
      * Check completeness of the future.
      */
     private void checkComplete() {
-        Throwable err = this.err.get();
+        Throwable err = this.err;
 
         boolean ignore = ignoreFailure(err);
 
-        if (init.get() && (res.isMarked() || lsnrCalls.get() == futs.sizex() || (err != null && !ignore))
-            && finished.compareAndSet(false, true)) {
+        if (flagSet(INITED) && !isDone() &&
+            ((err != null && !ignore) || lsnrCalls == futuresSize())) {
             try {
-                if (err == null && rdc != null && !res.isMarked())
-                    res.compareAndSet(null, rdc.reduce(), false, true);
+                onDone(
+                    rdc != null ? rdc.reduce() : null,
+                    ignore ? null : err);
             }
             catch (RuntimeException e) {
-                U.error(null, "Failed to execute compound future reducer: " + this, e);
+                U.error(
+                    null,
+                    "Failed to execute compound future reducer: " + this,
+                    e);
 
                 onDone(e);
-
-                return;
             }
             catch (AssertionError e) {
-                U.error(null, "Failed to execute compound future reducer: " + this, e);
+                U.error(
+                    null,
+                    "Failed to execute compound future reducer: " + this,
+                    e);
 
                 onDone(e);
 
                 throw e;
             }
+        }
+    }
 
-            onDone(res.getReference(), ignore ? null : err);
+    /**
+     * @return
+     */
+    private int futuresSize() {
+        synchronized (mux) {
+            return futs.size();
         }
     }
 
@@ -286,13 +349,15 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
         return S.toString(GridCompoundFuture.class, this,
             "done", isDone(),
             "cancelled", isCancelled(),
-            "err", error(),
-            "futs",
-                F.viewReadOnly(futs, new C1<IgniteInternalFuture<T>, String>() {
-                    @Override public String apply(IgniteInternalFuture<T> f) {
-                        return Boolean.toString(f.isDone());
-                    }
-                })
+            "err", error()
+//            ,
+//
+// "futs",
+//                F.viewReadOnly(futs, new C1<IgniteInternalFuture<T>, String>() {
+//                    @Override public String apply(IgniteInternalFuture<T> f) {
+//                        return Boolean.toString(f.isDone());
+//                    }
+//                })
         );
     }
 
@@ -305,14 +370,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
 
         /** {@inheritDoc} */
         @Override public void apply(IgniteInternalFuture<T> fut) {
-            pending.remove(fut);
-
             try {
                 T t = fut.get();
 
                 try {
-                    if (rdc != null && !rdc.collect(t) && !res.isMarked())
-                        res.compareAndSet(null, rdc.reduce(), false, true);
+                    if (rdc != null && !rdc.collect(t))
+                        onDone(rdc.reduce());
                 }
                 catch (RuntimeException e) {
                     U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -331,18 +394,18 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
             }
             catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException |
                 ClusterTopologyCheckedException e) {
-                err.compareAndSet(null, e);
+                errUpd.compareAndSet(GridCompoundFuture.this, null, e);
             }
             catch (IgniteCheckedException e) {
                 if (!ignoreFailure(e))
                     U.error(null, "Failed to execute compound future reducer: " + this, e);
 
-                err.compareAndSet(null, e);
+                errUpd.compareAndSet(GridCompoundFuture.this, null, e);
             }
             catch (RuntimeException e) {
                 U.error(null, "Failed to execute compound future reducer: " + this, e);
 
-                err.compareAndSet(null, e);
+                errUpd.compareAndSet(GridCompoundFuture.this, null, e);
             }
             catch (AssertionError e) {
                 U.error(null, "Failed to execute compound future reducer: " + this, e);
@@ -353,7 +416,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
                 throw e;
             }
 
-            lsnrCalls.incrementAndGet();
+            lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this);
 
             checkComplete();
         }
@@ -363,4 +426,4 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
             return "Compound future listener: " + GridCompoundFuture.this;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
index d93f12e..b3747d7 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java
@@ -3805,4 +3805,4 @@ public class ConcurrentHashMap8<K, V>
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
index 75db13c..28e38d7 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java
@@ -1735,4 +1735,4 @@ public class ConcurrentLinkedDeque8<E>
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/96c6beba/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
index 5b7381e..22baa46 100644
--- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
+++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java
@@ -2163,4 +2163,4 @@ public class ConcurrentLinkedHashMap<K, V> extends AbstractMap<K, V> implements
          */
         PER_SEGMENT_Q_OPTIMIZED_RMV
     }
-}
\ No newline at end of file
+}


Mime
View raw message