ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3479
Date Wed, 27 Sep 2017 14:50:52 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 537a3ecb2 -> 4e7f19ede


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e7f19ed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e7f19ed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e7f19ed

Branch: refs/heads/ignite-3479
Commit: 4e7f19ede40f35f1d9657f5dcb1b2ea8aeeb42d3
Parents: 537a3ec
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Sep 27 16:31:54 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Sep 27 17:50:37 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/GridComponent.java   |   5 +-
 .../ignite/internal/GridKernalContext.java      |   6 +
 .../ignite/internal/GridKernalContextImpl.java  |  14 +-
 .../apache/ignite/internal/IgniteKernal.java    |   4 +-
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../processors/cache/GridCacheProcessor.java    |   2 -
 .../cache/GridCacheSharedContext.java           |  16 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  10 +-
 .../dht/GridPartitionedGetFuture.java           | 162 ++++------------
 .../GridDhtPartitionsExchangeFuture.java        |  14 +-
 .../GridNearPessimisticTxPrepareFuture.java     |  17 +-
 .../mvcc/CacheCoordinatorsSharedManager.java    |  97 ++++++----
 .../processors/cache/mvcc/MvccCoordinator.java  |  28 ++-
 .../processors/cache/mvcc/MvccQueryAware.java   |  43 +++++
 .../processors/cache/mvcc/MvccQueryFuture.java  |  27 ---
 .../processors/cache/mvcc/MvccQueryTracker.java | 192 +++++++++++++++++++
 .../wal/reader/IgniteWalIteratorFactory.java    |   2 +-
 .../wal/reader/StandaloneGridKernalContext.java |   6 +
 .../query/GridCacheDistributedQueryManager.java |   2 +-
 .../cache/query/GridCacheQueryManager.java      |   2 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 113 ++++++++++-
 .../pagemem/BPlusTreePageMemoryImplTest.java    |   1 -
 .../BPlusTreeReuseListPageMemoryImplTest.java   |   1 -
 .../MetadataStoragePageMemoryImplTest.java      |   1 -
 .../pagemem/PageMemoryImplNoLoadTest.java       |   1 -
 .../persistence/pagemem/PageMemoryImplTest.java |   1 -
 .../loadtests/hashmap/GridCacheTestContext.java |   2 -
 27 files changed, 510 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 93ffe95..c3a8127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -61,7 +61,10 @@ public interface GridComponent {
         BINARY_PROC,
 
         /** Query processor. */
-        QUERY_PROC
+        QUERY_PROC,
+
+        /** */
+        CACHE_CRD_PROC
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 99c7cce..971be7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
 import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
@@ -643,4 +644,9 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      * @return Platform processor.
      */
     public PlatformProcessor platform();
+
+    /**
+     * @return Cache mvcc coordinator processor.
+     */
+    public CacheCoordinatorsSharedManager coordinators();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 07e5970..1715887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
@@ -282,6 +283,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     @GridToStringExclude
     private DataStructuresProcessor dataStructuresProc;
 
+    /** Cache mvcc coordinators. */
+    @GridToStringExclude
+    private CacheCoordinatorsSharedManager coordProc;
+
     /** */
     @GridToStringExclude
     private List<GridComponent> comps = new LinkedList<>();
@@ -344,7 +349,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
-    Map<String, ? extends ExecutorService> customExecSvcs;
+    private Map<String, ? extends ExecutorService> customExecSvcs;
 
     /** */
     @GridToStringExclude
@@ -579,6 +584,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
             poolProc = (PoolProcessor) comp;
         else if (comp instanceof GridMarshallerMappingProcessor)
             mappingProc = (GridMarshallerMappingProcessor)comp;
+        else if (comp instanceof CacheCoordinatorsSharedManager)
+            coordProc = (CacheCoordinatorsSharedManager)comp;
         else if (!(comp instanceof DiscoveryNodeValidationProcessor
                 || comp instanceof PlatformPluginProcessor))
             assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
@@ -834,6 +841,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public CacheCoordinatorsSharedManager coordinators() {
+        return coordProc;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteLogger log(String ctgr) {
         return config().getGridLogger().getLogger(ctgr);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index fabbeed..7b833bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -114,6 +114,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
@@ -937,8 +938,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             // Start processors before discovery manager, so they will
             // be able to start receiving messages once discovery completes.
             try {
+                startProcessor(new CacheCoordinatorsSharedManager(ctx));
                 startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx));
-                startProcessor(new  GridAffinityProcessor(ctx));
+                startProcessor(new GridAffinityProcessor(ctx));
                 startProcessor(createComponent(GridSegmentationProcessor.class, ctx));
                 startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
                 startProcessor(new GridClusterStateProcessor(ctx));

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index b576789..f850ad3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -805,7 +805,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */
-    @Nullable public IgniteInternalFuture<?> affinityReadyFuture(AffinityTopologyVersion ver) {
+    @Nullable public IgniteInternalFuture<AffinityTopologyVersion> affinityReadyFuture(AffinityTopologyVersion ver) {
         GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut;
 
         if (lastInitializedFut0 != null && lastInitializedFut0.initialVersion().compareTo(ver) == 0) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index dc24586..e52c56c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2176,7 +2176,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     @SuppressWarnings("unchecked")
     private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
         Collection<CacheStoreSessionListener> storeSesLsnrs) throws IgniteCheckedException {
-        CacheCoordinatorsSharedManager coord = new CacheCoordinatorsSharedManager();
         IgniteTxManager tm = new IgniteTxManager();
         GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
         GridCacheVersionManager verMgr = new GridCacheVersionManager();
@@ -2215,7 +2214,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         return new GridCacheSharedContext(
             kernalCtx,
-            coord,
             tm,
             verMgr,
             mvccMgr,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index bf5b999..1cdee39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -36,14 +36,12 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
@@ -123,9 +121,6 @@ public class GridCacheSharedContext<K, V> {
     /** Ttl cleanup manager. */
     private GridCacheSharedTtlCleanupManager ttlMgr;
 
-    /** Cache mvcc coordinator. */
-    private CacheCoordinatorsSharedManager crd;
-
     /** Cache contexts map. */
     private ConcurrentHashMap8<Integer, GridCacheContext<K, V>> ctxMap;
 
@@ -170,7 +165,6 @@ public class GridCacheSharedContext<K, V> {
 
     /**
      * @param kernalCtx  Context.
-     * @param crd Cache mvcc coordinator manager.
      * @param txMgr Transaction manager.
      * @param verMgr Version manager.
      * @param mvccMgr MVCC manager.
@@ -184,7 +178,6 @@ public class GridCacheSharedContext<K, V> {
      */
     public GridCacheSharedContext(
         GridKernalContext kernalCtx,
-        CacheCoordinatorsSharedManager crd,
         IgniteTxManager txMgr,
         GridCacheVersionManager verMgr,
         GridCacheMvccManager mvccMgr,
@@ -203,7 +196,6 @@ public class GridCacheSharedContext<K, V> {
         this.kernalCtx = kernalCtx;
 
         setManagers(mgrs,
-            crd,
             txMgr,
             jtaMgr,
             verMgr,
@@ -376,7 +368,6 @@ public class GridCacheSharedContext<K, V> {
         List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>();
 
         setManagers(mgrs,
-            crd,
             txMgr,
             jtaMgr,
             verMgr,
@@ -416,7 +407,6 @@ public class GridCacheSharedContext<K, V> {
 
     /**
      * @param mgrs Managers list.
-     * @param coord Cache mvcc coordinator manager.
      * @param txMgr Transaction manager.
      * @param jtaMgr JTA manager.
      * @param verMgr Version manager.
@@ -428,7 +418,6 @@ public class GridCacheSharedContext<K, V> {
      * @param ttlMgr Ttl cleanup manager.
      */
     private void setManagers(List<GridCacheSharedManager<K, V>> mgrs,
-        CacheCoordinatorsSharedManager coord,
         IgniteTxManager txMgr,
         CacheJtaManagerAdapter jtaMgr,
         GridCacheVersionManager verMgr,
@@ -442,7 +431,6 @@ public class GridCacheSharedContext<K, V> {
         CacheAffinitySharedManager affMgr,
         GridCacheIoManager ioMgr,
         GridCacheSharedTtlCleanupManager ttlMgr) {
-        this.crd = add(mgrs, coord);
         this.mvccMgr = add(mgrs, mvccMgr);
         this.verMgr = add(mgrs, verMgr);
         this.txMgr = add(mgrs, txMgr);
@@ -786,7 +774,7 @@ public class GridCacheSharedContext<K, V> {
      * @return Cache mvcc coordinator manager.
      */
     public CacheCoordinatorsSharedManager coordinators() {
-        return crd;
+        return kernalCtx.coordinators();
     }
 
     /**
@@ -844,7 +832,7 @@ public class GridCacheSharedContext<K, V> {
     /**
      * Captures all ongoing operations that we need to wait before we can proceed to the next topology version.
      * This method must be called only after
-     * {@link GridDhtPartitionTopology#updateTopologyVersion(GridDhtTopologyFuture, DiscoCache, long, boolean)}
+     * {@link GridDhtPartitionTopology#updateTopologyVersion}
      * method is called so that all new updates will wait to switch to the new version.
      * This method will capture:
      * <ul>

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 2af2d51..4eca4e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1236,7 +1236,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 }
             }
 
-            IgniteInternalFuture<Long> waitCrdCntrFut = null;
+            IgniteInternalFuture<MvccCoordinatorVersion> waitCrdCntrFut = null;
 
             if (req.requestMvccCounter()) {
                 assert tx.txState().mvccEnabled(cctx);
@@ -1245,10 +1245,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
 
                 assert crd != null : tx.topologyVersion();
 
-                if (crd.node().isLocal())
+                if (crd.nodeId().equals(cctx.localNodeId()))
                     onMvccResponse(cctx.localNodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx));
                 else {
-                    IgniteInternalFuture<Long> crdCntrFut = cctx.coordinators().requestTxCounter(crd.node(),
+                    IgniteInternalFuture<MvccCoordinatorVersion> crdCntrFut = cctx.coordinators().requestTxCounter(crd,
                         this,
                         tx.nearXidVersion());
 
@@ -1280,8 +1280,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 if (waitCrdCntrFut != null) {
                     skipInit = true;
 
-                    waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() {
-                        @Override public void apply(IgniteInternalFuture<Long> fut) {
+                    waitCrdCntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) {
                             try {
                                 fut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 37e9feb6..4bfd0fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -41,12 +41,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -60,14 +61,13 @@ import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Colocated get future.
  */
-public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> implements MvccQueryFuture {
+public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> implements MvccQueryAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -78,10 +78,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     private static IgniteLogger log;
 
     /** */
-    private MvccCoordinator mvccCrd;
-
-    /** */
-    private MvccCoordinatorVersion mvccVer;
+    private MvccQueryTracker mvccTracker;
 
     /**
      * @param cctx Context.
@@ -130,6 +127,18 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     }
 
     /**
+     * @return Mvcc version if mvcc is enabled for cache.
+     */
+    @Nullable private MvccCoordinatorVersion mvccVersion() {
+        if (!cctx.mvccEnabled())
+            return null;
+
+        assert mvccTracker != null;
+
+        return mvccTracker.mvccVersion();
+    }
+
+    /**
      * Initializes future.
      *
      * @param topVer Topology version.
@@ -148,11 +157,13 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         }
 
         if (cctx.mvccEnabled()) {
+            mvccTracker = new MvccQueryTracker(cctx, canRemap, this);
+
             trackable = true;
 
             cctx.mvcc().addFuture(this, futId);
 
-            requestMvccVersionAndMap(topVer);
+            mvccTracker.requestVersion(topVer);
 
             return;
         }
@@ -160,106 +171,31 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         initialMap(topVer);
     }
 
-    /**
-     * @param topVer Topology version.
-     */
-    private void initialMap(AffinityTopologyVersion topVer) {
-        map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+    /** {@inheritDoc} */
+    @Override public void onMvccVersionReceived(AffinityTopologyVersion topVer) {
+        initialMap(topVer);
+    }
 
-        markInitialized();
+    /** {@inheritDoc} */
+    @Override public void onMvccVersionError(IgniteCheckedException e) {
+        onDone(e);
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) {
-        if (!cctx.mvccEnabled())
-            return null;
+        if (mvccTracker != null)
+            return mvccTracker.onMvccCoordinatorChange(newCrd);
 
-        synchronized (this) {
-            if (mvccVer != null) {
-                mvccCrd = newCrd;
-
-                return mvccVer;
-            }
-            else if (mvccCrd != null)
-                mvccCrd = null;
-
-            return null;
-        }
+        return null;
     }
 
     /**
      * @param topVer Topology version.
      */
-    private void requestMvccVersionAndMap(final AffinityTopologyVersion topVer) {
-        MvccCoordinator mvccCrd = cctx.affinity().mvccCoordinator(topVer);
-
-        if (mvccCrd == null) {
-            onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
-
-            return;
-        }
-
-        synchronized (this) {
-            this.mvccCrd = mvccCrd;
-        }
-
-        MvccCoordinator curCrd = cctx.topology().mvccCoordinator();
-
-        if (!mvccCrd.equals(curCrd)) {
-            assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0;
-
-            // TODO IGNITE-3479.
-        }
-
-        IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = cctx.shared().coordinators().requestQueryCounter(mvccCrd.node());
-
-        cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() {
-            @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) {
-                boolean needRemap = false;
-
-                try {
-                    MvccCoordinatorVersion rcvdVer = fut.get();
-
-                    synchronized (GridPartitionedGetFuture.class) {
-                        if (GridPartitionedGetFuture.this.mvccCrd != null) {
-                            mvccVer = rcvdVer;
-                        }
-                        else
-                            needRemap = true;
-                    }
-
-                    if (!needRemap)
-                        initialMap(topVer);
-                }
-                catch (ClusterTopologyCheckedException e) {
-                    needRemap = true;
-                }
-                catch (IgniteCheckedException e) {
-                    GridPartitionedGetFuture.this.onDone(e);
-                }
+    private void initialMap(AffinityTopologyVersion topVer) {
+        map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
 
-                if (needRemap) {
-                    if (canRemap) {
-                        IgniteInternalFuture<AffinityTopologyVersion> waitFut = waitRemapFuture(topVer);
-
-                        waitFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                                try {
-                                    requestMvccVersionAndMap(fut.get());
-                                }
-                                catch (IgniteCheckedException e) {
-                                    GridPartitionedGetFuture.this.onDone(e);
-                                }
-                            }
-                        });
-                    }
-                    else {
-                        GridPartitionedGetFuture.this.onDone(new ClusterTopologyCheckedException("Failed to " +
-                            "request mvcc version, coordinator failed"));
-                    }
-                }
-            }
-        });
+        markInitialized();
     }
 
     /** {@inheritDoc} */
@@ -319,24 +255,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             if (trackable)
                 cctx.mvcc().removeFuture(futId);
 
-            if (cctx.mvccEnabled()) {
-                MvccCoordinator mvccCrd0 = null;
-                MvccCoordinatorVersion mvccVer0 = null;
-
-                synchronized (this) {
-                    if (mvccVer != null) {
-                        assert mvccCrd != null;
-
-                        mvccCrd0 = mvccCrd;
-                        mvccVer0 = mvccVer;
-
-                        mvccVer = null;
-                    }
-                }
-
-                if (mvccVer0 != null)
-                    cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
-            }
+            if (mvccTracker != null)
+                mvccTracker.onQueryDone();
 
             cache().sendTtlUpdateRequest(expiryPlc);
 
@@ -431,7 +351,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                         expiryPlc,
                         skipVals,
                         recovery,
-                        mvccVer);
+                        mvccVersion());
 
                 final Collection<Integer> invalidParts = fut.invalidPartitions();
 
@@ -488,7 +408,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                     skipVals,
                     cctx.deploymentEnabled(),
                     recovery,
-                    mvccVer);
+                    mvccVersion());
 
                 add(fut); // Append new future.
 
@@ -595,7 +515,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
 
                 if (readNoEntry) {
                     CacheDataRow row = cctx.mvccEnabled() ?
-                        cctx.offheap().mvccRead(cctx, key, mvccVer) :
+                        cctx.offheap().mvccRead(cctx, key, mvccVersion()) :
                         cctx.offheap().read(cctx, key);
 
                     if (row != null) {
@@ -639,7 +559,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                                 taskName,
                                 expiryPlc,
                                 !deserializeBinary,
-                                mvccVer,
+                                mvccVersion(),
                                 null);
 
                             if (getRes != null) {
@@ -659,7 +579,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                                 taskName,
                                 expiryPlc,
                                 !deserializeBinary,
-                                mvccVer);
+                                mvccVersion());
                         }
 
                         cache.context().evicts().touch(entry, topVer);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 51da7a0..d93b359 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -26,7 +26,6 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -80,7 +79,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryAware;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -571,11 +570,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             else if (exchId.isLeft()){
                 MvccCoordinator mvccCrd = cctx.coordinators().currentCoordinator();
 
-                if (mvccCrd != null && mvccCrd.node().equals(exchId.eventNode())) {
-                    assert !CU.clientNode(mvccCrd.node()) : mvccCrd;
-
+                if (mvccCrd != null && mvccCrd.nodeId().equals(exchId.eventNode().id()))
                     newMvccCrd = cctx.coordinators().reassignCoordinator(firstEvtDiscoCache) != null;
-                }
             }
 
             exchCtx = new ExchangeContext(crdNode, newMvccCrd, this);
@@ -822,8 +818,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             Map<MvccCounter, Integer> activeQrys = null;
 
             for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) {
-                if (fut instanceof MvccQueryFuture) {
-                    MvccCoordinatorVersion ver = ((MvccQueryFuture)fut).onMvccCoordinatorChange(mvccCrd);
+                if (fut instanceof MvccQueryAware) {
+                    MvccCoordinatorVersion ver = ((MvccQueryAware)fut).onMvccCoordinatorChange(mvccCrd);
 
                     if (ver != null ) {
                         MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), ver.counter());
@@ -1483,7 +1479,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
 
         if (err == null) {
-            if (exchCtx.newMvccCoordinator() && cctx.localNode().equals(cctx.coordinators().currentCoordinatorNode()))
+            if (exchCtx.newMvccCoordinator() && cctx.localNodeId().equals(cctx.coordinators().currentCoordinatorId()))
                 cctx.coordinators().initCoordinator(res, exchCtx.events().discoveryCache(), exchCtx.activeQueries());
 
             if (centralizedAff) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 80fd326..c6192d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -271,7 +271,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         AffinityTopologyVersion topVer = tx.topologyVersion();
 
-        ClusterNode mvccCrd = null;
+        MvccCoordinator mvccCrd = null;
 
         GridDhtTxMapping txMapping = new GridDhtTxMapping();
 
@@ -296,15 +296,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer);
 
             if (mvccCrd == null && cacheCtx.mvccEnabled()) {
-                MvccCoordinator mvccCrd0 = cacheCtx.affinity().mvccCoordinator(topVer);
+                mvccCrd = cacheCtx.affinity().mvccCoordinator(topVer);
 
-                if (mvccCrd0 == null) {
+                if (mvccCrd == null) {
                     onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
 
                     return;
                 }
-                else
-                    mvccCrd = mvccCrd0.node();
             }
 
             if (F.isEmpty(nodes)) {
@@ -433,13 +431,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
         if (mvccCrd != null) {
             assert !tx.onePhaseCommit();
 
-            if (mvccCrd.isLocal()) {
+            if (mvccCrd.equals(cctx.localNodeId())) {
                 MvccCoordinatorVersion mvccVer = cctx.coordinators().requestTxCounterOnCoordinator(tx);
 
                 onMvccResponse(cctx.localNodeId(), mvccVer);
             }
             else {
-                IgniteInternalFuture<Long> cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, this, tx.nearXidVersion());
+                IgniteInternalFuture<MvccCoordinatorVersion> cntrFut =
+                    cctx.coordinators().requestTxCounter(mvccCrd, this, tx.nearXidVersion());
 
                 add((IgniteInternalFuture)cntrFut);
             }
@@ -495,8 +494,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                     CacheCoordinatorsSharedManager.MvccVersionFuture crdFut =
                         (CacheCoordinatorsSharedManager.MvccVersionFuture)f;
 
-                    return "[mvccCrdNode=" + crdFut.crd.id() +
-                        ", loc=" + crdFut.crd.isLocal() +
+                    return "[mvccCrdNode=" + crdFut.crdId +
+                        ", loc=" + crdFut.crdId.equals(cctx.localNodeId()) +
                         ", done=" + f.isDone() + "]";
                 }
                 else

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index f144437..73febc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -30,6 +30,8 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -37,6 +39,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -45,6 +48,7 @@ import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -59,7 +63,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
 /**
  *
  */
-public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
+public class CacheCoordinatorsSharedManager extends GridProcessorAdapter {
     /** */
     public static final long COUNTER_NA = 0L;
 
@@ -128,10 +132,17 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         return Long.compare(ver1.counter(), ver2.counter());
     }
 
+    public CacheCoordinatorsSharedManager(GridKernalContext ctx) {
+        super(ctx);
+    }
+
     /** {@inheritDoc} */
-    @Override protected void start0() throws IgniteCheckedException {
-        super.start0();
+    @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return DiscoveryDataExchangeType.CACHE_CRD_PROC;
+    }
 
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
         statCntrs = new StatCounter[7];
 
         statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs");
@@ -142,12 +153,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
         statCntrs[6] = new CounterWithAvg("CoordinatorWaitTxsResponse", "avgFutTime");
 
-        cctx.gridEvents().addLocalEventListener(new CacheCoordinatorNodeFailListener(),
+        ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(),
             EVT_NODE_FAILED, EVT_NODE_LEFT);
 
-        cctx.gridIO().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener());
+        ctx.io().addMessageListener(MSG_TOPIC, new CoordinatorMessageListener());
     }
-
+    
     /**
      * @param log Logger.
      */
@@ -165,7 +176,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @return Counter.
      */
     public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) {
-        assert cctx.localNode().equals(currentCoordinatorNode());
+        assert ctx.localNodeId().equals(currentCoordinatorId());
 
         return assignTxCounter(tx.nearXidVersion(), 0L);
     }
@@ -176,19 +187,19 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param txVer Transaction version.
      * @return Counter request future.
      */
-    public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(ClusterNode crd,
+    public IgniteInternalFuture<MvccCoordinatorVersion> requestTxCounter(MvccCoordinator crd,
         MvccResponseListener lsnr,
         GridCacheVersion txVer) {
-        assert !crd.isLocal() : crd;
+        assert !ctx.localNodeId().equals(crd.nodeId());
 
         MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(),
-            crd,
+            crd.nodeId(),
             lsnr);
 
         verFuts.put(fut.id, fut);
 
         try {
-            cctx.gridIO().sendToGridTopic(crd,
+            ctx.io().sendToGridTopic(crd.nodeId(),
                 MSG_TOPIC,
                 new CoordinatorTxCounterRequest(fut.id, txVer),
                 MSG_POLICY);
@@ -224,7 +235,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr);
 
         try {
-            cctx.gridIO().sendToGridTopic(crd.node(),
+            ctx.io().sendToGridTopic(crd.nodeId(),
                 MSG_TOPIC,
                 msg,
                 MSG_POLICY);
@@ -242,16 +253,16 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param crd Coordinator.
      * @return Counter request future.
      */
-    public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(ClusterNode crd) {
+    public IgniteInternalFuture<MvccCoordinatorVersion> requestQueryCounter(MvccCoordinator crd) {
         assert crd != null;
 
         // TODO IGNITE-3478: special case for local?
-        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, null);
+        MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null);
 
         verFuts.put(fut.id, fut);
 
         try {
-            cctx.gridIO().sendToGridTopic(crd,
+            ctx.io().sendToGridTopic(crd.nodeId(),
                 MSG_TOPIC,
                 new CoordinatorQueryVersionRequest(fut.id),
                 MSG_POLICY);
@@ -280,7 +291,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         ackFuts.put(fut.id, fut);
 
         try {
-            cctx.gridIO().sendToGridTopic(crdId,
+            ctx.io().sendToGridTopic(crdId,
                 MSG_TOPIC,
                 new CoordinatorWaitTxsRequest(fut.id, txs),
                 MSG_POLICY);
@@ -311,7 +322,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         ackFuts.put(fut.id, fut);
 
         try {
-            cctx.gridIO().sendToGridTopic(crd,
+            ctx.io().sendToGridTopic(crd,
                 MSG_TOPIC,
                 new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
                 MSG_POLICY);
@@ -338,7 +349,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         msg.skipResponse(true);
 
         try {
-            cctx.gridIO().sendToGridTopic(crdId,
+            ctx.io().sendToGridTopic(crdId,
                 MSG_TOPIC,
                 msg,
                 MSG_POLICY);
@@ -357,7 +368,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param msg Message.
      */
     private void processCoordinatorTxCounterRequest(UUID nodeId, CoordinatorTxCounterRequest msg) {
-        ClusterNode node = cctx.discovery().node(nodeId);
+        ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node == null) {
             if (log.isDebugEnabled())
@@ -372,7 +383,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             statCntrs[0].update(res.size());
 
         try {
-            cctx.gridIO().sendToGridTopic(node,
+            ctx.io().sendToGridTopic(node,
                 MSG_TOPIC,
                 res,
                 MSG_POLICY);
@@ -392,7 +403,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @param msg Message.
      */
     private void processCoordinatorQueryVersionRequest(UUID nodeId, CoordinatorQueryVersionRequest msg) {
-        ClusterNode node = cctx.discovery().node(nodeId);
+        ClusterNode node = ctx.discovery().node(nodeId);
 
         if (node == null) {
             if (log.isDebugEnabled())
@@ -404,7 +415,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         MvccCoordinatorVersionResponse res = assignQueryCounter(nodeId, msg.futureId());
 
         try {
-            cctx.gridIO().sendToGridTopic(node,
+            ctx.io().sendToGridTopic(node,
                 MSG_TOPIC,
                 res,
                 MSG_POLICY);
@@ -436,7 +447,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             fut.onResponse(msg);
         }
         else {
-            if (cctx.discovery().alive(nodeId))
+            if (ctx.discovery().alive(nodeId))
                 U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
             else if (log.isDebugEnabled())
                 log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
@@ -470,7 +481,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
         if (!msg.skipResponse()) {
             try {
-                cctx.gridIO().sendToGridTopic(nodeId,
+                ctx.io().sendToGridTopic(nodeId,
                     MSG_TOPIC,
                     new CoordinatorFutureResponse(msg.futureId()),
                     MSG_POLICY);
@@ -502,7 +513,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             fut.onResponse();
         }
         else {
-            if (cctx.discovery().alive(nodeId))
+            if (ctx.discovery().alive(nodeId))
                 U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
             else if (log.isDebugEnabled())
                 log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
@@ -715,7 +726,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      */
     private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) {
         try {
-            cctx.gridIO().sendToGridTopic(nodeId,
+            ctx.io().sendToGridTopic(nodeId,
                 MSG_TOPIC,
                 new CoordinatorFutureResponse(msg.futureId()),
                 MSG_POLICY);
@@ -733,10 +744,10 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         return curCrd;
     }
 
-    public ClusterNode currentCoordinatorNode() {
+    public UUID currentCoordinatorId() {
         MvccCoordinator curCrd = this.curCrd;
 
-        return curCrd != null ? curCrd.node() : null;
+        return curCrd != null ? curCrd.nodeId() : null;
     }
 
     /**
@@ -758,10 +769,12 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @return New coordinator.
      */
     public MvccCoordinator reassignCoordinator(DiscoCache discoCache) {
-        assert curCrd == null || !discoCache.allNodes().contains(curCrd.node()) : curCrd;
+        assert curCrd == null || !F.nodeIds(discoCache.allNodes()).contains(curCrd.nodeId()) : curCrd;
 
         if (!discoCache.serverNodes().isEmpty()) {
-            curCrd = new MvccCoordinator(discoCache.serverNodes().get(0),
+            ClusterNode node = discoCache.serverNodes().get(0);
+
+            curCrd = new MvccCoordinator(node.id(),
                 discoCache.version().topologyVersion(),
                 discoCache.version());
 
@@ -777,8 +790,8 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     }
 
     /**
-     * @param nodeId Node ID.
-     * @param activeQueries Active queries.
+     * @param nodeId Node ID
+     * @param activeQueries
      */
     public void processClientActiveQueries(UUID nodeId,
         @Nullable Map<MvccCounter, Integer> activeQueries) {
@@ -793,14 +806,14 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         DiscoCache discoCache,
         Map<UUID, Map<MvccCounter, Integer>> activeQueries)
     {
-        assert cctx.localNode().equals(curCrd.node());
+        assert ctx.localNodeId().equals(curCrd.nodeId());
 
-        log.info("Initialize local node as mvcc coordinator [node=" + cctx.localNodeId() +
+        log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
             ", topVer=" + topVer + ']');
 
         crdVer = topVer.topologyVersion();
 
-        prevCrdQueries.init(activeQueries, discoCache, cctx.discovery());
+        prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
 
         crdLatch.countDown();
     }
@@ -816,18 +829,18 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         private MvccResponseListener lsnr;
 
         /** */
-        public final ClusterNode crd;
+        public final UUID crdId;
 
         /** */
         long startTime;
 
         /**
          * @param id Future ID.
-         * @param crd Coordinator.
+         * @param crdId Coordinator node ID.
          */
-        MvccVersionFuture(Long id, ClusterNode crd, @Nullable MvccResponseListener lsnr) {
+        MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) {
             this.id = id;
-            this.crd = crd;
+            this.crdId = crdId;
             this.lsnr = lsnr;
 
             if (STAT_CNTRS)
@@ -841,7 +854,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             assert res.counter() != COUNTER_NA;
 
             if (lsnr != null)
-                lsnr.onMvccResponse(crd.id(), res);
+                lsnr.onMvccResponse(crdId, res);
 
             onDone(res);
         }
@@ -859,7 +872,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
          * @param nodeId Failed node ID.
          */
         void onNodeLeft(UUID nodeId) {
-            if (crd.id().equals(nodeId)) {
+            if (crdId.equals(nodeId)) {
                 ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request coordinator version, " +
                     "coordinator failed: " + nodeId);
 
@@ -869,7 +882,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return "MvccVersionFuture [crd=" + crd + ", id=" + id + ']';
+            return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']';
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
index 24ff354..0b449d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
@@ -17,15 +17,19 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
-import org.apache.ignite.cluster.ClusterNode;
+import java.io.Serializable;
+import java.util.UUID;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 
 /**
  *
  */
-public class MvccCoordinator {
+public class MvccCoordinator implements Serializable {
     /** */
-    private final ClusterNode crd;
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final UUID nodeId;
 
     /**
      * Unique coordinator version, increases when new coordinator is assigned,
@@ -37,12 +41,16 @@ public class MvccCoordinator {
     private final AffinityTopologyVersion topVer;
 
     /**
-     * @param crd Coordinator nde.
+     * @param nodeId Coordinator node ID.
      * @param crdVer Coordinator version.
      * @param topVer Topology version when coordinator was assigned.
      */
-    public MvccCoordinator(ClusterNode crd, long crdVer, AffinityTopologyVersion topVer) {
-        this.crd = crd;
+    public MvccCoordinator(UUID nodeId, long crdVer, AffinityTopologyVersion topVer) {
+        assert nodeId != null;
+        assert crdVer > 0 : crdVer;
+        assert topVer != null;
+
+        this.nodeId = nodeId;
         this.crdVer = crdVer;
         this.topVer = topVer;
     }
@@ -55,10 +63,10 @@ public class MvccCoordinator {
     }
 
     /**
-     * @return Coordinator node.
+     * @return Coordinator node ID.
      */
-    public ClusterNode node() {
-        return crd;
+    public UUID nodeId() {
+        return nodeId;
     }
 
     /**
@@ -88,6 +96,6 @@ public class MvccCoordinator {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return "MvccCoordinator [node=" + crd.id() + ", ver=" + crdVer + ", topVer=" + topVer + ']';
+        return "MvccCoordinator [node=" + nodeId + ", ver=" + crdVer + ", topVer=" + topVer + ']';
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
new file mode 100644
index 0000000..d5172c6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryAware.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public interface MvccQueryAware {
+    /**
+     * @param newCrd New coordinator.
+     * @return Version used by this query.
+     */
+    @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd);
+
+    /**
+     * @param topVer Topology version when version was requested.
+     */
+    public void onMvccVersionReceived(AffinityTopologyVersion topVer);
+
+    /**
+     * @param e Error.
+     */
+    public void onMvccVersionError(IgniteCheckedException e);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
deleted file mode 100644
index 4d66437..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public interface MvccQueryFuture {
-    @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
new file mode 100644
index 0000000..095f630
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccQueryTracker {
+    /** */
+    private MvccCoordinator mvccCrd;
+
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
+    /** */
+    private final GridCacheContext cctx;
+
+    /** */
+    private final boolean canRemap;
+
+    /** */
+    private final MvccQueryAware lsnr;
+
+    /**
+     * @param cctx
+     * @param canRemap
+     * @param lsnr
+     */
+    public MvccQueryTracker(GridCacheContext cctx, boolean canRemap, MvccQueryAware lsnr) {
+        assert cctx.mvccEnabled() : cctx.name();
+
+        this.cctx = cctx;
+        this.canRemap = canRemap;
+        this.lsnr = lsnr;
+    }
+
+    @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) {
+        synchronized (this) {
+            if (mvccVer != null) {
+                mvccCrd = newCrd;
+
+                return mvccVer;
+            }
+            else if (mvccCrd != null)
+                mvccCrd = null;
+
+            return null;
+        }
+    }
+
+    public MvccCoordinatorVersion mvccVersion() {
+        return mvccVer;
+    }
+
+    public void onQueryDone() {
+        MvccCoordinator mvccCrd0 = null;
+        MvccCoordinatorVersion mvccVer0 = null;
+
+        synchronized (this) {
+            if (mvccVer != null) {
+                assert mvccCrd != null;
+
+                mvccCrd0 = mvccCrd;
+                mvccVer0 = mvccVer;
+
+                mvccVer = null;
+            }
+        }
+
+        if (mvccVer0 != null)
+            cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
+    }
+
+    public void requestVersion(final AffinityTopologyVersion topVer) {
+        MvccCoordinator mvccCrd0 = cctx.affinity().mvccCoordinator(topVer);
+
+        if (mvccCrd0 == null) {
+            lsnr.onMvccVersionError(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
+
+            return;
+        }
+
+        synchronized (this) {
+            this.mvccCrd = mvccCrd0;
+        }
+
+        MvccCoordinator curCrd = cctx.topology().mvccCoordinator();
+
+        if (!mvccCrd0.equals(curCrd)) {
+            assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0;
+
+            if (!canRemap) {
+                lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator changed."));
+
+                return;
+            }
+            else
+                waitNextTopology(topVer);
+        }
+
+        IgniteInternalFuture<MvccCoordinatorVersion> cntrFut =
+            cctx.shared().coordinators().requestQueryCounter(mvccCrd);
+
+        cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() {
+            @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) {
+                try {
+                    MvccCoordinatorVersion rcvdVer = fut.get();
+
+                    boolean needRemap = false;
+
+                    synchronized (MvccQueryTracker.this) {
+                        if (mvccCrd != null)
+                            mvccVer = rcvdVer;
+                        else
+                            needRemap = true;
+                    }
+
+                    if (!needRemap) {
+                        lsnr.onMvccVersionReceived(topVer);
+
+                        return;
+                    }
+                }
+                catch (ClusterTopologyCheckedException e) {
+                    IgniteLogger log = cctx.logger(MvccQueryTracker.class);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Mvcc coordinator failed: " + e);
+                }
+                catch (IgniteCheckedException e) {
+                    lsnr.onMvccVersionError(e);
+
+                    return;
+                }
+
+                // Coordinator failed on reassigned, need remap.
+                if (canRemap)
+                    waitNextTopology(topVer);
+                else {
+                    lsnr.onMvccVersionError(new ClusterTopologyCheckedException("Failed to " +
+                        "request mvcc version, coordinator failed."));
+                }
+            }
+        });
+    }
+
+    private void waitNextTopology(AffinityTopologyVersion topVer) {
+        assert canRemap;
+
+        IgniteInternalFuture<AffinityTopologyVersion> waitFut =
+            cctx.shared().exchange().affinityReadyFuture(topVer.nextMinorVersion());
+
+        if (waitFut == null)
+            requestVersion(cctx.shared().exchange().readyAffinityVersion());
+        else {
+            waitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    try {
+                        requestVersion(fut.get());
+                    }
+                    catch (IgniteCheckedException e) {
+                        lsnr.onMvccVersionError(e);
+                    }
+                }
+            });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index ed9848c..3a34e28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -198,7 +198,7 @@ public class IgniteWalIteratorFactory {
         dbMgr.setPageSize(pageSize);
 
         return new GridCacheSharedContext<>(
-            kernalCtx, null, null, null, null,
+            kernalCtx, null, null, null,
             null, null, dbMgr, null,
             null, null, null, null,
             null, null, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 07be8b4..0b507a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
 import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
@@ -440,6 +441,11 @@ public class StandaloneGridKernalContext implements GridKernalContext {
     }
 
     /** {@inheritDoc} */
+    @Override public CacheCoordinatorsSharedManager coordinators() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void markSegmented() {
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 83e846f..3a269db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -542,7 +542,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
             if (cctx.mvccEnabled()) {
                 mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
 
-                IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd.node());
+                IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd);
 
                 mvccVer = fut0.get();
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index dda1e69..3ddee2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1468,7 +1468,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             if (cctx.mvccEnabled()) {
                 mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
 
-                IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd.node());
+                IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd);
 
                 qry.mvccVersion(fut0.get());
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 5c11a4b..2f1a0d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1860,6 +1860,97 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testGetVersionRequestFailover() throws Exception {
+        final int NODES = 5;
+
+        testSpi = true;
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        Ignite client = startGrid(NODES - 1);
+
+        final List<String> cacheNames = new ArrayList<>();
+
+        final Map<Integer, Integer> vals = new HashMap<>();
+
+        for (int i = 0; i < 100; i++)
+            vals.put(i, i);
+
+        for (CacheConfiguration ccfg : cacheConfigurations()) {
+            ccfg.setName("cache-" + cacheNames.size());
+
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0)));
+
+            cacheNames.add(ccfg.getName());
+
+            IgniteCache cache = client.createCache(ccfg);
+
+            try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.putAll(vals);
+
+                tx.commit();
+            }
+        }
+
+        final AtomicInteger nodeIdx = new AtomicInteger(1);
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        try {
+            IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    Ignite node = ignite(nodeIdx.getAndIncrement());
+
+                    int cnt = 0;
+
+                    while (!done.get()) {
+                        for (String cacheName : cacheNames) {
+                            IgniteCache cache = node.cache(cacheName);
+
+                            Map<Integer, Integer> res = cache.getAll(vals.keySet());
+
+                            assertEquals(vals, res);
+                        }
+
+                        cnt++;
+                    }
+
+                    log.info("Finished [node=" + node.name() + ", cnt=" + cnt + ']');
+
+                    return null;
+                }
+            }, NODES - 1, "get-thread");
+
+            doSleep(1000);
+
+            TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(ignite(0));
+
+            crdSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    return msg instanceof MvccCoordinatorVersionResponse;
+                }
+            });
+
+            crdSpi.waitForBlocked();
+
+            stopGrid(0);
+
+            doSleep(1000);
+
+            done.set(true);
+
+            getFut.get();
+        }
+        finally {
+            done.set(true);
+        }
+    }
+
+    /**
      * @param N Number of object to update in single transaction.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
@@ -2218,29 +2309,35 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
         assertTrue("Active queries not empty", GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    Map activeQrys = GridTestUtils.getFieldValue(crd, "activeQueries");
+                    Map queries = GridTestUtils.getFieldValue(crd, "activeQueries");
+
+                    if (!queries.isEmpty())
+                        log.info("Active queries: " + queries);
 
-                    return activeQrys.isEmpty();
+                    return queries.isEmpty();
                 }
-            }, 5_000)
+            }, 8_000)
         );
         assertTrue("Previous coordinator queries not empty", GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    Map prevCrdQueries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries");
+                    Map queries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries");
+
+                    if (!queries.isEmpty())
+                        log.info("Previous coordinator queries: " + queries);
 
-                    return prevCrdQueries.isEmpty();
+                    return queries.isEmpty();
                 }
-            }, 5_000)
+            }, 8_000)
         );
 
-        if (crd.currentCoordinatorNode().equals(node.cluster().localNode())) {
+        if (node.cluster().localNode().id().equals(crd.currentCoordinatorId())) {
             assertTrue("prevQueriesDone flag is not set", GridTestUtils.waitForCondition(
                 new GridAbsPredicate() {
                     @Override public boolean apply() {
                         return GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone");
                     }
-                }, 5_000)
+                }, 8_000)
             );
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
index 64070d1..56d09f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
@@ -51,7 +51,6 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest {
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
index 5bbf575..39183b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
@@ -52,7 +52,6 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
index d16e525..a427c63 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/MetadataStoragePageMemoryImplTest.java
@@ -67,7 +67,6 @@ public class MetadataStoragePageMemoryImplTest extends MetadataStorageSelfTest{
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
index bd849b1..467ede4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
@@ -56,7 +56,6 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 37422fb..c5997fa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -79,7 +79,6 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
             null,
             null,
             null,
-            null,
             new NoOpPageStoreManager(),
             new NoOpWALManager(),
             new IgniteCacheDatabaseSharedManager(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e7f19ed/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index ee43309..6a1d4f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
 import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
 import org.apache.ignite.internal.processors.cache.dr.GridOsCacheDrManager;
 import org.apache.ignite.internal.processors.cache.jta.CacheNoopJtaManager;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import org.apache.ignite.internal.processors.cache.query.GridCacheLocalQueryManager;
@@ -65,7 +64,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
             ctx,
             new GridCacheSharedContext<>(
                 ctx,
-                new CacheCoordinatorsSharedManager(),
                 new IgniteTxManager(),
                 new GridCacheVersionManager(),
                 new GridCacheMvccManager(),


Mime
View raw message