ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [31/50] [abbrv] ignite git commit: Reverted committed versions for txs.
Date Thu, 03 Sep 2015 01:03:40 GMT
Reverted committed versions for txs.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6dd5ee2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6dd5ee2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6dd5ee2

Branch: refs/heads/ignite-264
Commit: a6dd5ee2e6d1bcd44bef5f2276ad386db6418836
Parents: d97a340
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Tue Aug 25 17:49:20 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Tue Aug 25 17:49:20 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheMvcc.java         | 143 ++++-
 .../distributed/GridDistributedBaseMessage.java |  34 ++
 .../distributed/GridDistributedCacheEntry.java  | 108 +++-
 .../GridDistributedLockResponse.java            |   7 +-
 .../GridDistributedTxFinishRequest.java         |  22 +-
 .../GridDistributedTxRemoteAdapter.java         |  18 +-
 .../distributed/dht/GridDhtCacheEntry.java      |   3 +-
 .../distributed/dht/GridDhtLockFuture.java      |  38 +-
 .../distributed/dht/GridDhtLockRequest.java     |  60 ++
 .../dht/GridDhtTransactionalCacheAdapter.java   |  54 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |  12 +
 .../distributed/dht/GridDhtTxFinishRequest.java |  58 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  32 +
 .../distributed/dht/GridDhtTxPrepareFuture.java |  40 +-
 .../dht/colocated/GridDhtColocatedCache.java    |   5 +
 .../distributed/near/GridNearLockFuture.java    |   5 +-
 .../distributed/near/GridNearLockResponse.java  |  25 +
 .../near/GridNearTransactionalCache.java        |  11 +-
 .../near/GridNearTxFinishFuture.java            |   9 +
 .../near/GridNearTxFinishRequest.java           |  27 +-
 .../cache/distributed/near/GridNearTxLocal.java |  11 +-
 .../near/GridNearTxPrepareFutureAdapter.java    |   2 +-
 .../near/GridNearTxPrepareResponse.java         |  23 +
 .../distributed/near/GridNearTxRemote.java      |   3 +-
 .../cache/transactions/IgniteInternalTx.java    |  21 +
 .../cache/transactions/IgniteTxAdapter.java     |  31 +
 .../cache/transactions/IgniteTxHandler.java     |  10 +-
 .../transactions/IgniteTxLocalAdapter.java      |  52 ++
 .../cache/transactions/IgniteTxManager.java     | 119 +++-
 .../cache/transactions/IgniteTxRemoteEx.java    |   6 +-
 .../messages/GridQueryNextPageResponse.java     |   8 +-
 .../h2/twostep/messages/GridQueryRequest.java   |  28 +-
 .../cache/GridCacheMvccPartitionedSelfTest.java | 259 +++++++-
 .../processors/cache/GridCacheMvccSelfTest.java | 586 ++++++++++++++++---
 .../processors/cache/GridCacheTestEntryEx.java  |  35 +-
 35 files changed, 1722 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index 7d83da2..f5eefff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -324,6 +324,103 @@ public final class GridCacheMvcc {
     }
 
     /**
+     * Moves completed candidates right before the base one. Note that
+     * if base is not found, then nothing happens and {@code false} is
+     * returned.
+     *
+     * @param baseVer Base version.
+     * @param committedVers Committed versions relative to base.
+     * @param rolledbackVers Rolled back versions relative to base.
+     * @return Lock owner.
+     */
+    @Nullable public GridCacheMvccCandidate orderCompleted(GridCacheVersion baseVer,
+        Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
+        assert baseVer != null;
+
+        if (rmts != null && !F.isEmpty(committedVers)) {
+            Deque<GridCacheMvccCandidate> mvAfter = null;
+
+            int maxIdx = -1;
+
+            for (ListIterator<GridCacheMvccCandidate> it = rmts.listIterator(rmts.size()); it.hasPrevious(); ) {
+                GridCacheMvccCandidate cur = it.previous();
+
+                if (!cur.version().equals(baseVer) && committedVers.contains(cur.version())) {
+                    cur.setOwner();
+
+                    assert localOwner() == null || localOwner().nearLocal(): "Cannot not have local owner and " +
+                        "remote completed transactions at the same time [baseVer=" + baseVer +
+                        ", committedVers=" + committedVers + ", rolledbackVers=" + rolledbackVers +
+                        ", localOwner=" + localOwner() + ", locs=" + locs + ", rmts=" + rmts + ']';
+
+                    if (maxIdx < 0)
+                        maxIdx = it.nextIndex();
+                }
+                else if (maxIdx >= 0 && cur.version().isGreaterEqual(baseVer)) {
+                    if (--maxIdx >= 0) {
+                        if (mvAfter == null)
+                            mvAfter = new LinkedList<>();
+
+                        it.remove();
+
+                        mvAfter.addFirst(cur);
+                    }
+                }
+
+                // If base is completed, then set it to owner too.
+                if (!cur.owner() && cur.version().equals(baseVer) && committedVers.contains(cur.version()))
+                    cur.setOwner();
+            }
+
+            if (maxIdx >= 0 && mvAfter != null) {
+                ListIterator<GridCacheMvccCandidate> it = rmts.listIterator(maxIdx + 1);
+
+                for (GridCacheMvccCandidate cand : mvAfter)
+                    it.add(cand);
+            }
+
+            // Remove rolled back versions.
+            if (!F.isEmpty(rolledbackVers)) {
+                for (Iterator<GridCacheMvccCandidate> it = rmts.iterator(); it.hasNext(); ) {
+                    GridCacheMvccCandidate cand = it.next();
+
+                    if (rolledbackVers.contains(cand.version())) {
+                        cand.setUsed(); // Mark as used to be consistent, even though we are about to remove it.
+
+                        it.remove();
+                    }
+                }
+
+                if (rmts.isEmpty())
+                    rmts = null;
+            }
+        }
+
+        return anyOwner();
+    }
+
+    /**
+     * Puts owned versions in front of base.
+     *
+     * @param baseVer Base version.
+     * @param owned Owned list.
+     * @return Current owner.
+     */
+    @Nullable public GridCacheMvccCandidate markOwned(GridCacheVersion baseVer, GridCacheVersion owned) {
+        if (owned == null)
+            return anyOwner();
+
+        if (rmts != null) {
+            GridCacheMvccCandidate baseCand = candidate(rmts, baseVer);
+
+            if (baseCand != null)
+                baseCand.ownerVersion(owned);
+        }
+
+        return anyOwner();
+    }
+
+    /**
      * @param parent Parent entry.
      * @param threadId Thread ID.
      * @param ver Lock version.
@@ -554,9 +651,14 @@ public final class GridCacheMvcc {
      *
      * @param ver Version to mark as ready.
      * @param mappedVer Mapped dht version.
+     * @param committedVers Committed versions.
+     * @param rolledBackVers Rolled back versions.
+     * @param pending Pending dht versions that are not owned and which version is less then mapped.
      * @return Lock owner after reassignment.
      */
-    @Nullable public GridCacheMvccCandidate readyNearLocal(GridCacheVersion ver, GridCacheVersion mappedVer) {
+    @Nullable public GridCacheMvccCandidate readyNearLocal(GridCacheVersion ver, GridCacheVersion mappedVer,
+        Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledBackVers,
+        Collection<GridCacheVersion> pending) {
         GridCacheMvccCandidate cand = candidate(locs, ver);
 
         if (cand != null) {
@@ -601,6 +703,24 @@ public final class GridCacheMvcc {
                 }
             }
 
+            // Mark all remote candidates with less version as owner unless it is pending.
+            if (rmts != null) {
+                for (GridCacheMvccCandidate rmt : rmts) {
+                    GridCacheVersion rmtVer = rmt.version();
+
+                    if (rmtVer.isLess(mappedVer)) {
+                        if (!pending.contains(rmtVer) &&
+                            !mappedVer.equals(rmt.ownerVersion()))
+                            rmt.setOwner();
+                    }
+                    else {
+                        // Remote version is greater, so need to check if it was committed or rolled back.
+                        if (committedVers.contains(rmtVer) || rolledBackVers.contains(rmtVer))
+                            rmt.setOwner();
+                    }
+                }
+            }
+
             reassign();
         }
 
@@ -611,10 +731,16 @@ public final class GridCacheMvcc {
      * Sets remote candidate to done.
      *
      * @param ver Version.
+     * @param pending Pending versions.
+     * @param committed Committed versions.
+     * @param rolledback Rolledback versions.
      * @return Lock owner.
      */
     @Nullable public GridCacheMvccCandidate doneRemote(
-        GridCacheVersion ver) {
+        GridCacheVersion ver,
+        Collection<GridCacheVersion> pending,
+        Collection<GridCacheVersion> committed,
+        Collection<GridCacheVersion> rolledback) {
         assert ver != null;
 
         if (log.isDebugEnabled())
@@ -646,6 +772,15 @@ public final class GridCacheMvcc {
 
                     break;
                 }
+                else if (!committed.contains(c.version()) && !rolledback.contains(c.version()) &&
+                    pending.contains(c.version())) {
+                    it.remove();
+
+                    if (mvAfter == null)
+                        mvAfter = new LinkedList<>();
+
+                    mvAfter.add(c);
+                }
             }
         }
 
@@ -709,10 +844,6 @@ public final class GridCacheMvcc {
             }
         }
 
-        // No assignment can happen in near local cache when remote candidate is present.
-        if (cctx.isNear() && firstRmt != null)
-            return;
-
         if (locs != null) {
             for (ListIterator<GridCacheMvccCandidate> it = locs.listIterator(); it.hasNext(); ) {
                 GridCacheMvccCandidate cand = it.next();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index fa69836..276f678 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@ -54,6 +54,16 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem
     @GridToStringExclude
     private byte[] candsByIdxBytes;
 
+    /** Committed versions with order higher than one for this message (needed for commit ordering). */
+    @GridToStringInclude
+    @GridDirectCollection(GridCacheVersion.class)
+    private Collection<GridCacheVersion> committedVers;
+
+    /** Rolled back versions with order higher than one for this message (needed for commit ordering). */
+    @GridToStringInclude
+    @GridDirectCollection(GridCacheVersion.class)
+    private Collection<GridCacheVersion> rolledbackVers;
+
     /** Count of keys referenced in candidates array (needed only locally for optimization). */
     @GridToStringInclude
     @GridDirectTransient
@@ -119,6 +129,30 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem
     }
 
     /**
+     * @param committedVers Committed versions.
+     * @param rolledbackVers Rolled back versions.
+     */
+    public void completedVersions(Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers) {
+        this.committedVers = committedVers;
+        this.rolledbackVers = rolledbackVers;
+    }
+
+    /**
+     * @return Committed versions.
+     */
+    public Collection<GridCacheVersion> committedVersions() {
+        return committedVers == null ? Collections.<GridCacheVersion>emptyList() : committedVers;
+    }
+
+    /**
+     * @return Rolled back versions.
+     */
+    public Collection<GridCacheVersion> rolledbackVersions() {
+        return rolledbackVers == null ? Collections.<GridCacheVersion>emptyList() : rolledbackVers;
+    }
+
+    /**
      * @param idx Key index.
      * @param candsByIdx List of candidates for that key.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index 0f6fdbf..074edd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -163,6 +163,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
      * @param timeout Lock acquire timeout.
      * @param tx Transaction flag.
      * @param implicitSingle Implicit flag.
+     * @param owned Owned candidate version.
      * @throws GridDistributedLockCancelledException If lock has been canceled.
      * @throws GridCacheEntryRemovedException If this entry is obsolete.
      */
@@ -173,9 +174,9 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
         GridCacheVersion ver,
         long timeout,
         boolean tx,
-        boolean implicitSingle
-    ) throws GridDistributedLockCancelledException,
-        GridCacheEntryRemovedException {
+        boolean implicitSingle,
+        @Nullable GridCacheVersion owned
+    ) throws GridDistributedLockCancelledException, GridCacheEntryRemovedException {
         GridCacheMvccCandidate prev;
         GridCacheMvccCandidate owner;
 
@@ -211,6 +212,9 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
                 /*near-local*/false
             );
 
+            if (owned != null)
+                mvcc.markOwned(ver, owned);
+
             owner = mvcc.anyOwner();
 
             boolean emptyAfter = mvcc.isEmpty();
@@ -508,12 +512,17 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
      *
      * @param ver Lock version.
      * @param mapped Mapped dht lock version.
+     * @param committed Committed versions.
+     * @param rolledBack Rolled back versions.
+     * @param pending Pending locks on dht node with version less then mapped.
      * @return Current lock owner.
      *
      * @throws GridCacheEntryRemovedException If entry is removed.
      */
-    @Nullable public GridCacheMvccCandidate readyNearLock(GridCacheVersion ver, GridCacheVersion mapped)
-        throws GridCacheEntryRemovedException {
+    @Nullable public GridCacheMvccCandidate readyNearLock(GridCacheVersion ver, GridCacheVersion mapped,
+        Collection<GridCacheVersion> committed,
+        Collection<GridCacheVersion> rolledBack,
+        Collection<GridCacheVersion> pending) throws GridCacheEntryRemovedException {
         GridCacheMvccCandidate prev = null;
         GridCacheMvccCandidate owner = null;
 
@@ -529,7 +538,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
 
                 boolean emptyBefore = mvcc.isEmpty();
 
-                owner = mvcc.readyNearLocal(ver, mapped);
+                owner = mvcc.readyNearLocal(ver, mapped, committed, rolledBack, pending);
 
                 assert owner == null || owner.owner() : "Owner flag is not set for owner: " + owner;
 
@@ -551,9 +560,79 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
     }
 
     /**
+     * Reorders completed versions.
+     *
+     * @param baseVer Base version for reordering.
+     * @param committedVers Completed versions.
+     * @param rolledbackVers Rolled back versions.
+     * @throws GridCacheEntryRemovedException If entry has been removed.
+     */
+    public void orderCompleted(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers)
+        throws GridCacheEntryRemovedException {
+        if (!F.isEmpty(committedVers) || !F.isEmpty(rolledbackVers)) {
+            GridCacheMvccCandidate prev = null;
+            GridCacheMvccCandidate owner = null;
+
+            CacheObject val;
+
+            synchronized (this) {
+                checkObsolete();
+
+                GridCacheMvcc mvcc = mvccExtras();
+
+                if (mvcc != null) {
+                    prev = mvcc.anyOwner();
+
+                    boolean emptyBefore = mvcc.isEmpty();
+
+                    owner = mvcc.orderCompleted(baseVer, committedVers, rolledbackVers);
+
+                    boolean emptyAfter = mvcc.isEmpty();
+
+                    checkCallbacks(emptyBefore, emptyAfter);
+
+                    if (emptyAfter)
+                        mvccExtras(null);
+                }
+
+                val = this.val;
+            }
+
+            // This call must be made outside of synchronization.
+            checkOwnerChanged(prev, owner, val);
+        }
+    }
+
+    /**
+     *
+     * @param lockVer Done version.
+     * @param baseVer Base version.
+     * @param committedVers Completed versions for reordering.
+     * @param rolledbackVers Rolled back versions for reordering.
+     * @param sysInvalidate Flag indicating if this entry is done from invalidated transaction (in case of tx
+     *      salvage). In this case all locks before salvaged lock will marked as used and corresponding
+     *      transactions will be invalidated.
+     * @throws GridCacheEntryRemovedException If entry has been removed.
+     * @return Owner.
+     */
+    @Nullable public GridCacheMvccCandidate doneRemote(
+        GridCacheVersion lockVer,
+        GridCacheVersion baseVer,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers,
+        boolean sysInvalidate) throws GridCacheEntryRemovedException {
+        return doneRemote(lockVer, baseVer, Collections.<GridCacheVersion>emptySet(), committedVers,
+            rolledbackVers, sysInvalidate);
+    }
+
+    /**
      *
      * @param lockVer Done version.
      * @param baseVer Base version.
+     * @param pendingVers Pending versions that are less than lock version.
+     * @param committedVers Completed versions for reordering.
+     * @param rolledbackVers Rolled back versions for reordering.
      * @param sysInvalidate Flag indicating if this entry is done from invalidated transaction (in case of tx
      *      salvage). In this case all locks before salvaged lock will marked as used and corresponding
      *      transactions will be invalidated.
@@ -563,6 +642,9 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
     @Nullable public GridCacheMvccCandidate doneRemote(
         GridCacheVersion lockVer,
         GridCacheVersion baseVer,
+        @Nullable Collection<GridCacheVersion> pendingVers,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers,
         boolean sysInvalidate) throws GridCacheEntryRemovedException {
         GridCacheMvccCandidate prev = null;
         GridCacheMvccCandidate owner = null;
@@ -579,10 +661,19 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
 
                 boolean emptyBefore = mvcc.isEmpty();
 
+                // Order completed versions.
+                if (!F.isEmpty(committedVers) || !F.isEmpty(rolledbackVers)) {
+                    mvcc.orderCompleted(lockVer, committedVers, rolledbackVers);
+
+                    if (!baseVer.equals(lockVer))
+                        mvcc.orderCompleted(baseVer, committedVers, rolledbackVers);
+                }
+
                 if (sysInvalidate && baseVer != null)
                     mvcc.salvageRemote(baseVer);
 
-                owner = mvcc.doneRemote(lockVer);
+                owner = mvcc.doneRemote(lockVer, maskNull(pendingVers), maskNull(committedVers),
+                    maskNull(rolledbackVers));
 
                 boolean emptyAfter = mvcc.isEmpty();
 
@@ -661,7 +752,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
                 tx.xidVersion(),
                 tx.timeout(),
                 true,
-                tx.implicitSingle()
+                tx.implicitSingle(),
+                tx.ownedVersion(txKey())
             );
 
             return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index aec1f40..ee0f65f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -164,10 +164,15 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
     /**
      * @param idx Candidates index.
      * @param cands Collection of candidates.
+     * @param committedVers Committed versions relative to lock version.
+     * @param rolledbackVers Rolled back versions relative to lock version.
      */
-    public void setCandidates(int idx, Collection<GridCacheMvccCandidate> cands) {
+    public void setCandidates(int idx, Collection<GridCacheMvccCandidate> cands,
+        Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
         assert idx >= 0;
 
+        completedVersions(committedVers, rolledbackVers);
+
         candidatesByIndex(idx, cands);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 89601e3..897a714 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -27,6 +27,7 @@ import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
+import java.util.*;
 
 /**
  * Transaction completion message.
@@ -56,6 +57,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
     /** Sync commit flag. */
     private boolean syncRollback;
 
+    /** Min version used as base for completed versions. */
+    private GridCacheVersion baseVer;
+
     /** Expected txSize. */
     private int txSize;
 
@@ -79,8 +83,11 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
      * @param commitVer Commit version.
      * @param commit Commit flag.
      * @param invalidate Invalidate flag.
-     * @param sys System flag.
+     * @param sys System transaction flag.
      * @param plc IO policy.
+     * @param baseVer Base version.
+     * @param committedVers Committed versions.
+     * @param rolledbackVers Rolled back versions.
      * @param txSize Expected transaction size.
      */
     public GridDistributedTxFinishRequest(
@@ -94,6 +101,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
         byte plc,
         boolean syncCommit,
         boolean syncRollback,
+        GridCacheVersion baseVer,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers,
         int txSize
     ) {
         super(xidVer, 0);
@@ -108,7 +118,10 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
         this.plc = plc;
         this.syncCommit = syncCommit;
         this.syncRollback = syncRollback;
+        this.baseVer = baseVer;
         this.txSize = txSize;
+
+        completedVersions(committedVers, rolledbackVers);
     }
 
     /**
@@ -176,6 +189,13 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
     }
 
     /**
+     * @return Base version.
+     */
+    public GridCacheVersion baseVersion() {
+        return baseVer;
+    }
+
+    /**
      * @return Expected tx size.
      */
     public int txSize() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 3685968..54e9be8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -225,16 +225,19 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
     /**
      * @param baseVer Base version.
+     * @param committedVers Committed versions.
+     * @param rolledbackVers Rolled back versions.
      */
-    @Override public void doneRemote(GridCacheVersion baseVer) {
+    @Override public void doneRemote(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers) {
         if (readMap != null && !readMap.isEmpty()) {
             for (IgniteTxEntry txEntry : readMap.values())
-                doneRemote(txEntry, baseVer);
+                doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers);
         }
 
         if (writeMap != null && !writeMap.isEmpty()) {
             for (IgniteTxEntry txEntry : writeMap.values())
-                doneRemote(txEntry, baseVer);
+                doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers);
         }
     }
 
@@ -243,8 +246,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
      *
      * @param txEntry Entry.
      * @param baseVer Base version for completed versions.
+     * @param committedVers Completed versions relative to base version.
+     * @param rolledbackVers Rolled back versions relative to base version.
+     * @param pendingVers Pending versions.
      */
-    private void doneRemote(IgniteTxEntry txEntry, GridCacheVersion baseVer) {
+    private void doneRemote(IgniteTxEntry txEntry, GridCacheVersion baseVer,
+        Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers,
+        Collection<GridCacheVersion> pendingVers) {
         while (true) {
             GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached();
 
@@ -252,7 +260,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                 // Handle explicit locks.
                 GridCacheVersion doneVer = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer;
 
-                entry.doneRemote(doneVer, baseVer, isSystemInvalidate());
+                entry.doneRemote(doneVer, baseVer, pendingVers, committedVers, rolledbackVers, isSystemInvalidate());
 
                 break;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index ccd146d..3b411b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -261,7 +261,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                 tx.xidVersion(),
                 tx.timeout(),
                 /*tx*/true,
-                tx.implicit());
+                tx.implicit(),
+                null);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 6996cbd..e160529 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -749,12 +749,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
             if (log.isDebugEnabled())
                 log.debug("Mapping entry for DHT lock future: " + this);
 
+            boolean hasRmtNodes = false;
+
             // Assign keys to primary nodes.
             for (GridDhtCacheEntry entry : entries) {
                 try {
                     while (true) {
                         try {
-                            cctx.dhtMap(
+                            hasRmtNodes = cctx.dhtMap(
                                 nearNodeId,
                                 topVer,
                                 entry,
@@ -788,6 +790,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
                 }
             }
 
+            if (tx != null)
+                tx.needsCompletedVersions(hasRmtNodes);
+
             if (isDone()) {
                 if (log.isDebugEnabled())
                     log.debug("Mapping won't proceed because future is done: " + this);
@@ -879,6 +884,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
                                         txEntry.op(GridCacheOperation.NOOP);
                                 }
                             }
+
+                            it.set(addOwned(req, e));
                         }
 
                         if (!F.isEmpty(req.keys())) {
@@ -908,6 +915,35 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         }
     }
 
+    /**
+     * @param req Request.
+     * @param e Entry.
+     * @return Entry.
+     */
+    private GridDhtCacheEntry addOwned(GridDhtLockRequest req, GridDhtCacheEntry e) {
+        while (true) {
+            try {
+                GridCacheMvccCandidate added = e.candidate(lockVer);
+
+                assert added != null;
+                assert added.dhtLocal();
+
+                if (added.ownerVersion() != null)
+                    req.owned(e.key(), added.ownerVersion());
+
+                break;
+            }
+            catch (GridCacheEntryRemovedException ignore) {
+                if (log.isDebugEnabled())
+                    log.debug("Got removed entry when creating DHT lock request (will retry): " + e);
+
+                e = cctx.dht().entryExx(e.key(), topVer);
+            }
+        }
+
+        return e;
+    }
+
     /** {@inheritDoc} */
     @Override public int hashCode() {
         return futId.hashCode();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 267b430..903898a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -52,6 +53,19 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
     /** Mini future ID. */
     private IgniteUuid miniId;
 
+    /** Owner mapped version, if any. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<KeyCacheObject, GridCacheVersion> owned;
+
+    /** Array of keys from {@link #owned}. Used during marshalling and unmarshalling. */
+    @GridToStringExclude
+    private KeyCacheObject[] ownedKeys;
+
+    /** Array of values from {@link #owned}. Used during marshalling and unmarshalling. */
+    @GridToStringExclude
+    private GridCacheVersion[] ownedValues;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -237,6 +251,27 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
     }
 
     /**
+     * Sets owner and its mapped version.
+     *
+     * @param key Key.
+     * @param ownerMapped Owner mapped version.
+     */
+    public void owned(KeyCacheObject key, GridCacheVersion ownerMapped) {
+        if (owned == null)
+            owned = new GridLeanMap<>(3);
+
+        owned.put(key, ownerMapped);
+    }
+
+    /**
+     * @param key Key.
+     * @return Owner and its mapped versions.
+     */
+    @Nullable public GridCacheVersion owned(KeyCacheObject key) {
+        return owned == null ? null : owned.get(key);
+    }
+
+    /**
      * @param idx Entry index to check.
      * @return {@code True} if near entry should be invalidated.
      */
@@ -264,6 +299,19 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
         super.prepareMarshal(ctx);
 
         prepareMarshalCacheObjects(nearKeys, ctx.cacheContext(cacheId));
+
+        if (owned != null) {
+            ownedKeys = new KeyCacheObject[owned.size()];
+            ownedValues = new GridCacheVersion[ownedKeys.length];
+
+            int i = 0;
+
+            for (Map.Entry<KeyCacheObject, GridCacheVersion> entry : owned.entrySet()) {
+                ownedKeys[i] = entry.getKey();
+                ownedValues[i] = entry.getValue();
+                i++;
+            }
+        }
     }
 
     /** {@inheritDoc} */
@@ -271,6 +319,18 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
         super.finishUnmarshal(ctx, ldr);
 
         finishUnmarshalCacheObjects(nearKeys, ctx.cacheContext(cacheId), ldr);
+
+        if (ownedKeys != null) {
+            owned = new GridLeanMap<>(ownedKeys.length);
+
+            for (int i = 0; i < ownedKeys.length; i++) {
+                ownedKeys[i].finishUnmarshal(ctx.cacheContext(cacheId).cacheObjectContext(), ldr);
+                owned.put(ownedKeys[i], ownedValues[i]);
+            }
+
+            ownedKeys = null;
+            ownedValues = null;
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index a630642..6181ab5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -233,7 +233,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                             req.version(),
                             req.timeout(),
                             tx != null,
-                            tx != null && tx.implicitSingle()
+                            tx != null && tx.implicitSingle(),
+                            null
                         );
 
                         // Invalidate key in near cache, if any.
@@ -1060,6 +1061,13 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                 null);
 
             if (err == null) {
+                res.pending(localDhtPendingVersions(entries, mappedVer));
+
+                // We have to add completed versions for cases when nearLocal and remote transactions
+                // execute concurrently.
+                res.completedVersions(ctx.tm().committedVersions(req.version()),
+                    ctx.tm().rolledbackVersions(req.version()));
+
                 int i = 0;
 
                 for (ListIterator<GridCacheEntryEx> it = entries.listIterator(); it.hasNext();) {
@@ -1208,6 +1216,40 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
     }
 
     /**
+     * Collects versions of pending candidates versions less then base.
+     *
+     * @param entries Tx entries to process.
+     * @param baseVer Base version.
+     * @return Collection of pending candidates versions.
+     */
+    private Collection<GridCacheVersion> localDhtPendingVersions(Iterable<GridCacheEntryEx> entries,
+        GridCacheVersion baseVer) {
+        Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5);
+
+        for (GridCacheEntryEx entry : entries) {
+            // Since entries were collected before locks are added, some of them may become obsolete.
+            while (true) {
+                try {
+                    for (GridCacheMvccCandidate cand : entry.localCandidates()) {
+                        if (cand.version().isLess(baseVer))
+                            lessPending.add(cand.version());
+                    }
+
+                    break; // While.
+                }
+                catch (GridCacheEntryRemovedException ignored) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry is localDhtPendingVersions (will retry): " + entry);
+
+                    entry = entryExx(entry.key());
+                }
+            }
+        }
+
+        return lessPending;
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param req Request.
      */
@@ -1230,6 +1272,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                         entry.doneRemote(
                             req.version(),
                             req.version(),
+                            null,
+                            null,
+                            null,
                             /*system invalidate*/false);
 
                         // Note that we don't reorder completed versions here,
@@ -1446,6 +1491,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             }
         }
 
+        Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
+        Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+
         // Backups.
         for (Map.Entry<ClusterNode, List<KeyCacheObject>> entry : dhtMap.entrySet()) {
             ClusterNode n = entry.getKey();
@@ -1466,6 +1514,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                     for (KeyCacheObject key : keyBytes)
                         req.addNearKey(key, ctx.shared());
 
+                req.completedVersions(committed, rolledback);
+
                 ctx.io().send(n, req, ctx.ioPolicy());
             }
             catch (ClusterTopologyCheckedException ignore) {
@@ -1492,6 +1542,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                     for (KeyCacheObject key : keyBytes)
                         req.addNearKey(key, ctx.shared());
 
+                    req.completedVersions(committed, rolledback);
+
                     ctx.io().send(n, req, ctx.ioPolicy());
                 }
                 catch (ClusterTopologyCheckedException ignore) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 8bd518b..72e2f97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -304,6 +304,10 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.isSystemInvalidate(),
                 sync,
                 sync,
+                tx.completedBase(),
+                tx.committedVersions(),
+                tx.rolledbackVersions(),
+                tx.pendingVersions(),
                 tx.size(),
                 tx.subjectId(),
                 tx.taskNameHash());
@@ -377,6 +381,10 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.isSystemInvalidate(),
                 sync,
                 sync,
+                tx.completedBase(),
+                tx.committedVersions(),
+                tx.rolledbackVersions(),
+                tx.pendingVersions(),
                 tx.size(),
                 tx.subjectId(),
                 tx.taskNameHash());
@@ -426,6 +434,10 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                     tx.isSystemInvalidate(),
                     sync,
                     sync,
+                    tx.completedBase(),
+                    tx.committedVersions(),
+                    tx.rolledbackVersions(),
+                    tx.pendingVersions(),
                     tx.size(),
                     tx.subjectId(),
                     tx.taskNameHash());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 18191a7..3c33e87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -17,13 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.transactions.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
@@ -52,6 +54,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
+    /** Pending versions with order less than one for this message (needed for commit ordering). */
+    @GridToStringInclude
+    @GridDirectCollection(GridCacheVersion.class)
+    private Collection<GridCacheVersion> pendingVers;
+
     /** Check comitted flag. */
     private boolean checkCommitted;
 
@@ -86,6 +93,10 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param sysInvalidate System invalidation flag.
      * @param syncCommit Synchronous commit flag.
      * @param syncRollback Synchronous rollback flag.
+     * @param baseVer Base version.
+     * @param committedVers Committed versions.
+     * @param rolledbackVers Rolled back versions.
+     * @param pendingVers Pending versions.
      * @param txSize Expected transaction size.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash.
@@ -106,28 +117,35 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         boolean sysInvalidate,
         boolean syncCommit,
         boolean syncRollback,
+        GridCacheVersion baseVer,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers,
+        Collection<GridCacheVersion> pendingVers,
         int txSize,
         @Nullable UUID subjId,
         int taskNameHash
     ) {
         super(
-            xidVer, 
-            futId, 
-            commitVer, 
-            threadId, 
-            commit, 
-            invalidate, 
-            sys, 
-            plc, 
-            syncCommit, 
-            syncRollback, 
-            txSize
-        );
+            xidVer,
+            futId,
+            commitVer,
+            threadId,
+            commit,
+            invalidate,
+            sys,
+            plc,
+            syncCommit,
+            syncRollback,
+            baseVer,
+            committedVers,
+            rolledbackVers,
+            txSize);
 
         assert miniId != null;
         assert nearNodeId != null;
         assert isolation != null;
 
+        this.pendingVers = pendingVers;
         this.topVer = topVer;
         this.nearNodeId = nearNodeId;
         this.isolation = isolation;
@@ -199,13 +217,23 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     }
 
     /**
-     * @return Check committed flag.
+     * @return Topology version.
      */
     @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
     /**
+     * Gets versions of not acquired locks with version less then one of transaction being committed.
+     *
+     * @return Versions of locks for entries participating in transaction that have not been acquired yet
+     *      have version less then one of transaction being committed.
+     */
+    public Collection<GridCacheVersion> pendingVersions() {
+        return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
+    }
+
+    /**
      * @return Check committed flag.
      */
     public boolean checkCommitted() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 30cee9d..58576bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -65,6 +65,12 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     /** */
     protected boolean explicitLock;
 
+    /** */
+    private boolean needsCompletedVers;
+
+    /** Versions of pending locks for entries of this tx. */
+    private Collection<GridCacheVersion> pendingVers;
+
     /** Flag indicating that originating node has near cache. */
     private boolean nearOnOriginatingNode;
 
@@ -212,6 +218,32 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
     protected abstract void sendFinishReply(boolean commit, @Nullable Throwable err);
 
     /**
+     * @param needsCompletedVers {@code True} if needs completed versions.
+     */
+    public void needsCompletedVersions(boolean needsCompletedVers) {
+        this.needsCompletedVers |= needsCompletedVers;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean needsCompletedVersions() {
+        return needsCompletedVers;
+    }
+
+    /**
+     * @return Versions for all pending locks that were in queue before tx locks were released.
+     */
+    public Collection<GridCacheVersion> pendingVersions() {
+        return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
+    }
+
+    /**
+     * @param pendingVers Versions for all pending locks that were in queue before tx locsk were released.
+     */
+    public void pendingVersions(Collection<GridCacheVersion> pendingVers) {
+        this.pendingVers = pendingVers;
+    }
+
+    /**
      * @return DHT thread ID.
      */
     long dhtThreadId() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 67f6b0c..f117de6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -626,6 +626,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         if (prepErr == null) {
             addDhtValues(res);
 
+            GridCacheVersion min = tx.minVersion();
+
+            res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min));
+
+            res.pending(localDhtPendingVersions(tx.writeEntries(), min));
+
             tx.implicitSingleResult(ret);
         }
 
@@ -879,16 +885,20 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>();
                 Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>();
 
+                boolean hasRemoteNodes = false;
+
                 // Assign keys to primary nodes.
                 if (!F.isEmpty(writes)) {
                     for (IgniteTxEntry write : writes)
-                        map(tx.entry(write.txKey()), futDhtMap, futNearMap);
+                        hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap);
                 }
 
                 if (!F.isEmpty(reads)) {
                     for (IgniteTxEntry read : reads)
-                        map(tx.entry(read.txKey()), futDhtMap, futNearMap);
+                        hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap);
                 }
+
+                tx.needsCompletedVersions(hasRemoteNodes);
             }
 
             // We are holding transaction-level locks for entries here, so we can get next write version.
@@ -1202,6 +1212,32 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         return ret;
     }
 
+    /**
+     * Collects versions of pending candidates versions less than base.
+     *
+     * @param entries Tx entries to process.
+     * @param baseVer Base version.
+     * @return Collection of pending candidates versions.
+     */
+    private Collection<GridCacheVersion> localDhtPendingVersions(Iterable<IgniteTxEntry> entries,
+        GridCacheVersion baseVer) {
+        Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5);
+
+        for (IgniteTxEntry entry : entries) {
+            try {
+                for (GridCacheMvccCandidate cand : entry.cached().localCandidates()) {
+                    if (cand.version().isLess(baseVer))
+                        lessPending.add(cand.version());
+                }
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op, no candidates.
+            }
+        }
+
+        return lessPending;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtTxPrepareFuture.class, this, "xid", tx.xidVersion(), "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index e93812e..eb7c78f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -579,12 +579,17 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
             if (map == null || map.isEmpty())
                 return;
 
+            Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
+            Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+
             for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
                 ClusterNode n = mapping.getKey();
 
                 GridDistributedUnlockRequest req = mapping.getValue();
 
                 if (!F.isEmpty(req.keys())) {
+                    req.completedVersions(committed, rolledback);
+
                     try {
                         // We don't wait for reply to this message.
                         ctx.io().send(n, req, ctx.ioPolicy());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index a61f08d..5db76ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1074,7 +1074,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                         // returned value if any.
                                         entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer.get());
 
-                                        entry.readyNearLock(lockVer, mappedVer);
+                                        entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
+                                            res.rolledbackVersions(), res.pending());
 
                                         if (inTx() && implicitTx() && tx.onePhaseCommit()) {
                                             boolean pass = res.filterResult(i);
@@ -1456,7 +1457,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                                     }
                                 }
                                 
-                                entry.readyNearLock(lockVer, mappedVer);
+                                entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), res.pending());
 
                                 if (retval) {
                                     if (readRecordable)

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
index 5418dd1..7ab71aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -30,6 +31,7 @@ import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.nio.*;
+import java.util.*;
 
 /**
  * Near cache lock response.
@@ -38,6 +40,11 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Collection of versions that are pending and less than lock version. */
+    @GridToStringInclude
+    @GridDirectCollection(GridCacheVersion.class)
+    private Collection<GridCacheVersion> pending;
+
     /** */
     private IgniteUuid miniId;
 
@@ -104,6 +111,24 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
     }
 
     /**
+     * Gets pending versions that are less than {@link #version()}.
+     *
+     * @return Pending versions.
+     */
+    public Collection<GridCacheVersion> pending() {
+        return pending;
+    }
+
+    /**
+     * Sets pending versions that are less than {@link #version()}.
+     *
+     * @param pending Pending versions.
+     */
+    public void pending(Collection<GridCacheVersion> pending) {
+        this.pending = pending;
+    }
+
+    /**
      * @return Mini future ID.
      */
     public IgniteUuid miniId() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index c451bc8..a1f1383 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -207,6 +207,9 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                             entry.doneRemote(
                                 req.version(),
                                 req.version(),
+                                null,
+                                req.committedVersions(),
+                                req.rolledbackVersions(),
                                 /*system invalidate*/false);
 
                             // Note that we don't reorder completed versions here,
@@ -329,7 +332,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                                 req.version(),
                                 req.timeout(),
                                 tx != null,
-                                tx != null && tx.implicitSingle()
+                                tx != null && tx.implicitSingle(),
+                                req.owned(entry.key())
                             );
 
                             assert cands.isEmpty() : "Received non-empty candidates in dht lock request: " + cands;
@@ -686,12 +690,17 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
             if (map == null || map.isEmpty())
                 return;
 
+            Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver);
+            Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver);
+
             for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) {
                 ClusterNode n = mapping.getKey();
 
                 GridDistributedUnlockRequest req = mapping.getValue();
 
                 if (!F.isEmpty(req.keys())) {
+                    req.completedVersions(committed, rolledback);
+
                     // We don't wait for reply to this message.
                     ctx.io().send(n, req, ctx.ioPolicy());
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 95f5149..7b164b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -323,6 +323,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             // If checkBackup is set, it means that primary node has crashed and we will not need to send
             // finish request to it, so we can mark future as initialized.
             markInitialized();
+
+            return;
         }
 
         try {
@@ -405,6 +407,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                         false,
                         true,
                         true,
+                        null,
+                        null,
+                        null,
+                        null,
                         0,
                         null,
                         0);
@@ -486,6 +492,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
             m.explicitLock(),
             tx.storeEnabled(),
             tx.topologyVersion(),
+            null,
+            null,
+            null,
             tx.size(),
             tx.subjectId(),
             tx.taskNameHash()

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 028a9de..d20b8ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -71,6 +71,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param explicitLock Explicit lock flag.
      * @param storeEnabled Store enabled flag.
      * @param topVer Topology version.
+     * @param baseVer Base version.
+     * @param committedVers Committed versions.
+     * @param rolledbackVers Rolled back versions.
      * @param txSize Expected transaction size.
      */
     public GridNearTxFinishRequest(
@@ -85,21 +88,27 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
         boolean syncRollback,
         boolean explicitLock,
         boolean storeEnabled,
-        AffinityTopologyVersion topVer,
+        @NotNull AffinityTopologyVersion topVer,
+        GridCacheVersion baseVer,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers,
         int txSize,
         @Nullable UUID subjId,
         int taskNameHash) {
         super(
-            xidVer, 
-            futId, 
-            null, 
-            threadId, 
-            commit, 
-            invalidate, 
+            xidVer,
+            futId,
+            null,
+            threadId,
+            commit,
+            invalidate,
             sys,
             plc,
-            syncCommit, 
-            syncRollback, 
+            syncCommit,
+            syncRollback,
+            baseVer,
+            committedVers,
+            rolledbackVers,
             txSize
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index fbbddeb..ccdc4e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -559,8 +559,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
     /**
      * @param mapping Mapping to order.
+     * @param pendingVers Pending versions.
+     * @param committedVers Committed versions.
+     * @param rolledbackVers Rolled back versions.
      */
-    void readyNearLocks(GridDistributedTxMapping mapping) {
+    void readyNearLocks(GridDistributedTxMapping mapping,
+        Collection<GridCacheVersion> pendingVers,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers)
+    {
         Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), mapping.writes());
 
         for (IgniteTxEntry txEntry : entries) {
@@ -576,7 +583,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
                     GridCacheVersion explicit = txEntry.explicitVersion();
 
                     if (explicit == null)
-                        entry.readyNearLock(xidVer, mapping.dhtVersion());
+                        entry.readyNearLock(xidVer, mapping.dhtVersion(), committedVers, rolledbackVers, pendingVers);
 
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 257d214..b7a2fee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -225,7 +225,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundIdentit
             m.dhtVersion(res.dhtVersion(), writeVer);
 
             if (m.near())
-                tx.readyNearLocks(m);
+                tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 1cc8130..ea5e97a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -41,6 +41,11 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Collection of versions that are pending and less than lock version. */
+    @GridToStringInclude
+    @GridDirectCollection(GridCacheVersion.class)
+    private Collection<GridCacheVersion> pending;
+
     /** Future ID.  */
     private IgniteUuid futId;
 
@@ -132,6 +137,24 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     }
 
     /**
+     * Gets pending versions that are less than {@link #version()}.
+     *
+     * @return Pending versions.
+     */
+    public Collection<GridCacheVersion> pending() {
+        return pending == null ? Collections.<GridCacheVersion>emptyList() : pending;
+    }
+
+    /**
+     * Sets pending versions that are less than {@link #version()}.
+     *
+     * @param pending Pending versions.
+     */
+    public void pending(Collection<GridCacheVersion> pending) {
+        this.pending = pending;
+    }
+
+    /**
      * @return Mini future ID.
      */
     public IgniteUuid miniId() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index d5d0205..3f64224 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -207,8 +207,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
     }
 
     /** {@inheritDoc} */
-    public GridCacheVersion ownedVersion(IgniteTxKey key) {
-        // TODO ignite-264 do we need this method?
+    @Override public GridCacheVersion ownedVersion(IgniteTxKey key) {
         return owned == null ? null : owned.get(key);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index c572575..4e43d97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -305,6 +305,14 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     public Map<Integer, Set<Integer>> invalidPartitions();
 
     /**
+     * Gets owned version for near remote transaction.
+     *
+     * @param key Key to get version for.
+     * @return Owned version, if any.
+     */
+    @Nullable public GridCacheVersion ownedVersion(IgniteTxKey key);
+
+    /**
      * Gets ID of additional node involved. For example, in DHT case, other node is
      * near node ID.
      *
@@ -665,6 +673,19 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     public Collection<GridCacheVersion> alternateVersions();
 
     /**
+     * @return {@code True} if transaction needs completed versions for processing.
+     */
+    public boolean needsCompletedVersions();
+
+    /**
+     * @param base Base for committed versions.
+     * @param committed Committed transactions relative to base.
+     * @param rolledback Rolled back transactions relative to base.
+     */
+    public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed,
+        Collection<GridCacheVersion> rolledback);
+
+    /**
      * @return {@code True} if transaction has at least one internal entry.
      */
     public boolean internal();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index badcd46..f9b2437 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -696,6 +696,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public GridCacheVersion ownedVersion(IgniteTxKey key) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public long startTime() {
         return startTime;
     }
@@ -859,6 +864,17 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         awaitCompletion();
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean needsCompletedVersions() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed,
+        Collection<GridCacheVersion> txs) {
+        /* No-op. */
+    }
+
     /**
      * Awaits transaction completion.
      *
@@ -1780,6 +1796,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
+        @Nullable @Override public GridCacheVersion ownedVersion(IgniteTxKey key) {
+            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+        }
+
+        /** {@inheritDoc} */
         @Nullable @Override public UUID otherNodeId() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }
@@ -2083,6 +2104,16 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
+        @Override public boolean needsCompletedVersions() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void completedVersions(GridCacheVersion base, Collection committed, Collection rolledback) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean internal() {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6b5d850..7523af6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -919,9 +919,9 @@ public class IgniteTxHandler {
         if (tx == null) {
             if (req.commit())
                 // Must be some long time duplicate, but we add it anyway.
-                ctx.tm().addCommittedTx(req.writeVersion(), null);
+                ctx.tm().addCommittedTx(req.version(), null);
             else
-                ctx.tm().addRolledbackTx(req.writeVersion());
+                ctx.tm().addRolledbackTx(req.version());
 
             if (log.isDebugEnabled())
                 log.debug("Received finish request for non-existing transaction (added to completed set) " +
@@ -940,12 +940,12 @@ public class IgniteTxHandler {
                 tx.systemInvalidate(req.isSystemInvalidate());
 
                 // Complete remote candidates.
-                tx.doneRemote(req.version());
+                tx.doneRemote(req.baseVersion(), null, null, null);
 
                 tx.commit();
             }
             else {
-                tx.doneRemote(req.version());
+                tx.doneRemote(req.baseVersion(), null, null, null);
 
                 tx.rollback();
             }
@@ -985,7 +985,7 @@ public class IgniteTxHandler {
             tx.invalidate(req.isInvalidate());
 
             // Complete remote candidates.
-            tx.doneRemote(req.version());
+            tx.doneRemote(req.version(), null, null, null);
 
             tx.commit();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6dd5ee2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 31a9171..f41a819 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -77,6 +77,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     /** Flag indicating with TM commit happened. */
     protected AtomicBoolean doneFlag = new AtomicBoolean(false);
 
+    /** Committed versions, relative to base. */
+    private Collection<GridCacheVersion> committedVers = Collections.emptyList();
+
+    /** Rolled back versions, relative to base. */
+    private Collection<GridCacheVersion> rolledbackVers = Collections.emptyList();
+
+    /** Base for completed versions. */
+    private GridCacheVersion completedBase;
+
     /** Flag indicating that transformed values should be sent to remote nodes. */
     private boolean sndTransformedVals;
 
@@ -1021,6 +1030,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             if (doneFlag.compareAndSet(false, true)) {
                 // Unlock all locks.
                 cctx.tm().commitTx(this);
+
+                boolean needsCompletedVersions = needsCompletedVersions();
+
+                assert !needsCompletedVersions || completedBase != null;
+                assert !needsCompletedVersions || committedVers != null;
+                assert !needsCompletedVersions || rolledbackVers != null;
             }
         }
     }
@@ -1039,10 +1054,47 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 cctx.tm().rollbackTx(this);
 
             state(commit ? COMMITTED : ROLLED_BACK);
+
+            boolean needsCompletedVersions = needsCompletedVersions();
+
+            assert !needsCompletedVersions || completedBase != null;
+            assert !needsCompletedVersions || committedVers != null;
+            assert !needsCompletedVersions || rolledbackVers != null;
         }
     }
 
     /** {@inheritDoc} */
+    @Override public void completedVersions(
+        GridCacheVersion completedBase,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers) {
+        this.completedBase = completedBase;
+        this.committedVers = committedVers;
+        this.rolledbackVers = rolledbackVers;
+    }
+
+    /**
+     * @return Completed base for ordering.
+     */
+    public GridCacheVersion completedBase() {
+        return completedBase;
+    }
+
+    /**
+     * @return Committed versions.
+     */
+    public Collection<GridCacheVersion> committedVersions() {
+        return committedVers;
+    }
+
+    /**
+     * @return Rolledback versions.
+     */
+    public Collection<GridCacheVersion> rolledbackVersions() {
+        return rolledbackVers;
+    }
+
+    /** {@inheritDoc} */
     @Override public void userRollback() throws IgniteCheckedException {
         TransactionState state = state();
 


Mime
View raw message