ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anovi...@apache.org
Subject [4/8] ignite git commit: Performance optimizations.
Date Mon, 09 Nov 2015 10:45:11 GMT
Performance optimizations.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4848a70
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4848a70
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4848a70

Branch: refs/heads/ignite-843-rc1
Commit: a4848a702fea1573af7f36af91d02f7df3ab64f4
Parents: 7393227
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Nov 9 12:16:16 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Nov 9 12:16:16 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoMessage.java   |   4 +-
 .../processors/cache/GridCacheContext.java      |  29 +--
 .../processors/cache/GridCacheEntryEx.java      |   4 +-
 .../processors/cache/GridCacheMapEntry.java     |  55 ++--
 .../processors/cache/GridCacheMvccManager.java  | 145 +++++------
 .../distributed/GridDistributedCacheEntry.java  |   2 +-
 .../distributed/GridDistributedTxMapping.java   |   8 +-
 .../GridDistributedTxRemoteAdapter.java         |   5 +-
 .../distributed/dht/GridDhtLockFuture.java      |   7 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  13 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  43 ++--
 .../distributed/dht/GridDhtTxPrepareFuture.java |  78 +++---
 .../cache/distributed/dht/GridDhtTxRemote.java  |  45 ++--
 .../dht/atomic/GridDhtAtomicCache.java          |   1 -
 .../dht/colocated/GridDhtColocatedCache.java    |   7 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  11 +-
 .../near/GridNearTransactionalCache.java        |   7 +-
 .../near/GridNearTxFinishFuture.java            | 157 ++++++------
 .../cache/distributed/near/GridNearTxLocal.java |  21 +-
 .../cache/transactions/IgniteInternalTx.java    |   3 +-
 .../cache/transactions/IgniteTxAdapter.java     | 251 ++++++++++---------
 .../cache/transactions/IgniteTxHandler.java     |  37 +--
 .../transactions/IgniteTxLocalAdapter.java      |  26 +-
 .../cache/transactions/IgniteTxManager.java     | 171 ++++++-------
 .../GridBoundedConcurrentLinkedHashMap.java     |   7 +-
 .../GridBoundedConcurrentLinkedHashSet.java     |   7 +-
 .../util/GridBoundedConcurrentOrderedMap.java   |  39 +--
 .../internal/util/GridConcurrentFactory.java    |  11 +-
 .../util/GridConcurrentLinkedHashSet.java       |   9 +-
 .../ignite/internal/util/IgniteUuidCache.java   |   6 +-
 .../util/future/GridCompoundFuture.java         | 155 ++++++++----
 .../java/org/jsr166/ConcurrentHashMap8.java     |   2 +-
 .../java/org/jsr166/ConcurrentLinkedDeque8.java |   2 +-
 .../org/jsr166/ConcurrentLinkedHashMap.java     | 195 +++++++++++---
 .../GridCacheAffinityBackupsSelfTest.java       |   8 +
 .../cache/GridCacheAbstractFullApiSelfTest.java |   2 +-
 .../GridCacheMissingCommitVersionSelfTest.java  |  40 +--
 .../processors/cache/GridCacheTestEntryEx.java  |   3 +-
 ...achePartitionedMultiNodeFullApiSelfTest.java |   2 +-
 .../continuous/GridEventConsumeSelfTest.java    |   2 +-
 ...dBoundedConcurrentLinkedHashMapSelfTest.java |   2 +-
 .../GridConcurrentLinkedHashMapSelfTest.java    |  62 ++++-
 .../junits/common/GridCommonAbstractTest.java   |   4 +-
 ...rrentLinkedHashMapMultiThreadedSelfTest.java | 104 ++++----
 .../yardstick/cache/IgnitePutTxBenchmark.java   |  10 +
 .../cache/IgnitePutTxPrimaryOnlyBenchmark.java  |  65 +++++
 .../IgnitePutTxSkipLocalBackupBenchmark.java    |  65 +++++
 .../cache/WaitMapExchangeFinishCallable.java    |  95 +++++++
 48 files changed, 1220 insertions(+), 807 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index c83a281..cb19ba0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -143,7 +143,7 @@ public class GridIoMessage implements Message {
     /**
      * @return Message.
      */
-    public Object message() {
+    public Message message() {
         return msg;
     }
 
@@ -320,4 +320,4 @@ public class GridIoMessage implements Message {
     @Override public String toString() {
         return S.toString(GridIoMessage.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 1f4852c..ee4da46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1488,10 +1488,9 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @param log Log.
      * @param dhtMap Dht mappings.
      * @param nearMap Near mappings.
-     * @return {@code True} if mapped.
      * @throws GridCacheEntryRemovedException If reader for entry is removed.
      */
-    public boolean dhtMap(
+    public void dhtMap(
         UUID nearNodeId,
         AffinityTopologyVersion topVer,
         GridDhtCacheEntry entry,
@@ -1509,7 +1508,7 @@ public class GridCacheContext<K, V> implements Externalizable {
 
         Collection<ClusterNode> dhtRemoteNodes = F.view(dhtNodes, F.remoteNodes(nodeId())); // Exclude local node.
 
-        boolean ret = map(entry, dhtRemoteNodes, dhtMap);
+        map(entry, dhtRemoteNodes, dhtMap);
 
         Collection<ClusterNode> nearRemoteNodes = null;
 
@@ -1530,7 +1529,7 @@ public class GridCacheContext<K, V> implements Externalizable {
             if (nearNodes != null && !nearNodes.isEmpty()) {
                 nearRemoteNodes = F.view(nearNodes, F.notIn(dhtNodes));
 
-                ret |= map(entry, nearRemoteNodes, nearMap);
+                map(entry, nearRemoteNodes, nearMap);
             }
         }
 
@@ -1540,8 +1539,6 @@ public class GridCacheContext<K, V> implements Externalizable {
 
             entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds);
         }
-
-        return ret;
     }
 
     /**
@@ -1549,10 +1546,9 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @param log Log.
      * @param dhtMap Dht mappings.
      * @param nearMap Near mappings.
-     * @return {@code True} if mapped.
      * @throws GridCacheEntryRemovedException If reader for entry is removed.
      */
-    public boolean dhtMap(
+    public void dhtMap(
         GridDhtCacheEntry entry,
         GridCacheVersion explicitLockVer,
         IgniteLogger log,
@@ -1571,27 +1567,20 @@ public class GridCacheContext<K, V> implements Externalizable {
 
             Collection<ClusterNode> nearNodes = cand.mappedNearNodes();
 
-            boolean ret = map(entry, dhtNodes, dhtMap);
+            map(entry, dhtNodes, dhtMap);
 
             if (nearNodes != null && !nearNodes.isEmpty())
-                ret |= map(entry, nearNodes, nearMap);
-
-            return ret;
+                map(entry, nearNodes, nearMap);
         }
-
-        return false;
     }
 
     /**
      * @param entry Entry.
      * @param nodes Nodes.
      * @param map Map.
-     * @return {@code True} if mapped.
      */
-    private boolean map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes,
+    private void map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes,
         Map<ClusterNode, List<GridDhtCacheEntry>> map) {
-        boolean ret = false;
-
         if (nodes != null) {
             for (ClusterNode n : nodes) {
                 List<GridDhtCacheEntry> entries = map.get(n);
@@ -1600,12 +1589,8 @@ public class GridCacheContext<K, V> implements Externalizable {
                     map.put(n, entries = new LinkedList<>());
 
                 entries.add(entry);
-
-                ret = true;
             }
         }
-
-        return ret;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 50b01c8..af62e39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -389,7 +389,6 @@ public interface GridCacheEntryEx {
      * @param tx Cache transaction.
      * @param evtNodeId ID of node responsible for this change.
      * @param affNodeId Partitioned node iD.
-     * @param writeThrough If {@code true}, persist to the storage.
      * @param retval {@code True} if value should be returned (and unmarshalled if needed).
      * @param evt Flag to signal event notification.
      * @param metrics Flag to signal metrics notification.
@@ -409,7 +408,6 @@ public interface GridCacheEntryEx {
         @Nullable IgniteInternalTx tx,
         UUID evtNodeId,
         UUID affNodeId,
-        boolean writeThrough,
         boolean retval,
         boolean evt,
         boolean metrics,
@@ -1014,4 +1012,4 @@ public interface GridCacheEntryEx {
      * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
      */
     public void onUnlock();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ca0995a..df9f5c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1108,7 +1108,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 CacheLazyEntry e = new CacheLazyEntry(cctx, key, old);
 
-                Object interceptorVal = cctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(cctx, key, old),
+                Object interceptorVal = cctx.config().getInterceptor().onBeforePut(
+                    new CacheLazyEntry(cctx, key, old),
                     val0);
 
                 key0 = e.key();
@@ -1212,7 +1213,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable IgniteInternalTx tx,
         UUID evtNodeId,
         UUID affNodeId,
-        boolean writeThrough,
         boolean retval,
         boolean evt,
         boolean metrics,
@@ -1244,6 +1244,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         Cache.Entry entry0 = null;
 
+        boolean deferred;
+
+        boolean marked = false;
+
         synchronized (this) {
             checkObsolete();
 
@@ -1349,40 +1353,33 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
 
             cctx.dataStructures().onEntryUpdated(key, true);
-        }
-
-        // Persist outside of synchronization. The correctness of the
-        // value will be handled by current transaction.
-        if (writeThrough)
-            cctx.store().remove(tx, keyValue(false));
 
-        if (cctx.deferredDelete() && !detached() && !isInternal())
-            cctx.onDeferredDelete(this, newVer);
-        else {
-            boolean marked = false;
+            deferred = cctx.deferredDelete() && !detached() && !isInternal();
 
-            synchronized (this) {
+            if (!deferred) {
                 // If entry is still removed.
-                if (newVer == ver) {
-                    if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
-                        if (log.isDebugEnabled())
-                            log.debug("Entry could not be marked obsolete (it is still used): " + this);
-                    }
-                    else {
-                        recordNodeId(affNodeId, topVer);
+                assert newVer == ver;
 
-                        // If entry was not marked obsolete, then removed lock
-                        // will be registered whenever removeLock is called.
-                        cctx.mvcc().addRemoved(cctx, obsoleteVer);
+                if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
+                    if (log.isDebugEnabled())
+                        log.debug("Entry could not be marked obsolete (it is still used): " + this);
+                }
+                else {
+                    recordNodeId(affNodeId, topVer);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Entry was marked obsolete: " + this);
-                    }
+                    if (log.isDebugEnabled())
+                        log.debug("Entry was marked obsolete: " + this);
                 }
             }
+        }
 
-            if (marked)
-                onMarkedObsolete();
+        if (deferred)
+            cctx.onDeferredDelete(this, newVer);
+
+        if (marked) {
+            assert !deferred;
+
+            onMarkedObsolete();
         }
 
         if (intercept)
@@ -4247,4 +4244,4 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             return "IteratorEntry [key=" + key + ']';
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 0960c9d..2c14209 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -24,7 +25,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
@@ -88,7 +89,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private ConcurrentMap<Long, GridCacheExplicitLockSpan> pendingExplicit;
 
     /** Set of removed lock versions. */
-    private Collection<GridCacheVersion> rmvLocks =
+    private GridBoundedConcurrentLinkedHashSet<GridCacheVersion> rmvLocks =
         new GridBoundedConcurrentLinkedHashSet<>(MAX_REMOVED_LOCKS, MAX_REMOVED_LOCKS, 0.75f, 16, PER_SEGMENT_Q);
 
     /** Current local candidates. */
@@ -114,7 +115,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap();
 
     /** Finish futures. */
-    private final Queue<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
+    private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>();
 
     /** Logger. */
     @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
@@ -143,17 +144,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 Collection<? extends GridCacheFuture> futCol = futs.get(owner.version());
 
                 if (futCol != null) {
-                    for (GridCacheFuture fut : futCol) {
-                        if (fut instanceof GridCacheMvccFuture && !fut.isDone()) {
-                            GridCacheMvccFuture<Boolean> mvccFut =
-                                (GridCacheMvccFuture<Boolean>)fut;
-
-                            // Since this method is called outside of entry synchronization,
-                            // we can safely invoke any method on the future.
-                            // Also note that we don't remove future here if it is done.
-                            // The removal is initiated from within future itself.
-                            if (mvccFut.onOwnerChanged(entry, owner))
-                                return;
+                    synchronized (futCol) {
+                        for (GridCacheFuture fut : futCol) {
+                            if (fut instanceof GridCacheMvccFuture && !fut.isDone()) {
+                                GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut;
+
+                                // Since this method is called outside of entry synchronization,
+                                // we can safely invoke any method on the future.
+                                // Also note that we don't remove future here if it is done.
+                                // The removal is initiated from within future itself.
+                                if (mvccFut.onOwnerChanged(entry, owner))
+                                    return;
+                            }
                         }
                     }
                 }
@@ -171,8 +173,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             else if (log.isDebugEnabled())
                 log.debug("Failed to find transaction for changed owner: " + owner);
 
-            for (FinishLockFuture f : finishFuts)
-                f.recheck(entry);
+            if (!finishFuts.isEmptyx()) {
+                for (FinishLockFuture f : finishFuts)
+                    f.recheck(entry);
+            }
         }
 
         /** {@inheritDoc} */
@@ -203,21 +207,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             if (log.isDebugEnabled())
                 log.debug("Processing node left [nodeId=" + discoEvt.eventNode().id() + "]");
 
-            for (Collection<GridCacheFuture<?>> futsCol : futs.values()) {
-                for (GridCacheFuture<?> fut : futsCol) {
-                    if (!fut.trackable()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Skipping non-trackable future: " + fut);
-
-                        continue;
-                    }
-
-                    fut.onNodeLeft(discoEvt.eventNode().id());
-
-                    if (fut.isCancelled() || fut.isDone())
-                        removeFuture(fut);
-                }
-            }
+            for (GridCacheFuture<?> fut : activeFutures())
+                fut.onNodeLeft(discoEvt.eventNode().id());
 
             for (IgniteInternalFuture<?> fut : atomicFuts.values()) {
                 if (fut instanceof GridCacheFuture) {
@@ -272,7 +263,15 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @return Collection of active futures.
      */
     public Collection<GridCacheFuture<?>> activeFutures() {
-        return F.flatCollections(futs.values());
+        ArrayList<GridCacheFuture<?>> col = new ArrayList<>();
+
+        for (Collection<GridCacheFuture<?>> verFuts : futs.values()) {
+            synchronized (verFuts) {
+                col.addAll(verFuts);
+            }
+        }
+
+        return col;
     }
 
     /**
@@ -345,10 +344,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param err Error.
      */
     private void cancelClientFutures(IgniteCheckedException err) {
-        for (Collection<GridCacheFuture<?>> futures : futs.values()) {
-            for (GridCacheFuture<?> future : futures)
-                ((GridFutureAdapter)future).onDone(err);
-        }
+        for (GridCacheFuture<?> fut : activeFutures())
+            ((GridFutureAdapter)fut).onDone(err);
 
         for (GridCacheAtomicFuture<?> future : atomicFuts.values())
             ((GridFutureAdapter)future).onDone(err);
@@ -444,11 +441,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             return true;
 
         while (true) {
-            Collection<GridCacheFuture<?>> old = futs.putIfAbsent(fut.version(),
-                new ConcurrentLinkedDeque8<GridCacheFuture<?>>() {
-                    /** */
-                    private int hash;
+            Collection<GridCacheFuture<?>> old = futs.get(fut.version());
 
+            if (old == null) {
+                Collection<GridCacheFuture<?>> col = new HashSet<GridCacheFuture<?>>(U.capacity(4), 0.75f) {
                     {
                         // Make sure that we add future to queue before
                         // adding queue to the map of futures.
@@ -456,16 +452,16 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                     }
 
                     @Override public int hashCode() {
-                        if (hash == 0)
-                            hash = System.identityHashCode(this);
-
-                        return hash;
+                        return System.identityHashCode(this);
                     }
 
                     @Override public boolean equals(Object obj) {
                         return obj == this;
                     }
-                });
+                };
+
+                old = futs.putIfAbsent(fut.version(), col);
+            }
 
             if (old != null) {
                 boolean empty, dup = false;
@@ -474,10 +470,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                     empty = old.isEmpty();
 
                     if (!empty)
-                        dup = old.contains(fut);
-
-                    if (!empty && !dup)
-                        old.add(fut);
+                        dup = !old.add(fut);
                 }
 
                 // Future is being removed, so we force-remove here and try again.
@@ -594,14 +587,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) {
         Collection<? extends GridCacheFuture> futs = this.futs.get(ver);
 
-        if (futs != null)
-            for (GridCacheFuture<?> fut : futs)
-                if (fut.futureId().equals(futId)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Found future in futures map: " + fut);
+        if (futs != null) {
+            synchronized (futs) {
+                for (GridCacheFuture<?> fut : futs) {
+                    if (fut.futureId().equals(futId)) {
+                        if (log.isDebugEnabled())
+                            log.debug("Found future in futures map: " + fut);
 
-                    return fut;
+                        return fut;
+                    }
                 }
+            }
+        }
 
         if (log.isDebugEnabled())
             log.debug("Failed to find future in futures map [ver=" + ver + ", futId=" + futId + ']');
@@ -619,7 +616,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) {
         Collection c = futs.get(ver);
 
-        return c == null ? Collections.<IgniteInternalFuture<T>>emptyList() : (Collection<IgniteInternalFuture<T>>)c;
+        if (c == null)
+            return Collections.<IgniteInternalFuture<T>>emptyList();
+        else {
+            synchronized (c) {
+                return new ArrayList<>((Collection<IgniteInternalFuture<T>>)c);
+            }
+        }
     }
 
     /**
@@ -949,12 +952,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     @Override public void printMemoryStats() {
         X.println(">>> ");
         X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']');
-        X.println(">>>   rmvLocksSize: " + rmvLocks.size());
+        X.println(">>>   rmvLocksSize: " + rmvLocks.sizex());
         X.println(">>>   dhtLocCandsSize: " + dhtLocCands.size());
         X.println(">>>   lockedSize: " + locked.size());
         X.println(">>>   futsSize: " + futs.size());
         X.println(">>>   near2dhtSize: " + near2dht.size());
-        X.println(">>>   finishFutsSize: " + finishFuts.size());
+        X.println(">>>   finishFutsSize: " + finishFuts.sizex());
     }
 
     /**
@@ -974,9 +977,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
         Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
 
-        for (FinishLockFuture fut : finishFuts) {
-            if (fut.topologyVersion().equals(topVer))
-                cands.putAll(fut.pendingLocks());
+        if (!finishFuts.isEmptyx()) {
+            for (FinishLockFuture fut : finishFuts) {
+                if (fut.topologyVersion().equals(topVer))
+                    cands.putAll(fut.pendingLocks());
+            }
         }
 
         return cands;
@@ -1054,8 +1059,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
      * @param topVer Topology version.
      * @return Future that signals when all locks for given partitions will be released.
      */
-    private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<GridDistributedCacheEntry> filter,
-        AffinityTopologyVersion topVer) {
+    private IgniteInternalFuture<?> finishLocks(
+        @Nullable final IgnitePredicate<GridDistributedCacheEntry> filter,
+        AffinityTopologyVersion topVer
+    ) {
         assert topVer.topologyVersion() != 0;
 
         if (topVer.equals(AffinityTopologyVersion.NONE))
@@ -1069,10 +1076,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         finishFut.listen(new CI1<IgniteInternalFuture<?>>() {
             @Override public void apply(IgniteInternalFuture<?> e) {
                 finishFuts.remove(finishFut);
-
-                // This call is required to make sure that the concurrent queue
-                // clears memory occupied by internal nodes.
-                finishFuts.peek();
             }
         });
 
@@ -1088,8 +1091,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
         if (exchLog.isDebugEnabled())
             exchLog.debug("Rechecking pending locks for completion.");
 
-        for (FinishLockFuture fut : finishFuts)
-            fut.recheck();
+        if (!finishFuts.isEmptyx()) {
+            for (FinishLockFuture fut : finishFuts)
+                fut.recheck();
+        }
     }
 
     /**
@@ -1250,4 +1255,4 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 return S.toString(FinishLockFuture.class, this, super.toString());
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index a138d30..a3eb723 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -403,7 +403,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
 
             doomed = mvcc == null ? null : mvcc.candidate(ver);
 
-            if (doomed == null || doomed.dhtLocal() || (!doomed.local() && !doomed.nearLocal()))
+            if (doomed == null)
                 addRemoved(ver);
 
             GridCacheVersion obsoleteVer = obsoleteVersionExtras();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index 1e78ba2..2d2d935 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -23,13 +23,13 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -87,7 +87,7 @@ public class GridDistributedTxMapping implements Externalizable {
     public GridDistributedTxMapping(ClusterNode node) {
         this.node = node;
 
-        entries = new GridConcurrentLinkedHashSet<>();
+        entries = new LinkedHashSet<>();
     }
 
     /**
@@ -297,7 +297,7 @@ public class GridDistributedTxMapping implements Externalizable {
      */
     private void ensureModifiable() {
         if (readOnly) {
-            entries = new GridConcurrentLinkedHashSet<>(entries);
+            entries = new LinkedHashSet<>(entries);
 
             readOnly = false;
         }
@@ -330,4 +330,4 @@ public class GridDistributedTxMapping implements Externalizable {
     @Override public String toString() {
         return S.toString(GridDistributedTxMapping.class, this, "node", node.id());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index fcbf58d..93303c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -583,13 +583,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 eventNodeId(),
                                                 nodeId,
                                                 false,
-                                                false,
                                                 true,
                                                 true,
                                                 topVer,
                                                 null,
                                                 replicate ? DR_BACKUP : DR_NONE,
-                                                near() ? null : explicitVer, CU.subjectId(this, cctx),
+                                                near() ? null : explicitVer,
+                                                CU.subjectId(this, cctx),
                                                 resolveTaskName(),
                                                 dhtVer);
                                         else {
@@ -629,7 +629,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                             eventNodeId(),
                                             nodeId,
                                             false,
-                                            false,
                                             true,
                                             true,
                                             topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c175b0b..579d701 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -782,14 +782,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             if (log.isDebugEnabled())
                 log.debug("Mapping entry for DHT lock future: " + this);
 
-            boolean hasRmtNodes = false;
-
             // Assign keys to primary nodes.
             for (GridDhtCacheEntry entry : entries) {
                 try {
                     while (true) {
                         try {
-                            hasRmtNodes = cctx.dhtMap(
+                            cctx.dhtMap(
                                 nearNodeId,
                                 topVer,
                                 entry,
@@ -823,9 +821,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
                 }
             }
 
-            if (tx != null)
-                tx.needsCompletedVersions(hasRmtNodes);
-
             if (isDone()) {
                 if (log.isDebugEnabled())
                     log.debug("Mapping won't proceed because future is done: " + this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 4ce4759..3069afd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -62,9 +62,9 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanSet;
-import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -1090,8 +1090,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
                 // We have to add completed versions for cases when nearLocal and remote transactions
                 // execute concurrently.
-                res.completedVersions(ctx.tm().committedVersions(req.version()),
-                    ctx.tm().rolledbackVersions(req.version()));
+                IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(req.version());
+
+                res.completedVersions(versPair.get1(), versPair.get2());
 
                 int i = 0;
 
@@ -1510,8 +1511,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             }
         }
 
-        Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
-        Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+        IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver);
+
+        Collection<GridCacheVersion> committed = versPair.get1();
+        Collection<GridCacheVersion> rolledback = versPair.get2();
 
         // Backups.
         for (Map.Entry<ClusterNode, List<KeyCacheObject>> entry : dhtMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 8c7d985..6de8795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -84,7 +83,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>();
 
     /** Mapped flag. */
-    protected AtomicBoolean mapped = new AtomicBoolean();
+    protected volatile boolean mapped;
 
     /** */
     private long dhtThreadId;
@@ -92,9 +91,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     /** */
     protected boolean explicitLock;
 
-    /** */
-    private boolean needsCompletedVers;
-
     /** Versions of pending locks for entries of this tx. */
     private Collection<GridCacheVersion> pendingVers;
 
@@ -141,20 +137,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         int taskNameHash
     ) {
         super(
-            cctx, 
-            xidVer, 
-            implicit, 
-            implicitSingle, 
-            sys, 
-            plc, 
-            concurrency, 
-            isolation, 
-            timeout, 
+            cctx,
+            xidVer,
+            implicit,
+            implicitSingle,
+            sys,
+            plc,
+            concurrency,
+            isolation,
+            timeout,
             invalidate,
             storeEnabled,
             onePhaseCommit,
-            txSize, 
-            subjId, 
+            txSize,
+            subjId,
             taskNameHash
         );
 
@@ -244,16 +240,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      */
     protected abstract void sendFinishReply(boolean commit, @Nullable Throwable err);
 
-    /**
-     * @param needsCompletedVers {@code True} if needs completed versions.
-     */
-    public void needsCompletedVersions(boolean needsCompletedVers) {
-        this.needsCompletedVers |= needsCompletedVers;
-    }
-
     /** {@inheritDoc} */
     @Override public boolean needsCompletedVersions() {
-        return needsCompletedVers;
+        return nearOnOriginatingNode;
     }
 
     /**
@@ -281,10 +270,10 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * Map explicit locks.
      */
     protected void mapExplicitLocks() {
-        if (!mapped.get()) {
+        if (!mapped) {
             // Explicit locks may participate in implicit transactions only.
             if (!implicit()) {
-                mapped.set(true);
+                mapped = true;
 
                 return;
             }
@@ -343,7 +332,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
             if (!F.isEmpty(nearEntryMap))
                 addNearNodeEntryMapping(nearEntryMap);
 
-            mapped.set(true);
+            mapped = true;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 61975d7..1d6f633 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -58,10 +60,10 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.util.F0;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -177,7 +179,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
     /** Keys that should be locked. */
     @GridToStringInclude
-    private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+    private final Set<IgniteTxKey> lockKeys = new HashSet<>();
 
     /** Force keys future for correct transforms. */
     private IgniteInternalFuture<?> forceKeysFut;
@@ -267,7 +269,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         if (log.isDebugEnabled())
             log.debug("Transaction future received owner changed callback: " + entry);
 
-        boolean rmv = lockKeys.remove(entry.txKey());
+        boolean rmv;
+
+        synchronized (lockKeys) {
+            rmv = lockKeys.remove(entry.txKey());
+        }
 
         return rmv && mapIfLocked();
     }
@@ -293,7 +299,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @return {@code True} if all locks are owned.
      */
     private boolean checkLocks() {
-        return locksReady && lockKeys.isEmpty();
+        if (!locksReady)
+            return false;
+
+        synchronized (lockKeys) {
+            return lockKeys.isEmpty();
+        }
     }
 
     /** {@inheritDoc} */
@@ -495,8 +506,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 txEntry.cached(entry);
             }
 
-            if (tx.optimistic() && txEntry.explicitVersion() == null)
-                lockKeys.add(txEntry.txKey());
+            if (tx.optimistic() && txEntry.explicitVersion() == null) {
+                synchronized (lockKeys) {
+                    lockKeys.add(txEntry.txKey());
+                }
+            }
 
             while (true) {
                 try {
@@ -689,7 +703,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
             GridCacheVersion min = tx.minVersion();
 
-            res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min));
+            if (tx.needsCompletedVersions()) {
+                IgnitePair<Collection<GridCacheVersion>> versPair = cctx.tm().versions(min);
+
+                res.completedVersions(versPair.get1(), versPair.get2());
+            }
 
             res.pending(localDhtPendingVersions(tx.writeEntries(), min));
 
@@ -987,21 +1005,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 if (err0 != null) {
                     err.compareAndSet(null, err0);
 
+                    tx.rollbackAsync();
+
                     final GridNearTxPrepareResponse res = createPrepareResponse(err.get());
 
-                    tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                        @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
-                            if (GridDhtTxPrepareFuture.super.onDone(res, res.error())) {
-                                try {
-                                    if (replied.compareAndSet(false, true))
-                                        sendPrepareResponse(res);
-                                }
-                                catch (IgniteCheckedException e) {
-                                    U.error(log, "Failed to send prepare response for transaction: " + tx, e);
-                                }
-                            }
-                        }
-                    });
+                    onDone(res, res.error());
 
                     return;
                 }
@@ -1017,20 +1025,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>();
                 Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>();
 
-                boolean hasRemoteNodes = false;
-
                 // Assign keys to primary nodes.
                 if (!F.isEmpty(writes)) {
                     for (IgniteTxEntry write : writes)
-                        hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap);
+                        map(tx.entry(write.txKey()), futDhtMap, futNearMap);
                 }
 
                 if (!F.isEmpty(reads)) {
                     for (IgniteTxEntry read : reads)
-                        hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap);
+                        map(tx.entry(read.txKey()), futDhtMap, futNearMap);
                 }
-
-                tx.needsCompletedVersions(hasRemoteNodes);
             }
 
             if (isDone())
@@ -1223,15 +1227,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param entry Transaction entry.
      * @param futDhtMap DHT mapping.
      * @param futNearMap Near mapping.
-     * @return {@code True} if mapped.
      */
-    private boolean map(
+    private void map(
         IgniteTxEntry entry,
         Map<UUID, GridDistributedTxMapping> futDhtMap,
         Map<UUID, GridDistributedTxMapping> futNearMap
     ) {
         if (entry.cached().isLocal())
-            return false;
+            return;
 
         GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
@@ -1247,8 +1250,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             entry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
         }
 
-        boolean ret;
-
         while (true) {
             try {
                 Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
@@ -1272,10 +1273,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     log.debug("Entry has no near readers: " + entry);
 
                 // Exclude local node.
-                ret = map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap);
+                map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap);
 
                 // Exclude DHT nodes.
-                ret |= map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap);
+                map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap);
 
                 break;
             }
@@ -1285,8 +1286,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 entry.cached(cached);
             }
         }
-
-        return ret;
     }
 
     /**
@@ -1294,16 +1293,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param nodes Nodes.
      * @param globalMap Map.
      * @param locMap Exclude map.
-     * @return {@code True} if mapped.
      */
-    private boolean map(
+    private void map(
         IgniteTxEntry entry,
         Iterable<ClusterNode> nodes,
         Map<UUID, GridDistributedTxMapping> globalMap,
         Map<UUID, GridDistributedTxMapping> locMap
     ) {
-        boolean ret = false;
-
         if (nodes != null) {
             for (ClusterNode n : nodes) {
                 GridDistributedTxMapping global = globalMap.get(n.id());
@@ -1332,12 +1328,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     locMap.put(n.id(), loc = new GridDistributedTxMapping(n));
 
                 loc.add(entry);
-
-                ret = true;
             }
         }
-
-        return ret;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index f8be2a7..e268a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -112,19 +113,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         int taskNameHash
     ) {
         super(
-            ctx, 
-            nodeId, 
-            rmtThreadId, 
-            xidVer, 
-            commitVer, 
+            ctx,
+            nodeId,
+            rmtThreadId,
+            xidVer,
+            commitVer,
             sys,
             plc,
-            concurrency, 
-            isolation, 
-            invalidate, 
-            timeout, 
+            concurrency,
+            isolation,
+            invalidate,
+            timeout,
             txSize,
-            subjId, 
+            subjId,
             taskNameHash
         );
 
@@ -138,7 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
 
         readMap = Collections.emptyMap();
 
-        writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+        writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1);
 
         topologyVersion(topVer);
     }
@@ -183,19 +184,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         int taskNameHash
     ) {
         super(
-            ctx, 
-            nodeId, 
-            rmtThreadId, 
-            xidVer, 
-            commitVer, 
+            ctx,
+            nodeId,
+            rmtThreadId,
+            xidVer,
+            commitVer,
             sys,
             plc,
-            concurrency, 
-            isolation, 
-            invalidate, 
-            timeout, 
+            concurrency,
+            isolation,
+            invalidate,
+            timeout,
             txSize,
-            subjId, 
+            subjId,
             taskNameHash
         );
 
@@ -207,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
         this.rmtFutId = rmtFutId;
 
         readMap = Collections.emptyMap();
-        writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f);
+        writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1);
 
         topologyVersion(topVer);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4cd9e84..7f9edb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index f03b461..83c220d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
@@ -688,8 +689,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             if (map == null || map.isEmpty())
                 return;
 
-            Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
-            Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+            IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver);
+
+            Collection<GridCacheVersion> committed = versPair.get1();
+            Collection<GridCacheVersion> rolledback = versPair.get2();
 
             for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
                 ClusterNode n = mapping.getKey();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index af43113..0002180 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.List;
+import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
@@ -55,7 +57,6 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -293,7 +294,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
 
         txMapping = new GridDhtTxMapping();
 
-        ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>();
+        Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
 
         if (!F.isEmpty(writes)) {
             for (int cacheId : tx.activeCacheIds()) {
@@ -353,7 +354,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
      *
      * @param mappings Queue of mappings.
      */
-    private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) {
+    private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) {
         if (isDone())
             return;
 
@@ -556,7 +557,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
         private AtomicBoolean rcvRes = new AtomicBoolean(false);
 
         /** Mappings to proceed prepare. */
-        private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
+        private Queue<GridDistributedTxMapping> mappings;
 
         /**
          * @param m Mapping.
@@ -564,7 +565,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
          */
         MiniFuture(
             GridDistributedTxMapping m,
-            ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
+            Queue<GridDistributedTxMapping> mappings
         ) {
             this.m = m;
             this.mappings = mappings;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 0e8aa0d..5ab85b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -712,8 +713,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
             if (map == null || map.isEmpty())
                 return;
 
-            Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
-            Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+            IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver);
+
+            Collection<GridCacheVersion> committed = versPair.get1();
+            Collection<GridCacheVersion> rolledback = versPair.get2();
 
             for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
                 ClusterNode n = mapping.getKey();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 46c9f3e..a9dbda2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -88,15 +87,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     /** Commit flag. */
     private boolean commit;
 
-    /** Error. */
-    private AtomicReference<Throwable> err = new AtomicReference<>(null);
-
     /** Node mappings. */
-    private ConcurrentMap<UUID, GridDistributedTxMapping> mappings;
+    private Map<UUID, GridDistributedTxMapping> mappings;
 
     /** Trackable flag. */
     private boolean trackable = true;
 
+    /** */
+    private boolean finishOnePhaseCalled;
+
     /**
      * @param cctx Context.
      * @param tx Transaction.
@@ -176,38 +175,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
-     * @param e Error.
-     */
-    void onError(Throwable e) {
-        tx.commitError(e);
-
-        if (err.compareAndSet(null, e)) {
-            boolean marked = tx.setRollbackOnly();
-
-            if (e instanceof IgniteTxRollbackCheckedException) {
-                if (marked) {
-                    try {
-                        tx.rollback();
-                    }
-                    catch (IgniteCheckedException ex) {
-                        U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
-                    }
-                }
-            }
-            else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error.
-                try {
-                    tx.close();
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(log, "Failed to invalidate transaction: " + tx, ex);
-                }
-            }
-
-            onComplete();
-        }
-    }
-
-    /**
      * @param nodeId Sender.
      * @param res Result.
      */
@@ -247,24 +214,56 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
     /** {@inheritDoc} */
     @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) {
-        if ((initialized() || err != null)) {
-            if (tx.needCheckBackup()) {
-                assert tx.onePhaseCommit();
+        if (isDone())
+            return false;
 
-                if (err != null)
-                    err = new TransactionRollbackException("Failed to commit transaction.", err);
+        synchronized (this) {
+            if (isDone())
+                return false;
 
-                try {
-                    tx.finish(err == null);
+            if (err != null) {
+                tx.commitError(err);
+
+                boolean marked = tx.setRollbackOnly();
+
+                if (err instanceof IgniteTxRollbackCheckedException) {
+                    if (marked) {
+                        try {
+                            tx.rollback();
+                        }
+                        catch (IgniteCheckedException ex) {
+                            U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
+                        }
+                    }
                 }
-                catch (IgniteCheckedException e) {
-                    if (err != null)
-                        err.addSuppressed(e);
-                    else
-                        err = e;
+                else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error.
+                    try {
+                        tx.close();
+                    }
+                    catch (IgniteCheckedException ex) {
+                        U.error(log, "Failed to invalidate transaction: " + tx, ex);
+                    }
                 }
             }
 
+            if (initialized() || err != null) {
+                if (tx.needCheckBackup()) {
+                    assert tx.onePhaseCommit();
+
+                    if (err != null)
+                        err = new TransactionRollbackException("Failed to commit transaction.", err);
+
+                    try {
+                        tx.finish(err == null);
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (err != null)
+                            err.addSuppressed(e);
+                        else
+                            err = e;
+                    }
+                }
+
             if (tx.onePhaseCommit()) {
                 boolean commit = this.commit && err == null;
 
@@ -273,36 +272,35 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 tx.tmFinish(commit);
             }
 
-            Throwable th = this.err.get();
-
-            if (super.onDone(tx0, th != null ? th : err)) {
-                if (error() instanceof IgniteTxHeuristicCheckedException) {
-                    AffinityTopologyVersion topVer = tx.topologyVersion();
+                if (super.onDone(tx0, err)) {
+                    if (error() instanceof IgniteTxHeuristicCheckedException) {
+                        AffinityTopologyVersion topVer = tx.topologyVersion();
 
-                    for (IgniteTxEntry e : tx.writeMap().values()) {
-                        GridCacheContext cacheCtx = e.context();
+                        for (IgniteTxEntry e : tx.writeMap().values()) {
+                            GridCacheContext cacheCtx = e.context();
 
-                        try {
-                            if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) {
-                                GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
+                            try {
+                                if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) {
+                                    GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key());
 
-                                if (entry != null)
-                                    entry.invalidate(null, tx.xidVersion());
+                                    if (entry != null)
+                                        entry.invalidate(null, tx.xidVersion());
+                                }
                             }
-                        }
-                        catch (Throwable t) {
-                            U.error(log, "Failed to invalidate entry.", t);
+                            catch (Throwable t) {
+                                U.error(log, "Failed to invalidate entry.", t);
 
-                            if (t instanceof Error)
-                                throw (Error)t;
+                                if (t instanceof Error)
+                                    throw (Error)t;
+                            }
                         }
                     }
-                }
 
-                // Don't forget to clean up.
-                cctx.mvcc().removeFuture(this);
+                    // Don't forget to clean up.
+                    cctx.mvcc().removeFuture(this);
 
-                return true;
+                    return true;
+                }
             }
         }
 
@@ -321,7 +319,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      * Completeness callback.
      */
     private void onComplete() {
-        onDone(tx, err.get());
+        onDone(tx);
     }
 
     /**
@@ -354,7 +352,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                 markInitialized();
 
-                if (!isSync()) {
+                if (!isSync() && !isDone()) {
                     boolean complete = true;
 
                     for (IgniteInternalFuture<?> f : pending())
@@ -367,15 +365,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 }
             }
             else
-                onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
+                onDone(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx)));
         }
         catch (Error | RuntimeException e) {
-            onError(e);
+            onDone(e);
 
             throw e;
         }
         catch (IgniteCheckedException e) {
-            onError(e);
+            onDone(e);
         }
     }
 
@@ -415,7 +413,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                         "(backup has left grid): " + tx.xidVersion(), cause));
                 }
                 else if (backup.isLocal()) {
-                    boolean committed = cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion());
+                    boolean committed = !cctx.tm().addRolledbackTx(tx);
 
                     readyNearMappingFromBackup(mapping);
 
@@ -515,6 +513,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      * @param commit Commit flag.
      */
     private void finishOnePhase(boolean commit) {
+        assert Thread.holdsLock(this);
+
+        if (finishOnePhaseCalled)
+            return;
+
+        finishOnePhaseCalled = true;
+
         // No need to send messages as transaction was already committed on remote node.
         // Finish local mapping only as we need send commit message to backups.
         for (GridDistributedTxMapping m : mappings.values()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 883c285..db4a4b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
@@ -88,7 +87,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     private static final long serialVersionUID = 0L;
 
     /** DHT mappings. */
-    private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
+    private Map<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>();
 
     /** Future. */
     @GridToStringExclude
@@ -217,7 +216,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** {@inheritDoc} */
     @Override protected IgniteInternalFuture<Boolean> addReader(
-        long msgId, 
+        long msgId,
         GridDhtCacheEntry cached,
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer
@@ -472,7 +471,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     /**
      * @return DHT map.
      */
-    ConcurrentMap<UUID, GridDistributedTxMapping> mappings() {
+    Map<UUID, GridDistributedTxMapping> mappings() {
         return mappings;
     }
 
@@ -798,14 +797,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                 catch (Error | RuntimeException e) {
                     commitErr.compareAndSet(null, e);
 
-                    fut0.onError(e);
+                    fut0.onDone(e);
 
                     throw e;
                 }
                 catch (IgniteCheckedException e) {
                     commitErr.compareAndSet(null, e);
 
-                    fut0.onError(e);
+                    fut0.onDone(e);
                 }
             }
         });
@@ -1152,8 +1151,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /** {@inheritDoc} */
     @Override protected GridCacheEntryEx entryEx(
-        GridCacheContext cacheCtx, 
-        IgniteTxKey key, 
+        GridCacheContext cacheCtx,
+        IgniteTxKey key,
         AffinityTopologyVersion topVer
     ) {
         if (cacheCtx.isColocated()) {
@@ -1245,7 +1244,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     @Override public void onRemap(AffinityTopologyVersion topVer) {
         assert cctx.kernalContext().clientNode();
 
-        mapped.set(false);
+        mapped = false;
         nearLocallyMapped = false;
         colocatedLocallyMapped = false;
         txNodes = null;
@@ -1254,7 +1253,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         dhtMap.clear();
         mappings.clear();
 
-        this.topVer.set(topVer);
+        synchronized (this) {
+            this.topVer = topVer;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 20fb8c2..94af6bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -536,9 +536,8 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
 
     /**
      * @param commitVer Commit version.
-     * @return {@code True} if version was set.
      */
-    public boolean commitVersion(GridCacheVersion commitVer);
+    public void commitVersion(GridCacheVersion commitVer);
 
     /**
      * @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)


Mime
View raw message