ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [34/36] ignite git commit: ignite-3484
Date Fri, 08 Sep 2017 12:36:06 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index d1f445e..ccd22d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -24,21 +24,25 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -54,6 +58,8 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  */
 public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
     /** */
+    public static final long COUNTER_NA = 0L;
+    /** */
     private final CoordinatorAssignmentHistory assignHist = new CoordinatorAssignmentHistory();
 
     /** */
@@ -63,23 +69,25 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
 
     /** */
-    private final ConcurrentHashMap<GridCacheVersion, MvccUpdateVersion> activeTxs = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<GridCacheVersion, Long> activeTxs = new ConcurrentHashMap<>();
 
     /** */
     private final Map<Long, Integer> activeQueries = new HashMap<>();
 
     /** */
-    private final ConcurrentMap<Long, TxCounterFuture> cntrFuts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, MvccVersionFuture> verFuts = new ConcurrentHashMap<>();
 
     /** */
-    private final ConcurrentMap<Long, QueryVersionFuture> qryVerFuts = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, TxAckFuture> ackFuts = new ConcurrentHashMap<>();
 
+    /** */
+    private final AtomicLong futIdCntr = new AtomicLong();
 
     /** */
-    private final ConcurrentMap<Long, TxAckFuture> ackFuts = new ConcurrentHashMap<>();
+    private final CountDownLatch crdLatch = new CountDownLatch(1);
 
     /** */
-    private final AtomicLong futIdCntr = new AtomicLong();
+    private long crdVer;
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
@@ -95,14 +103,14 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param tx Transaction.
      * @return Counter.
      */
-    public long requestTxCounterOnCoordinator(IgniteInternalTx tx) {
+    public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) {
         assert cctx.localNode().equals(assignHist.currentCoordinator());
 
         AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
 
         assert txTopVer != null && txTopVer.initialized() : txTopVer;
 
-        return assignTxCounter(tx.nearXidVersion(), txTopVer.topologyVersion());
+        return assignTxCounter(tx.nearXidVersion(), 0L, txTopVer.topologyVersion());
     }
 
     /**
@@ -110,16 +118,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param tx Transaction.
      * @return Counter request future.
      */
-    public IgniteInternalFuture<Long> requestTxCounter(ClusterNode crd, IgniteInternalTx tx) {
+    public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd, GridDhtTxLocalAdapter tx) {
         assert !crd.isLocal() : crd;
 
         AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
 
         assert txTopVer != null && txTopVer.initialized() : txTopVer;
 
-        TxCounterFuture fut = new TxCounterFuture(futIdCntr.incrementAndGet(), crd, tx);
+        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(),
+            crd,
+            tx);
 
-        cntrFuts.put(fut.id, fut);
+        verFuts.put(fut.id, fut);
 
         try {
             cctx.gridIO().sendToGridTopic(crd,
@@ -128,7 +138,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
                 SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
-            if (cntrFuts.remove(fut.id) != null)
+            if (verFuts.remove(fut.id) != null)
                 fut.onDone(e);
         }
 
@@ -159,10 +169,13 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param crd Coordinator.
      * @return Counter request future.
      */
-    public IgniteInternalFuture<MvccQueryVersion> requestQueryCounter(ClusterNode crd, long topVer) {
-        QueryVersionFuture fut = new QueryVersionFuture(futIdCntr.incrementAndGet(), topVer, crd);
+    public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(ClusterNode crd) {
+        assert crd != null;
 
-        qryVerFuts.put(fut.id, fut);
+        // TODO IGNITE-3478: special case for local?
+        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, null);
+
+        verFuts.put(fut.id, fut);
 
         try {
             cctx.gridIO().sendToGridTopic(crd,
@@ -171,7 +184,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
                 SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
-            if (cntrFuts.remove(fut.id) != null)
+            if (verFuts.remove(fut.id) != null)
                 fut.onDone(e);
         }
 
@@ -195,11 +208,11 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
                 SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException e) {
-            if (cntrFuts.remove(fut.id) != null)
-                fut.onDone();
+            if (ackFuts.remove(fut.id) != null)
+                fut.onDone(); // No need to ack, finish without error.
         }
         catch (IgniteCheckedException e) {
-            if (cntrFuts.remove(fut.id) != null)
+            if (ackFuts.remove(fut.id) != null)
                 fut.onDone(e);
         }
 
@@ -244,12 +257,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             return;
         }
 
-        long nextCtr = assignTxCounter(msg.txId(), msg.topologyVersion());
+        MvccCoordinatorVersionResponse res = assignTxCounter(msg.txId(), msg.futureId(), msg.topologyVersion());
 
         try {
             cctx.gridIO().sendToGridTopic(node,
                 TOPIC_CACHE_COORDINATOR,
-                new CoordinatorTxCounterResponse(nextCtr, msg.futureId()),
+                res,
                 SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException e) {
@@ -262,23 +275,6 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     }
 
     /**
-     * @param nodeId Sender node ID.
-     * @param msg Message.
-     */
-    private void processCoordinatorCounterResponse(UUID nodeId, CoordinatorTxCounterResponse msg) {
-        TxCounterFuture fut = cntrFuts.remove(msg.futureId());
-
-        if (fut != null)
-            fut.onResponse(msg.counter());
-        else {
-            if (cctx.discovery().alive(nodeId))
-                U.warn(log, "Failed to find coordinator counter future [node=" + nodeId + ", msg=" + msg + ']');
-            else if (log.isDebugEnabled())
-                log.debug("Failed to find coordinator counter future [node=" + nodeId + ", msg=" + msg + ']');
-        }
-    }
-
-    /**
      *
      * @param nodeId Sender node ID.
      * @param msg Message.
@@ -293,7 +289,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             return;
         }
 
-        CoordinatorQueryVersionResponse res = assignQueryCounter(nodeId, msg.futureId());
+        MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, msg.futureId());
 
         try {
             cctx.gridIO().sendToGridTopic(node,
@@ -318,8 +314,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param nodeId Sender node ID.
      * @param msg Message.
      */
-    private void processCoordinatorQueryVersionResponse(UUID nodeId, CoordinatorQueryVersionResponse msg) {
-        QueryVersionFuture fut = qryVerFuts.remove(msg.futureId());
+    private void processCoordinatorQueryVersionResponse(UUID nodeId, MvccCoordinatorVersionResponse msg) {
+        MvccVersionFuture fut = verFuts.remove(msg.futureId());
 
         if (fut != null)
             fut.onResponse(msg);
@@ -384,41 +380,60 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param topVer Topology version.
      * @return Counter.
      */
-    private synchronized long assignTxCounter(GridCacheVersion txId, long topVer) {
+    private synchronized MvccCoordinatorVersionResponse assignTxCounter(GridCacheVersion txId, long futId, long topVer) {
+        assert crdVer != 0;
+
         long nextCtr = mvccCntr.incrementAndGet();
 
-        MvccUpdateVersion ver = new MvccUpdateVersion(topVer, nextCtr);
+        // TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
+        GridLongList txs = null;
+
+        for (Long txVer : activeTxs.values()) {
+            if (txs == null)
+                txs = new GridLongList();
+
+            txs.add(txVer);
+        }
 
-        Object old = activeTxs.put(txId, ver);
+        Object old = activeTxs.put(txId, nextCtr);
 
         assert old == null : txId;
 
-        return nextCtr;
+        long minQry = 0;
+
+        for (Long qryCntr : activeQueries.keySet()) {
+            if (qryCntr < minQry)
+                minQry = qryCntr;
+        }
+
+        return new MvccCoordinatorVersionResponse(futId, crdVer, nextCtr, txs, minQry);
     }
 
     /**
      * @param txId Transaction ID.
      */
     private synchronized void onTxDone(GridCacheVersion txId) {
-        MvccUpdateVersion ver = activeTxs.remove(txId);
+        Long cntr = activeTxs.remove(txId);
 
-        assert ver != null;
+        assert cntr != null;
 
-        committedCntr.setIfGreater(ver.counter());
+        committedCntr.setIfGreater(cntr);
     }
 
     /**
      * @param qryNodeId Node initiated query.
      * @return Counter for query.
      */
-    private synchronized CoordinatorQueryVersionResponse assignQueryCounter(UUID qryNodeId, long futId) {
+    private synchronized MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) {
+        assert crdVer != 0;
+
         Long mvccCntr = committedCntr.get();
 
-        List<MvccUpdateVersion> txs = null;
+        GridLongList txs = null;
 
-        for (MvccUpdateVersion txVer : activeTxs.values()) {
+        for (Long txVer : activeTxs.values()) {
             if (txs == null)
-                txs = new ArrayList<>();
+                txs = new GridLongList();
 
             txs.add(txVer);
         }
@@ -430,7 +445,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         else
             activeQueries.put(mvccCntr, 1);
 
-        return new CoordinatorQueryVersionResponse(futId, mvccCntr, txs);
+        return new MvccCoordinatorVersionResponse(futId, crdVer, mvccCntr, txs, COUNTER_NA);
     }
 
     /**
@@ -449,33 +464,6 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             activeQueries.remove(mvccCntr);
     }
 
-    private synchronized long cleanupVersion() {
-        long cntr = committedCntr.get();
-
-        Long minActive = minActiveTx();
-
-        if (minActive != null && minActive < cntr)
-            cntr = minActive - 1;
-
-        for (Long qryCntr : activeQueries.keySet()) {
-            if (qryCntr <= cntr)
-                cntr = qryCntr - 1;
-        }
-
-        return cntr;
-    }
-
-    @Nullable private Long minActiveTx() {
-        Long min = null;
-
-        for (Map.Entry<GridCacheVersion, MvccUpdateVersion> e : activeTxs.entrySet()) {
-            if (min == null || e.getValue().counter() < min)
-                min = e.getValue().counter();
-        }
-
-        return min;
-    }
-
     /**
      * @param topVer Topology version.
      * @return MVCC coordinator for given topology version.
@@ -499,6 +487,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             if (!F.eq(curCrd, newCrd)) {
                 assignHist.addAssignment(discoCache.version(), newCrd);
 
+                if (cctx.localNode().equals(newCrd)) {
+                    crdVer = discoCache.version().topologyVersion();
+
+                    crdLatch.countDown();
+                }
+
                 log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() +
                     ", crd=" + newCrd + ']');
 
@@ -512,88 +506,46 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     /**
      *
      */
-    public class QueryVersionFuture extends GridFutureAdapter<MvccQueryVersion> {
+    public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> {
         /** */
         private final Long id;
 
         /** */
-        private long topVer;
+        private GridDhtTxLocalAdapter tx;
 
         /** */
         public final ClusterNode crd;
 
         /**
          * @param id Future ID.
-         * @param topVer Topology version.
          * @param crd Coordinator.
          */
-        QueryVersionFuture(Long id, long topVer, ClusterNode crd) {
-            this.id = id;
-            this.topVer = topVer;
-            this.crd = crd;
-        }
-
-        /**
-         * @param res Response.
-         */
-        void onResponse(CoordinatorQueryVersionResponse res) {
-            assert res.counter() != MvccUpdateVersion.COUNTER_NA;
-
-            res.topologyVersion(topVer);
-
-            onDone(res);
-        }
-
-        /**
-         * @param nodeId Failed node ID.
-         */
-        void onNodeLeft(UUID nodeId) {
-            if (crd.id().equals(nodeId) && cntrFuts.remove(id) != null)
-                onDone(new ClusterTopologyCheckedException("Failed to request query version, coordinator failed: " + nodeId));
-        }
-    }
-
-    /**
-     *
-     */
-    public class TxCounterFuture extends GridFutureAdapter<Long> {
-        /** */
-        private final Long id;
-
-        /** */
-        private IgniteInternalTx tx;
-
-        /** */
-        public final ClusterNode crd;
-
-        /**
-         * @param id Future ID.
-         * @param crd Coordinator.
-         */
-        TxCounterFuture(Long id, ClusterNode crd, IgniteInternalTx tx) {
+        MvccVersionFuture(Long id, ClusterNode crd, @Nullable GridDhtTxLocalAdapter tx) {
             this.id = id;
             this.crd = crd;
             this.tx = tx;
         }
 
         /**
-         * @param cntr Counter.
+         * @param res Response.
          */
-        void onResponse(long cntr) {
-            assert cntr != MvccUpdateVersion.COUNTER_NA;
+        void onResponse(MvccCoordinatorVersionResponse res) {
+            assert res.counter() != COUNTER_NA;
 
             if (tx != null)
-                tx.mvccCoordinatorCounter(cntr);
+                tx.mvccCoordinatorVersion(res);
 
-            onDone(cntr);
+            onDone(res);
         }
 
         /**
          * @param nodeId Failed node ID.
          */
         void onNodeLeft(UUID nodeId) {
-            if (crd.id().equals(nodeId) && cntrFuts.remove(id) != null)
-                onDone(new ClusterTopologyCheckedException("Failed to request counter, coordinator failed: " + nodeId));
+            if (crd.id().equals(nodeId) && verFuts.remove(id) != null) {
+                onDone(new ClusterTopologyCheckedException("Failed to request coordinator version, " +
+                    "coordinator failed: " + nodeId));
+            }
         }
     }
 
@@ -627,7 +579,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
          * @param nodeId Failed node ID.
          */
         void onNodeLeft(UUID nodeId) {
-            if (crd.id().equals(nodeId) && cntrFuts.remove(id) != null)
+            if (crd.id().equals(nodeId) && verFuts.remove(id) != null)
                 onDone();
         }
     }
@@ -644,10 +596,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
             UUID nodeId = discoEvt.eventNode().id();
 
-            for (TxCounterFuture fut : cntrFuts.values())
-                fut.onNodeLeft(nodeId);
-
-            for (QueryVersionFuture fut : qryVerFuts.values())
+            for (MvccVersionFuture fut : verFuts.values())
                 fut.onNodeLeft(nodeId);
 
             for (TxAckFuture fut : ackFuts.values())
@@ -660,10 +609,26 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     private class CoordinatorMessageListener implements GridMessageListener {
         /** {@inheritDoc} */
         @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+            MvccCoordinatorMessage msg0 = (MvccCoordinatorMessage)msg;
+
+            if (msg0.waitForCoordinatorInit()) {
+                if (crdVer == 0) {
+                    try {
+                        U.await(crdLatch);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        U.warn(log, "Failed to wait for coordinator initialization, thread interrupted [" +
+                            "msgNode=" + nodeId + ", msg=" + msg + ']');
+
+                        return;
+                    }
+
+                    assert crdVer != 0L;
+                }
+            }
+
             if (msg instanceof CoordinatorTxCounterRequest)
                 processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
-            else if (msg instanceof CoordinatorTxCounterResponse)
-                processCoordinatorCounterResponse(nodeId, (CoordinatorTxCounterResponse)msg);
             else if (msg instanceof CoordinatorTxAckRequest)
                 processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
             else if (msg instanceof CoordinatorTxAckResponse)
@@ -672,8 +637,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
                 processCoordinatorQueryAckRequest((CoordinatorQueryAckRequest)msg);
             else if (msg instanceof CoordinatorQueryVersionRequest)
                 processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
-            else if (msg instanceof CoordinatorQueryVersionResponse)
-                processCoordinatorQueryVersionResponse(nodeId, (CoordinatorQueryVersionResponse) msg);
+            else if (msg instanceof MvccCoordinatorVersionResponse)
+                processCoordinatorQueryVersionResponse(nodeId, (MvccCoordinatorVersionResponse) msg);
             else
                 U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
index d7e865a..96c0ee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
@@ -20,14 +20,13 @@ package org.apache.ignite.internal.processors.cache.mvcc;
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  *
  */
-public class CoordinatorQueryAckRequest implements Message {
+public class CoordinatorQueryAckRequest implements MvccCoordinatorMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -48,6 +47,11 @@ public class CoordinatorQueryAckRequest implements Message {
         this.cntr = cntr;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
     /**
      * @return Counter.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionRequest.java
index 9d1cd5f..f329cd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionRequest.java
@@ -20,14 +20,13 @@ package org.apache.ignite.internal.processors.cache.mvcc;
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  *
  */
-public class CoordinatorQueryVersionRequest implements Message {
+public class CoordinatorQueryVersionRequest implements MvccCoordinatorMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -48,6 +47,11 @@ public class CoordinatorQueryVersionRequest implements Message {
         this.futId = futId;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return true;
+    }
+
     /**
      * @return Future ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionResponse.java
deleted file mode 100644
index ea3e8d8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryVersionResponse.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorQueryVersionResponse implements Message, MvccQueryVersion {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long cntr;
-
-    /** */
-    public long topVer;
-
-    /** */
-    @GridDirectCollection(MvccUpdateVersion.class)
-    private List<MvccUpdateVersion> txs;
-
-    /** */
-    private long futId;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorQueryVersionResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param cntr Counter.
-     * @param futId Future ID.
-     */
-    CoordinatorQueryVersionResponse(long futId, long cntr, List<MvccUpdateVersion> txs) {
-        this.futId = futId;
-        this.cntr = cntr;
-        this.txs = txs;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public long futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    public long counter() {
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<MvccUpdateVersion> activeTransactions() {
-        return txs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long topologyVersion() {
-        return topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void topologyVersion(long topVer) {
-        assert topVer > 0;
-
-        this.topVer = topVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("cntr", cntr))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeLong("topVer", topVer))
-                    return false;
-
-                writer.incrementState();
-
-            case 3:
-                if (!writer.writeCollection("txs", txs, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cntr = reader.readLong("cntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                topVer = reader.readLong("topVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 3:
-                txs = reader.readCollection("txs", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorQueryVersionResponse.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 136;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 4;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorQueryVersionResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
index 5c4108d..6256880 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
@@ -21,14 +21,13 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  *
  */
-public class CoordinatorTxAckRequest implements Message {
+public class CoordinatorTxAckRequest implements MvccCoordinatorMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -60,6 +59,11 @@ public class CoordinatorTxAckRequest implements Message {
         this.txId = txId;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
     /**
      * @return Future ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java
index c48ba4b..059416c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckResponse.java
@@ -20,14 +20,13 @@ package org.apache.ignite.internal.processors.cache.mvcc;
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  *
  */
-public class CoordinatorTxAckResponse implements Message {
+public class CoordinatorTxAckResponse implements MvccCoordinatorMessage {
     /** */
     private long futId;
 
@@ -53,6 +52,11 @@ public class CoordinatorTxAckResponse implements Message {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
index 8d5f699..fe3c547 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterRequest.java
@@ -21,14 +21,13 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  *
  */
-public class CoordinatorTxCounterRequest implements Message {
+public class CoordinatorTxCounterRequest implements MvccCoordinatorMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -60,6 +59,11 @@ public class CoordinatorTxCounterRequest implements Message {
         this.topVer = topVer;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return true;
+    }
+
     public long topologyVersion() {
         return topVer;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterResponse.java
deleted file mode 100644
index 9a8064e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxCounterResponse.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorTxCounterResponse implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long cntr;
-
-    /** */
-    private long futId;
-
-    /**
-     * Required by {@link GridIoMessageFactory}.
-     */
-    public CoordinatorTxCounterResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param cntr Counter.
-     * @param futId Future ID.
-     */
-    CoordinatorTxCounterResponse(long cntr, long futId) {
-        this.cntr = cntr;
-        this.futId = futId;
-    }
-
-    /**
-     * @return Future ID.
-     */
-    public long futureId() {
-        return futId;
-    }
-
-    /**
-     * @return Counter.
-     */
-    public long counter() {
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("cntr", cntr))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("futId", futId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cntr = reader.readLong("cntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                futId = reader.readLong("futId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(CoordinatorTxCounterResponse.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 130;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CoordinatorTxCounterResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorMessage.java
new file mode 100644
index 0000000..ed761ca
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ *
+ */
+public interface MvccCoordinatorMessage extends Message {
+    public boolean waitForCoordinatorInit();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
new file mode 100644
index 0000000..eb0768d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/**
+ *
+ */
+public interface MvccCoordinatorVersion extends Message {
+    /**
+     * @return Active transactions.
+     */
+    public GridLongList activeTransactions();
+
+    /**
+     * @return Coordinator version.
+     */
+    public long coordinatorVersion();
+
+    /**
+     * @return Cleanup version.
+     */
+    public long cleanupVersion();
+
+    /**
+     * @return Counter.
+     */
+    public long counter();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
new file mode 100644
index 0000000..623f897
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, MvccCoordinatorVersion {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long futId;
+
+    /** */
+    private long crdVer;
+
+    /** */
+    private long cntr;
+
+    /** */
+    private GridLongList txs; // TODO IGNITE-3478 (do not send on backups?)
+
+    /** */
+    private long cleanupVer;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public MvccCoordinatorVersionResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param cntr Counter.
+     * @param futId Future ID.
+     */
+    MvccCoordinatorVersionResponse(long futId, long crdVer, long cntr, GridLongList txs, long cleanupVer) {
+        this.futId = futId;
+        this.crdVer = crdVer;
+        this.cntr = cntr;
+        this.txs = txs;
+        this.cleanupVer = cleanupVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long cleanupVersion() {
+        return cleanupVer;
+    }
+
+    /** {@inheritDoc} */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridLongList activeTransactions() {
+        return txs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cleanupVer", cleanupVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("crdVer", crdVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("txs", txs))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cleanupVer = reader.readLong("cleanupVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                crdVer = reader.readLong("crdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                txs = reader.readMessage("txs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccCoordinatorVersionResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 136;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccCoordinatorVersionResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
new file mode 100644
index 0000000..161e8d4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class MvccCounter implements Comparable<MvccCounter>, Message {
+    /** */
+    private long crdVer;
+
+    /** */
+    private long cntr;
+
+    /** */
+    private long cleanupCntr;
+
+    /**
+     *
+     */
+    public MvccCounter() {
+        // No-op.
+    }
+
+    /**
+     * @param crdVer Coordinator version.
+     * @param cntr Coordinator counter.
+     */
+    public MvccCounter(long crdVer, long cntr, long cleanupCntr) {
+        assert crdVer > 0 : crdVer;
+        assert cntr != CacheCoordinatorsSharedManager.COUNTER_NA;
+
+        this.crdVer = crdVer;
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@NotNull MvccCounter other) {
+        int cmp = Long.compare(crdVer, other.crdVer);
+
+        if (cmp != 0)
+            return cmp;
+
+        return Long.compare(cntr, other.cntr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        MvccCounter that = (MvccCounter) o;
+
+        return crdVer == that.crdVer && cntr == that.cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = (int) (crdVer ^ (crdVer >>> 32));
+
+        res = 31 * res + (int) (cntr ^ (cntr >>> 32));
+
+        return res;
+    }
+
+    /**
+     * @return Coordinator version.
+     */
+    public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    /**
+     * @return Counters.
+     */
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("cntr", cntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("crdVer", crdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cntr = reader.readLong("cntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                crdVer = reader.readLong("crdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccCounter.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 135;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccCounter.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryVersion.java
deleted file mode 100644
index 2c269dc..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryVersion.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.util.List;
-import org.apache.ignite.plugin.extensions.communication.Message;
-
-/**
- *
- */
-public interface MvccQueryVersion extends Message {
-    /**
-     * @return Active transactions.
-     */
-    public List<MvccUpdateVersion> activeTransactions();
-
-    /**
-     * @return Topology version.
-     */
-    public long topologyVersion();
-
-    /**
-     * @param topVer Topology version.
-     */
-    public void topologyVersion(long topVer);
-
-    /**
-     * @return Counter.
-     */
-    public long counter();}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersion.java
deleted file mode 100644
index d285782..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUpdateVersion.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
-
-/**
- *
- */
-public class MvccUpdateVersion implements Comparable<MvccUpdateVersion>, Message {
-    /** */
-    public static final long COUNTER_NA = 0L;
-
-    /** */
-    private long topVer;
-
-    /** */
-    private long cntr;
-
-    /**
-     *
-     */
-    public MvccUpdateVersion() {
-        // No-op.
-    }
-
-    /**
-     * @param topVer Topology version.
-     * @param cntr Coordinator counter.
-     */
-    public MvccUpdateVersion(long topVer, long cntr) {
-        assert topVer > 0 : topVer;
-        assert cntr != COUNTER_NA;
-
-        this.topVer = topVer;
-        this.cntr = cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int compareTo(@NotNull MvccUpdateVersion other) {
-        int cmp = Long.compare(topVer, other.topVer);
-
-        if (cmp != 0)
-            return cmp;
-
-        return Long.compare(cntr, other.cntr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        MvccUpdateVersion that = (MvccUpdateVersion) o;
-
-        return topVer == that.topVer && cntr == that.cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int res = (int) (topVer ^ (topVer >>> 32));
-
-        res = 31 * res + (int) (cntr ^ (cntr >>> 32));
-
-        return res;
-    }
-
-    /**
-     * @return Coordinators topology version.
-     */
-    public long topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * @return Counters.
-     */
-    public long counter() {
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeLong("cntr", cntr))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong("topVer", topVer))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cntr = reader.readLong("cntr");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                topVer = reader.readLong("topVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return reader.afterMessageRead(MvccUpdateVersion.class);
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 135;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 2;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(MvccUpdateVersion.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index c53aa25..5a88f9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -50,7 +50,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeListImpl;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -1251,11 +1251,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
-            long topVer,
-            long mvccCntr) throws IgniteCheckedException {
+            MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
             CacheDataStore delegate = init0(false);
 
-            delegate.mvccUpdate(cctx, key, val, ver, topVer, mvccCntr);
+            delegate.mvccUpdate(cctx, key, val, ver, mvccVer);
         }
 
         /** {@inheritDoc} */
@@ -1305,7 +1304,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public CacheDataRow mvccFind(GridCacheContext cctx, KeyCacheObject key, MvccQueryVersion mvccVer)
+        @Override public CacheDataRow mvccFind(GridCacheContext cctx, KeyCacheObject key, MvccCoordinatorVersion mvccVer)
             throws IgniteCheckedException {
             CacheDataStore delegate = init0(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index dfe0e06..6445304 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.lang.GridTuple;
@@ -636,7 +637,7 @@ public interface IgniteInternalTx {
     public void commitError(Throwable e);
 
     /**
-     * @param mvccCrdCntr Update counter assigned by MVCC coordinator.
+     * @param mvccVer Version.
      */
-    public void mvccCoordinatorCounter(long mvccCrdCntr);
+    public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 05d2eb5..937785a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -56,7 +56,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -247,7 +247,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     protected boolean storeEnabled = true;
 
     /** */
-    private long mvccCrdCntr = MvccUpdateVersion.COUNTER_NA;
+    protected MvccCoordinatorVersion mvccVer;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -363,6 +363,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
             log = U.logger(cctx.kernalContext(), logRef, this);
     }
 
+    public MvccCoordinatorVersion mvccCoordinatorVersion() {
+        return mvccVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer) {
+        this.mvccVer = mvccVer;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean localResult() {
         assert originatingNodeId() != null;
@@ -1530,27 +1539,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         return (taskName = cctx.kernalContext().task().resolveTaskName(taskNameHash));
     }
 
-    /** {@inheritDoc} */
-    public final void mvccCoordinatorCounter(long mvccCrdCntr) {
-        this.mvccCrdCntr = mvccCrdCntr;
-    }
-
-    /**
-     * @return Coordinator counter.
-     */
-    public final long mvccCoordinatorCounter() {
-        return mvccCrdCntr;
-    }
-
-    /**
-     * @return Mvcc version.
-     */
-    protected final long mvccCounterForCommit() {
-        assert !txState().mvccEnabled(cctx) || mvccCrdCntr != MvccUpdateVersion.COUNTER_NA : mvccCrdCntr;
-
-        return mvccCrdCntr;
-    }
-
     /**
      * Resolve DR conflict.
      *
@@ -1850,7 +1838,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
-        @Override public void mvccCoordinatorCounter(long mvccCrdCntr) {
+        @Override public void mvccCoordinatorVersion(MvccCoordinatorVersion mvccVer) {
             // No-op.
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 3a8d5ee..58aa555 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -860,7 +860,7 @@ public class IgniteTxHandler {
             tx = ctx.tm().tx(dhtVer);
 
         if (tx != null) {
-            tx.mvccCoordinatorCounter(req.mvccCoordinatorCounter());
+            tx.mvccCoordinatorVersion(req.mvccCoordinatorVersion());
 
             req.txState(tx.txState());
         }
@@ -1310,7 +1310,7 @@ public class IgniteTxHandler {
                 tx.commitVersion(req.commitVersion());
                 tx.invalidate(req.isInvalidate());
                 tx.systemInvalidate(req.isSystemInvalidate());
-                tx.mvccCoordinatorCounter(req.mvccCoordinatorCounter());
+                tx.mvccCoordinatorVersion(req.mvccCoordinatorVersion());
 
                 // Complete remote candidates.
                 tx.doneRemote(req.baseVersion(), null, null, null);
@@ -1357,7 +1357,7 @@ public class IgniteTxHandler {
         try {
             tx.commitVersion(req.writeVersion());
             tx.invalidate(req.isInvalidate());
-            tx.mvccCoordinatorCounter(req.mvccCoordinatorCounter());
+            tx.mvccCoordinatorVersion(req.mvccCoordinatorVersion());
 
             // Complete remote candidates.
             tx.doneRemote(req.version(), null, null, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 1eb70c2..f785e2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -507,7 +507,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             try {
                 cctx.tm().txContext(this);
 
-                long mvccCntr = mvccCounterForCommit();
+                assert !txState.mvccEnabled(cctx) || mvccVer != null;
 
                 AffinityTopologyVersion topVer = topologyVersion();
 
@@ -687,7 +687,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             resolveTaskName(),
                                             dhtVer,
                                             null,
-                                            mvccCntr);
+                                            mvccVer);
 
                                         if (updRes.success())
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -715,7 +715,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 resolveTaskName(),
                                                 dhtVer,
                                                 null,
-                                                mvccCntr);
+                                                mvccVer);
                                         }
                                     }
                                     else if (op == DELETE) {
@@ -737,7 +737,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             resolveTaskName(),
                                             dhtVer,
                                             null,
-                                            mvccCntr);
+                                            mvccVer);
 
                                         if (updRes.success())
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -761,7 +761,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                                 resolveTaskName(),
                                                 dhtVer,
                                                 null,
-                                                mvccCntr);
+                                                mvccVer);
                                         }
                                     }
                                     else if (op == RELOAD) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index 4c7e431..a1dacd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -60,7 +60,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
 
         if (storeMvccVersion()) {
             assert row.mvccUpdateTopologyVersion() > 0 : row;
-            assert row.mvccUpdateCounter() != MvccUpdateVersion.COUNTER_NA : row;
+            assert row.mvccUpdateCounter() != CacheCoordinatorsSharedManager.COUNTER_NA : row;
 
             PageUtils.putLong(pageAddr, off, row.mvccUpdateTopologyVersion());
             off += 8;
@@ -123,7 +123,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
             long mvcCntr = rowIo.getMvccUpdateCounter(srcPageAddr, srcIdx);
 
             assert mvccTopVer > 0 : mvccTopVer;
-            assert mvcCntr != MvccUpdateVersion.COUNTER_NA;
+            assert mvcCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index e10f753..bc27a21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -62,7 +62,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = row.mvccUpdateCounter();
 
             assert mvccUpdateTopVer > 0 : mvccUpdateCntr;
-            assert mvccUpdateCntr != MvccUpdateVersion.COUNTER_NA;
+            assert mvccUpdateCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
 
             PageUtils.putLong(pageAddr, off, mvccUpdateTopVer);
             off += 8;
@@ -98,7 +98,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccUpdateCounter(srcPageAddr, srcIdx);
 
             assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
-            assert mvccUpdateCntr != MvccUpdateVersion.COUNTER_NA;
+            assert mvccUpdateCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index a365fc5..1fcf8dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -21,7 +21,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
@@ -167,7 +167,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
 
         long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx);
 
-        assert row.mvccUpdateCounter() != MvccUpdateVersion.COUNTER_NA;
+        assert row.mvccUpdateCounter() != CacheCoordinatorsSharedManager.COUNTER_NA;
 
         cmp = Long.compare(row.mvccUpdateCounter(), mvccCntr);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
index 45856b8..62a07b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 
 /**
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
-        return MvccUpdateVersion.COUNTER_NA;
+        return CacheCoordinatorsSharedManager.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
index 11d56e0..e22a2a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 
 /**
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
-        return MvccUpdateVersion.COUNTER_NA;
+        return CacheCoordinatorsSharedManager.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
index 92371a1..b334e3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.tree;
 
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
@@ -59,6 +59,6 @@ public final class DataInnerIO extends AbstractDataInnerIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
-        return MvccUpdateVersion.COUNTER_NA;
+        return CacheCoordinatorsSharedManager.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
index bcf733c..28460f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.tree;
 
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
@@ -59,6 +59,6 @@ public final class DataLeafIO extends AbstractDataLeafIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
-        return MvccUpdateVersion.COUNTER_NA;
+        return CacheCoordinatorsSharedManager.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/16261308/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index d0d6ee1..17cc9e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.tree;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
@@ -46,7 +46,7 @@ public class MvccDataRow extends DataRow {
         super(grp, hash, link, part, rowData);
 
         assert mvccTopVer > 0 : mvccTopVer;
-        assert mvccCntr != MvccUpdateVersion.COUNTER_NA;
+        assert mvccCntr != CacheCoordinatorsSharedManager.COUNTER_NA;
 
         this.mvccTopVer = mvccTopVer;
         this.mvccCntr = mvccCntr;


Mime
View raw message