ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/3] ignite git commit: ignite-3484
Date Thu, 07 Sep 2017 14:23:00 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 0559ccd..8e998b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -37,7 +37,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -427,7 +427,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             assert !tx.onePhaseCommit();
 
             if (mvccCrd.isLocal())
-                tx.mvccCoordinatorCounter(cctx.coordinators().requestTxCounterOnCoordinator(tx.nearXidVersion()));
+                tx.mvccCoordinatorCounter(cctx.coordinators().requestTxCounterOnCoordinator(tx));
             else {
                 IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, tx);
 
@@ -471,9 +471,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                         ", loc=" + ((MiniFuture)f).primary().isLocal() +
                         ", done=" + f.isDone() + "]";
                 }
-                else if (f instanceof CacheCoordinatorsSharedManager.MvccCounterFuture) {
-                    CacheCoordinatorsSharedManager.MvccCounterFuture crdFut =
-                        (CacheCoordinatorsSharedManager.MvccCounterFuture)f;
+                else if (f instanceof CacheCoordinatorsSharedManager.TxCounterFuture) {
+                    CacheCoordinatorsSharedManager.TxCounterFuture crdFut =
+                        (CacheCoordinatorsSharedManager.TxCounterFuture)f;
 
                     return "[crdNode=" + crdFut.crd.id() +
                         ", loc=" + crdFut.crd.isLocal() +
@@ -530,7 +530,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
             if (res.error() != null)
                 onError(res.error());
             else {
-                if (res.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA)
+                if (res.mvccCoordinatorCounter() != MvccUpdateVersion.COUNTER_NA)
                     tx.mvccCoordinatorCounter(res.mvccCoordinatorCounter());
 
                 onPrepareResponse(m, res, updateMapping);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index a399421..a7d9e60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -49,7 +49,7 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
                 @Override public void apply(final GridNearTxFinishFuture fut) {
                     GridNearTxLocal tx = fut.tx();
 
-                    if (tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA) {
+                    if (tx.mvccCoordinatorCounter() != MvccUpdateVersion.COUNTER_NA) {
                         ClusterNode crd = fut.context().coordinators().coordinator(tx.topologyVersion());
 
                         assert crd != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index eb1f79f..d949d35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -42,7 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -393,7 +393,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
     @SuppressWarnings("ForLoopReplaceableByForEach")
     /** {@inheritDoc} */
     public void finish(boolean commit) {
-        if (!commit && tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA) {
+        if (!commit && tx.mvccCoordinatorCounter() != MvccUpdateVersion.COUNTER_NA) {
             ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion());
 
             assert crd != null;
@@ -861,7 +861,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
             tx.activeCachesDeploymentEnabled(),
             !waitRemoteTxs && (tx.needReturnValue() && tx.implicit()),
             waitRemoteTxs,
-            TxMvccVersion.COUNTER_NA);
+            MvccUpdateVersion.COUNTER_NA);
 
         finishReq.checkCommitted(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index eb6d580..53ba43d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -24,7 +24,7 @@ import java.util.UUID;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.lang.IgniteUuid;
@@ -44,7 +44,7 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
     private int miniId;
 
     /** */
-    private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+    private long mvccCrdCntr = MvccUpdateVersion.COUNTER_NA;
 
     /**
      * Empty constructor required for {@link Externalizable}.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index e390932..6aefeb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLoca
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -1168,7 +1167,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                         resolveTaskName(),
                                         null,
                                         keepBinary,
-                                        TxMvccVersion.COUNTER_NA,
+                                        null,
                                         null) : null; // TODO IGNITE-3478
 
                                 if (res != null) {
@@ -1188,7 +1187,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     resolveTaskName(),
                                     null,
                                     keepBinary,
-                                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
+                                    null); // TODO IGNITE-3478
                             }
                         }
                         catch (ClusterTopologyCheckedException e) {
@@ -1773,7 +1772,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                             resolveTaskName(),
                                             null,
                                             txEntry.keepBinary(),
-                                            TxMvccVersion.COUNTER_NA,
+                                            null,
                                             null); // TODO IGNITE-3478
 
                                         if (getRes != null) {
@@ -1793,7 +1792,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                             resolveTaskName(),
                                             null,
                                             txEntry.keepBinary(),
-                                            TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
+                                            null); // TODO IGNITE-3478
                                     }
 
                                     // If value is in cache and passed the filter.
@@ -2069,7 +2068,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     resolveTaskName(),
                                     null,
                                     txEntry.keepBinary(),
-                                    TxMvccVersion.COUNTER_NA,
+                                    null,
                                     null); // TODO IGNITE-3478
 
                                 if (getRes != null) {
@@ -2089,7 +2088,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     resolveTaskName(),
                                     null,
                                     txEntry.keepBinary(),
-                                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
+                                    null); // TODO IGNITE-3478
                             }
 
                             if (val != null) {
@@ -2157,7 +2156,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                         resolveTaskName(),
                                         accessPlc,
                                         !deserializeBinary,
-                                        TxMvccVersion.COUNTER_NA,
+                                        null,
                                         null) : null; // TODO IGNITE-3478
 
                                 if (getRes != null) {
@@ -2177,7 +2176,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     resolveTaskName(),
                                     accessPlc,
                                     !deserializeBinary,
-                                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
+                                    null); // TODO IGNITE-3478
                             }
 
                             if (val != null) {
@@ -2645,7 +2644,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                             resolveTaskName(),
                             expiryPlc0,
                             txEntry == null ? keepBinary : txEntry.keepBinary(),
-                            TxMvccVersion.COUNTER_NA,
+                            null,
                             null); // TODO IGNITE-3478
 
                         if (res == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index a23ae4b..2371e99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -99,7 +99,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     private AffinityTopologyVersion clientRemapVer;
 
     /** */
-    private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+    private long mvccCrdCntr = MvccUpdateVersion.COUNTER_NA;
 
     /**
      * Empty constructor required by {@link Externalizable}.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index f53087d..0324d73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -54,9 +54,8 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.resource.GridResourceIoc;
@@ -463,7 +462,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                     taskName,
                                     expiry,
                                     !deserializeBinary,
-                                    TxMvccVersion.COUNTER_NA,
+                                    null,
                                     null);
 
                                 if (res != null) {
@@ -492,7 +491,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                     taskName,
                                     expiry,
                                     !deserializeBinary,
-                                    TxMvccVersion.COUNTER_NA);
+                                    null);
 
                                 if (v != null) {
                                     ctx.addResult(vals,
@@ -1048,7 +1047,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                             taskName,
                             null,
                             keepBinary,
-                            TxMvccVersion.COUNTER_NA);
+                            null);
 
                         Object oldVal = null;
 
@@ -1169,7 +1168,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                 taskName,
                                 null,
                                 keepBinary,
-                                TxMvccVersion.COUNTER_NA);
+                                null);
 
                             Object interceptorVal = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(
                                 ctx, entry.key(), old, keepBinary), val);
@@ -1203,7 +1202,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> {
                                 taskName,
                                 null,
                                 keepBinary,
-                                TxMvccVersion.COUNTER_NA);
+                                null);
 
                             IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor()
                                 .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, keepBinary));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index f780922..d1f445e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -61,13 +63,17 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
 
     /** */
-    private final ConcurrentHashMap<GridCacheVersion, Long> activeTxs = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<GridCacheVersion, MvccUpdateVersion> activeTxs = new ConcurrentHashMap<>();
 
     /** */
     private final Map<Long, Integer> activeQueries = new HashMap<>();
 
     /** */
-    private final ConcurrentMap<Long, MvccCounterFuture> cntrFuts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, TxCounterFuture> cntrFuts = new ConcurrentHashMap<>();
+
+    /** */
+    private final ConcurrentMap<Long, QueryVersionFuture> qryVerFuts = new ConcurrentHashMap<>();
+
 
     /** */
     private final ConcurrentMap<Long, TxAckFuture> ackFuts = new ConcurrentHashMap<>();
@@ -86,13 +92,17 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     }
 
     /**
-     * @param txVer Tx version.
+     * @param tx Transaction.
      * @return Counter.
      */
-    public long requestTxCounterOnCoordinator(GridCacheVersion txVer) {
+    public long requestTxCounterOnCoordinator(IgniteInternalTx tx) {
         assert cctx.localNode().equals(assignHist.currentCoordinator());
 
-        return assignTxCounter(txVer);
+        AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
+
+        assert txTopVer != null && txTopVer.initialized() : txTopVer;
+
+        return assignTxCounter(tx.nearXidVersion(), txTopVer.topologyVersion());
     }
 
     /**
@@ -103,14 +113,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     public IgniteInternalFuture<Long> requestTxCounter(ClusterNode crd, IgniteInternalTx tx) {
         assert !crd.isLocal() : crd;
 
-        MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd, tx);
+        AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
+
+        assert txTopVer != null && txTopVer.initialized() : txTopVer;
+
+        TxCounterFuture fut = new TxCounterFuture(futIdCntr.incrementAndGet(), crd, tx);
 
         cntrFuts.put(fut.id, fut);
 
         try {
             cctx.gridIO().sendToGridTopic(crd,
                 TOPIC_CACHE_COORDINATOR,
-                new CoordinatorTxCounterRequest(fut.id, tx.nearXidVersion()),
+                new CoordinatorTxCounterRequest(fut.id, tx.nearXidVersion(), txTopVer.topologyVersion()),
                 SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
@@ -145,15 +159,15 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param crd Coordinator.
      * @return Counter request future.
      */
-    public IgniteInternalFuture<Long> requestQueryCounter(ClusterNode crd) {
-        MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd, null);
+    public IgniteInternalFuture<MvccQueryVersion> requestQueryCounter(ClusterNode crd, long topVer) {
+        QueryVersionFuture fut = new QueryVersionFuture(futIdCntr.incrementAndGet(), topVer, crd);
 
-        cntrFuts.put(fut.id, fut);
+        qryVerFuts.put(fut.id, fut);
 
         try {
             cctx.gridIO().sendToGridTopic(crd,
                 TOPIC_CACHE_COORDINATOR,
-                new CoordinatorQueryCounterRequest(fut.id),
+                new CoordinatorQueryVersionRequest(fut.id),
                 SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
@@ -230,12 +244,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             return;
         }
 
-        long nextCtr = assignTxCounter(msg.txId());
+        long nextCtr = assignTxCounter(msg.txId(), msg.topologyVersion());
 
         try {
             cctx.gridIO().sendToGridTopic(node,
                 TOPIC_CACHE_COORDINATOR,
-                new CoordinatorMvccCounterResponse(nextCtr, msg.futureId()),
+                new CoordinatorTxCounterResponse(nextCtr, msg.futureId()),
                 SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException e) {
@@ -251,8 +265,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorCounterResponse(UUID nodeId, CoordinatorMvccCounterResponse msg) {
-        MvccCounterFuture fut = cntrFuts.remove(msg.futureId());
+    private void processCoordinatorCounterResponse(UUID nodeId, CoordinatorTxCounterResponse msg) {
+        TxCounterFuture fut = cntrFuts.remove(msg.futureId());
 
         if (fut != null)
             fut.onResponse(msg.counter());
@@ -260,15 +274,16 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             if (cctx.discovery().alive(nodeId))
                 U.warn(log, "Failed to find coordinator counter future [node=" + nodeId + ", msg=" + msg + ']');
             else if (log.isDebugEnabled())
-                log.debug("Failed to find query counter future [node=" + nodeId + ", msg=" + msg + ']');
+                log.debug("Failed to find coordinator counter future [node=" + nodeId + ", msg=" + msg + ']');
         }
     }
+
     /**
      *
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorQueryCounterRequest(UUID nodeId, CoordinatorQueryCounterRequest msg) {
+    private void processCoordinatorQueryVersionRequest(UUID nodeId, CoordinatorQueryVersionRequest msg) {
         ClusterNode node = cctx.discovery().node(nodeId);
 
         if (node == null) {
@@ -278,9 +293,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             return;
         }
 
-        long qryCntr = assignQueryCounter(nodeId);
-
-        CoordinatorMvccCounterResponse res = new CoordinatorMvccCounterResponse(qryCntr, msg.futureId());
+        CoordinatorQueryVersionResponse res = assignQueryCounter(nodeId, msg.futureId());
 
         try {
             cctx.gridIO().sendToGridTopic(node,
@@ -292,12 +305,29 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             if (log.isDebugEnabled())
                 log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
 
-            onQueryDone(qryCntr);
+            onQueryDone(res.counter());
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
 
-            onQueryDone(qryCntr);
+            onQueryDone(res.counter());
+        }
+    }
+
+    /**
+     * @param nodeId Sender node ID.
+     * @param msg Message.
+     */
+    private void processCoordinatorQueryVersionResponse(UUID nodeId, CoordinatorQueryVersionResponse msg) {
+        QueryVersionFuture fut = qryVerFuts.remove(msg.futureId());
+
+        if (fut != null)
+            fut.onResponse(msg);
+        else {
+            if (cctx.discovery().alive(nodeId))
+                U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
+            else if (log.isDebugEnabled())
+                log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
         }
     }
 
@@ -351,12 +381,15 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
     /**
      * @param txId Transaction ID.
+     * @param topVer Topology version.
      * @return Counter.
      */
-    private synchronized long assignTxCounter(GridCacheVersion txId) {
-        long nextCtr = mvccCntr.getAndIncrement();
+    private synchronized long assignTxCounter(GridCacheVersion txId, long topVer) {
+        long nextCtr = mvccCntr.incrementAndGet();
 
-        Object old = activeTxs.put(txId, nextCtr);
+        MvccUpdateVersion ver = new MvccUpdateVersion(topVer, nextCtr);
+
+        Object old = activeTxs.put(txId, ver);
 
         assert old == null : txId;
 
@@ -367,24 +400,28 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param txId Transaction ID.
      */
     private synchronized void onTxDone(GridCacheVersion txId) {
-        Long cntr = activeTxs.remove(txId);
+        MvccUpdateVersion ver = activeTxs.remove(txId);
 
-        assert cntr != null;
+        assert ver != null;
 
-        committedCntr.setIfGreater(cntr);
+        committedCntr.setIfGreater(ver.counter());
     }
 
     /**
      * @param qryNodeId Node initiated query.
      * @return Counter for query.
      */
-    private synchronized long assignQueryCounter(UUID qryNodeId) {
+    private synchronized CoordinatorQueryVersionResponse assignQueryCounter(UUID qryNodeId, long futId) {
         Long mvccCntr = committedCntr.get();
 
-        Long minActive = minActiveTx();
+        List<MvccUpdateVersion> txs = null;
 
-        if (minActive != null && minActive < mvccCntr)
-            mvccCntr = minActive - 1;
+        for (MvccUpdateVersion txVer : activeTxs.values()) {
+            if (txs == null)
+                txs = new ArrayList<>();
+
+            txs.add(txVer);
+        }
 
         Integer queries = activeQueries.get(mvccCntr);
 
@@ -393,7 +430,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         else
             activeQueries.put(mvccCntr, 1);
 
-        return mvccCntr;
+        return new CoordinatorQueryVersionResponse(futId, mvccCntr, txs);
     }
 
     /**
@@ -431,9 +468,9 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     @Nullable private Long minActiveTx() {
         Long min = null;
 
-        for (Map.Entry<GridCacheVersion, Long> e : activeTxs.entrySet()) {
-            if (min == null || e.getValue() < min)
-                min = e.getValue();
+        for (Map.Entry<GridCacheVersion, MvccUpdateVersion> e : activeTxs.entrySet()) {
+            if (min == null || e.getValue().counter() < min)
+                min = e.getValue().counter();
         }
 
         return min;
@@ -475,7 +512,51 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     /**
      *
      */
-    public class MvccCounterFuture extends GridFutureAdapter<Long> {
+    public class QueryVersionFuture extends GridFutureAdapter<MvccQueryVersion> {
+        /** */
+        private final Long id;
+
+        /** */
+        private long topVer;
+
+        /** */
+        public final ClusterNode crd;
+
+        /**
+         * @param id Future ID.
+         * @param topVer Topology version.
+         * @param crd Coordinator.
+         */
+        QueryVersionFuture(Long id, long topVer, ClusterNode crd) {
+            this.id = id;
+            this.topVer = topVer;
+            this.crd = crd;
+        }
+
+        /**
+         * @param res Response.
+         */
+        void onResponse(CoordinatorQueryVersionResponse res) {
+            assert res.counter() != MvccUpdateVersion.COUNTER_NA;
+
+            res.topologyVersion(topVer);
+
+            onDone(res);
+        }
+
+        /**
+         * @param nodeId Failed node ID.
+         */
+        void onNodeLeft(UUID nodeId) {
+            if (crd.id().equals(nodeId) && cntrFuts.remove(id) != null)
+                onDone(new ClusterTopologyCheckedException("Failed to request query version, coordinator failed: " + nodeId));
+        }
+    }
+
+    /**
+     *
+     */
+    public class TxCounterFuture extends GridFutureAdapter<Long> {
         /** */
         private final Long id;
 
@@ -489,7 +570,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
          * @param id Future ID.
          * @param crd Coordinator.
          */
-        MvccCounterFuture(Long id, ClusterNode crd, IgniteInternalTx tx) {
+        TxCounterFuture(Long id, ClusterNode crd, IgniteInternalTx tx) {
             this.id = id;
             this.crd = crd;
             this.tx = tx;
@@ -499,7 +580,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
          * @param cntr Counter.
          */
         void onResponse(long cntr) {
-            assert cntr != TxMvccVersion.COUNTER_NA;
+            assert cntr != MvccUpdateVersion.COUNTER_NA;
 
             if (tx != null)
                 tx.mvccCoordinatorCounter(cntr);
@@ -512,7 +593,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
          */
         void onNodeLeft(UUID nodeId) {
             if (crd.id().equals(nodeId) && cntrFuts.remove(id) != null)
-                onDone(new ClusterTopologyCheckedException("Failed to request counter, node failed: " + nodeId));
+                onDone(new ClusterTopologyCheckedException("Failed to request counter, coordinator failed: " + nodeId));
         }
     }
 
@@ -563,7 +644,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
             UUID nodeId = discoEvt.eventNode().id();
 
-            for (MvccCounterFuture fut : cntrFuts.values())
+            for (TxCounterFuture fut : cntrFuts.values())
+                fut.onNodeLeft(nodeId);
+
+            for (QueryVersionFuture fut : qryVerFuts.values())
                 fut.onNodeLeft(nodeId);
 
             for (TxAckFuture fut : ackFuts.values())
@@ -578,16 +662,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             if (msg instanceof CoordinatorTxCounterRequest)
                 processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
-            else if (msg instanceof CoordinatorMvccCounterResponse)
-                processCoordinatorCounterResponse(nodeId, (CoordinatorMvccCounterResponse)msg);
+            else if (msg instanceof CoordinatorTxCounterResponse)
+                processCoordinatorCounterResponse(nodeId, (CoordinatorTxCounterResponse)msg);
             else if (msg instanceof CoordinatorTxAckRequest)
                 processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
             else if (msg instanceof CoordinatorTxAckResponse)
                 processCoordinatorTxAckResponse(nodeId, (CoordinatorTxAckResponse)msg);
             else if (msg instanceof CoordinatorQueryAckRequest)
                 processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
-            else if (msg instanceof CoordinatorQueryCounterRequest)
-                processCoordinatorQueryCounterRequest(nodeId, (CoordinatorQueryCounterRequest)msg);
+            else if (msg instanceof CoordinatorQueryVersionRequest)
+                processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
+            else if (msg instanceof CoordinatorQueryVersionResponse)
+                processCoordinatorQueryVersionResponse(nodeId, (CoordinatorQueryVersionResponse) msg);
             else
                 U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java
deleted file mode 100644
index 5005477..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorMvccCounterResponse.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorMvccCounterResponse implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long cntr;
-
-    /** */
-    private long futId;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorMvccCounterResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param cntr Counter.
-     * @param futId Future ID.
-     */
-    CoordinatorMvccCounterResponse(long cntr, long futId) {
-        this.cntr = cntr;
-        this.futId = futId;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public long futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Counter.
-     */
-    public long counter() {
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("cntr", cntr))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cntr = reader.readLong("cntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorMvccCounterResponse.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 130;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorMvccCounterResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java
deleted file mode 100644
index 5dda247..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryCounterRequest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorQueryCounterRequest implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long futId;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorQueryCounterRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param futId Future ID.
-     */
-    CoordinatorQueryCounterRequest(long futId) {
-        this.futId = futId;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public long futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorQueryCounterRequest.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 133;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorQueryCounterRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionRequest.java
new file mode 100644
index 0000000..9d1cd5f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionRequest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorQueryVersionRequest implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long futId;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorQueryVersionRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param futId Future ID.
+     */
+    CoordinatorQueryVersionRequest(long futId) {
+        this.futId = futId;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorQueryVersionRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 133;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorQueryVersionRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionResponse.java
new file mode 100644
index 0000000..ea3e8d8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionResponse.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorQueryVersionResponse implements Message, MvccQueryVersion {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long cntr;
+
+    /** */
+    public long topVer;
+
+    /** */
+    @GridDirectCollection(MvccUpdateVersion.class)
+    private List<MvccUpdateVersion> txs;
+
+    /** */
+    private long futId;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorQueryVersionResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param cntr Counter.
+     * @param futId Future ID.
+     */
+    CoordinatorQueryVersionResponse(long futId, long cntr, List<MvccUpdateVersion> txs) {
+        this.futId = futId;
+        this.cntr = cntr;
+        this.txs = txs;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<MvccUpdateVersion> activeTransactions() {
+        return txs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void topologyVersion(long topVer) {
+        assert topVer > 0;
+
+        this.topVer = topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeCollection("txs", txs, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                topVer = reader.readLong("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                txs = reader.readCollection("txs", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorQueryVersionResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 136;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorQueryVersionResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
index dbdefda..8d5f699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
@@ -38,6 +38,9 @@ public class CoordinatorTxCounterRequest implements Message {
     /** */
     private GridCacheVersion txId;
 
+    /** */
+    private long topVer;
+
     /**
      * Required by {@link GridIoMessageFactory}.
      */
@@ -49,11 +52,16 @@ public class CoordinatorTxCounterRequest implements Message {
      * @param futId Future ID.
      * @param txId Transaction ID.
      */
-    CoordinatorTxCounterRequest(long futId, GridCacheVersion txId) {
+    CoordinatorTxCounterRequest(long futId, GridCacheVersion txId, long topVer) {
         assert txId != null;
 
         this.futId = futId;
         this.txId = txId;
+        this.topVer = topVer;
+    }
+
+    public long topologyVersion() {
+        return topVer;
     }
 
     /**
@@ -89,6 +97,12 @@ public class CoordinatorTxCounterRequest implements Message {
                 writer.incrementState();
 
             case 1:
+                if (!writer.writeLong("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
                 if (!writer.writeMessage("txId", txId))
                     return false;
 
@@ -116,6 +130,14 @@ public class CoordinatorTxCounterRequest implements Message {
                 reader.incrementState();
 
             case 1:
+                topVer = reader.readLong("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
                 txId = reader.readMessage("txId");
 
                 if (!reader.isLastRead())
@@ -135,7 +157,7 @@ public class CoordinatorTxCounterRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 2;
+        return 3;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterResponse.java
new file mode 100644
index 0000000..9a8064e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterResponse.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorTxCounterResponse implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long cntr;
+
+    /** */
+    private long futId;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public CoordinatorTxCounterResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param cntr Counter.
+     * @param futId Future ID.
+     */
+    CoordinatorTxCounterResponse(long cntr, long futId) {
+        this.cntr = cntr;
+        this.futId = futId;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Counter.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(CoordinatorTxCounterResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 130;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CoordinatorTxCounterResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryVersion.java
new file mode 100644
index 0000000..2c269dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryVersion.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.List;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ *
+ */
+public interface MvccQueryVersion extends Message {
+    /**
+     * @return Active transactions.
+     */
+    public List<MvccUpdateVersion> activeTransactions();
+
+    /**
+     * @return Topology version.
+     */
+    public long topologyVersion();
+
+    /**
+     * @param topVer Topology version.
+     */
+    public void topologyVersion(long topVer);
+
+    /**
+     * @return Counter.
+     */
+    public long counter();}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersion.java
new file mode 100644
index 0000000..d285782
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersion.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class MvccUpdateVersion implements Comparable<MvccUpdateVersion>, Message {
+    /** */
+    public static final long COUNTER_NA = 0L;
+
+    /** */
+    private long topVer;
+
+    /** */
+    private long cntr;
+
+    /**
+     *
+     */
+    public MvccUpdateVersion() {
+        // No-op.
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param cntr Coordinator counter.
+     */
+    public MvccUpdateVersion(long topVer, long cntr) {
+        assert topVer > 0 : topVer;
+        assert cntr != COUNTER_NA;
+
+        this.topVer = topVer;
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@NotNull MvccUpdateVersion other) {
+        int cmp = Long.compare(topVer, other.topVer);
+
+        if (cmp != 0)
+            return cmp;
+
+        return Long.compare(cntr, other.cntr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        MvccUpdateVersion that = (MvccUpdateVersion) o;
+
+        return topVer == that.topVer && cntr == that.cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = (int) (topVer ^ (topVer >>> 32));
+
+        res = 31 * res + (int) (cntr ^ (cntr >>> 32));
+
+        return res;
+    }
+
+    /**
+     * @return Coordinators topology version.
+     */
+    public long topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Counters.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                topVer = reader.readLong("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccUpdateVersion.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 135;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccUpdateVersion.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccVersion.java
deleted file mode 100644
index b47ed3c..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/TxMvccVersion.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.NotNull;
-
-/**
- *
- */
-public class TxMvccVersion implements Comparable<TxMvccVersion> {
-    /** */
-    public static final long COUNTER_NA = 0L;
-
-    /** */
-    private final long topVer;
-
-    /** */
-    private final long cntr;
-
-    /** */
-    private final GridCacheVersion txId;
-
-    /**
-     * @param topVer Topology version.
-     * @param cntr Coordinator counter.
-     * @param txId Transaction ID.
-     */
-    public TxMvccVersion(long topVer, long cntr, GridCacheVersion txId) {
-        assert topVer > 0 : topVer;
-        assert cntr != COUNTER_NA;
-        assert txId != null;
-
-        this.topVer = topVer;
-        this.cntr = cntr;
-        this.txId = txId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int compareTo(@NotNull TxMvccVersion other) {
-        int cmp = Long.compare(topVer, other.topVer);
-
-        if (cmp != 0)
-            return cmp;
-
-        return Long.compare(cntr, other.cntr);
-    }
-
-    /**
-     * @return Coordinators topology version.
-     */
-    public long topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * @return Counters.
-     */
-    public long counter() {
-        return cntr;
-    }
-
-    /**
-     * @return Transaction ID.
-     */
-    public GridCacheVersion txId() {
-        return txId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(TxMvccVersion.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 9ba2f39..c53aa25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryVersion;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeListImpl;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -1304,12 +1305,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public CacheDataRow findMvcc(GridCacheContext cctx, KeyCacheObject key, long topVer, long mvccCntr)
+        @Override public CacheDataRow mvccFind(GridCacheContext cctx, KeyCacheObject key, MvccQueryVersion mvccVer)
             throws IgniteCheckedException {
             CacheDataStore delegate = init0(true);
 
             if (delegate != null)
-                return delegate.findMvcc(cctx, key, topVer, mvccCntr);
+                return delegate.mvccFind(cctx, key, mvccVer);
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 2d96f72..05d2eb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -56,7 +56,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -247,7 +247,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     protected boolean storeEnabled = true;
 
     /** */
-    private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+    private long mvccCrdCntr = MvccUpdateVersion.COUNTER_NA;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -1445,7 +1445,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                     resolveTaskName(),
                     null,
                     keepBinary,
-                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
+                    null); // TODO IGNITE-3478
             }
 
             boolean modified = false;
@@ -1546,7 +1546,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
      * @return Mvcc version.
      */
     protected final long mvccCounterForCommit() {
-        assert !txState().mvccEnabled(cctx) || mvccCrdCntr != TxMvccVersion.COUNTER_NA : mvccCrdCntr;
+        assert !txState().mvccEnabled(cctx) || mvccCrdCntr != MvccUpdateVersion.COUNTER_NA : mvccCrdCntr;
 
         return mvccCrdCntr;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b29b702..beab17a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
@@ -1658,7 +1657,7 @@ public class IgniteTxHandler {
                                                 tx.resolveTaskName(),
                                                 /*expiryPlc*/null,
                                                 /*keepBinary*/true,
-                                                TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
+                                                null); // TODO IGNITE-3478
 
                                             if (val == null)
                                                 val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 82be466..1eb70c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -54,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -1098,7 +1097,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                     resolveTaskName(),
                                     null,
                                     txEntry.keepBinary(),
-                                    TxMvccVersion.COUNTER_NA); // TODO IGNITE-3478
+                                    null); // TODO IGNITE-3478
                             }
                         }
                         else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index 0b4664e..4c7e431 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -60,7 +60,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
 
         if (storeMvccVersion()) {
             assert row.mvccUpdateTopologyVersion() > 0 : row;
-            assert row.mvccUpdateCounter() != TxMvccVersion.COUNTER_NA : row;
+            assert row.mvccUpdateCounter() != MvccUpdateVersion.COUNTER_NA : row;
 
             PageUtils.putLong(pageAddr, off, row.mvccUpdateTopologyVersion());
             off += 8;
@@ -79,7 +79,12 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
             long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx);
             long mvccCntr = getMvccUpdateCounter(pageAddr, idx);
 
-            return ((CacheDataTree)tree).rowStore().mvccKeySearchRow(cacheId, hash, link, mvccTopVer, mvccCntr);
+            return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
+                hash,
+                link,
+                CacheDataRowAdapter.RowData.KEY_ONLY,
+                mvccTopVer,
+                mvccCntr);
         }
 
         return ((CacheDataTree)tree).rowStore().keySearchRow(cacheId, hash, link);
@@ -118,7 +123,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
             long mvcCntr = rowIo.getMvccUpdateCounter(srcPageAddr, srcIdx);
 
             assert mvccTopVer > 0 : mvccTopVer;
-            assert mvcCntr != TxMvccVersion.COUNTER_NA;
+            assert mvcCntr != MvccUpdateVersion.COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index d496103..e10f753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -62,7 +62,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = row.mvccUpdateCounter();
 
             assert mvccUpdateTopVer > 0 : mvccUpdateCntr;
-            assert mvccUpdateCntr != TxMvccVersion.COUNTER_NA;
+            assert mvccUpdateCntr != MvccUpdateVersion.COUNTER_NA;
 
             PageUtils.putLong(pageAddr, off, mvccUpdateTopVer);
             off += 8;
@@ -98,7 +98,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccUpdateCounter(srcPageAddr, srcIdx);
 
             assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
-            assert mvccUpdateCntr != TxMvccVersion.COUNTER_NA;
+            assert mvccUpdateCntr != MvccUpdateVersion.COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
             off += 8;
@@ -117,9 +117,10 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx);
             long mvccCntr = getMvccUpdateCounter(pageAddr, idx);
 
-            return ((CacheDataTree)tree).rowStore().mvccKeySearchRow(cacheId,
+            return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
                 hash,
                 link,
+                CacheDataRowAdapter.RowData.KEY_ONLY,
                 mvccTopVer,
                 mvccCntr);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e40b4d9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index 6774d3e..f9e1eb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -69,12 +69,12 @@ public class CacheDataRowStore extends RowStore {
      * @param mvccCntr
      * @return Search row.
      */
-    CacheSearchRow mvccKeySearchRow(int cacheId, int hash, long link, long mvccTopVer, long mvccCntr) {
+    MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long mvccTopVer, long mvccCntr) {
         MvccDataRow dataRow = new MvccDataRow(grp,
             hash,
             link,
             partId,
-            CacheDataRowAdapter.RowData.KEY_ONLY,
+            rowData,
             mvccTopVer,
             mvccCntr);
 


Mime
View raw message