ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/36] ignite git commit: ignite-3478
Date Fri, 08 Sep 2017 12:35:40 GMT
ignite-3478


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08be7310
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08be7310
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08be7310

Branch: refs/heads/ignite-6149
Commit: 08be7310a93d3ce455215b97cf8ab1a2c3f0ab31
Parents: 855c2d4
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Aug 31 12:52:23 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Aug 31 16:59:15 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java |   1 -
 .../discovery/GridDiscoveryManager.java         |   3 -
 .../processors/cache/GridCacheEntryEx.java      |   7 +-
 .../processors/cache/GridCacheMapEntry.java     |   7 +-
 .../GridDistributedTxRemoteAdapter.java         |  12 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |  12 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  49 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  47 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java | 487 ++++++++++---------
 .../dht/GridDhtTxPrepareRequest.java            |  63 ++-
 .../GridDhtPartitionsExchangeFuture.java        |   2 +
 .../GridNearPessimisticTxPrepareFuture.java     |  37 ++
 .../near/GridNearTxFinishFuture.java            |   4 +-
 .../cache/distributed/near/GridNearTxLocal.java |  20 +-
 .../near/GridNearTxPrepareRequest.java          |  17 +
 .../near/GridNearTxPrepareResponse.java         |  18 +
 .../mvcc/CacheCoordinatorsSharedManager.java    |  77 ++-
 .../mvcc/CoordinatorAssignmentHistory.java      |  71 +++
 .../cache/transactions/IgniteInternalTx.java    |   5 +
 .../cache/transactions/IgniteTxAdapter.java     |  36 ++
 .../cache/transactions/IgniteTxHandler.java     |  17 +-
 .../IgniteTxImplicitSingleStateImpl.java        |   7 +
 .../transactions/IgniteTxLocalAdapter.java      |  15 +-
 .../IgniteTxRemoteSingleStateImpl.java          |   5 +
 .../transactions/IgniteTxRemoteStateImpl.java   |  10 +
 .../cache/transactions/IgniteTxState.java       |   6 +
 .../cache/transactions/IgniteTxStateImpl.java   |   9 +
 .../processors/cache/GridCacheTestEntryEx.java  |   8 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |   2 +-
 29 files changed, 714 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 4b57eb8..95e855a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.managers.discovery;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 8e3f9fc..cbd2738 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -30,7 +30,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -43,9 +42,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.zip.CRC32;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index b2cabac..5b97195 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -381,7 +382,8 @@ public interface GridCacheEntryEx {
         @Nullable UUID subjId,
         String taskName,
         @Nullable GridCacheVersion dhtVer,
-        @Nullable Long updateCntr
+        @Nullable Long updateCntr,
+        @Nullable TxMvccVersion mvccVer
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
@@ -423,7 +425,8 @@ public interface GridCacheEntryEx {
         @Nullable UUID subjId,
         String taskName,
         @Nullable GridCacheVersion dhtVer,
-        @Nullable Long updateCntr
+        @Nullable Long updateCntr,
+        @Nullable TxMvccVersion mvccVer
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 61f6fb4..5336b22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
@@ -888,7 +889,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable UUID subjId,
         String taskName,
         @Nullable GridCacheVersion dhtVer,
-        @Nullable Long updateCntr
+        @Nullable Long updateCntr,
+        @Nullable TxMvccVersion mvccVer
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         CacheObject old;
 
@@ -1082,7 +1084,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable UUID subjId,
         String taskName,
         @Nullable GridCacheVersion dhtVer,
-        @Nullable Long updateCntr
+        @Nullable Long updateCntr,
+        @Nullable TxMvccVersion mvccVer
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert cctx.transactional();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index ea6461d..db1e2dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWra
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -474,6 +475,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                     cctx.database().checkpointReadLock();
 
                     try {
+                        TxMvccVersion mvccVer = createMvccVersion();
+
                         Collection<IgniteTxEntry> entries = near() ? allEntries() : writeEntries();
 
                         List<DataEntry> dataEntries = null;
@@ -594,7 +597,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                         CU.subjectId(this, cctx),
                                                         resolveTaskName(),
                                                         dhtVer,
-                                                        txEntry.updateCounter());
+                                                        txEntry.updateCounter(),
+                                                        mvccVer);
                                                 else {
                                                     assert val != null : txEntry;
 
@@ -618,7 +622,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                         CU.subjectId(this, cctx),
                                                         resolveTaskName(),
                                                         dhtVer,
-                                                        txEntry.updateCounter());
+                                                        txEntry.updateCounter(),
+                                                        mvccVer);
 
                                                     // Keep near entry up to date.
                                                     if (nearCached != null) {
@@ -650,7 +655,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                     CU.subjectId(this, cctx),
                                                     resolveTaskName(),
                                                     dhtVer,
-                                                    txEntry.updateCounter());
+                                                    txEntry.updateCounter(),
+                                                    mvccVer);
 
                                                 // Keep near entry up to date.
                                                 if (nearCached != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 5311ddc..9ca1412 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -347,7 +348,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 tx.taskNameHash(),
                 tx.activeCachesDeploymentEnabled(),
                 false,
-                false);
+                false,
+                TxMvccVersion.COUNTER_NA);
 
             try {
                 cctx.io().send(n, req, tx.ioPolicy());
@@ -395,6 +397,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
         if (tx.onePhaseCommit())
             return false;
 
+        assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA;
+
         boolean sync = tx.syncMode() == FULL_SYNC;
 
         if (tx.explicitLock())
@@ -450,7 +454,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 tx.activeCachesDeploymentEnabled(),
                 updCntrs,
                 false,
-                false);
+                false,
+                tx.mvccCoordinatorCounter());
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
 
@@ -519,7 +524,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                     tx.taskNameHash(),
                     tx.activeCachesDeploymentEnabled(),
                     false,
-                    false);
+                    false,
+                    tx.mvccCoordinatorCounter());
 
                 req.writeVersion(tx.writeVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 90f3687..976a534 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -25,6 +25,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -66,6 +67,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** One phase commit write version. */
     private GridCacheVersion writeVer;
 
+    /** */
+    private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -121,7 +125,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         int taskNameHash,
         boolean addDepInfo,
         boolean retVal,
-        boolean waitRemoteTxs
+        boolean waitRemoteTxs,
+        long mvccCrdCntr
     ) {
         super(
             xidVer,
@@ -150,6 +155,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         this.nearNodeId = nearNodeId;
         this.isolation = isolation;
         this.miniId = miniId;
+        this.mvccCrdCntr = mvccCrdCntr;
 
         needReturnValue(retVal);
         waitRemoteTransactions(waitRemoteTxs);
@@ -206,7 +212,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         boolean addDepInfo,
         Collection<Long> updateIdxs,
         boolean retVal,
-        boolean waitRemoteTxs
+        boolean waitRemoteTxs,
+        long mvccCrdCntr
     ) {
         this(nearNodeId,
             futId,
@@ -231,7 +238,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
             taskNameHash,
             addDepInfo,
             retVal,
-            waitRemoteTxs);
+            waitRemoteTxs,
+            mvccCrdCntr);
 
         if (updateIdxs != null && !updateIdxs.isEmpty()) {
             partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -242,6 +250,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     }
 
     /**
+     * @return Counter.
+     */
+    public long mvccCoordinatorCounter() {
+        return mvccCrdCntr;
+    }
+
+    /**
      * @return Partition update counters.
      */
     public GridLongList partUpdateCounters(){
@@ -367,24 +382,30 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
                     return false;
 
                 writer.incrementState();
 
             case 26:
+                if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 27:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -427,7 +448,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 23:
-                nearNodeId = reader.readUuid("nearNodeId");
+                mvccCrdCntr = reader.readLong("mvccCrdCntr");
 
                 if (!reader.isLastRead())
                     return false;
@@ -435,7 +456,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 24:
-                partUpdateCnt = reader.readMessage("partUpdateCnt");
+                nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -443,7 +464,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 25:
-                pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+                partUpdateCnt = reader.readMessage("partUpdateCnt");
 
                 if (!reader.isLastRead())
                     return false;
@@ -451,6 +472,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 26:
+                pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 27:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -470,7 +499,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 27;
+        return 28;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 5b8a7b5..44e2a54 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -313,24 +314,10 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     /**
      * Prepares next batch of entries in dht transaction.
      *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param verMap Version map.
-     * @param msgId Message ID.
-     * @param nearMiniId Near mini future ID.
-     * @param txNodes Transaction nodes mapping.
-     * @param last {@code True} if this is last prepare request.
+     * @param req Prepare request.
      * @return Future that will be completed when locks are acquired.
      */
-    public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
-        @Nullable Collection<IgniteTxEntry> reads,
-        @Nullable Collection<IgniteTxEntry> writes,
-        Map<IgniteTxKey, GridCacheVersion> verMap,
-        long msgId,
-        int nearMiniId,
-        Map<UUID, Collection<UUID>> txNodes,
-        boolean last
-    ) {
+    public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(GridNearTxPrepareRequest req) {
         // In optimistic mode prepare still can be called explicitly from salvageTx.
         GridDhtTxPrepareFuture fut = prepFut;
 
@@ -344,14 +331,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 cctx,
                 this,
                 timeout,
-                nearMiniId,
-                verMap,
-                last,
+                req.miniId(),
+                req.dhtVersions(),
+                req.last(),
                 needReturnValue()))) {
                 GridDhtTxPrepareFuture f = prepFut;
 
-                assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
-                    "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
+                assert f.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " +
+                    "[futMiniId=" + f.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + f + ']';
 
                 if (timeout == -1)
                     f.onError(timeoutException());
@@ -360,8 +347,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             }
         }
         else {
-            assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
-                "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';
+            assert fut.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " +
+                "[futMiniId=" + fut.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + fut + ']';
 
             // Prepare was called explicitly.
             return chainOnePhasePrepare(fut);
@@ -389,14 +376,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         }
 
         try {
-            if (reads != null) {
-                for (IgniteTxEntry e : reads)
-                    addEntry(msgId, e);
+            if (req.reads() != null) {
+                for (IgniteTxEntry e : req.reads())
+                    addEntry(req.messageId(), e);
             }
 
-            if (writes != null) {
-                for (IgniteTxEntry e : writes)
-                    addEntry(msgId, e);
+            if (req.writes() != null) {
+                for (IgniteTxEntry e : req.writes())
+                    addEntry(req.messageId(), e);
             }
 
             userPrepare(null);
@@ -407,7 +394,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             if (isSystemInvalidate())
                 fut.complete();
             else
-                fut.prepare(reads, writes, txNodes);
+                fut.prepare(req);
         }
         catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
             fut.onError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 03d99fc..a3d67d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -59,7 +59,9 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -84,6 +86,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFutureCancelledException;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -168,14 +171,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     @SuppressWarnings("UnusedDeclaration")
     private volatile int mapped;
 
-    /** Prepare reads. */
-    private Iterable<IgniteTxEntry> reads;
-
-    /** Prepare writes. */
-    private Iterable<IgniteTxEntry> writes;
-
-    /** Tx nodes. */
-    private Map<UUID, Collection<UUID>> txNodes;
+    /** Prepare request. */
+    private GridNearTxPrepareRequest req;
 
     /** Trackable flag. */
     private boolean trackable = true;
@@ -341,7 +338,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     private void onEntriesLocked() {
         ret = new GridCacheReturn(null, tx.localResult(), true, null, true);
 
-        for (IgniteTxEntry writeEntry : writes) {
+        for (IgniteTxEntry writeEntry : req.writes()) {
             IgniteTxEntry txEntry = tx.entry(writeEntry.txKey());
 
             assert txEntry != null : writeEntry;
@@ -597,10 +594,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
         if (log.isDebugEnabled())
             log.debug("Marking all local candidates as ready: " + this);
 
-        readyLocks(writes);
+        readyLocks(req.writes());
 
         if (tx.serializable() && tx.optimistic())
-            readyLocks(reads);
+            readyLocks(req.reads());
 
         locksReady = true;
     }
@@ -869,6 +866,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             tx.onePhaseCommit(),
             tx.activeCachesDeploymentEnabled());
 
+        res.mvccCoordinatorCounter(tx.mvccCoordinatorCounter());
+
         if (prepErr == null) {
             if (tx.needReturnValue() || tx.nearOnOriginatingNode() || tx.hasInterceptor())
                 addDhtValues(res);
@@ -896,8 +895,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
      */
     private void addDhtValues(GridNearTxPrepareResponse res) {
         // Interceptor on near node needs old values to execute callbacks.
-        if (!F.isEmpty(writes)) {
-            for (IgniteTxEntry e : writes) {
+        if (!F.isEmpty(req.writes())) {
+            for (IgniteTxEntry e : req.writes()) {
                 IgniteTxEntry txEntry = tx.entry(e.txKey());
 
                 assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
@@ -1002,33 +1001,30 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     /**
      * Initializes future.
      *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param txNodes Transaction nodes mapping.
+     * @param req Prepare request.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes,
-        Map<UUID, Collection<UUID>> txNodes) {
+    public void prepare(GridNearTxPrepareRequest req) {
+        assert req != null;
+
         if (tx.empty()) {
             tx.setRollbackOnly();
 
             onDone((GridNearTxPrepareResponse)null);
         }
 
-        this.reads = reads;
-        this.writes = writes;
-        this.txNodes = txNodes;
+        this.req = req;
 
         boolean ser = tx.serializable() && tx.optimistic();
 
-        if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) {
+        if (!F.isEmpty(req.writes()) || (ser && !F.isEmpty(req.reads()))) {
             Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
 
-            for (IgniteTxEntry entry : writes)
+            for (IgniteTxEntry entry : req.writes())
                 forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
 
             if (ser) {
-                for (IgniteTxEntry entry : reads)
+                for (IgniteTxEntry entry : req.reads())
                     forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
             }
 
@@ -1191,15 +1187,17 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
      *
      */
     private void prepare0() {
+        boolean skipInit = false;
+
         try {
             if (tx.serializable() && tx.optimistic()) {
                 IgniteCheckedException err0;
 
                 try {
-                    err0 = checkReadConflict(writes);
+                    err0 = checkReadConflict(req.writes());
 
                     if (err0 == null)
-                        err0 = checkReadConflict(reads);
+                        err0 = checkReadConflict(req.reads());
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to check entry version: " + e, e);
@@ -1225,264 +1223,317 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 }
             }
 
+            IgniteInternalFuture<Long> waitCoordCntrFut = null;
+
+            if (req.requestMvccCounter()) {
+                assert tx.txState().mvccEnabled(cctx);
+
+                ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion());
+
+                assert crd != null : tx.topologyVersion();
+
+                if (crd.isLocal())
+                    tx.mvccCoordinatorCounter(cctx.coordinators().requestTxCounterOnCoordinator(tx.nearXidVersion()));
+                else {
+                    IgniteInternalFuture<Long> coordCntrFut = cctx.coordinators().requestTxCounter(crd, tx);
+
+                    if (tx.onePhaseCommit())
+                        waitCoordCntrFut = coordCntrFut;
+                }
+            }
+
             // We are holding transaction-level locks for entries here, so we can get next write version.
             onEntriesLocked();
 
             // We are holding transaction-level locks for entries here, so we can get next write version.
             tx.writeVersion(cctx.versions().next(tx.topologyVersion()));
 
-            {
-                // Assign keys to primary nodes.
-                if (!F.isEmpty(writes)) {
-                    for (IgniteTxEntry write : writes)
-                        map(tx.entry(write.txKey()));
-                }
+            // Assign keys to primary nodes.
+            if (!F.isEmpty(req.writes())) {
+                for (IgniteTxEntry write : req.writes())
+                    map(tx.entry(write.txKey()));
+            }
 
-                if (!F.isEmpty(reads)) {
-                    for (IgniteTxEntry read : reads)
-                        map(tx.entry(read.txKey()));
-                }
+            if (!F.isEmpty(req.reads())) {
+                for (IgniteTxEntry read : req.reads())
+                    map(tx.entry(read.txKey()));
             }
 
             if (isDone())
                 return;
 
             if (last) {
-                if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
-                    for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
-                        if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
-                            tx.onePhaseCommit(false);
+                if (waitCoordCntrFut != null) {
+                    skipInit = true;
 
-                            break;
-                        }
-                    }
-                }
-
-                int miniId = 0;
+                    waitCoordCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() {
+                        @Override public void apply(IgniteInternalFuture<Long> fut) {
+                            try {
+                                fut.get();
 
-                assert tx.transactionNodes() != null;
+                                sendPrepareRequests();
+                            }
+                            catch (Throwable e) {
+                                U.error(log, "Failed to get coordinator counter: " + e, e);
 
-                final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
+                                GridNearTxPrepareResponse res = createPrepareResponse(e);
 
-                // Create mini futures.
-                for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
-                    assert !dhtMapping.empty();
+                                onDone(res, res.error());
+                            }
+                            finally {
+                                markInitialized();
+                            }
+                        }
+                    });
+                }
+                else
+                    sendPrepareRequests();
+            }
+        }
+        finally {
+            if (!skipInit)
+                markInitialized();
+        }
+    }
 
-                    ClusterNode n = dhtMapping.primary();
+    /**
+     *
+     */
+    private void sendPrepareRequests() {
+        if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
+            for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+                if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+                    tx.onePhaseCommit(false);
 
-                    assert !n.isLocal();
+                    break;
+                }
+            }
+        }
 
-                    GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
+        assert !tx.txState().mvccEnabled(cctx) || !tx.onePhaseCommit() || tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA;
 
-                    Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
+        int miniId = 0;
 
-                    Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
+        assert tx.transactionNodes() != null;
 
-                    if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
-                        continue;
+        final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
 
-                    if (tx.remainingTime() == -1)
-                        return;
+        // Create mini futures.
+        for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
+            assert !dhtMapping.empty();
 
-                    MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+            ClusterNode n = dhtMapping.primary();
 
-                    add(fut); // Append new future.
+            assert !n.isLocal();
 
-                    assert txNodes != null;
+            GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
 
-                    GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                        futId,
-                        fut.futureId(),
-                        tx.topologyVersion(),
-                        tx,
-                        timeout,
-                        dhtWrites,
-                        nearWrites,
-                        txNodes,
-                        tx.nearXidVersion(),
-                        true,
-                        tx.onePhaseCommit(),
-                        tx.subjectId(),
-                        tx.taskNameHash(),
-                        tx.activeCachesDeploymentEnabled(),
-                        tx.storeWriteThrough(),
-                        retVal);
+            Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
 
-                    int idx = 0;
+            Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
 
-                    for (IgniteTxEntry entry : dhtWrites) {
-                        try {
-                            GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
+            if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
+                continue;
 
-                            GridCacheContext<?, ?> cacheCtx = cached.context();
+            if (tx.remainingTime() == -1)
+                return;
 
-                            // Do not invalidate near entry on originating transaction node.
-                            req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
-                                cached.readerId(n.id()) != null);
+            MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+
+            add(fut); // Append new future.
+
+            assert req.transactionNodes() != null;
+
+            GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+                futId,
+                fut.futureId(),
+                tx.topologyVersion(),
+                tx,
+                timeout,
+                dhtWrites,
+                nearWrites,
+                this.req.transactionNodes(),
+                tx.nearXidVersion(),
+                true,
+                tx.onePhaseCommit(),
+                tx.subjectId(),
+                tx.taskNameHash(),
+                tx.activeCachesDeploymentEnabled(),
+                tx.storeWriteThrough(),
+                retVal,
+                tx.mvccCoordinatorCounter());
+
+            int idx = 0;
+
+            for (IgniteTxEntry entry : dhtWrites) {
+                try {
+                    GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
-                            if (cached.isNewLocked()) {
-                                List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
-                                    tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
+                    GridCacheContext<?, ?> cacheCtx = cached.context();
 
-                                // Do not preload if local node is a partition owner.
-                                if (!owners.contains(cctx.localNode()))
-                                    req.markKeyForPreload(idx);
-                            }
+                    // Do not invalidate near entry on originating transaction node.
+                    req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
+                        cached.readerId(n.id()) != null);
 
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            assert false : "Got removed exception on entry with dht local candidate: " + entry;
-                        }
+                    if (cached.isNewLocked()) {
+                        List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
+                            tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
 
-                        idx++;
+                        // Do not preload if local node is a partition owner.
+                        if (!owners.contains(cctx.localNode()))
+                            req.markKeyForPreload(idx);
                     }
 
-                    if (!F.isEmpty(nearWrites)) {
-                        for (IgniteTxEntry entry : nearWrites) {
-                            try {
-                                if (entry.explicitVersion() == null) {
-                                    GridCacheMvccCandidate added = entry.cached().candidate(version());
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    assert false : "Got removed exception on entry with dht local candidate: " + entry;
+                }
 
-                                    assert added != null : "Missing candidate for cache entry:" + entry;
-                                    assert added.dhtLocal();
+                idx++;
+            }
 
-                                    if (added.ownerVersion() != null)
-                                        req.owned(entry.txKey(), added.ownerVersion());
-                                }
+            if (!F.isEmpty(nearWrites)) {
+                for (IgniteTxEntry entry : nearWrites) {
+                    try {
+                        if (entry.explicitVersion() == null) {
+                            GridCacheMvccCandidate added = entry.cached().candidate(version());
 
-                                break;
-                            }
-                            catch (GridCacheEntryRemovedException ignore) {
-                                assert false : "Got removed exception on entry with dht local candidate: " + entry;
-                            }
+                            assert added != null : "Missing candidate for cache entry:" + entry;
+                            assert added.dhtLocal();
+
+                            if (added.ownerVersion() != null)
+                                req.owned(entry.txKey(), added.ownerVersion());
                         }
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        assert false : "Got removed exception on entry with dht local candidate: " + entry;
                     }
+                }
+            }
 
-                    assert req.transactionNodes() != null;
+            assert req.transactionNodes() != null;
 
-                    try {
-                        cctx.io().send(n, req, tx.ioPolicy());
+            try {
+                cctx.io().send(n, req, tx.ioPolicy());
 
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() +
-                                ", dhtTxId=" + tx.xidVersion() +
-                                ", node=" + n.id() + ']');
-                        }
-                    }
-                    catch (ClusterTopologyCheckedException ignored) {
-                        fut.onNodeLeft();
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() +
+                        ", dhtTxId=" + tx.xidVersion() +
+                        ", node=" + n.id() + ']');
+                }
+            }
+            catch (ClusterTopologyCheckedException ignored) {
+                fut.onNodeLeft();
+            }
+            catch (IgniteCheckedException e) {
+                if (!cctx.kernalContext().isStopping()) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() +
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + n.id() + ']');
                     }
-                    catch (IgniteCheckedException e) {
-                        if (!cctx.kernalContext().isStopping()) {
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() +
-                                    ", dhtTxId=" + tx.xidVersion() +
-                                    ", node=" + n.id() + ']');
-                            }
 
-                            fut.onResult(e);
-                        }
-                        else {
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() +
-                                    ", dhtTxId=" + tx.xidVersion() +
-                                    ", node=" + n.id() +
-                                    ", err=" + e + ']');
-                            }
-                        }
+                    fut.onResult(e);
+                }
+                else {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() +
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + n.id() +
+                            ", err=" + e + ']');
                     }
                 }
+            }
+        }
 
-                for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
-                    if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
-                        if (tx.remainingTime() == -1)
-                            return;
-
-                        MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
-
-                        add(fut); // Append new future.
-
-                        GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                            futId,
-                            fut.futureId(),
-                            tx.topologyVersion(),
-                            tx,
-                            timeout,
-                            null,
-                            nearMapping.writes(),
-                            tx.transactionNodes(),
-                            tx.nearXidVersion(),
-                            true,
-                            tx.onePhaseCommit(),
-                            tx.subjectId(),
-                            tx.taskNameHash(),
-                            tx.activeCachesDeploymentEnabled(),
-                            tx.storeWriteThrough(),
-                            retVal);
-
-                        for (IgniteTxEntry entry : nearMapping.entries()) {
-                            if (CU.writes().apply(entry)) {
-                                try {
-                                    if (entry.explicitVersion() == null) {
-                                        GridCacheMvccCandidate added = entry.cached().candidate(version());
+        for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+            if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+                if (tx.remainingTime() == -1)
+                    return;
 
-                                        assert added != null : "Null candidate for non-group-lock entry " +
-                                            "[added=" + added + ", entry=" + entry + ']';
-                                        assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
-                                            "[added=" + added + ", entry=" + entry + ']';
+                MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
+
+                add(fut); // Append new future.
+
+                GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+                    futId,
+                    fut.futureId(),
+                    tx.topologyVersion(),
+                    tx,
+                    timeout,
+                    null,
+                    nearMapping.writes(),
+                    tx.transactionNodes(),
+                    tx.nearXidVersion(),
+                    true,
+                    tx.onePhaseCommit(),
+                    tx.subjectId(),
+                    tx.taskNameHash(),
+                    tx.activeCachesDeploymentEnabled(),
+                    tx.storeWriteThrough(),
+                    retVal,
+                    tx.mvccCoordinatorCounter());
+
+                for (IgniteTxEntry entry : nearMapping.entries()) {
+                    if (CU.writes().apply(entry)) {
+                        try {
+                            if (entry.explicitVersion() == null) {
+                                GridCacheMvccCandidate added = entry.cached().candidate(version());
 
-                                        if (added != null && added.ownerVersion() != null)
-                                            req.owned(entry.txKey(), added.ownerVersion());
-                                    }
+                                assert added != null : "Null candidate for non-group-lock entry " +
+                                    "[added=" + added + ", entry=" + entry + ']';
+                                assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+                                    "[added=" + added + ", entry=" + entry + ']';
 
-                                    break;
-                                } catch (GridCacheEntryRemovedException ignore) {
-                                    assert false : "Got removed exception on entry with dht local candidate: " + entry;
-                                }
+                                if (added != null && added.ownerVersion() != null)
+                                    req.owned(entry.txKey(), added.ownerVersion());
                             }
+
+                            break;
+                        } catch (GridCacheEntryRemovedException ignore) {
+                            assert false : "Got removed exception on entry with dht local candidate: " + entry;
                         }
+                    }
+                }
 
-                        assert req.transactionNodes() != null;
+                assert req.transactionNodes() != null;
 
-                        try {
-                            cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
+                try {
+                    cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
 
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
-                                    ", dhtTxId=" + tx.xidVersion() +
-                                    ", node=" + nearMapping.primary().id() + ']');
-                            }
-                        }
-                        catch (ClusterTopologyCheckedException ignored) {
-                            fut.onNodeLeft();
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + nearMapping.primary().id() + ']');
+                    }
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    fut.onNodeLeft();
+                }
+                catch (IgniteCheckedException e) {
+                    if (!cctx.kernalContext().isStopping()) {
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
+                                ", dhtTxId=" + tx.xidVersion() +
+                                ", node=" + nearMapping.primary().id() + ']');
                         }
-                        catch (IgniteCheckedException e) {
-                            if (!cctx.kernalContext().isStopping()) {
-                                if (msgLog.isDebugEnabled()) {
-                                    msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
-                                        ", dhtTxId=" + tx.xidVersion() +
-                                        ", node=" + nearMapping.primary().id() + ']');
-                                }
 
-                                fut.onResult(e);
-                            }
-                            else {
-                                if (msgLog.isDebugEnabled()) {
-                                    msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
-                                        ", dhtTxId=" + tx.xidVersion() +
-                                        ", node=" + nearMapping.primary().id() +
-                                        ", err=" + e + ']');
-                                }
-                            }
+                        fut.onResult(e);
+                    }
+                    else {
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
+                                ", dhtTxId=" + tx.xidVersion() +
+                                ", node=" + nearMapping.primary().id() +
+                                ", err=" + e + ']');
                         }
                     }
                 }
             }
         }
-        finally {
-            markInitialized();
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index d334850..805c34d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -103,6 +104,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     @GridDirectTransient
     private List<IgniteTxKey> nearWritesCacheMissed;
 
+    /** */
+    private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -141,7 +145,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         int taskNameHash,
         boolean addDepInfo,
         boolean storeWriteThrough,
-        boolean retVal) {
+        boolean retVal,
+        long mvccCrdCntr) {
         super(tx,
             timeout,
             null,
@@ -169,6 +174,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size());
 
         nearNodeId = tx.nearNodeId();
+        this.mvccCrdCntr = mvccCrdCntr;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long mvccCoordinatorCounter() {
+        return mvccCrdCntr;
     }
 
     /**
@@ -407,54 +420,60 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeMessage("nearXidVer", nearXidVer))
+                if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeBitSet("preloadKeys", preloadKeys))
+                if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBitSet("preloadKeys", preloadKeys))
                     return false;
 
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 31:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 32:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -501,7 +520,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 23:
-                nearNodeId = reader.readUuid("nearNodeId");
+                mvccCrdCntr = reader.readLong("mvccCrdCntr");
 
                 if (!reader.isLastRead())
                     return false;
@@ -509,7 +528,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 24:
-                nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
+                nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -517,7 +536,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 25:
-                nearXidVer = reader.readMessage("nearXidVer");
+                nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -525,7 +544,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 26:
-                ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
+                nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -533,7 +552,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 27:
-                ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
+                ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -541,7 +560,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 28:
-                preloadKeys = reader.readBitSet("preloadKeys");
+                ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -549,7 +568,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 29:
-                subjId = reader.readUuid("subjId");
+                preloadKeys = reader.readBitSet("preloadKeys");
 
                 if (!reader.isLastRead())
                     return false;
@@ -557,7 +576,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 30:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -565,6 +584,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 31:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 32:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -584,6 +611,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 32;
+        return 33;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6d85222..82bd463 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1418,6 +1418,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
        }
 
         if (err == null) {
+            cctx.coordinators().assignCoordinator(exchCtx.events().discoveryCache());
+
             if (centralizedAff) {
                 assert !exchCtx.mergeExchanges();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index d017d7d..0cccce3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -262,6 +263,18 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
+        ClusterNode mvccCrd = null;
+
+        if (tx.txState().mvccEnabled(cctx)) {
+            mvccCrd = cctx.coordinators().coordinator(topVer);
+
+            if (mvccCrd == null) {
+                onDone(new ClusterTopologyCheckedException("Mvcc coordinator is not assigned: " + topVer));
+
+                return;
+            }
+        }
+
         GridDhtTxMapping txMapping = new GridDhtTxMapping();
 
         boolean hasNearCache = false;
@@ -326,6 +339,16 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         for (final GridDistributedTxMapping m : mappings.values()) {
             final ClusterNode primary = m.primary();
 
+            boolean needCntr = false;
+
+            if (mvccCrd != null) {
+                if (tx.onePhaseCommit() || mvccCrd.equals(primary)) {
+                    needCntr = true;
+
+                    mvccCrd = null;
+                }
+            }
+
             if (primary.isLocal()) {
                 if (m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) {
                     GridNearTxPrepareRequest nearReq = createRequest(txMapping.transactionNodes(),
@@ -334,6 +357,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                         m.nearEntriesReads(),
                         m.nearEntriesWrites());
 
+                    nearReq.requestMvccCounter(needCntr);
+
                     prepareLocal(nearReq, m, ++miniId, true);
 
                     GridNearTxPrepareRequest colocatedReq = createRequest(txNodes,
@@ -347,6 +372,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 else {
                     GridNearTxPrepareRequest req = createRequest(txNodes, m, timeout, m.reads(), m.writes());
 
+                    req.requestMvccCounter(needCntr);
+
                     prepareLocal(req, m, ++miniId, m.hasNearCacheEntries());
                 }
             }
@@ -357,6 +384,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                     m.reads(),
                     m.writes());
 
+                req.requestMvccCounter(needCntr);
+
                 final MiniFuture fut = new MiniFuture(m, ++miniId);
 
                 req.miniId(fut.futureId());
@@ -389,6 +418,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             }
         }
 
+        if (mvccCrd != null) {
+            assert !tx.onePhaseCommit();
+
+            IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, tx);
+
+            add((IgniteInternalFuture)cntrFut);
+        }
+
         markInitialized();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index c45eb7b..e093eeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -847,7 +848,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             0,
             tx.activeCachesDeploymentEnabled(),
             !waitRemoteTxs && (tx.needReturnValue() && tx.implicit()),
-            waitRemoteTxs);
+            waitRemoteTxs,
+            TxMvccVersion.COUNTER_NA);
 
         finishReq.checkCommitted(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 55d6bdd..8ecf21f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -105,14 +105,12 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
-import static org.apache.ignite.transactions.TransactionState.ACTIVE;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
 import static org.apache.ignite.transactions.TransactionState.PREPARING;
 import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
-import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
 import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
 
 /**
@@ -3338,19 +3336,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
     /**
      * Prepares next batch of entries in dht transaction.
      *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param txNodes Transaction nodes mapping.
-     * @param last {@code True} if this is last prepare request.
+     * @param req Prepare request.
      * @return Future that will be completed when locks are acquired.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
-        @Nullable Collection<IgniteTxEntry> reads,
-        @Nullable Collection<IgniteTxEntry> writes,
-        Map<UUID, Collection<UUID>> txNodes,
-        boolean last
-    ) {
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(GridNearTxPrepareRequest req) {
         long timeout = remainingTime();
 
         if (state() != PREPARING) {
@@ -3375,11 +3365,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
             timeout,
             0,
             Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
-            last,
+            req.last(),
             needReturnValue() && implicit());
 
         try {
-            userPrepare((serializable() && optimistic()) ? F.concat(false, writes, reads) : writes);
+            userPrepare((serializable() && optimistic()) ? F.concat(false, req.writes(), req.reads()) : req.writes());
 
             // Make sure to add future before calling prepare on it.
             cctx.mvcc().addFuture(fut);
@@ -3387,7 +3377,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
             if (isSystemInvalidate())
                 fut.complete();
             else
-                fut.prepare(reads, writes, txNodes);
+                fut.prepare(req);
         }
         catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
             fut.onError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index e352c87..e1c6636 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -57,6 +57,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** */
     private static final int ALLOW_WAIT_TOP_FUT_FLAG_MASK = 0x10;
 
+    /** */
+    private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x02;
+
     /** Future ID. */
     private IgniteUuid futId;
 
@@ -149,6 +152,20 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
     }
 
     /**
+     * @return {@code True} if need request MVCC counter on primary node on prepare step.
+     */
+    public boolean requestMvccCounter() {
+        return isFlag(REQUEST_MVCC_CNTR_FLAG_MASK);
+    }
+
+    /**
+     * @param val {@code True} if need request MVCC counter on primary node on prepare step.
+     */
+    public void requestMvccCounter(boolean val) {
+        setFlag(val, REQUEST_MVCC_CNTR_FLAG_MASK);
+    }
+
+    /**
      * @return {@code True} if it is safe for first client request to wait for topology future
      *      completion.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 8162168..4233371 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -97,6 +98,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     /** Not {@code null} if client node should remap transaction. */
     private AffinityTopologyVersion clientRemapVer;
 
+    /** */
+    private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -146,6 +150,20 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     }
 
     /**
+     * @param mvccCrdCntr Counter.
+     */
+    public void mvccCoordinatorCounter(long mvccCrdCntr) {
+        this.mvccCrdCntr = mvccCrdCntr;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long mvccCoordinatorCounter() {
+        return mvccCrdCntr;
+    }
+
+    /**
      * @return One-phase commit state on primary node.
      */
     public boolean onePhaseCommit() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index e5d07ea..ec29002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -31,9 +30,12 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
@@ -47,6 +49,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  */
 public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
     /** */
+    private final CoordinatorAssignmentHistory assignHist = new CoordinatorAssignmentHistory();
+
+    /** */
     private final AtomicLong mvccCntr = new AtomicLong(0L);
 
     /** */
@@ -74,20 +79,28 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         cctx.gridIO().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener());
     }
 
+    public long requestTxCounterOnCoordinator(GridCacheVersion txVer) {
+        assert cctx.localNode().equals(assignHist.currentCoordinator());
+
+        return assignTxCounter(txVer);
+    }
+
     /**
      * @param crd Coordinator.
-     * @param txId Transaction ID.
+     * @param tx Transaction.
      * @return Counter request future.
      */
-    public IgniteInternalFuture<Long> requestTxCounter(ClusterNode crd, GridCacheVersion txId) {
-        MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd);
+    public IgniteInternalFuture<Long> requestTxCounter(ClusterNode crd, IgniteInternalTx tx) {
+        assert !crd.isLocal() : crd;
+
+        MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd, tx);
 
         cntrFuts.put(fut.id, fut);
 
         try {
             cctx.gridIO().sendToGridTopic(crd,
                 TOPIC_CACHE_COORDINATOR,
-                new CoordinatorTxCounterRequest(fut.id, txId),
+                new CoordinatorTxCounterRequest(fut.id, tx.nearXidVersion()),
                 SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
@@ -98,8 +111,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         return fut;
     }
 
+    /**
+     * @param crd Coordinator.
+     * @return Counter request future.
+     */
     public IgniteInternalFuture<Long> requestQueryCounter(ClusterNode crd) {
-        MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd);
+        MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd, null);
 
         cntrFuts.put(fut.id, fut);
 
@@ -118,6 +135,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     }
 
     /**
+     * @param crd Coordinator.
      * @param txId Transaction ID.
      * @return Acknowledge future.
      */
@@ -144,6 +162,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         return fut;
     }
 
+    /**
+     * @param crd Coordinator.
+     * @param txId Transaction ID.
+     */
     public void ackTxRollback(ClusterNode crd, GridCacheVersion txId) {
         CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, txId);
 
@@ -329,14 +351,32 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     }
 
     /**
-     * @param discoCache Cluster topology.
-     * @return Assigned coordinator.
+     * @param topVer Topology version.
+     * @return MVCC coordinator for given topology version.
      */
-    @Nullable public ClusterNode assignCoordinator(DiscoCache discoCache) {
-        // TODO IGNITE-3478
-        List<ClusterNode> srvNodes = discoCache.serverNodes();
+    @Nullable public ClusterNode coordinator(AffinityTopologyVersion topVer) {
+        return assignHist.coordinator(topVer);
+    }
+
+    /**
+     * @param discoCache Discovery snapshot.
+     */
+    public void assignCoordinator(DiscoCache discoCache) {
+        ClusterNode curCrd = assignHist.currentCoordinator();
+
+        if (curCrd == null || !discoCache.allNodes().contains(curCrd)) {
+            ClusterNode newCrd = null;
+
+            if (!discoCache.serverNodes().isEmpty())
+                newCrd = discoCache.serverNodes().get(0);
 
-        return srvNodes.isEmpty() ? null : srvNodes.get(0);
+            if (!F.eq(curCrd, newCrd)) {
+                assignHist.addAssignment(discoCache.version(), newCrd);
+
+                log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() +
+                    ", crd=" + newCrd + ']');
+            }
+        }
     }
 
     /**
@@ -347,21 +387,30 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         private final Long id;
 
         /** */
+        private IgniteInternalTx tx;
+
+        /** */
         private final ClusterNode crd;
 
         /**
          * @param id Future ID.
          * @param crd Coordinator.
          */
-        MvccCounterFuture(Long id, ClusterNode crd) {
+        MvccCounterFuture(Long id, ClusterNode crd, IgniteInternalTx tx) {
             this.id = id;
             this.crd = crd;
+            this.tx = tx;
         }
 
         /**
          * @param cntr Counter.
          */
         void onResponse(long cntr) {
+            assert cntr != TxMvccVersion.COUNTER_NA;
+
+            if (tx != null)
+                tx.mvccCoordinatorCounter(cntr);
+
             onDone(cntr);
         }
 
@@ -448,7 +497,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             else if (msg instanceof CoordinatorQueryCounterRequest)
                 processCoordinatorQueryStateRequest(nodeId, (CoordinatorQueryCounterRequest)msg);
             else
-                U.warn(log, "Unexpected message received: " + msg);
+                U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
         }
     }
 }


Mime
View raw message