ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [02/51] incubator-ignite git commit: IGNITE-674 - Merging 6.6.4 fixes to Ignite.
Date Fri, 10 Apr 2015 12:12:13 GMT
IGNITE-674 - Merging 6.6.4 fixes to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ab01e7dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ab01e7dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ab01e7dd

Branch: refs/heads/ignite-709
Commit: ab01e7dd37803a5a39b71d58429bccae79def1a4
Parents: 7e86251
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Fri Apr 3 15:23:38 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Fri Apr 3 15:23:38 2015 -0700

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   1 -
 .../processors/cache/GridCacheContext.java      |  58 ++++-
 .../cache/GridCacheEvictionManager.java         |   6 +-
 .../cache/GridCacheExplicitLockSpan.java        |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |  28 ++-
 .../processors/cache/GridCacheMvcc.java         |   2 +-
 .../cache/GridCacheMvccCandidate.java           |  50 ++++-
 .../processors/cache/GridCacheMvccManager.java  |  44 +++-
 .../distributed/GridDistributedCacheEntry.java  |  12 +-
 .../distributed/dht/GridDhtCacheEntry.java      |  31 ++-
 .../distributed/dht/GridDhtLockFuture.java      |  15 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  14 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |  22 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   2 +
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  30 ++-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  28 ++-
 .../dht/colocated/GridDhtColocatedCache.java    |  32 ++-
 .../colocated/GridDhtColocatedLockFuture.java   |  46 +++-
 .../GridDhtPartitionsExchangeFuture.java        |  39 +++-
 .../distributed/near/GridNearCacheEntry.java    |  10 +-
 .../distributed/near/GridNearLockFuture.java    |  11 +-
 .../near/GridNearTxFinishFuture.java            |   8 +-
 .../cache/distributed/near/GridNearTxLocal.java |   7 +
 .../near/GridNearTxPrepareFuture.java           | 122 +++++++----
 .../near/GridNearTxPrepareRequest.java          |  64 ++++--
 .../cache/transactions/IgniteTxAdapter.java     |   3 +-
 .../cache/transactions/IgniteTxHandler.java     | 120 +++++++---
 .../transactions/IgniteTxLocalAdapter.java      |  76 ++++---
 .../cache/transactions/IgniteTxManager.java     |   7 +
 .../util/future/GridCompoundFuture.java         |   1 +
 .../internal/util/future/GridFutureAdapter.java |  18 +-
 .../ignite/internal/util/worker/GridWorker.java |   6 +-
 .../dht/IgniteCacheLockFailoverSelfTest.java    | 148 +++++++++++++
 .../dht/IgniteCacheMultiTxLockSelfTest.java     | 217 +++++++++++++++++++
 .../testframework/junits/GridAbstractTest.java  |   4 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   3 +
 36 files changed, 1052 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 9551680..f75bcf4 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.codegen;
 
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index dfa82e6..6c3bd60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1416,6 +1416,7 @@ public class GridCacheContext<K, V> implements Externalizable {
         UUID nearNodeId,
         AffinityTopologyVersion topVer,
         GridDhtCacheEntry entry,
+        GridCacheVersion explicitLockVer,
         IgniteLogger log,
         Map<ClusterNode, List<GridDhtCacheEntry>> dhtMap,
         @Nullable Map<ClusterNode, List<GridDhtCacheEntry>> nearMap
@@ -1431,6 +1432,8 @@ public class GridCacheContext<K, V> implements Externalizable {
 
         boolean ret = map(entry, dhtRemoteNodes, dhtMap);
 
+        Collection<ClusterNode> nearRemoteNodes = null;
+
         if (nearMap != null) {
             Collection<UUID> readers = entry.readers();
 
@@ -1445,8 +1448,21 @@ public class GridCacheContext<K, V> implements Externalizable {
             else if (log.isDebugEnabled())
                 log.debug("Entry has no near readers: " + entry);
 
-            if (nearNodes != null && !nearNodes.isEmpty())
-                ret |= map(entry, F.view(nearNodes, F.notIn(dhtNodes)), nearMap);
+            if (nearNodes != null && !nearNodes.isEmpty()) {
+                nearRemoteNodes = F.view(nearNodes, F.notIn(dhtNodes));
+
+                ret |= map(entry, nearRemoteNodes, nearMap);
+            }
+        }
+
+        if (explicitLockVer != null) {
+            Collection<ClusterNode> dhtNodeIds = new ArrayList<>(dhtRemoteNodes);
+            Collection<ClusterNode> nearNodeIds = F.isEmpty(nearRemoteNodes) ? null : new ArrayList<>(nearRemoteNodes);
+
+            if (!F.isEmpty(nearNodeIds))
+                U.dumpStack("Added near mapped nodes: " + entry + ", " + nearNodeIds);
+
+            entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds);
         }
 
         return ret;
@@ -1454,6 +1470,44 @@ public class GridCacheContext<K, V> implements Externalizable {
 
     /**
      * @param entry Entry.
+     * @param log Log.
+     * @param dhtMap Dht mappings.
+     * @param nearMap Near mappings.
+     * @return {@code True} if mapped.
+     * @throws GridCacheEntryRemovedException If reader for entry is removed.
+     */
+    public boolean dhtMap(
+        GridDhtCacheEntry entry,
+        GridCacheVersion explicitLockVer,
+        IgniteLogger log,
+        Map<ClusterNode, List<GridDhtCacheEntry>> dhtMap,
+        Map<ClusterNode, List<GridDhtCacheEntry>> nearMap
+    ) throws GridCacheEntryRemovedException {
+        assert explicitLockVer != null;
+
+        GridCacheMvccCandidate cand = entry.candidate(explicitLockVer);
+
+        if (cand != null) {
+            Collection<ClusterNode> dhtNodes = cand.mappedDhtNodes();
+
+            if (log.isDebugEnabled())
+                log.debug("Mapping explicit lock to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
+
+            Collection<ClusterNode> nearNodes = cand.mappedNearNodes();
+
+            boolean ret = map(entry, dhtNodes, dhtMap);
+
+            if (nearNodes != null && !nearNodes.isEmpty())
+                ret |= map(entry, nearNodes, nearMap);
+
+            return ret;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param entry Entry.
      * @param nodes Nodes.
      * @param map Map.
      * @return {@code True} if mapped.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index fd56568..b0c9243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -253,11 +253,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
         if (plcEnabled && evictSync && !cctx.isNear()) {
             // Add dummy event to worker.
-            ClusterNode locNode = cctx.localNode();
-
-            DiscoveryEvent evt = new DiscoveryEvent(locNode, "Dummy event.", EVT_NODE_JOINED, locNode);
-
-            evt.topologySnapshot(locNode.order(), cctx.discovery().topology(locNode.order()));
+            DiscoveryEvent evt = cctx.discovery().localJoinEvent();
 
             backupWorker.addEvent(evt);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java
index 242e769..0e555c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java
@@ -45,7 +45,7 @@ public class GridCacheExplicitLockSpan extends ReentrantLock {
     private final Map<KeyCacheObject, Deque<GridCacheMvccCandidate>> cands = new HashMap<>();
 
     /** Span lock release future. */
-    @GridToStringInclude
+    @GridToStringExclude
     private final GridFutureAdapter<Object> releaseFut = new GridFutureAdapter<>();
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 542d0ad..76fd69b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2623,7 +2623,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
         ttlAndExpireTimeExtras(ttl, expireTime);
 
-        if (expireTime != 0 && expireTime != oldExpireTime && cctx.config().isEagerTtl())
+        if (expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
             cctx.ttl().addTrackedEntry(this);
 
         this.ver = ver;
@@ -3508,26 +3508,32 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
 
         try {
             synchronized (this) {
-                CacheObject expiredVal = val;
+                CacheObject expiredVal = saveValueForIndexUnlocked();
 
                 boolean hasOldBytes = valPtr != 0;
 
                 boolean expired = checkExpired();
 
                 if (expired) {
-                    if (cctx.deferredDelete() && !detached() && !isInternal()) {
-                        if (!deletedUnlocked()) {
-                            update(null, 0L, 0L, ver);
+                    if (!obsolete()) {
+                        if (cctx.deferredDelete() && !detached() && !isInternal()) {
+                            if (!deletedUnlocked()) {
+                                update(null, 0L, 0L, ver);
 
-                            deletedUnlocked(true);
+                                deletedUnlocked(true);
 
-                            deferred = true;
+                                deferred = true;
+                            }
+                        }
+                        else {
+                            if (markObsolete0(obsoleteVer, true))
+                                obsolete = true; // Success, will return "true".
                         }
                     }
-                    else {
-                        if (markObsolete0(obsoleteVer, true))
-                            obsolete = true; // Success, will return "true".
-                    }
+
+                    clearIndex(expiredVal);
+
+                    releaseSwap();
 
                     if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
                         cctx.events().addEvent(partition(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index b0f5b2e..83a108f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -1014,7 +1014,7 @@ public final class GridCacheMvcc {
             for (Iterator<GridCacheMvccCandidate> it = rmts.iterator(); it.hasNext(); ) {
                 GridCacheMvccCandidate cand = it.next();
 
-                if (!cand.tx() && nodeId.equals(cand.nodeId())) {
+                if (!cand.tx() && (nodeId.equals(cand.nodeId()) || nodeId.equals(cand.otherNodeId()))) {
                     cand.setUsed(); // Mark as used to be consistent.
                     cand.setRemoved();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
index c5f26cb..894e465 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
@@ -94,6 +96,14 @@ public class GridCacheMvccCandidate implements Externalizable,
     /** Other lock version (near version vs dht version). */
     private transient GridCacheVersion otherVer;
 
+    /** Mapped DHT node IDs. */
+    @GridToStringInclude
+    private transient volatile Collection<ClusterNode> mappedDhtNodes;
+
+    /** Mapped near node IDs. */
+    @GridToStringInclude
+    private transient volatile Collection<ClusterNode> mappedNearNodes;
+
     /** Owned lock version by the moment this candidate was added. */
     @GridToStringInclude
     private transient volatile GridCacheVersion ownerVer;
@@ -229,13 +239,6 @@ public class GridCacheMvccCandidate implements Externalizable,
     }
 
     /**
-     * @return {@code True} if has reentry.
-     */
-    public boolean hasReentry() {
-        return reentry != null;
-    }
-
-    /**
      * @return Removed reentry candidate or {@code null}.
      */
     @Nullable public GridCacheMvccCandidate unenter() {
@@ -282,6 +285,39 @@ public class GridCacheMvccCandidate implements Externalizable,
     }
 
     /**
+     * @return Mapped node IDs.
+     */
+    public Collection<ClusterNode> mappedDhtNodes() {
+        return mappedDhtNodes;
+    }
+
+    /**
+     * @return Mapped node IDs.
+     */
+    public Collection<ClusterNode> mappedNearNodes() {
+        return mappedNearNodes;
+    }
+
+    /**
+     * @param mappedDhtNodes Mapped DHT node IDs.
+     */
+    public void mappedNodeIds(Collection<ClusterNode> mappedDhtNodes, Collection<ClusterNode> mappedNearNodes) {
+        this.mappedDhtNodes = mappedDhtNodes;
+        this.mappedNearNodes = mappedNearNodes;
+    }
+
+    /**
+     * @param node Node to remove.
+     */
+    public void removeMappedNode(ClusterNode node) {
+        if (mappedDhtNodes.contains(node))
+            mappedDhtNodes = new ArrayList<>(F.view(mappedDhtNodes, F.notEqualTo(node)));
+
+        if (mappedNearNodes != null && mappedNearNodes.contains(node))
+            mappedNearNodes = new ArrayList<>(F.view(mappedNearNodes, F.notEqualTo(node)));
+    }
+
+    /**
      * @return Near version.
      */
     public GridCacheVersion otherVersion() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 081ea78..a569e56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -174,17 +174,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
             if (log.isDebugEnabled())
                 log.debug("Processing node left [nodeId=" + discoEvt.eventNode().id() + "]");
 
-            for (GridDistributedCacheEntry entry : locked()) {
-                try {
-                    entry.removeExplicitNodeLocks(discoEvt.eventNode().id());
-                }
-                catch (GridCacheEntryRemovedException ignore) {
-                    if (log.isDebugEnabled())
-                        log.debug("Attempted to remove node locks from removed entry in mvcc manager " +
-                            "disco callback (will ignore): " + entry);
-                }
-            }
-
             for (Collection<GridCacheFuture<?>> futsCol : futs.values()) {
                 for (GridCacheFuture<?> fut : futsCol) {
                     if (!fut.trackable()) {
@@ -239,6 +228,39 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Collection of pending explicit locks.
+     */
+    public Collection<GridCacheExplicitLockSpan> activeExplicitLocks() {
+        return pendingExplicit.values();
+    }
+
+    /**
+     * @return Collection of active futures.
+     */
+    public Collection<GridCacheFuture<?>> activeFutures() {
+        return F.flatCollections(futs.values());
+    }
+
+    /**
+     * @param leftNodeId Left node ID.
+     * @param topVer Topology version.
+     */
+    public void removeExplicitNodeLocks(UUID leftNodeId, AffinityTopologyVersion topVer) {
+        for (GridDistributedCacheEntry entry : locked()) {
+            try {
+                entry.removeExplicitNodeLocks(leftNodeId);
+
+                entry.context().evicts().touch(entry, topVer);
+            }
+            catch (GridCacheEntryRemovedException ignore) {
+                if (log.isDebugEnabled())
+                    log.debug("Attempted to remove node locks from removed entry in mvcc manager " +
+                        "disco callback (will ignore): " + entry);
+            }
+        }
+    }
+
+    /**
      * @param from From version.
      * @param to To version.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index a1d83d5..9fb9f46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -316,8 +316,11 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
 
                 refreshRemotes();
 
-                if (emptyAfter)
+                if (emptyAfter) {
                     mvccExtras(null);
+
+                    onUnlock();
+                }
             }
         }
 
@@ -326,6 +329,13 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
     }
 
     /**
+     *
+     */
+    public void onUnlock() {
+        // No-op.
+    }
+
+    /**
      * Unlocks local lock.
      *
      * @return Removed candidate, or <tt>null</tt> if thread still holds the lock.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index e6d5173..7762763 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -297,7 +297,9 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
     /**
      * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
      */
-    public void onUnlock() {
+    @Override public void onUnlock() {
+        super.onUnlock();
+
         locPart.onUnlock();
     }
 
@@ -644,13 +646,34 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
      * @return Candidate, if one existed for the version, or {@code null} if candidate was not found.
      * @throws GridCacheEntryRemovedException If removed.
      */
-    @Nullable public synchronized GridCacheMvccCandidate mappings(GridCacheVersion ver)
-        throws GridCacheEntryRemovedException {
+    @Nullable public synchronized GridCacheMvccCandidate mappings(
+        GridCacheVersion ver,
+        Collection<ClusterNode> dhtNodeIds,
+        Collection<ClusterNode> nearNodeIds
+    ) throws GridCacheEntryRemovedException {
         checkObsolete();
 
         GridCacheMvcc mvcc = mvccExtras();
 
-        return mvcc == null ? null : mvcc.candidate(ver);
+        GridCacheMvccCandidate cand = mvcc == null ? null : mvcc.candidate(ver);
+
+        if (cand != null)
+            cand.mappedNodeIds(dhtNodeIds, nearNodeIds);
+
+        return cand;
+    }
+
+    /**
+     * @param ver Version.
+     * @param mappedNode Mapped node to remove.
+     */
+    public synchronized void removeMapping(GridCacheVersion ver, ClusterNode mappedNode) {
+        GridCacheMvcc mvcc = mvccExtras();
+
+        GridCacheMvccCandidate cand = mvcc == null ? null : mvcc.candidate(ver);
+
+        if (cand != null)
+            cand.removeMappedNode(mappedNode);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 7e93946..9898cf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -188,7 +188,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
             lockVer = cctx.mvcc().mappedVersion(nearLockVer);
 
             if (lockVer == null)
-                lockVer = cctx.versions().onReceivedAndNext(nearNodeId, nearLockVer);
+                lockVer = nearLockVer;
         }
 
         futId = IgniteUuid.randomUuid();
@@ -749,9 +749,16 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
                 try {
                     while (true) {
                         try {
-                            hasRmtNodes = cctx.dhtMap(nearNodeId, topVer, entry, log, dhtMap, null);
+                            hasRmtNodes = cctx.dhtMap(
+                                nearNodeId,
+                                topVer,
+                                entry,
+                                tx == null ? lockVer : null,
+                                log,
+                                dhtMap,
+                                null);
 
-                            GridCacheMvccCandidate cand = entry.mappings(lockVer);
+                            GridCacheMvccCandidate cand = entry.candidate(lockVer);
 
                             // Possible in case of lock cancellation.
                             if (cand == null) {
@@ -1097,6 +1104,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
 
                             if (tx != null)
                                 tx.removeDhtMapping(node.id(), entry);
+                            else
+                                entry.removeMapping(lockVer, node);
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 737e3ed..78bf4a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -795,6 +795,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                     req.implicitTx(),
                                     req.implicitSingleTx(),
                                     ctx.system(),
+                                    false,
                                     ctx.ioPolicy(),
                                     PESSIMISTIC,
                                     req.isolation(),
@@ -1165,13 +1166,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                 while (true) {
                     GridDistributedCacheEntry entry = peekExx(key);
 
-                    boolean created = false;
-
-                    if (entry == null) {
-                        entry = entryExx(key);
-
-                        created = true;
-                    }
+                    if (entry == null)
+                        // Nothing to unlock.
+                        break;
 
                     try {
                         entry.doneRemote(
@@ -1195,9 +1192,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                                     "(added to cancelled locks set): " + req);
                         }
 
-                        if (created && entry.markObsolete(req.version()))
-                            removeEntry(entry);
-
                         ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
 
                         break;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 728145a..8e8a0a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -243,13 +243,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
     }
 
     /**
-     * Completes this future.
-     */
-    void complete() {
-        onComplete();
-    }
-
-    /**
      * Initializes future.
      */
     public void finish() {
@@ -279,10 +272,13 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
         if (tx.onePhaseCommit())
             return false;
 
-        boolean res = false;
-
         boolean sync = commit ? tx.syncCommit() : tx.syncRollback();
 
+        if (tx.explicitLock())
+            sync = true;
+
+        boolean res = false;
+
         // Create mini futures.
         for (GridDistributedTxMapping dhtMapping : dhtMap.values()) {
             ClusterNode n = dhtMapping.node();
@@ -313,8 +309,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.system(),
                 tx.ioPolicy(),
                 tx.isSystemInvalidate(),
-                tx.syncCommit(),
-                tx.syncRollback(),
+                sync,
+                sync,
                 tx.completedBase(),
                 tx.committedVersions(),
                 tx.rolledbackVersions(),
@@ -365,8 +361,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                     tx.system(),
                     tx.ioPolicy(),
                     tx.isSystemInvalidate(),
-                    tx.syncCommit(),
-                    tx.syncRollback(),
+                    sync,
+                    sync,
                     tx.completedBase(),
                     tx.committedVersions(),
                     tx.rolledbackVersions(),

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 00e8e3e..f3266df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -107,6 +107,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         boolean implicit,
         boolean implicitSingle,
         boolean sys,
+        boolean explicitLock,
         GridIoPolicy plc,
         TransactionConcurrency concurrency,
         TransactionIsolation isolation,
@@ -126,6 +127,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             implicit,
             implicitSingle,
             sys,
+            explicitLock,
             plc,
             concurrency,
             isolation,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 5e37d96..e0f2008 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -65,6 +65,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     private long dhtThreadId;
 
     /** */
+    protected boolean explicitLock;
+
+    /** */
     private boolean needsCompletedVers;
 
     /** Versions of pending locks for entries of this tx. */
@@ -96,6 +99,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         boolean implicit,
         boolean implicitSingle,
         boolean sys,
+        boolean explicitLock,
         GridIoPolicy plc,
         TransactionConcurrency concurrency,
         TransactionIsolation isolation,
@@ -113,6 +117,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
 
         assert cctx != null;
 
+        this.explicitLock = explicitLock;
+
         threadId = Thread.currentThread().getId();
         dhtThreadId = threadId;
     }
@@ -179,6 +185,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     }
 
     /**
+     * @return Explicit lock flag.
+     */
+    public boolean explicitLock() {
+        return explicitLock;
+    }
+
+    /**
+     * @param explicitLock Explicit lock flag.
+     */
+    public void explicitLock(boolean explicitLock) {
+        this.explicitLock = explicitLock;
+    }
+
+    /**
      * @return DHT thread ID.
      */
     long dhtThreadId() {
@@ -227,8 +247,12 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                             if (nearEntryMap == null)
                                 nearEntryMap = new GridLeanMap<>();
 
-                            cacheCtx.dhtMap(nearNodeId(), topologyVersion(),
-                                (GridDhtCacheEntry)e.cached(), log, dhtEntryMap, nearEntryMap);
+                            cacheCtx.dhtMap(
+                                (GridDhtCacheEntry)e.cached(),
+                                e.explicitVersion(),
+                                log,
+                                dhtEntryMap,
+                                nearEntryMap);
                         }
 
                         break;
@@ -828,6 +852,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),
-            "dhtNodes", dhtMap.keySet(), "super", super.toString());
+            "dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index e0fc719..87e5a29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -856,13 +856,15 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
                     if (!F.isEmpty(nearWrites)) {
                         for (IgniteTxEntry entry : nearWrites) {
                             try {
-                                GridCacheMvccCandidate added = entry.cached().candidate(version());
+                                if (entry.explicitVersion() == null) {
+                                    GridCacheMvccCandidate added = entry.cached().candidate(version());
 
-                                assert added != null;
-                                assert added.dhtLocal();
+                                    assert added != null : "Missing candidate for cache entry:" + entry;
+                                    assert added.dhtLocal();
 
-                                if (added.ownerVersion() != null)
-                                    req.owned(entry.txKey(), added.ownerVersion());
+                                    if (added.ownerVersion() != null)
+                                        req.owned(entry.txKey(), added.ownerVersion());
+                                }
 
                                 break;
                             }
@@ -912,15 +914,17 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
 
                         for (IgniteTxEntry entry : nearMapping.writes()) {
                             try {
-                                GridCacheMvccCandidate added = entry.cached().candidate(version());
+                                if (entry.explicitVersion() == null) {
+                                    GridCacheMvccCandidate added = entry.cached().candidate(version());
 
-                                assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " +
-                                    "[added=" + added + ", entry=" + entry + ']';
-                                assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
-                                    "[added=" + added + ", entry=" + entry + ']';
+                                    assert added != null || entry.groupLockEntry() : "Null candidate for non-group-lock entry " +
+                                        "[added=" + added + ", entry=" + entry + ']';
+                                    assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+                                        "[added=" + added + ", entry=" + entry + ']';
 
-                                if (added != null && added.ownerVersion() != null)
-                                    req.owned(entry.txKey(), added.ownerVersion());
+                                    if (added != null && added.ownerVersion() != null)
+                                        req.owned(entry.txKey(), added.ownerVersion());
+                                }
 
                                 break;
                             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index eb6d9a6..e941d30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -501,9 +502,20 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                 assert !n.isLocal();
 
-                if (!F.isEmpty(req.keys()))
-                    // We don't wait for reply to this message.
-                    ctx.io().send(n, req, ctx.ioPolicy());
+                if (!F.isEmpty(req.keys())) {
+                    try {
+                        // We don't wait for reply to this message.
+                        ctx.io().send(n, req, ctx.ioPolicy());
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send unlock request (node has left the grid) [keys=" + req.keys() +
+                                ", n=" + n + ", e=" + e + ']');
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send unlock request [keys=" + req.keys() + ", n=" + n + ']', e);
+                    }
+                }
             }
         }
         catch (IgniteCheckedException ex) {
@@ -584,8 +596,18 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                 if (!F.isEmpty(req.keys())) {
                     req.completedVersions(committed, rolledback);
 
-                    // We don't wait for reply to this message.
-                    ctx.io().send(n, req, ctx.ioPolicy());
+                    try {
+                        // We don't wait for reply to this message.
+                        ctx.io().send(n, req, ctx.ioPolicy());
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send unlock request (node has left the grid) [keys=" + req.keys() +
+                                ", n=" + n + ", e=" + e + ']');
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send unlock request [keys=" + req.keys() + ", n=" + n + ']', e);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 3087dff..7b05065 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -141,6 +141,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         this.accessTtl = accessTtl;
         this.filter = filter;
 
+        ignoreInterrupts(true);
+
         threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
 
         lockVer = tx != null ? tx.xidVersion() : cctx.versions().next();
@@ -517,8 +519,13 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
      */
     void map() {
         // Obtain the topology version to use.
-        AffinityTopologyVersion topVer = tx != null ? tx.topologyVersionSnapshot() :
-            cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
+        AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
+
+        if (topVer != null && tx != null)
+            tx.topologyVersion(topVer);
+
+        if (topVer == null && tx != null)
+            topVer = tx.topologyVersionSnapshot();
 
         if (topVer != null) {
             // Continue mapping on the same topology version as it was before.
@@ -651,15 +658,30 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                 Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
 
                 for (KeyCacheObject key : mappedKeys) {
-                    boolean explicit;
-
                     IgniteTxKey txKey = cctx.txKey(key);
 
-                    while (true) {
-                        GridDistributedCacheEntry entry = null;
+                    GridDistributedCacheEntry entry = null;
+
+                    if (tx != null) {
+                        IgniteTxEntry txEntry = tx.entry(txKey);
+
+                        if (txEntry != null) {
+                            entry = (GridDistributedCacheEntry)txEntry.cached();
+
+                            if (entry != null && !(loc ^ entry.detached())) {
+                                entry = cctx.colocated().entryExx(key, topVer, true);
+
+                                txEntry.cached(entry);
+                            }
+                        }
+                    }
+
+                    boolean explicit;
 
+                    while (true) {
                         try {
-                            entry = cctx.colocated().entryExx(key, topVer, true);
+                            if (entry == null)
+                                entry = cctx.colocated().entryExx(key, topVer, true);
 
                             if (!cctx.isAll(entry, filter)) {
                                 if (log.isDebugEnabled())
@@ -941,6 +963,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
         // Assign keys to primary nodes.
         Collection<KeyCacheObject> distributedKeys = new ArrayList<>(keys.size());
 
+        boolean explicit = false;
+
         for (KeyCacheObject key : keys) {
             if (!cctx.affinity().primary(cctx.localNode(), key, topVer)) {
                 // Remove explicit locks added so far.
@@ -950,7 +974,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
                 return false;
             }
 
-            addLocalKey(key, topVer, distributedKeys);
+            explicit |= addLocalKey(key, topVer, distributedKeys);
 
             if (isDone())
                 return true;
@@ -958,8 +982,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
 
         trackable = false;
 
-        if (tx != null)
+        if (tx != null) {
+            if (explicit)
+                tx.markExplicit(cctx.localNodeId());
+
             tx.colocatedLocallyMapped(true);
+        }
 
         if (!distributedKeys.isEmpty()) {
             if (tx != null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ddf0df1..ccc2d8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
@@ -564,7 +565,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 if (log.isDebugEnabled())
                     log.debug("Before waiting for partition release future: " + this);
 
-                partReleaseFut.get();
+                while (true) {
+                    try {
+                        partReleaseFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+                        break;
+                    }
+                    catch (IgniteFutureTimeoutCheckedException ignored) {
+                        // Print pending transactions and locks that might have led to hang.
+                        dumpPendingObjects();
+                    }
+                }
 
                 if (log.isDebugEnabled())
                     log.debug("After waiting for partition release future: " + this);
@@ -572,6 +583,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 if (!F.isEmpty(reqs))
                     blockGateways();
 
+                if (exchId.isLeft())
+                    cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (cacheCtx.isLocal())
                         continue;
@@ -658,6 +672,29 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     *
+     */
+    private void dumpPendingObjects() {
+        U.warn(log, "Failed to wait for partition release future. Dumping pending objects that might be the cause: " +
+            cctx.localNodeId());
+
+        U.warn(log, "Pending transactions:");
+
+        for (IgniteInternalTx tx : cctx.tm().activeTransactions())
+            U.warn(log, ">>> " + tx);
+
+        U.warn(log, "Pending explicit locks:");
+
+        for (GridCacheExplicitLockSpan lockSpan : cctx.mvcc().activeExplicitLocks())
+            U.warn(log, ">>> " + lockSpan);
+
+        U.warn(log, "Pending cache futures:");
+
+        for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures())
+            U.warn(log, ">>> " + fut);
+    }
+
+    /**
      * @param cacheId Cache ID to check.
      * @return {@code True} if cache is stopping by this exchange.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index ce80ad9..74342fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -499,7 +499,15 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
                 return null;
 
             // Local lock for near cache is a local lock.
-            cand = mvcc.addNearLocal(this, locId, dhtNodeId, threadId, ver, timeout, tx, implicitSingle);
+            cand = mvcc.addNearLocal(
+                this,
+                locId,
+                dhtNodeId,
+                threadId,
+                ver,
+                timeout,
+                tx,
+                implicitSingle);
 
             owner = mvcc.anyOwner();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index d6d1a1b..a25d546 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -152,6 +152,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
         this.accessTtl = accessTtl;
         this.filter = filter;
 
+        ignoreInterrupts(true);
+
         threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
 
         lockVer = tx != null ? tx.xidVersion() : cctx.versions().next();
@@ -649,8 +651,13 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      */
     void map() {
         // Obtain the topology version to use.
-        AffinityTopologyVersion topVer = tx != null ? tx.topologyVersionSnapshot() :
-            cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+        AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+        if (topVer != null && tx != null)
+            tx.topologyVersion(topVer);
+
+        if (topVer == null && tx != null)
+            topVer = tx.topologyVersionSnapshot();
 
         if (topVer != null) {
             // Continue mapping on the same topology version as it was before.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 5d1a306..3b2418e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -62,7 +62,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     private IgniteUuid futId;
 
     /** Transaction. */
-    @GridToStringExclude
+    @GridToStringInclude
     private GridNearTxLocal tx;
 
     /** Commit flag. */
@@ -85,12 +85,12 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public GridNearTxFinishFuture(GridCacheSharedContext<K, V> cctx, GridNearTxLocal tx, boolean commit) {
         super(cctx.kernalContext(), F.<IgniteInternalTx>identityReducer(tx));
 
-        assert cctx != null;
-
         this.cctx = cctx;
         this.tx = tx;
         this.commit = commit;
 
+        ignoreInterrupts(true);
+
         mappings = tx.mappings();
 
         futId = IgniteUuid.randomUuid();
@@ -264,7 +264,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      * @return Synchronous flag.
      */
     private boolean isSync() {
-        return commit ? tx.syncCommit() : tx.syncRollback();
+        return tx.explicitLock() || (commit ? tx.syncCommit() : tx.syncRollback());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index fa916b0..1b2d204 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.transactions.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -59,14 +60,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         new ConcurrentHashMap8<>();
 
     /** Future. */
+    @GridToStringExclude
     private final AtomicReference<IgniteInternalFuture<IgniteInternalTx>> prepFut =
         new AtomicReference<>();
 
     /** */
+    @GridToStringExclude
     private final AtomicReference<GridNearTxFinishFuture> commitFut =
         new AtomicReference<>();
 
     /** */
+    @GridToStringExclude
     private final AtomicReference<GridNearTxFinishFuture> rollbackFut =
         new AtomicReference<>();
 
@@ -126,6 +130,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             implicit,
             implicitSingle,
             sys,
+            false,
             plc,
             concurrency,
             isolation,
@@ -527,6 +532,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
      * @return {@code True} if mapping was found.
      */
     public boolean markExplicit(UUID nodeId) {
+        explicitLock = true;
+
         GridDistributedTxMapping m = mappings.get(nodeId);
 
         if (m != null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 2da78fb..39a33b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -68,7 +68,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
     private IgniteUuid futId;
 
     /** Transaction. */
-    @GridToStringExclude
+    @GridToStringInclude
     private GridNearTxLocal tx;
 
     /** Error. */
@@ -198,6 +198,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
 
                 tx.removeKeysMapping(nodeId, mappings);
             }
+
             if (e instanceof IgniteTxRollbackCheckedException) {
                 if (marked) {
                     try {
@@ -308,62 +309,59 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
      */
     public void prepare() {
         if (tx.optimistic()) {
-            GridDhtTopologyFuture topFut = topologyReadLock();
+            // Obtain the topology version to use.
+            AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
 
-            try {
-                if (topFut == null) {
-                    assert isDone();
+            if (topVer != null) {
+                tx.topologyVersion(topVer);
 
-                    return;
-                }
+                prepare0();
 
-                if (topFut.isDone()) {
-                    try {
-                        if (!tx.state(PREPARING)) {
-                            if (tx.setRollbackOnly()) {
-                                if (tx.timedOut())
-                                    onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
-                                        "was rolled back: " + this));
-                                else
-                                    onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " +
-                                        "[state=" + tx.state() + ", tx=" + this + ']'));
-                            }
-                            else
-                                onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
-                                    "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+                return;
+            }
 
-                            return;
-                        }
+            prepareOnTopology();
 
-                        tx.topologyVersion(topFut.topologyVersion());
+        }
+        else
+            preparePessimistic();
+    }
 
-                        // Make sure to add future before calling prepare.
-                        cctx.mvcc().addFuture(this);
+    /**
+     *
+     */
+    private void prepareOnTopology() {
+        GridDhtTopologyFuture topFut = topologyReadLock();
 
-                        prepare0();
-                    }
-                    catch (TransactionTimeoutException | TransactionOptimisticException e) {
-                        onError(cctx.localNodeId(), null, e);
-                    }
-                }
-                else {
-                    topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                            cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
-                                @Override public void run() {
-                                    prepare();
-                                }
-                            });
-                        }
-                    });
-                }
+        try {
+            if (topFut == null) {
+                assert isDone();
+
+                return;
+            }
+
+            if (topFut.isDone()) {
+                tx.topologyVersion(topFut.topologyVersion());
+
+                prepare0();
             }
-            finally {
-                topologyReadUnlock();
+            else {
+                topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                    @Override
+                    public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                        cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+                            @Override
+                            public void run() {
+                                prepareOnTopology();
+                            }
+                        });
+                    }
+                });
             }
         }
-        else
-            preparePessimistic();
+        finally {
+            topologyReadUnlock();
+        }
     }
 
     /**
@@ -431,12 +429,34 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
         assert tx.optimistic();
 
         try {
+            if (!tx.state(PREPARING)) {
+                if (tx.setRollbackOnly()) {
+                    if (tx.timedOut())
+                        onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
+                            "was rolled back: " + this));
+                    else
+                        onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " +
+                            "[state=" + tx.state() + ", tx=" + this + ']'));
+                }
+                else
+                    onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+                        "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+
+                return;
+            }
+
+            // Make sure to add future before calling prepare.
+            cctx.mvcc().addFuture(this);
+
             prepare(
                 tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
                 tx.writeEntries());
 
             markInitialized();
         }
+        catch (TransactionTimeoutException | TransactionOptimisticException e) {
+            onError(cctx.localNodeId(), null, e);
+        }
         catch (IgniteCheckedException e) {
             onDone(e);
         }
@@ -592,6 +612,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
                 tx.onePhaseCommit(),
                 tx.needReturnValue() && tx.implicit(),
                 tx.implicitSingle(),
+                m.explicitLock(),
                 tx.subjectId(),
                 tx.taskNameHash());
 
@@ -682,6 +703,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
             tx.onePhaseCommit(),
             tx.needReturnValue() && tx.implicit(),
             tx.implicitSingle(),
+            m.explicitLock(),
             tx.subjectId(),
             tx.taskNameHash());
 
@@ -790,6 +812,12 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
 
         cur.add(entry);
 
+        if (entry.explicitVersion() != null) {
+            tx.markExplicit(primary.id());
+
+            cur.markExplicitLock();
+        }
+
         entry.nodeId(primary.id());
 
         if (cacheCtx.isNear()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 846022c..f0587ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -66,6 +66,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** Implicit single flag. */
     private boolean implicitSingle;
 
+    /** Explicit lock flag. Set to true if at leat one entry was explicitly locked. */
+    private boolean explicitLock;
+
     /** Subject ID. */
     private UUID subjId;
 
@@ -109,6 +112,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         boolean onePhaseCommit,
         boolean retVal,
         boolean implicitSingle,
+        boolean explicitLock,
         @Nullable UUID subjId,
         int taskNameHash
     ) {
@@ -123,6 +127,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
         this.lastBackups = lastBackups;
         this.retVal = retVal;
         this.implicitSingle = implicitSingle;
+        this.explicitLock = explicitLock;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
     }
@@ -198,6 +203,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     }
 
     /**
+     * @return Explicit lock flag.
+     */
+    public boolean explicitLock() {
+        return explicitLock;
+    }
+
+    /**
      * @return Topology version.
      */
     @Override public AffinityTopologyVersion topologyVersion() {
@@ -259,60 +271,66 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
         switch (writer.state()) {
             case 25:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeBoolean("explicitLock", explicitLock))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeBoolean("implicitSingle", implicitSingle))
+                if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeBoolean("last", last))
+                if (!writer.writeBoolean("implicitSingle", implicitSingle))
                     return false;
 
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
+                if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeBoolean("near", near))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 31:
-                if (!writer.writeBoolean("retVal", retVal))
+                if (!writer.writeBoolean("near", near))
                     return false;
 
                 writer.incrementState();
 
             case 32:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("retVal", retVal))
                     return false;
 
                 writer.incrementState();
 
             case 33:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 34:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 35:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -335,7 +353,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
         switch (reader.state()) {
             case 25:
-                futId = reader.readIgniteUuid("futId");
+                explicitLock = reader.readBoolean("explicitLock");
 
                 if (!reader.isLastRead())
                     return false;
@@ -343,7 +361,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 26:
-                implicitSingle = reader.readBoolean("implicitSingle");
+                futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -351,7 +369,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 27:
-                last = reader.readBoolean("last");
+                implicitSingle = reader.readBoolean("implicitSingle");
 
                 if (!reader.isLastRead())
                     return false;
@@ -359,7 +377,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 28:
-                lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
+                last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
                     return false;
@@ -367,7 +385,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 29:
-                miniId = reader.readIgniteUuid("miniId");
+                lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
                     return false;
@@ -375,7 +393,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 30:
-                near = reader.readBoolean("near");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -383,7 +401,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 31:
-                retVal = reader.readBoolean("retVal");
+                near = reader.readBoolean("near");
 
                 if (!reader.isLastRead())
                     return false;
@@ -391,7 +409,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 32:
-                subjId = reader.readUuid("subjId");
+                retVal = reader.readBoolean("retVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -399,7 +417,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 33:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -407,6 +425,14 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 34:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 35:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -426,7 +452,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 35;
+        return 36;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 1b66b4a..735e373 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1164,7 +1164,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        state(MARKED_ROLLBACK, true);
+        if (local() && !dht())
+            state(MARKED_ROLLBACK, true);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ab01e7dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index da30a94..8e47105 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -268,6 +268,7 @@ public class IgniteTxHandler {
                 req.implicitSingle(),
                 req.implicitSingle(),
                 req.system(),
+                req.explicitLock(),
                 req.policy(),
                 req.concurrency(),
                 req.isolation(),
@@ -292,6 +293,9 @@ public class IgniteTxHandler {
         }
 
         if (tx != null) {
+            if (req.explicitLock())
+                tx.explicitLock(req.explicitLock());
+
             tx.transactionNodes(req.transactionNodes());
 
             if (req.onePhaseCommit()) {
@@ -538,6 +542,7 @@ public class IgniteTxHandler {
                             true,
                             false, /* we don't know, so assume false. */
                             req.system(),
+                            req.explicitLock(),
                             req.policy(),
                             PESSIMISTIC,
                             READ_COMMITTED,
@@ -556,6 +561,10 @@ public class IgniteTxHandler {
 
                     tx.topologyVersion(req.topologyVersion());
                 }
+                else {
+                    if (req.explicitLock())
+                        tx.explicitLock(req.explicitLock());
+                }
 
                 tx.storeEnabled(req.storeEnabled());
 
@@ -580,20 +589,44 @@ public class IgniteTxHandler {
                 return commitFut;
             }
             else {
-                assert tx != null : "Transaction is null for near rollback request [nodeId=" +
+                assert tx != null || req.explicitLock() : "Transaction is null for near rollback request [nodeId=" +
                     nodeId + ", req=" + req + "]";
 
-                tx.syncRollback(req.syncRollback());
+                if (tx != null) {
+                    tx.syncRollback(req.syncRollback());
 
-                tx.nearFinishFutureId(req.futureId());
-                tx.nearFinishMiniId(req.miniId());
+                    tx.nearFinishFutureId(req.futureId());
+                    tx.nearFinishMiniId(req.miniId());
 
-                IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
+                    IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
 
-                // Only for error logging.
-                rollbackFut.listen(CU.errorLogger(log));
+                    // Only for error logging.
+                    rollbackFut.listen(CU.errorLogger(log));
 
-                return rollbackFut;
+                    return rollbackFut;
+                }
+                else {
+                    // Always send finish response.
+                    GridCacheMessage res = new GridNearTxFinishResponse(req.version(), req.threadId(),
+                        req.futureId(), req.miniId(), null);
+
+                    try {
+                        ctx.io().send(nodeId, res, req.policy());
+                    }
+                    catch (Throwable e) {
+                        // Double-check.
+                        if (ctx.discovery().node(nodeId) == null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res +
+                                    ']');
+                        }
+                        else
+                            U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", " +
+                                "res=" + res + ']', e);
+                    }
+
+                    return null;
+                }
             }
         }
         catch (Throwable e) {
@@ -757,15 +790,36 @@ public class IgniteTxHandler {
         if (nearTx != null)
             finish(nodeId, nearTx, req);
 
-        if (dhtTx != null && !dhtTx.done()) {
-            dhtTx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
-                @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
-                    sendReply(nodeId, req);
-                }
-            });
+        if (req.replyRequired()) {
+            IgniteInternalFuture completeFut;
+
+            IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture();
+            IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture();
+
+            if (dhtFin != null && nearFin != null) {
+                GridCompoundFuture fut = new GridCompoundFuture();
+
+                fut.add(dhtFin);
+                fut.add(nearFin);
+
+                fut.markInitialized();
+
+                completeFut = fut;
+            }
+            else
+                completeFut = dhtFin != null ? dhtFin : nearFin;
+
+            if (completeFut != null) {
+                completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+                    @Override
+                    public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) {
+                        sendReply(nodeId, req);
+                    }
+                });
+            }
+            else
+                sendReply(nodeId, req);
         }
-        else
-            sendReply(nodeId, req);
     }
 
     /**
@@ -874,21 +928,19 @@ public class IgniteTxHandler {
      * @param req Request.
      */
     protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req) {
-        if (req.replyRequired()) {
-            GridCacheMessage res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
+        GridCacheMessage res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
 
-            try {
-                ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
-            }
-            catch (Throwable e) {
-                // Double-check.
-                if (ctx.discovery().node(nodeId) == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']');
-                }
-                else
-                    U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+        try {
+            ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+        }
+        catch (Throwable e) {
+            // Double-check.
+            if (ctx.discovery().node(nodeId) == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Node left while sending finish response [nodeId=" + nodeId + ", res=" + res + ']');
             }
+            else
+                U.error(log, "Failed to send finish response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
         }
     }
 
@@ -942,6 +994,16 @@ public class IgniteTxHandler {
 
                     return null;
                 }
+
+                if (ctx.discovery().node(nodeId) == null) {
+                    tx.state(ROLLING_BACK);
+
+                    tx.state(ROLLED_BACK);
+
+                    ctx.tm().uncommitTx(tx);
+
+                    return null;
+                }
             }
 
             if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) {


Mime
View raw message