ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [17/17] ignite git commit: ignite-3479
Date Tue, 26 Sep 2017 14:25:08 GMT
ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/331a255c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/331a255c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/331a255c

Branch: refs/heads/ignite-3479
Commit: 331a255cd117d534f2e25bbc0b0db21c40089ea4
Parents: f3229af
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Sep 26 15:16:10 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Sep 26 17:24:40 2017 +0300

----------------------------------------------------------------------
 .../processors/affinity/AffinityAssignment.java |   9 +-
 .../affinity/GridAffinityAssignment.java        |   7 +-
 .../affinity/GridAffinityAssignmentCache.java   |   5 +-
 .../affinity/HistoryAffinityAssignment.java     |   5 +-
 .../cache/CacheAffinitySharedManager.java       |  12 +-
 .../processors/cache/ExchangeContext.java       |  43 +++++-
 .../cache/GridCacheAffinityManager.java         |   3 +-
 .../dht/GridClientPartitionTopology.java        |   7 +
 .../dht/GridDhtPartitionTopology.java           |   4 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  11 ++
 .../distributed/dht/GridDhtTxPrepareFuture.java |  14 +-
 .../dht/GridPartitionedGetFuture.java           | 145 +++++++++++++++----
 .../GridDhtPartitionsExchangeFuture.java        |  81 +++++++++--
 .../GridDhtPartitionsSingleMessage.java         |  64 ++++++--
 .../GridNearPessimisticTxPrepareFuture.java     |   7 +-
 .../mvcc/CacheCoordinatorsSharedManager.java    |  90 ++++++------
 .../processors/cache/mvcc/MvccCoordinator.java  |  58 ++++++++
 .../processors/cache/mvcc/MvccCounter.java      | 100 +++++++++++++
 .../processors/cache/mvcc/MvccQueryFuture.java  |   3 +-
 .../query/GridCacheDistributedQueryManager.java |   2 +-
 .../cache/query/GridCacheQueryManager.java      |   2 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +
 .../util/future/GridCompoundFuture.java         |   4 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |  78 +++++++++-
 24 files changed, 619 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
index acb9213..28dec1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.processors.affinity;
 
-import org.apache.ignite.cluster.ClusterNode;
-
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 
 /**
  * Cached affinity calculations.
@@ -86,5 +86,8 @@ public interface AffinityAssignment {
      */
     public Set<Integer> backupPartitions(UUID nodeId);
 
-    public ClusterNode mvccCoordinator();
+    /**
+     * @return Mvcc coordinator.
+     */
+    public MvccCoordinator mvccCoordinator();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 2913930..a7549cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -40,7 +41,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
     private final AffinityTopologyVersion topVer;
 
     /** */
-    private final ClusterNode mvccCrd;
+    private final MvccCoordinator mvccCrd;
 
     /** Collection of calculated affinity nodes. */
     private List<List<ClusterNode>> assignment;
@@ -84,7 +85,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
     GridAffinityAssignment(AffinityTopologyVersion topVer,
         List<List<ClusterNode>> assignment,
         List<List<ClusterNode>> idealAssignment,
-        ClusterNode mvccCrd) {
+        MvccCoordinator mvccCrd) {
         assert topVer != null;
         assert assignment != null;
         assert idealAssignment != null;
@@ -271,7 +272,7 @@ public class GridAffinityAssignment implements AffinityAssignment, Serializable
     }
 
     /** {@inheritDoc} */
-    @Override public ClusterNode mvccCoordinator() {
+    @Override public MvccCoordinator mvccCoordinator() {
         return mvccCrd;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 4b0659c..83837b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -184,7 +185,7 @@ public class GridAffinityAssignmentCache {
      * @param affAssignment Affinity assignment for topology version.
      */
     public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
-        ClusterNode mvccCrd = ctx.cache().context().coordinators().currentCoordinatorForCacheAffinity(topVer);
+        MvccCoordinator mvccCrd = ctx.cache().context().coordinators().currentCoordinatorForCacheAffinity(topVer);
 
         initialize(topVer, affAssignment, mvccCrd);
     }
@@ -195,7 +196,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment for topology version.
      */
-    private void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment, ClusterNode mvccCrd) {
+    private void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment, MvccCoordinator mvccCrd) {
         assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']';
         assert idealAssignment != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
index cae3611..d9c03e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.affinity;
 
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -44,7 +45,7 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
     private final boolean clientEvtChange;
 
     /** */
-    private final ClusterNode mvccCrd;
+    private final MvccCoordinator mvccCrd;
 
     /**
      * @param assign Assignment.
@@ -58,7 +59,7 @@ public class HistoryAffinityAssignment implements AffinityAssignment {
     }
 
     /** {@inheritDoc} */
-    @Override public ClusterNode mvccCoordinator() {
+    @Override public MvccCoordinator mvccCoordinator() {
         return mvccCrd;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 741e204..99727e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -448,7 +448,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         if (grpHolder.client()) {
                             ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer);
 
-                            grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+                            grp.topology().updateTopologyVersion(topFut,
+                                discoCache,
+                                cctx.coordinators().currentCoordinator(),
+                                -1,
+                                false);
 
                             grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
 
@@ -517,7 +521,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         new ClusterTopologyServerNotFoundException("All server nodes left grid."));
                 }
 
-                grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+                grp.topology().updateTopologyVersion(topFut,
+                    discoCache,
+                    cctx.coordinators().currentCoordinator(),
+                    -1,
+                    false);
 
                 grp.topology().update(topVer, partMap, null, Collections.<Integer>emptySet(), null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index c9f0744..36ce6ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -18,10 +18,12 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -52,21 +54,24 @@ public class ExchangeContext {
     private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, false);
 
     /** */
-    private final boolean mvccCrdChange;
+    private final boolean newMvccCrd;
+
+    /** */
+    private Map<MvccCounter, Integer> activeQrys;
 
     /**
      * @param crd Coordinator flag.
      * @param fut Exchange future.
      */
-    public ExchangeContext(boolean crd, boolean mvccCrdChange, GridDhtPartitionsExchangeFuture fut) {
-        this.mvccCrdChange = mvccCrdChange;
+    public ExchangeContext(boolean crd, boolean newMvccCrd, GridDhtPartitionsExchangeFuture fut) {
+        this.newMvccCrd = newMvccCrd;
 
         int protocolVer = exchangeProtocolVersion(fut.firstEventCache().minimumNodeVersion());
 
         if (compatibilityNode || (crd && fut.localJoinExchange())) {
             fetchAffOnJoin = true;
 
-            merge = !mvccCrdChange;
+            merge = !newMvccCrd;
         }
         else {
             boolean startCaches = fut.exchangeId().isJoined() &&
@@ -74,7 +79,7 @@ public class ExchangeContext {
 
             fetchAffOnJoin = protocolVer == 1;
 
-            merge = !mvccCrdChange &&
+            merge = !newMvccCrd &&
                 !startCaches &&
                 protocolVer > 1 &&
                 fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
@@ -131,10 +136,32 @@ public class ExchangeContext {
     }
 
     /**
-     * @return {@code True} if mvcc coordinator node is changed during this exchange.
+     * @return {@code True} if new node assigned as mvcc coordinator node during this exchange.
      */
-    public boolean mvccCoordinatorChange() {
-        return mvccCrdChange;
+    public boolean newMvccCoordinator() {
+        return newMvccCrd;
+    }
+
+    public Map<MvccCounter, Integer> activeQueries() {
+        return activeQrys;
+    }
+
+    public void addActiveQueries(Map<MvccCounter, Integer> activeQrys0) {
+        if (activeQrys0 == null)
+            return;
+
+        if (activeQrys != null) {
+            for (Map.Entry<MvccCounter, Integer> e : activeQrys0.entrySet()) {
+                Integer cnt = activeQrys.get(e.getKey());
+
+                if (cnt == null)
+                    activeQrys.put(e.getKey(), e.getValue());
+                else
+                    activeQrys.put(e.getKey(), cnt + e.getValue());
+            }
+        }
+        else
+            activeQrys = activeQrys0;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index a2407e5..91e4505 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
@@ -238,7 +239,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
         return aff0.cachedAffinity(topVer);
     }
 
-    public ClusterNode mvccCoordinator(AffinityTopologyVersion topVer) {
+    public MvccCoordinator mvccCoordinator(AffinityTopologyVersion topVer) {
         return assignment(topVer).mvccCoordinator();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index e994113..e328c25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
@@ -195,9 +196,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public MvccCoordinator mvccCoordinator() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
         GridDhtTopologyFuture exchFut,
         DiscoCache discoCache,
+        MvccCoordinator mvccCrd,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 4ae68ef..cf6554a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.jetbrains.annotations.Nullable;
 
@@ -69,6 +70,7 @@ public interface GridDhtPartitionTopology {
     public void updateTopologyVersion(
         GridDhtTopologyFuture exchFut,
         DiscoCache discoCache,
+        MvccCoordinator mvccCrd,
         long updateSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException;
@@ -379,4 +381,6 @@ public interface GridDhtPartitionTopology {
      * @param updateRebalanceVer {@code True} if need check rebalance state.
      */
     public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, boolean updateRebalanceVer);
+
+    public MvccCoordinator mvccCoordinator();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 380066a..1f3d00d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
@@ -137,6 +138,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** */
     private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
 
+    /** */
+    private volatile MvccCoordinator mvccCrd;
+
     /**
      * @param ctx Cache shared context.
      * @param grp Cache group.
@@ -229,9 +233,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public MvccCoordinator mvccCoordinator() {
+        return mvccCrd;
+    }
+
+    /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
         GridDhtTopologyFuture exchFut,
         DiscoCache discoCache,
+        MvccCoordinator mvccCrd,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {
@@ -255,6 +265,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             lastTopChangeVer = exchTopVer;
 
             this.discoCache = discoCache;
+            this.mvccCrd = mvccCrd;
         }
         finally {
             lock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 8795007..2af2d51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
@@ -259,6 +260,11 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteLogger logger() {
+        return log;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futId;
     }
@@ -1235,14 +1241,16 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             if (req.requestMvccCounter()) {
                 assert tx.txState().mvccEnabled(cctx);
 
-                ClusterNode crd = cctx.coordinators().currentCoordinator();
+                MvccCoordinator crd = cctx.coordinators().currentCoordinator();
 
                 assert crd != null : tx.topologyVersion();
 
-                if (crd.isLocal())
+                if (crd.node().isLocal())
                     onMvccResponse(cctx.localNodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx));
                 else {
-                    IgniteInternalFuture<Long> crdCntrFut = cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion());
+                    IgniteInternalFuture<Long> crdCntrFut = cctx.coordinators().requestTxCounter(crd.node(),
+                        this,
+                        tx.nearXidVersion());
 
                     if (tx.onePhaseCommit())
                         waitCrdCntrFut = crdCntrFut;

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 7c2e0a5..1476b2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryFuture;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
@@ -77,7 +78,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
     private static IgniteLogger log;
 
     /** */
-    private ClusterNode mvccCrd;
+    private MvccCoordinator mvccCrd;
 
     /** */
     private MvccCoordinatorVersion mvccVer;
@@ -128,11 +129,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             log = U.logger(cctx.kernalContext(), logRef, GridPartitionedGetFuture.class);
     }
 
-    /** {@inheritDoc} */
-    @Nullable @Override public MvccCoordinatorVersion onCoordinatorChange(ClusterNode oldCrd, ClusterNode newCrd) {
-        return null;
-    }
-
     /**
      * Initializes future.
      *
@@ -151,22 +147,51 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
         }
 
-        // TODO IGNITE-3478 (correct failover and remap).
         if (cctx.mvccEnabled()) {
+            trackable = true;
+
+            cctx.mvcc().addFuture(this, futId);
+
+            requestMvccVersionAndMap(topVer);
 
             return;
         }
 
+        initialMap(topVer);
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    private void initialMap(AffinityTopologyVersion topVer) {
         map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
 
         markInitialized();
     }
 
+    /** {@inheritDoc} */
+    @Nullable @Override public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) {
+        if (!cctx.mvccEnabled())
+            return null;
+
+        synchronized (this) {
+            if (mvccVer != null) {
+                mvccCrd = newCrd;
+
+                return mvccVer;
+            }
+            else if (mvccCrd != null)
+                mvccCrd = null;
+
+            return null;
+        }
+    }
+
     /**
-     *
+     * @param topVer Topology version.
      */
-    private void requestMvccVersion(final AffinityTopologyVersion topVer) {
-        mvccCrd = cctx.affinity().mvccCoordinator(topVer);
+    private void requestMvccVersionAndMap(final AffinityTopologyVersion topVer) {
+        MvccCoordinator mvccCrd = cctx.affinity().mvccCoordinator(topVer);
 
         if (mvccCrd == null) {
             onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
@@ -174,28 +199,64 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             return;
         }
 
-            final AffinityTopologyVersion topVer0 = topVer;IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = cctx.shared().coordinators().requestQueryCounter(mvccCrd);
+        synchronized (this) {
+            this.mvccCrd = mvccCrd;
+        }
+
+        MvccCoordinator curCrd = cctx.topology().mvccCoordinator();
+
+        if (!mvccCrd.equals(curCrd)) {
+            assert cctx.topology().topologyVersionFuture().initialVersion().compareTo(topVer) > 0;
+
+            // TODO IGNITE-3479.
+        }
+
+        IgniteInternalFuture<MvccCoordinatorVersion> cntrFut = cctx.shared().coordinators().requestQueryCounter(mvccCrd.node());
 
         cntrFut.listen(new IgniteInClosure<IgniteInternalFuture<MvccCoordinatorVersion>>() {
             @Override public void apply(IgniteInternalFuture<MvccCoordinatorVersion> fut) {
+                boolean needRemap = false;
+
                 try {
-                    mvccVer = fut.get();
+                    MvccCoordinatorVersion rcvdVer = fut.get();
 
-                    map(keys,
-                        Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(),
-                        topVer);
+                    synchronized (GridPartitionedGetFuture.class) {
+                        if (GridPartitionedGetFuture.this.mvccCrd != null) {
+                            mvccVer = rcvdVer;
+                        }
+                        else
+                            needRemap = true;
+                    }
 
-                    markInitialized();
+                    if (!needRemap)
+                        initialMap(topVer);
                 }
                 catch (ClusterTopologyCheckedException e) {
-                    if (canRemap) {
-
-                    }
-                    else
-                        onDone(new ClusterTopologyCheckedException("Failed to request mvcc version, coordinator failed: " + mvccCrd));
+                    needRemap = true;
                 }
                 catch (IgniteCheckedException e) {
-                    onDone(e);
+                    GridPartitionedGetFuture.this.onDone(e);
+                }
+
+                if (needRemap) {
+                    if (canRemap) {
+                        IgniteInternalFuture<AffinityTopologyVersion> waitFut = waitRemapFuture(topVer);
+
+                        waitFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                                try {
+                                    requestMvccVersionAndMap(fut.get());
+                                }
+                                catch (IgniteCheckedException e) {
+                                    GridPartitionedGetFuture.this.onDone(e);
+                                }
+                            }
+                        });
+                    }
+                    else {
+                        GridPartitionedGetFuture.this.onDone(new ClusterTopologyCheckedException("Failed to " +
+                            "request mvcc version, coordinator failed"));
+                    }
                 }
             }
         });
@@ -258,10 +319,23 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
             if (trackable)
                 cctx.mvcc().removeFuture(futId);
 
-            if (mvccVer != null) {
-                assert mvccCrd != null;
+            if (cctx.mvccEnabled()) {
+                MvccCoordinator mvccCrd0 = null;
+                MvccCoordinatorVersion mvccVer0 = null;
+
+                synchronized (this) {
+                    if (mvccVer != null) {
+                        assert mvccCrd != null;
 
-                cctx.shared().coordinators().ackQueryDone(mvccCrd, mvccVer);
+                        mvccCrd0 = mvccCrd;
+                        mvccVer0 = mvccVer;
+
+                        mvccVer = null;
+                    }
+                }
+
+                if (mvccVer0 != null)
+                    cctx.shared().coordinators().ackQueryDone(mvccCrd0.node(), mvccVer0);
             }
 
             cache().sendTtlUpdateRequest(expiryPlc);
@@ -679,6 +753,17 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
         return Collections.emptyMap();
     }
 
+    /**
+     * @param curTopVer Current topology version.
+     * @return Future to wait for before remapping.
+     */
+    private IgniteInternalFuture<AffinityTopologyVersion> waitRemapFuture(AffinityTopologyVersion curTopVer) {
+        AffinityTopologyVersion updTopVer =
+            new AffinityTopologyVersion(Math.max(curTopVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+
+        return cctx.affinity().affinityReadyFuture(updTopVer);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@@ -783,17 +868,15 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 onDone(Collections.<K, V>emptyMap());
             }
             else {
-                final AffinityTopologyVersion updTopVer =
-                    new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+                IgniteInternalFuture<AffinityTopologyVersion> waitFut = waitRemapFuture(topVer);
 
-                cctx.affinity().affinityReadyFuture(updTopVer).listen(
-                    new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                waitFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                             try {
-                                fut.get();
+                                AffinityTopologyVersion topVer = fut.get();
 
                                 // Remap.
-                                map(keys.keySet(), F.t(node, keys), updTopVer);
+                                map(keys.keySet(), F.t(node, keys), topVer);
 
                                 onDone(Collections.<K, V>emptyMap());
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 30e5621..1642263 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.ExchangeContext;
 import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.StateChangeRequest;
@@ -76,6 +77,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryFuture;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -553,25 +558,27 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             boolean crdNode = crd != null && crd.isLocal();
 
-            boolean mvccCrdChange = false;
+            boolean newMvccCrd = false;
 
-            if (localJoinExchange())
-                cctx.coordinators().reassignCoordinator(firstEvtDiscoCache);
-            else if (exchId.isLeft()){
-                ClusterNode mvccCrd = cctx.coordinators().currentCoordinator();
-
-                if (mvccCrd != null && mvccCrd.equals(exchId.eventNode())) {
-                    assert !CU.clientNode(mvccCrd) : mvccCrd;
+            if (localJoinExchange()) {
+                MvccCoordinator mvccCrd = cctx.coordinators().currentCoordinator();
 
-                    ClusterNode newMvccCrd = cctx.coordinators().reassignCoordinator(firstEvtDiscoCache);
+                if (mvccCrd == null) {
+                    newMvccCrd = cctx.coordinators().reassignCoordinator(firstEvtDiscoCache) != null &&
+                        srvNodes.size() == 1;
+                }
+            }
+            else if (exchId.isLeft()){
+                MvccCoordinator mvccCrd = cctx.coordinators().currentCoordinator();
 
-                    assert !F.eq(mvccCrd, newMvccCrd);
+                if (mvccCrd != null && mvccCrd.node().equals(exchId.eventNode())) {
+                    assert !CU.clientNode(mvccCrd.node()) : mvccCrd;
 
-                    mvccCrdChange = newMvccCrd != null;
+                    newMvccCrd = cctx.coordinators().reassignCoordinator(firstEvtDiscoCache) != null;
                 }
             }
 
-            exchCtx = new ExchangeContext(crdNode, mvccCrdChange, this);
+            exchCtx = new ExchangeContext(crdNode, newMvccCrd, this);
 
             assert state == null : state;
 
@@ -663,7 +670,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
             }
 
-            updateTopologies(crdNode);
+            updateTopologies(crdNode, cctx.coordinators().currentCoordinator());
 
             switch (exchange) {
                 case ALL: {
@@ -767,9 +774,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * @param crd Coordinator flag.
+     * @param mvccCrd Mvcc coordinator.
      * @throws IgniteCheckedException If failed.
      */
-    private void updateTopologies(boolean crd) throws IgniteCheckedException {
+    private void updateTopologies(boolean crd, MvccCoordinator mvccCrd) throws IgniteCheckedException {
         for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
             if (grp.isLocal())
                 continue;
@@ -795,12 +803,46 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             top.updateTopologyVersion(
                 this,
                 events().discoveryCache(),
+                mvccCrd,
                 updSeq,
                 cacheGroupStopping(grp.groupId()));
         }
 
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
-            top.updateTopologyVersion(this, events().discoveryCache(), -1, cacheGroupStopping(top.groupId()));
+        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
+            top.updateTopologyVersion(this,
+                events().discoveryCache(),
+                mvccCrd,
+                -1,
+                cacheGroupStopping(top.groupId()));
+        }
+
+        if (exchCtx.newMvccCoordinator()) {
+            assert mvccCrd != null;
+
+            Map<MvccCounter, Integer> activeQrys = null;
+
+            for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) {
+                if (fut instanceof MvccQueryFuture) {
+                    MvccCoordinatorVersion ver = ((MvccQueryFuture)fut).onMvccCoordinatorChange(mvccCrd);
+
+                    if (ver != null ) {
+                        MvccCounter cntr = new MvccCounter(ver.coordinatorVersion(), ver.counter());
+
+                        if (activeQrys == null)
+                            activeQrys = new HashMap<>();
+
+                        Integer cnt = activeQrys.get(cntr);
+
+                        if (cnt == null)
+                            activeQrys.put(cntr, 1);
+                        else
+                            activeQrys.put(cntr, cnt + 1);
+                    }
+                }
+            }
+
+            exchCtx.addActiveQueries(activeQrys);
+        }
     }
 
     /**
@@ -1262,6 +1304,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 msg.partitionHistoryCounters(partHistReserved0);
         }
 
+        msg.activeQueries(exchCtx.activeQueries());
+
         if (stateChangeExchange() && changeGlobalStateE != null)
             msg.setError(changeGlobalStateE);
         else if (localJoinExchange())
@@ -1437,6 +1481,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         }
 
         if (err == null) {
+            if (exchCtx.newMvccCoordinator() && cctx.localNode().equals(cctx.coordinators().currentCoordinatorNode()))
+                cctx.coordinators().initCoordinator(res, exchCtx.activeQueries());
+
             if (centralizedAff) {
                 assert !exchCtx.mergeExchanges();
 
@@ -2277,6 +2324,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
                 GridDhtPartitionsSingleMessage msg = e.getValue();
 
+                exchCtx.addActiveQueries(msg.activeQueries());
+
                 // Apply update counters after all single messages are received.
                 for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
                     Integer grpId = entry.getKey();

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 215152d..c461e4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -29,12 +29,14 @@ import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -100,6 +102,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      */
     private GridDhtPartitionsFullMessage finishMsg;
 
+    /** */
+    @GridDirectMap(keyType = Message.class, valueType = Integer.class)
+    private Map<MvccCounter, Integer> activeQrys;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -124,6 +130,20 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     /**
+     * @return Active queries started with previous coordinator.
+     */
+    Map<MvccCounter, Integer> activeQueries() {
+        return activeQrys;
+    }
+
+    /**
+     * @param activeQrys Active queries started with previous coordinator.
+     */
+    void activeQueries(Map<MvccCounter, Integer> activeQrys) {
+        this.activeQrys = activeQrys;
+    }
+
+    /**
      * @param finishMsg Exchange finish message (used to restore exchange state on new coordinator).
      */
     void finishMessage(GridDhtPartitionsFullMessage finishMsg) {
@@ -404,48 +424,54 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         switch (writer.state()) {
             case 5:
-                if (!writer.writeBoolean("client", client))
+                if (!writer.writeMap("activeQrys", activeQrys, MessageCollectionItemType.MSG, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
+                if (!writer.writeBoolean("client", client))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("errBytes", errBytes))
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeMessage("finishMsg", finishMsg))
+                if (!writer.writeByteArray("errBytes", errBytes))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT))
+                if (!writer.writeMessage("finishMsg", finishMsg))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 12:
+                if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -468,7 +494,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         switch (reader.state()) {
             case 5:
-                client = reader.readBoolean("client");
+                activeQrys = reader.readMap("activeQrys", MessageCollectionItemType.MSG, MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -476,7 +502,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 6:
-                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
+                client = reader.readBoolean("client");
 
                 if (!reader.isLastRead())
                     return false;
@@ -484,7 +510,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 7:
-                errBytes = reader.readByteArray("errBytes");
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -492,7 +518,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 8:
-                finishMsg = reader.readMessage("finishMsg");
+                errBytes = reader.readByteArray("errBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -500,7 +526,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 9:
-                grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT);
+                finishMsg = reader.readMessage("finishMsg");
 
                 if (!reader.isLastRead())
                     return false;
@@ -508,7 +534,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 10:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -516,7 +542,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 11:
-                partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -524,6 +550,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 12:
+                partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -543,7 +577,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 75a2b87..80fd326 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
 import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
@@ -295,13 +296,15 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
                 nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer);
 
             if (mvccCrd == null && cacheCtx.mvccEnabled()) {
-                mvccCrd = cacheCtx.affinity().mvccCoordinator(topVer);
+                MvccCoordinator mvccCrd0 = cacheCtx.affinity().mvccCoordinator(topVer);
 
-                if (mvccCrd == null) {
+                if (mvccCrd0 == null) {
                     onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
 
                     return;
                 }
+                else
+                    mvccCrd = mvccCrd0.node();
             }
 
             if (F.isEmpty(nodes)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
index dff1b53..2bf653c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -71,7 +72,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     private static final byte MSG_POLICY = SYSTEM_POOL;
     
     /** */
-    private volatile Coordinator curCrd;
+    private volatile MvccCoordinator curCrd;
 
     /** */
     private final AtomicLong mvccCntr = new AtomicLong(1L);
@@ -106,6 +107,23 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
     /** */
     private StatCounter[] statCntrs;
 
+    /**
+     * @param ver1 First version.
+     * @param ver2 Second version.
+     * @return
+     */
+    public static int compareVersions(MvccCoordinatorVersion ver1, MvccCoordinatorVersion ver2) {
+        assert ver1 != null;
+        assert ver2 != null;
+
+        int cmp = Long.compare(ver1.coordinatorVersion(), ver2.coordinatorVersion());
+
+        if (cmp != 0)
+            return cmp;
+
+        return Long.compare(ver1.counter(), ver2.counter());
+    }
+
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
@@ -143,7 +161,7 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
      * @return Counter.
      */
     public MvccCoordinatorVersion requestTxCounterOnCoordinator(IgniteInternalTx tx) {
-        assert cctx.localNode().equals(currentCoordinator());
+        assert cctx.localNode().equals(currentCoordinatorNode());
 
         return assignTxCounter(tx.nearXidVersion(), 0L);
     }
@@ -681,48 +699,52 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
         }
     }
 
-    public ClusterNode currentCoordinator() {
-        Coordinator crd = curCrd;
+    public MvccCoordinator currentCoordinator() {
+        return curCrd;
+    }
 
-        return crd != null ? crd.crd : null;
+    public ClusterNode currentCoordinatorNode() {
+        MvccCoordinator curCrd = this.curCrd;
+
+        return curCrd != null ? curCrd.node() : null;
     }
 
-    public ClusterNode currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) {
-        Coordinator crd = curCrd;
+    public MvccCoordinator currentCoordinatorForCacheAffinity(AffinityTopologyVersion topVer) {
+        MvccCoordinator crd = curCrd;
 
-        assert crd == null || crd.topVer.compareTo(topVer) <= 0 : crd;
+        // Assert coordinator did not already change.
+        assert crd == null || crd.topologyVersion().compareTo(topVer) <= 0 : crd;
 
-        return crd != null ? crd.crd : null;
+        return crd;
     }
 
     /**
      * @param discoCache Discovery snapshot.
      */
-    public ClusterNode reassignCoordinator(DiscoCache discoCache) {
-        ClusterNode curCrd = currentCoordinator();
-
-        assert curCrd == null || !discoCache.allNodes().contains(curCrd) : curCrd;
-
-        ClusterNode newCrd;
+    public MvccCoordinator reassignCoordinator(DiscoCache discoCache) {
+        assert curCrd == null || !discoCache.allNodes().contains(curCrd.node()) : curCrd;
 
         if (!discoCache.serverNodes().isEmpty()) {
-            newCrd = discoCache.serverNodes().get(0);
-
-            if (cctx.localNode().equals(newCrd)) {
-                crdVer = discoCache.version().topologyVersion();
-
-                crdLatch.countDown();
-            }
+            curCrd = new MvccCoordinator(discoCache.serverNodes().get(0), discoCache.version());
 
             log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() +
-                ", crd=" + newCrd + ']');
+                ", crd=" + curCrd.node().id() + ']');
         }
         else
-            newCrd = null;
+            curCrd = null;
 
-        this.curCrd = new Coordinator(newCrd, discoCache.version());
+        return curCrd;
+    }
 
-        return newCrd;
+    public void initCoordinator(AffinityTopologyVersion topVer, @Nullable Map<MvccCounter, Integer> activeQrys) {
+        assert cctx.localNode().equals(curCrd.node());
+
+        log.info("Initialize local node as mvcc coordinator [node=" + cctx.localNodeId() +
+            ", topVer=" + topVer + ']');
+
+        crdVer = topVer.topologyVersion();
+
+        crdLatch.countDown();
     }
 
     /**
@@ -1000,20 +1022,4 @@ public class CacheCoordinatorsSharedManager<K, V> extends GridCacheSharedManager
             this.txId = txId;
         }
     }
-
-    /**
-     *
-     */
-    static class Coordinator {
-        /** */
-        final ClusterNode crd;
-
-        /** */
-        final AffinityTopologyVersion topVer;
-
-        Coordinator(ClusterNode crd, AffinityTopologyVersion topVer) {
-            this.crd = crd;
-            this.topVer = topVer;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
index cc92cc3..2affc5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinator.java
@@ -1,7 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+
 /**
  *
  */
 public class MvccCoordinator {
+    /** */
+    private final ClusterNode crd;
+
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    public MvccCoordinator(ClusterNode crd, final AffinityTopologyVersion topVer) {
+        this.crd = crd;
+        this.topVer = topVer;
+    }
+
+    public ClusterNode node() {
+        return crd;
+    }
+
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    @Override public boolean equals(Object other) {
+        if (this == other)
+            return true;
+
+        if (other == null || getClass() != other.getClass())
+            return false;
+
+        MvccCoordinator that = (MvccCoordinator)other;
+
+        return topVer.equals(topVer) && crd.equals(that.crd);
+    }
+
+    @Override public int hashCode() {
+        int res = crd.hashCode();
+
+        res = 31 * res + topVer.hashCode();
+
+        return res;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
new file mode 100644
index 0000000..847822e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCounter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class MvccCounter implements Message {
+    /** */
+    private long crdVer;
+
+    /** */
+    private long cntr;
+
+    public MvccCounter() {
+        // No-po.
+    }
+
+    public MvccCounter(long crdVer, long cntr) {
+        this.crdVer = crdVer;
+        this.cntr = cntr;
+    }
+
+    public long coordinatorVersion() {
+        return crdVer;
+    }
+
+    public long counter() {
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        MvccCounter that = (MvccCounter) o;
+
+        return crdVer == that.crdVer && cntr == that.cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = (int) (crdVer ^ (crdVer >>> 32));
+        res = 31 * res + (int) (cntr ^ (cntr >>> 32));
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    @Override public String toString() {
+        return super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
index 1463577..4d66437 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryFuture.java
@@ -17,12 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
-import org.apache.ignite.cluster.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
 public interface MvccQueryFuture {
-    @Nullable public MvccCoordinatorVersion onCoordinatorChange(ClusterNode oldCrd, ClusterNode newCrd);
+    @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 3c43768..f5df18b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -539,7 +539,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
             final MvccCoordinatorVersion mvccVer;
 
             if (cctx.mvccEnabled()) {
-                mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
+                mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()).node();
 
                 IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index f46f8d2..67bd6e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1465,7 +1465,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             // TODO IGNITE-3478.
             if (cctx.mvccEnabled()) {
-                mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion());
+                mvccCrd = cctx.affinity().mvccCoordinator(cctx.shared().exchange().readyAffinityVersion()).node();
 
                 IgniteInternalFuture<MvccCoordinatorVersion> fut0 = cctx.shared().coordinators().requestQueryCounter(mvccCrd);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 0081ec2..92e6785 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -358,6 +358,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
      * @param ret Result.
      */
     public void implicitSingleResult(GridCacheReturn ret) {
+        assert ret != null;
+
         if (ret.invokeResult())
             implicitRes.mergeEntryProcessResults(ret);
         else

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index a724060..87f5882 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -285,12 +285,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
                 onDone(rdc != null ? rdc.reduce() : null);
             }
             catch (RuntimeException e) {
-                logError(null, "Failed to execute compound future reducer: " + this, e);
+                logError(logger(), "Failed to execute compound future reducer: " + this, e);
 
                 onDone(e);
             }
             catch (AssertionError e) {
-                logError(null, "Failed to execute compound future reducer: " + this, e);
+                logError(logger(), "Failed to execute compound future reducer: " + this, e);
 
                 onDone(e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/331a255c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 6b01aef..1b8a509 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1675,7 +1675,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
      *
      * @throws Exception If failed.
      */
-    public void _testReadInProgressCoordinatorFails() throws Exception {
+    public void testReadInProgressCoordinatorFails() throws Exception {
         testSpi = true;
 
         startGrids(4);
@@ -1766,6 +1766,82 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testMvccCoordinatorChangeSimple() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        final List<String> cacheNames = new ArrayList<>();
+
+        for (CacheConfiguration ccfg : cacheConfigurations()) {
+            ccfg.setName("cache-" + cacheNames.size());
+
+            cacheNames.add(ccfg.getName());
+
+            srv0.createCache(ccfg);
+        }
+
+        checkPutGet(cacheNames);
+
+        for (int i = 0; i < 3; i++) {
+            startGrid(i + 1);
+
+            checkPutGet(cacheNames);
+        }
+
+        client = true;
+
+        for (int i = 0; i < 3; i++) {
+            Ignite node = startGrid(i + 4);
+
+            // Init client caches outside of transactions.
+            for (String cacheName : cacheNames)
+                node.cache(cacheName);
+
+            checkPutGet(cacheNames);
+        }
+
+        for (int i = 0; i < 3; i++) {
+            stopGrid(i);
+
+            checkPutGet(cacheNames);
+        }
+    }
+
+    /**
+     * @param cacheNames Cache names.
+     */
+    private void checkPutGet(List<String> cacheNames) {
+        List<Ignite> nodes = G.allGrids();
+
+        assertFalse(nodes.isEmpty());
+
+        Ignite putNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+
+        Map<Integer, Integer> vals = new HashMap();
+
+        Integer val = ThreadLocalRandom.current().nextInt();
+
+        for (int i = 0; i < 10; i++)
+            vals.put(i, val);
+
+        try (Transaction tx = putNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (String cacheName : cacheNames)
+                putNode.cache(cacheName).putAll(vals);
+
+            tx.commit();
+        }
+
+        for (Ignite node : nodes) {
+            for (String cacheName : cacheNames) {
+                Map<Object, Object> res = node.cache(cacheName).getAll(vals.keySet());
+
+                assertEquals(vals, res);
+            }
+        }
+    }
+
+    /**
      * @param N Number of object to update in single transaction.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.


Mime
View raw message