ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptupit...@apache.org
Subject [13/23] ignite git commit: Cleanup DiscoCache.
Date Wed, 06 Sep 2017 14:28:29 GMT
Cleanup DiscoCache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/788adc0d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/788adc0d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/788adc0d

Branch: refs/heads/ignite-5896
Commit: 788adc0d00e0455e06cc9acabfe9ad425fdcd65b
Parents: 1202f3f
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Sep 6 11:14:21 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Sep 6 11:14:21 2017 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java |  62 +--
 .../discovery/GridDiscoveryManager.java         |  38 +-
 .../cache/CacheAffinitySharedManager.java       |  17 +-
 .../GridCachePartitionExchangeManager.java      |   9 +-
 .../processors/cache/GridCacheUtils.java        |  53 +--
 .../dht/GridClientPartitionTopology.java        |  11 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   6 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  48 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java | 440 +++++++++----------
 .../dht/atomic/GridDhtAtomicCache.java          |   2 +-
 .../dht/colocated/GridDhtColocatedCache.java    |   4 +-
 .../GridDhtPartitionsExchangeFuture.java        |  12 +-
 .../near/GridNearTransactionalCache.java        |   4 +-
 .../cache/distributed/near/GridNearTxLocal.java |  20 +-
 .../cache/query/GridCacheQueryAdapter.java      |   2 +-
 .../cache/query/GridCacheQueryManager.java      |   6 +-
 .../cache/transactions/IgniteTxHandler.java     |  15 +-
 .../service/GridServiceProcessor.java           |   4 +-
 .../CacheLateAffinityAssignmentTest.java        |   2 +-
 19 files changed, 302 insertions(+), 453 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 5ac99f1..4b57eb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -58,13 +58,6 @@ public class DiscoCache {
     /** Daemon nodes. */
     private final List<ClusterNode> daemonNodes;
 
-    /** All server nodes. */
-    private final List<ClusterNode> srvNodesWithCaches;
-
-    /** All nodes with at least one cache configured. */
-    @GridToStringInclude
-    private final List<ClusterNode> allNodesWithCaches;
-
     /** All remote nodes with at least one cache configured. */
     @GridToStringInclude
     private final List<ClusterNode> rmtNodesWithCaches;
@@ -97,8 +90,6 @@ public class DiscoCache {
      * @param allNodes All nodes.
      * @param srvNodes Server nodes.
      * @param daemonNodes Daemon nodes.
-     * @param srvNodesWithCaches Server nodes with at least one cache configured.
-     * @param allNodesWithCaches All nodes with at least one cache configured.
      * @param rmtNodesWithCaches Remote nodes with at least one cache configured.
      * @param allCacheNodes Cache nodes by cache name.
      * @param cacheGrpAffNodes Affinity nodes by cache group ID.
@@ -113,8 +104,6 @@ public class DiscoCache {
         List<ClusterNode> allNodes,
         List<ClusterNode> srvNodes,
         List<ClusterNode> daemonNodes,
-        List<ClusterNode> srvNodesWithCaches,
-        List<ClusterNode> allNodesWithCaches,
         List<ClusterNode> rmtNodesWithCaches,
         Map<Integer, List<ClusterNode>> allCacheNodes,
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
@@ -127,8 +116,6 @@ public class DiscoCache {
         this.allNodes = allNodes;
         this.srvNodes = srvNodes;
         this.daemonNodes = daemonNodes;
-        this.srvNodesWithCaches = srvNodesWithCaches;
-        this.allNodesWithCaches = allNodesWithCaches;
         this.rmtNodesWithCaches = rmtNodesWithCaches;
         this.allCacheNodes = allCacheNodes;
         this.cacheGrpAffNodes = cacheGrpAffNodes;
@@ -195,36 +182,13 @@ public class DiscoCache {
         return daemonNodes;
     }
 
-    /** @return Server nodes with at least one cache configured. */
-    public List<ClusterNode> serverNodesWithCaches() {
-        return srvNodesWithCaches;
-    }
-
     /**
-     * Gets all remote nodes that have at least one cache configured.
+     * Gets all alive remote nodes that have at least one cache configured.
      *
      * @return Collection of nodes.
      */
-    public List<ClusterNode> remoteNodesWithCaches() {
-        return rmtNodesWithCaches;
-    }
-
-    /**
-     * Gets collection of nodes with at least one cache configured.
-     *
-     * @return Collection of nodes.
-     */
-    public List<ClusterNode> allNodesWithCaches() {
-        return allNodesWithCaches;
-    }
-
-    /**
-     * Gets collection of server nodes with at least one cache configured.
-     *
-     * @return Collection of nodes.
-     */
-    public Collection<ClusterNode> aliveServerNodes() {
-        return F.view(serverNodes(), new P1<ClusterNode>() {
+    public Collection<ClusterNode> remoteAliveNodesWithCaches() {
+        return F.view(rmtNodesWithCaches, new P1<ClusterNode>() {
             @Override public boolean apply(ClusterNode node) {
                 return alives.contains(node.id());
             }
@@ -236,8 +200,8 @@ public class DiscoCache {
      *
      * @return Collection of nodes.
      */
-    public Collection<ClusterNode> aliveServerNodesWithCaches() {
-        return F.view(serverNodesWithCaches(), new P1<ClusterNode>() {
+    public Collection<ClusterNode> aliveServerNodes() {
+        return F.view(serverNodes(), new P1<ClusterNode>() {
             @Override public boolean apply(ClusterNode node) {
                 return alives.contains(node.id());
             }
@@ -248,16 +212,14 @@ public class DiscoCache {
      * @return Oldest alive server node.
      */
     public @Nullable ClusterNode oldestAliveServerNode(){
-        Iterator<ClusterNode> it = aliveServerNodes().iterator();
-        return it.hasNext() ? it.next() : null;
-    }
+        for (int i = 0; i < srvNodes.size(); i++) {
+            ClusterNode srv = srvNodes.get(i);
 
-    /**
-     * @return Oldest alive server node with at least one cache configured.
-     */
-    public @Nullable ClusterNode oldestAliveServerNodeWithCache(){
-        Iterator<ClusterNode> it = aliveServerNodesWithCaches().iterator();
-        return it.hasNext() ? it.next() : null;
+            if (alives.contains(srv.id()))
+                return srv;
+        }
+
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 56af9bf..fa5d053 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1845,42 +1845,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Gets cache nodes for cache with given ID.
-     *
-     * @param cacheId Cache ID.
-     * @param topVer Topology version.
-     * @return Collection of cache nodes.
-     */
-    public Collection<ClusterNode> cacheNodes(int cacheId, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheId, topVer).cacheNodes(cacheId);
-    }
-
-    /**
-     * Gets all nodes with at least one cache configured.
-     *
-     * @param topVer Topology version.
-     * @return Collection of cache nodes.
-     */
-    public Collection<ClusterNode> cacheNodes(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(CU.cacheId(null), topVer).allNodesWithCaches();
-    }
-
-    /**
      * Gets cache remote nodes for cache with given name.
      *
      * @param topVer Topology version.
      * @return Collection of cache nodes.
      */
-    public Collection<ClusterNode> remoteCacheNodes(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(CU.cacheId(null), topVer).remoteNodesWithCaches();
+    public Collection<ClusterNode> remoteAliveNodesWithCaches(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(CU.cacheId(null), topVer).remoteAliveNodesWithCaches();
     }
 
     /**
      * @param topVer Topology version (maximum allowed node order).
      * @return Oldest alive server nodes with at least one cache configured.
      */
-    @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(CU.cacheId(null), topVer).oldestAliveServerNodeWithCache();
+    @Nullable public ClusterNode oldestAliveServerNode(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(CU.cacheId(null), topVer).oldestAliveServerNode();
     }
 
     /**
@@ -2243,9 +2222,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size());
 
-        Set<ClusterNode> allNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
         Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
-        Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
 
         for (ClusterNode node : allNodes) {
             assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
@@ -2270,11 +2247,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 CachePredicate filter = entry.getValue();
 
                 if (filter.cacheNode(node)) {
-                    allNodesWithCaches.add(node);
-
-                    if(!CU.clientNode(node))
-                        srvNodesWithCaches.add(node);
-
                     if (!node.isLocal())
                         rmtNodesWithCaches.add(node);
 
@@ -2291,8 +2263,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             Collections.unmodifiableList(allNodes),
             Collections.unmodifiableList(srvNodes),
             Collections.unmodifiableList(daemonNodes),
-            U.sealList(srvNodesWithCaches),
-            U.sealList(allNodesWithCaches),
             U.sealList(rmtNodesWithCaches),
             Collections.unmodifiableMap(allCacheNodes),
             Collections.unmodifiableMap(cacheGrpAffNodes),

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 39a5ea8..beb7a7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1659,7 +1659,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         if (!aff.lastVersion().equals(topVer))
                             calculateAndInit(fut.events(), aff, topVer);
 
-                        grpHolder.topology().beforeExchange(fut, true, false);
+                        grpHolder.topology(fut.context().events().discoveryCache()).beforeExchange(fut, true, false);
                     }
                     else {
                         List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
@@ -1715,7 +1715,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         if (!aff.lastVersion().equals(topVer))
                             calculateAndInit(fut.events(), aff, topVer);
 
-                        grpHolder.topology().beforeExchange(fut, true, false);
+                        grpHolder.topology(fut.context().events().discoveryCache()).beforeExchange(fut, true, false);
                     }
                 }
 
@@ -1833,7 +1833,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         Map<UUID, GridDhtPartitionMap> map = affinityFullMap(aff);
 
                         for (GridDhtPartitionMap map0 : map.values())
-                            cache.topology().update(fut.exchangeId(), map0, true);
+                            cache.topology(fut.context().events().discoveryCache()).update(fut.exchangeId(), map0, true);
                     }
                 }
             });
@@ -2059,7 +2059,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 List<List<ClusterNode>> newAssignment0 = initAff ? new ArrayList<>(newAssignment) : null;
 
-                GridDhtPartitionTopology top = grpHolder.topology();
+                GridDhtPartitionTopology top = grpHolder.topology(fut.context().events().discoveryCache());
 
                 Map<Integer, List<T>> cacheAssignment = null;
 
@@ -2277,9 +2277,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /**
+         * @param discoCache Discovery data cache.
          * @return Cache topology.
          */
-        abstract GridDhtPartitionTopology topology();
+        abstract GridDhtPartitionTopology topology(DiscoCache discoCache);
 
         /**
          * @return Affinity.
@@ -2314,7 +2315,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /** {@inheritDoc} */
-        @Override public GridDhtPartitionTopology topology() {
+        @Override public GridDhtPartitionTopology topology(DiscoCache discoCache) {
             return grp.topology();
         }
     }
@@ -2390,8 +2391,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /** {@inheritDoc} */
-        @Override public GridDhtPartitionTopology topology() {
-            return cctx.exchange().clientTopology(groupId());
+        @Override public GridDhtPartitionTopology topology(DiscoCache discoCache) {
+            return cctx.exchange().clientTopology(groupId(), discoCache);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index bd34a5f..fe9ed29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -719,9 +719,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /**
      * @param grpId Cache group ID.
+     * @param discoCache Discovery data cache.
      * @return Topology.
      */
-    public GridDhtPartitionTopology clientTopology(int grpId) {
+    public GridDhtPartitionTopology clientTopology(int grpId, DiscoCache discoCache) {
         GridClientPartitionTopology top = clientTops.get(grpId);
 
         if (top != null)
@@ -741,7 +742,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             aff.partitions());
 
         GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
-            top = new GridClientPartitionTopology(cctx, grpId, aff.partitions(), affKey));
+            top = new GridClientPartitionTopology(cctx, discoCache, grpId, aff.partitions(), affKey));
 
         return old != null ? old : top;
     }
@@ -925,7 +926,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * for non coordinator -  {@link GridDhtPartitionsSingleMessage SingleMessages} send
      */
     private void refreshPartitions() {
-        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
+        ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
 
         if (oldest == null) {
             if (log.isDebugEnabled())
@@ -955,7 +956,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             AffinityTopologyVersion rmtTopVer =
                 lastFut != null ? (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion()) : AffinityTopologyVersion.NONE;
 
-            Collection<ClusterNode> rmts = CU.remoteNodes(cctx, rmtTopVer);
+            Collection<ClusterNode> rmts = cctx.discovery().remoteAliveNodesWithCaches(rmtTopVer);
 
             if (log.isDebugEnabled())
                 log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 070fc81..2018a64 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -441,59 +441,14 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets all nodes on which cache with the same name is started.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return All nodes on which cache with the same name is started (including nodes
-     *      that may have already left).
-     */
-    public static Collection<ClusterNode> allNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().cacheNodes(ctx.cacheId(), topOrder);
-    }
-
-    /**
-     * Gets all nodes with at least one cache configured.
-     *
-     * @param ctx Shared cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return All nodes on which cache with the same name is started (including nodes
-     *      that may have already left).
-     */
-    public static Collection<ClusterNode> allNodes(GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().cacheNodes(topOrder);
-    }
-
-    /**
-     * Gets remote nodes with at least one cache configured.
-     *
-     * @param ctx Cache shared context.
-     * @param topVer Topology version.
-     * @return Collection of remote nodes with at least one cache configured.
-     */
-    public static Collection<ClusterNode> remoteNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
-        return ctx.discovery().remoteCacheNodes(topVer);
-    }
-
-    /**
-     * Gets all nodes on which cache with the same name is started and the local DHT storage is enabled.
-     *
-     * @param ctx Cache context.
-     * @return All nodes on which cache with the same name is started.
-     */
-    public static Collection<ClusterNode> affinityNodes(final GridCacheContext ctx) {
-        return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), AffinityTopologyVersion.NONE);
-    }
-
-    /**
      * Gets DHT affinity nodes.
      *
      * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Affinity nodes.
+     * @param topVer Topology version.
+     * @return Cache affinity nodes for given topology version.
      */
-    public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), topOrder);
+    public static Collection<ClusterNode> affinityNodes(GridCacheContext ctx, AffinityTopologyVersion topVer) {
+        return ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), topVer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 299394f..e994113 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -120,17 +120,20 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /**
      * @param cctx Context.
+     * @param discoCache Discovery data cache.
      * @param grpId Group ID.
      * @param parts Number of partitions in the group.
      * @param similarAffKey Key to find caches with similar affinity.
      */
     public GridClientPartitionTopology(
         GridCacheSharedContext<?, ?> cctx,
+        DiscoCache discoCache,
         int grpId,
         int parts,
         Object similarAffKey
     ) {
         this.cctx = cctx;
+        this.discoCache = discoCache;
         this.grpId = grpId;
         this.similarAffKey = similarAffKey;
         this.parts = parts;
@@ -338,7 +341,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         }
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+        ClusterNode oldest = discoCache.oldestAliveServerNode();
 
         assert oldest != null;
 
@@ -535,7 +538,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
      * @return List of nodes for the partition.
      */
     private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.allNodesWithCaches()) : null;
+        Collection<UUID> allIds = F.nodeIds(discoCache.cacheGroupAffinityNodes(grpId));
 
         lock.readLock().lock();
 
@@ -961,10 +964,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         assert nodeId.equals(cctx.localNodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+        ClusterNode oldest = discoCache.oldestAliveServerNode();
 
         // If this node became the oldest node.
-        if (oldest.id().equals(cctx.localNodeId())) {
+        if (cctx.localNode().equals(oldest)) {
             long seq = node2part.updateSequence();
 
             if (seq != updateSeq) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f7f71a1..01d972d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -326,7 +326,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         if (grp.affinityNode()) {
             ClusterNode loc = ctx.localNode();
 
-            ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+            ClusterNode oldest = discoCache.oldestAliveServerNode();
 
             GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
@@ -466,7 +466,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         lastTopChangeVer = readyTopVer = evts.topologyVersion();
                     }
 
-                    ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+                    ClusterNode oldest = discoCache.oldestAliveServerNode();
 
                     if (log.isDebugEnabled()) {
                         log.debug("Partition map beforeExchange [exchId=" + exchFut.exchangeId() +
@@ -2047,7 +2047,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private long updateLocal(int p, GridDhtPartitionState state, long updateSeq, AffinityTopologyVersion affVer) {
         assert lock.isWriteLockedByCurrentThread();
 
-        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+        ClusterNode oldest = discoCache.oldestAliveServerNode();
 
         assert oldest != null || ctx.kernalContext().clientNode();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 5b8a7b5..ab5631e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -34,10 +34,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -313,24 +313,10 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     /**
      * Prepares next batch of entries in dht transaction.
      *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param verMap Version map.
-     * @param msgId Message ID.
-     * @param nearMiniId Near mini future ID.
-     * @param txNodes Transaction nodes mapping.
-     * @param last {@code True} if this is last prepare request.
+     * @param req Prepare request.
      * @return Future that will be completed when locks are acquired.
      */
-    public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
-        @Nullable Collection<IgniteTxEntry> reads,
-        @Nullable Collection<IgniteTxEntry> writes,
-        Map<IgniteTxKey, GridCacheVersion> verMap,
-        long msgId,
-        int nearMiniId,
-        Map<UUID, Collection<UUID>> txNodes,
-        boolean last
-    ) {
+    public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(GridNearTxPrepareRequest req) {
         // In optimistic mode prepare still can be called explicitly from salvageTx.
         GridDhtTxPrepareFuture fut = prepFut;
 
@@ -344,14 +330,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
                 cctx,
                 this,
                 timeout,
-                nearMiniId,
-                verMap,
-                last,
+                req.miniId(),
+                req.dhtVersions(),
+                req.last(),
                 needReturnValue()))) {
                 GridDhtTxPrepareFuture f = prepFut;
 
-                assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
-                    "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']';
+                assert f.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " +
+                    "[futMiniId=" + f.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + f + ']';
 
                 if (timeout == -1)
                     f.onError(timeoutException());
@@ -360,8 +346,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             }
         }
         else {
-            assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " +
-                "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']';
+            assert fut.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " +
+                "[futMiniId=" + fut.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + fut + ']';
 
             // Prepare was called explicitly.
             return chainOnePhasePrepare(fut);
@@ -389,14 +375,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         }
 
         try {
-            if (reads != null) {
-                for (IgniteTxEntry e : reads)
-                    addEntry(msgId, e);
+            if (req.reads() != null) {
+                for (IgniteTxEntry e : req.reads())
+                    addEntry(req.messageId(), e);
             }
 
-            if (writes != null) {
-                for (IgniteTxEntry e : writes)
-                    addEntry(msgId, e);
+            if (req.writes() != null) {
+                for (IgniteTxEntry e : req.writes())
+                    addEntry(req.messageId(), e);
             }
 
             userPrepare(null);
@@ -407,7 +393,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             if (isSystemInvalidate())
                 fut.complete();
             else
-                fut.prepare(reads, writes, txNodes);
+                fut.prepare(req);
         }
         catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
             fut.onError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ce3c290..1c5e1a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -168,14 +169,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     @SuppressWarnings("UnusedDeclaration")
     private volatile int mapped;
 
-    /** Prepare reads. */
-    private Iterable<IgniteTxEntry> reads;
-
-    /** Prepare writes. */
-    private Iterable<IgniteTxEntry> writes;
-
-    /** Tx nodes. */
-    private Map<UUID, Collection<UUID>> txNodes;
+    /** Prepare request. */
+    private GridNearTxPrepareRequest req;
 
     /** Trackable flag. */
     private boolean trackable = true;
@@ -341,7 +336,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     private void onEntriesLocked() {
         ret = new GridCacheReturn(null, tx.localResult(), true, null, true);
 
-        for (IgniteTxEntry writeEntry : writes) {
+        for (IgniteTxEntry writeEntry : req.writes()) {
             IgniteTxEntry txEntry = tx.entry(writeEntry.txKey());
 
             assert txEntry != null : writeEntry;
@@ -597,10 +592,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
         if (log.isDebugEnabled())
             log.debug("Marking all local candidates as ready: " + this);
 
-        readyLocks(writes);
+        readyLocks(req.writes());
 
         if (tx.serializable() && tx.optimistic())
-            readyLocks(reads);
+            readyLocks(req.reads());
 
         locksReady = true;
     }
@@ -896,8 +891,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
      */
     private void addDhtValues(GridNearTxPrepareResponse res) {
         // Interceptor on near node needs old values to execute callbacks.
-        if (!F.isEmpty(writes)) {
-            for (IgniteTxEntry e : writes) {
+        if (!F.isEmpty(req.writes())) {
+            for (IgniteTxEntry e : req.writes()) {
                 IgniteTxEntry txEntry = tx.entry(e.txKey());
 
                 assert txEntry != null : "Missing tx entry for key [tx=" + tx + ", key=" + e.txKey() + ']';
@@ -1002,33 +997,30 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     /**
      * Initializes future.
      *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param txNodes Transaction nodes mapping.
+     * @param req Prepare request.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes,
-        Map<UUID, Collection<UUID>> txNodes) {
+    public void prepare(GridNearTxPrepareRequest req) {
+        assert req != null;
+
         if (tx.empty()) {
             tx.setRollbackOnly();
 
             onDone((GridNearTxPrepareResponse)null);
         }
 
-        this.reads = reads;
-        this.writes = writes;
-        this.txNodes = txNodes;
+        this.req = req;
 
         boolean ser = tx.serializable() && tx.optimistic();
 
-        if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) {
+        if (!F.isEmpty(req.writes()) || (ser && !F.isEmpty(req.reads()))) {
             Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
 
-            for (IgniteTxEntry entry : writes)
+            for (IgniteTxEntry entry : req.writes())
                 forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
 
             if (ser) {
-                for (IgniteTxEntry entry : reads)
+                for (IgniteTxEntry entry : req.reads())
                     forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
             }
 
@@ -1196,10 +1188,10 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 IgniteCheckedException err0;
 
                 try {
-                    err0 = checkReadConflict(writes);
+                    err0 = checkReadConflict(req.writes());
 
                     if (err0 == null)
-                        err0 = checkReadConflict(reads);
+                        err0 = checkReadConflict(req.reads());
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to check entry version: " + e, e);
@@ -1230,258 +1222,262 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             // We are holding transaction-level locks for entries here, so we can get next write version.
             tx.writeVersion(cctx.versions().next(tx.topologyVersion()));
 
-            {
-                // Assign keys to primary nodes.
-                if (!F.isEmpty(writes)) {
-                    for (IgniteTxEntry write : writes)
-                        map(tx.entry(write.txKey()));
-                }
+            // Assign keys to primary nodes.
+            if (!F.isEmpty(req.writes())) {
+                for (IgniteTxEntry write : req.writes())
+                    map(tx.entry(write.txKey()));
+            }
 
-                if (!F.isEmpty(reads)) {
-                    for (IgniteTxEntry read : reads)
-                        map(tx.entry(read.txKey()));
-                }
+            if (!F.isEmpty(req.reads())) {
+                for (IgniteTxEntry read : req.reads())
+                    map(tx.entry(read.txKey()));
             }
 
             if (isDone())
                 return;
 
-            if (last) {
-                if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
-                    for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
-                        if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
-                            tx.onePhaseCommit(false);
+            if (last)
+                sendPrepareRequests();
+        }
+        finally {
+            markInitialized();
+        }
+    }
 
-                            break;
-                        }
-                    }
-                }
+    /**
+     *
+     */
+    private void sendPrepareRequests() {
+        if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
+            for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+                if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+                    tx.onePhaseCommit(false);
 
-                int miniId = 0;
+                    break;
+                }
+            }
+        }
 
-                assert tx.transactionNodes() != null;
+        int miniId = 0;
 
-                final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
+        assert tx.transactionNodes() != null;
 
-                // Create mini futures.
-                for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
-                    assert !dhtMapping.empty();
+        final long timeout = timeoutObj != null ? timeoutObj.timeout : 0;
 
-                    ClusterNode n = dhtMapping.primary();
+        // Create mini futures.
+        for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) {
+            assert !dhtMapping.empty();
 
-                    assert !n.isLocal();
+            ClusterNode n = dhtMapping.primary();
 
-                    GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
+            assert !n.isLocal();
 
-                    Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
+            GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id());
 
-                    Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
+            Collection<IgniteTxEntry> nearWrites = nearMapping == null ? null : nearMapping.writes();
 
-                    if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
-                        continue;
+            Collection<IgniteTxEntry> dhtWrites = dhtMapping.writes();
 
-                    if (tx.remainingTime() == -1)
-                        return;
+            if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites))
+                continue;
 
-                    MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+            if (tx.remainingTime() == -1)
+                return;
 
-                    add(fut); // Append new future.
+            MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping);
+
+            add(fut); // Append new future.
+
+            assert req.transactionNodes() != null;
+
+            GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+                futId,
+                fut.futureId(),
+                tx.topologyVersion(),
+                tx,
+                timeout,
+                dhtWrites,
+                nearWrites,
+                this.req.transactionNodes(),
+                tx.nearXidVersion(),
+                true,
+                tx.onePhaseCommit(),
+                tx.subjectId(),
+                tx.taskNameHash(),
+                tx.activeCachesDeploymentEnabled(),
+                tx.storeWriteThrough(),
+                retVal);
+
+            int idx = 0;
+
+            for (IgniteTxEntry entry : dhtWrites) {
+                try {
+                    GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
 
-                    assert txNodes != null;
+                    GridCacheContext<?, ?> cacheCtx = cached.context();
 
-                    GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                        futId,
-                        fut.futureId(),
-                        tx.topologyVersion(),
-                        tx,
-                        timeout,
-                        dhtWrites,
-                        nearWrites,
-                        txNodes,
-                        tx.nearXidVersion(),
-                        true,
-                        tx.onePhaseCommit(),
-                        tx.subjectId(),
-                        tx.taskNameHash(),
-                        tx.activeCachesDeploymentEnabled(),
-                        tx.storeWriteThrough(),
-                        retVal);
+                    // Do not invalidate near entry on originating transaction node.
+                    req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
+                        cached.readerId(n.id()) != null);
 
-                    int idx = 0;
+                    if (cached.isNewLocked()) {
+                        List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
+                            tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
 
-                    for (IgniteTxEntry entry : dhtWrites) {
-                        try {
-                            GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached();
+                        // Do not preload if local node is a partition owner.
+                        if (!owners.contains(cctx.localNode()))
+                            req.markKeyForPreload(idx);
+                    }
 
-                            GridCacheContext<?, ?> cacheCtx = cached.context();
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    assert false : "Got removed exception on entry with dht local candidate: " + entry;
+                }
 
-                            // Do not invalidate near entry on originating transaction node.
-                            req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
-                                cached.readerId(n.id()) != null);
+                idx++;
+            }
 
-                            if (cached.isNewLocked()) {
-                                List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
-                                    tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
+            if (!F.isEmpty(nearWrites)) {
+                for (IgniteTxEntry entry : nearWrites) {
+                    try {
+                        if (entry.explicitVersion() == null) {
+                            GridCacheMvccCandidate added = entry.cached().candidate(version());
 
-                                // Do not preload if local node is a partition owner.
-                                if (!owners.contains(cctx.localNode()))
-                                    req.markKeyForPreload(idx);
-                            }
+                            assert added != null : "Missing candidate for cache entry:" + entry;
+                            assert added.dhtLocal();
 
-                            break;
-                        }
-                        catch (GridCacheEntryRemovedException ignore) {
-                            assert false : "Got removed exception on entry with dht local candidate: " + entry;
+                            if (added.ownerVersion() != null)
+                                req.owned(entry.txKey(), added.ownerVersion());
                         }
 
-                        idx++;
+                        break;
                     }
-
-                    if (!F.isEmpty(nearWrites)) {
-                        for (IgniteTxEntry entry : nearWrites) {
-                            try {
-                                if (entry.explicitVersion() == null) {
-                                    GridCacheMvccCandidate added = entry.cached().candidate(version());
-
-                                    assert added != null : "Missing candidate for cache entry:" + entry;
-                                    assert added.dhtLocal();
-
-                                    if (added.ownerVersion() != null)
-                                        req.owned(entry.txKey(), added.ownerVersion());
-                                }
-
-                                break;
-                            }
-                            catch (GridCacheEntryRemovedException ignore) {
-                                assert false : "Got removed exception on entry with dht local candidate: " + entry;
-                            }
-                        }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        assert false : "Got removed exception on entry with dht local candidate: " + entry;
                     }
+                }
+            }
 
-                    assert req.transactionNodes() != null;
+            assert req.transactionNodes() != null;
 
-                    try {
-                        cctx.io().send(n, req, tx.ioPolicy());
+            try {
+                cctx.io().send(n, req, tx.ioPolicy());
 
-                        if (msgLog.isDebugEnabled()) {
-                            msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() +
-                                ", dhtTxId=" + tx.xidVersion() +
-                                ", node=" + n.id() + ']');
-                        }
-                    }
-                    catch (ClusterTopologyCheckedException ignored) {
-                        fut.onNodeLeft();
+                if (msgLog.isDebugEnabled()) {
+                    msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() +
+                        ", dhtTxId=" + tx.xidVersion() +
+                        ", node=" + n.id() + ']');
+                }
+            }
+            catch (ClusterTopologyCheckedException ignored) {
+                fut.onNodeLeft();
+            }
+            catch (IgniteCheckedException e) {
+                if (!cctx.kernalContext().isStopping()) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() +
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + n.id() + ']');
                     }
-                    catch (IgniteCheckedException e) {
-                        if (!cctx.kernalContext().isStopping()) {
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() +
-                                    ", dhtTxId=" + tx.xidVersion() +
-                                    ", node=" + n.id() + ']');
-                            }
 
-                            fut.onResult(e);
-                        }
-                        else {
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() +
-                                    ", dhtTxId=" + tx.xidVersion() +
-                                    ", node=" + n.id() +
-                                    ", err=" + e + ']');
-                            }
-                        }
+                    fut.onResult(e);
+                }
+                else {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() +
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + n.id() +
+                            ", err=" + e + ']');
                     }
                 }
+            }
+        }
 
-                for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
-                    if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
-                        if (tx.remainingTime() == -1)
-                            return;
-
-                        MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
-
-                        add(fut); // Append new future.
-
-                        GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
-                            futId,
-                            fut.futureId(),
-                            tx.topologyVersion(),
-                            tx,
-                            timeout,
-                            null,
-                            nearMapping.writes(),
-                            tx.transactionNodes(),
-                            tx.nearXidVersion(),
-                            true,
-                            tx.onePhaseCommit(),
-                            tx.subjectId(),
-                            tx.taskNameHash(),
-                            tx.activeCachesDeploymentEnabled(),
-                            tx.storeWriteThrough(),
-                            retVal);
-
-                        for (IgniteTxEntry entry : nearMapping.entries()) {
-                            if (CU.writes().apply(entry)) {
-                                try {
-                                    if (entry.explicitVersion() == null) {
-                                        GridCacheMvccCandidate added = entry.cached().candidate(version());
+        for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+            if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+                if (tx.remainingTime() == -1)
+                    return;
 
-                                        assert added != null : "Null candidate for non-group-lock entry " +
-                                            "[added=" + added + ", entry=" + entry + ']';
-                                        assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
-                                            "[added=" + added + ", entry=" + entry + ']';
+                MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping);
+
+                add(fut); // Append new future.
+
+                GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest(
+                    futId,
+                    fut.futureId(),
+                    tx.topologyVersion(),
+                    tx,
+                    timeout,
+                    null,
+                    nearMapping.writes(),
+                    tx.transactionNodes(),
+                    tx.nearXidVersion(),
+                    true,
+                    tx.onePhaseCommit(),
+                    tx.subjectId(),
+                    tx.taskNameHash(),
+                    tx.activeCachesDeploymentEnabled(),
+                    tx.storeWriteThrough(),
+                    retVal);
+
+                for (IgniteTxEntry entry : nearMapping.entries()) {
+                    if (CU.writes().apply(entry)) {
+                        try {
+                            if (entry.explicitVersion() == null) {
+                                GridCacheMvccCandidate added = entry.cached().candidate(version());
 
-                                        if (added != null && added.ownerVersion() != null)
-                                            req.owned(entry.txKey(), added.ownerVersion());
-                                    }
+                                assert added != null : "Null candidate for non-group-lock entry " +
+                                    "[added=" + added + ", entry=" + entry + ']';
+                                assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
+                                    "[added=" + added + ", entry=" + entry + ']';
 
-                                    break;
-                                } catch (GridCacheEntryRemovedException ignore) {
-                                    assert false : "Got removed exception on entry with dht local candidate: " + entry;
-                                }
+                                if (added != null && added.ownerVersion() != null)
+                                    req.owned(entry.txKey(), added.ownerVersion());
                             }
+
+                            break;
+                        } catch (GridCacheEntryRemovedException ignore) {
+                            assert false : "Got removed exception on entry with dht local candidate: " + entry;
                         }
+                    }
+                }
 
-                        assert req.transactionNodes() != null;
+                assert req.transactionNodes() != null;
 
-                        try {
-                            cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
+                try {
+                    cctx.io().send(nearMapping.primary(), req, tx.ioPolicy());
 
-                            if (msgLog.isDebugEnabled()) {
-                                msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
-                                    ", dhtTxId=" + tx.xidVersion() +
-                                    ", node=" + nearMapping.primary().id() + ']');
-                            }
-                        }
-                        catch (ClusterTopologyCheckedException ignored) {
-                            fut.onNodeLeft();
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() +
+                            ", dhtTxId=" + tx.xidVersion() +
+                            ", node=" + nearMapping.primary().id() + ']');
+                    }
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    fut.onNodeLeft();
+                }
+                catch (IgniteCheckedException e) {
+                    if (!cctx.kernalContext().isStopping()) {
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
+                                ", dhtTxId=" + tx.xidVersion() +
+                                ", node=" + nearMapping.primary().id() + ']');
                         }
-                        catch (IgniteCheckedException e) {
-                            if (!cctx.kernalContext().isStopping()) {
-                                if (msgLog.isDebugEnabled()) {
-                                    msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() +
-                                        ", dhtTxId=" + tx.xidVersion() +
-                                        ", node=" + nearMapping.primary().id() + ']');
-                                }
 
-                                fut.onResult(e);
-                            }
-                            else {
-                                if (msgLog.isDebugEnabled()) {
-                                    msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
-                                        ", dhtTxId=" + tx.xidVersion() +
-                                        ", node=" + nearMapping.primary().id() +
-                                        ", err=" + e + ']');
-                                }
-                            }
+                        fut.onResult(e);
+                    }
+                    else {
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() +
+                                ", dhtTxId=" + tx.xidVersion() +
+                                ", node=" + nearMapping.primary().id() +
+                                ", err=" + e + ']');
                         }
                     }
                 }
             }
         }
-        finally {
-            markInitialized();
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1467bfa..7d0f747 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1965,7 +1965,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         if (dhtFut != null) {
             if (req.writeSynchronizationMode() == PRIMARY_SYNC
                 // To avoid deadlock disable back-pressure for sender data node.
-                && !ctx.discovery().cacheAffinityNode(node, ctx.name())
+                && !ctx.discovery().cacheGroupAffinityNode(node, ctx.groupId())
                 && !dhtFut.isDone()) {
                 final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index c72f53e..053bbe5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -712,7 +712,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                     }
 
                     if (map == null) {
-                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer);
+                        Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, topVer);
 
                         keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size());
 
@@ -815,7 +815,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
                     AffinityTopologyVersion topVer = lock.topologyVersion();
 
                     if (map == null) {
-                        Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer);
+                        Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, topVer);
 
                         keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 240b5f0..9589f09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2247,7 +2247,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
 
                     GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                        cctx.exchange().clientTopology(desc.groupId());
+                        cctx.exchange().clientTopology(desc.groupId(), events().discoveryCache());
 
                     top.beforeExchange(this, true, true);
                 }
@@ -2265,7 +2265,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
                     GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                        cctx.exchange().clientTopology(grpId);
+                        cctx.exchange().clientTopology(grpId, events().discoveryCache());
 
                     CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId,
                         top.partitions());
@@ -2448,7 +2448,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (grpCtx != null)
                     top = grpCtx.topology();
                 else
-                    top = cctx.exchange().clientTopology(e.getKey());
+                    top = cctx.exchange().clientTopology(e.getKey(), events().discoveryCache());
 
                 assignPartitionStates(top);
             }
@@ -2826,10 +2826,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     null);
             }
             else {
-                ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
+                ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal()) {
-                    GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId);
+                    GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId, events().discoveryCache());
 
                     CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
                         top.partitions());
@@ -2858,7 +2858,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
             GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                cctx.exchange().clientTopology(grpId);
+                cctx.exchange().clientTopology(grpId, events().discoveryCache());
 
             top.update(exchId, entry.getValue(), false);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index a691cbc..973a199 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -540,7 +540,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
                             ver = cand.version();
 
                             if (map == null) {
-                                Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion());
+                                Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, cand.topologyVersion());
 
                                 if (F.isEmpty(affNodes))
                                     return;
@@ -663,7 +663,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
 
                             if (cand != null) {
                                 if (map == null) {
-                                    Collection<ClusterNode> affNodes = CU.allNodes(ctx, cand.topologyVersion());
+                                    Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, cand.topologyVersion());
 
                                     if (F.isEmpty(affNodes))
                                         return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 55d6bdd..8ecf21f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -105,14 +105,12 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
-import static org.apache.ignite.transactions.TransactionState.ACTIVE;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
 import static org.apache.ignite.transactions.TransactionState.PREPARING;
 import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
 import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
-import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
 import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
 
 /**
@@ -3338,19 +3336,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
     /**
      * Prepares next batch of entries in dht transaction.
      *
-     * @param reads Read entries.
-     * @param writes Write entries.
-     * @param txNodes Transaction nodes mapping.
-     * @param last {@code True} if this is last prepare request.
+     * @param req Prepare request.
      * @return Future that will be completed when locks are acquired.
      */
     @SuppressWarnings("TypeMayBeWeakened")
-    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
-        @Nullable Collection<IgniteTxEntry> reads,
-        @Nullable Collection<IgniteTxEntry> writes,
-        Map<UUID, Collection<UUID>> txNodes,
-        boolean last
-    ) {
+    public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(GridNearTxPrepareRequest req) {
         long timeout = remainingTime();
 
         if (state() != PREPARING) {
@@ -3375,11 +3365,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
             timeout,
             0,
             Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
-            last,
+            req.last(),
             needReturnValue() && implicit());
 
         try {
-            userPrepare((serializable() && optimistic()) ? F.concat(false, writes, reads) : writes);
+            userPrepare((serializable() && optimistic()) ? F.concat(false, req.writes(), req.reads()) : req.writes());
 
             // Make sure to add future before calling prepare on it.
             cctx.mvcc().addFuture(fut);
@@ -3387,7 +3377,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
             if (isSystemInvalidate())
                 fut.complete();
             else
-                fut.prepare(reads, writes, txNodes);
+                fut.prepare(req);
         }
         catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) {
             fut.onError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index c4eae8c..b5fdd23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -600,7 +600,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
 
-        Collection<ClusterNode> affNodes = CU.affinityNodes(cctx);
+        Collection<ClusterNode> affNodes = CU.affinityNodes(cctx, topVer);
 
         if (prj == null && part == null)
             return affNodes;

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 3e27720..f873461 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1859,11 +1859,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             Callable<Collection<CacheSqlMetadata>> job = new MetadataJob();
 
             // Remote nodes that have current cache.
-            Collection<ClusterNode> nodes = F.view(cctx.discovery().remoteNodes(), new P1<ClusterNode>() {
-                @Override public boolean apply(ClusterNode n) {
-                    return cctx.kernalContext().discovery().cacheAffinityNode(n, cacheName);
-                }
-            });
+            Collection<ClusterNode> nodes = CU.affinityNodes(cctx, AffinityTopologyVersion.NONE);
 
             Collection<Collection<CacheSqlMetadata>> res = new ArrayList<>(nodes.size() + 1);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index beeb184..362eaac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -254,11 +254,7 @@ public class IgniteTxHandler {
     ) {
         req.txState(locTx.txState());
 
-        IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
-            req.reads(),
-            req.writes(),
-            req.transactionNodes(),
-            req.last());
+        IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(req);
 
         if (locTx.isRollbackOnly())
             locTx.rollbackNearTxLocalAsync();
@@ -520,14 +516,7 @@ public class IgniteTxHandler {
             if (req.needReturnValue())
                 tx.needReturnValue(true);
 
-            IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
-                req.reads(),
-                req.writes(),
-                req.dhtVersions(),
-                req.messageId(),
-                req.miniId(),
-                req.transactionNodes(),
-                req.last());
+            IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(req);
 
             if (tx.isRollbackOnly() && !tx.commitOnPrepare()) {
                 if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 2b3a882..f750053 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1489,7 +1489,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
 
             if (!cache.context().affinityNode()) {
                 ClusterNode oldestSrvNode =
-                    ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
+                    ctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldestSrvNode == null)
                     return new GridEmptyIterator<>();
@@ -1730,7 +1730,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
                         // In case the cache instance isn't tracked by DiscoveryManager anymore.
                         discoCache.updateAlives(ctx.discovery());
 
-                        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
+                        ClusterNode oldest = discoCache.oldestAliveServerNode();
 
                         if (oldest != null && oldest.isLocal()) {
                             final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/788adc0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 695d8a6..95e9479 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2826,7 +2826,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
 
         assertNotNull("Failed to find exchange future:", evt);
 
-        Collection<ClusterNode> allNodes = ctx.discovery().cacheNodes(topVer0);
+        Collection<ClusterNode> allNodes = ctx.discovery().serverNodes(topVer0);
 
         for (DynamicCacheDescriptor cacheDesc : ctx.cache().cacheDescriptors().values()) {
             if (assignments.get(cacheDesc.cacheId()) != null)


Mime
View raw message