ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4296
Date Thu, 24 Nov 2016 10:28:30 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4296 [created] d4477e845


ignite-4296


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4477e84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4477e84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4477e84

Branch: refs/heads/ignite-4296
Commit: d4477e8456731db6acb7e669dfc89de2de0341cc
Parents: 7a47a01
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Nov 24 12:19:59 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Nov 24 13:26:26 2016 +0300

----------------------------------------------------------------------
 .../rendezvous/RendezvousAffinityFunction.java  |  80 +++++++++----
 .../discovery/GridDiscoveryManager.java         | 114 ++-----------------
 .../GridCachePartitionExchangeManager.java      |   6 +-
 .../processors/cache/GridCacheUtils.java        |  17 ---
 .../binary/CacheObjectBinaryProcessorImpl.java  |   3 +-
 .../dht/GridClientPartitionTopology.java        |  26 ++---
 .../dht/GridDhtPartitionTopology.java           |   8 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 105 +++++++++--------
 .../GridDhtPartitionsExchangeFuture.java        |  49 +++++++-
 .../service/GridServiceProcessor.java           |   4 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  41 +++++--
 .../AbstractAffinityFunctionSelfTest.java       |   2 +-
 .../GridDiscoveryManagerAliveCacheSelfTest.java |   2 +-
 .../GridCachePartitionedAffinitySpreadTest.java |   7 +-
 14 files changed, 227 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index ec12973..c76aae8 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.cache.affinity.rendezvous;
 
-import java.io.ByteArrayOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -354,46 +353,69 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
 
     /**
      * Returns collection of nodes (primary first) for specified partition.
+     *
+     * @param d Message digest.
+     * @param part Partition.
+     * @param nodes Nodes.
+     * @param nodesHash Serialized nodes hashes.
+     * @param backups Number of backups.
+     * @param neighborhoodCache Neighborhood.
+     * @return Assignment.
      */
-    public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups,
+    public List<ClusterNode> assignPartition(MessageDigest d,
+        int part,
+        List<ClusterNode> nodes,
+        Map<ClusterNode, byte[]> nodesHash,
+        int backups,
         @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
         if (nodes.size() <= 1)
             return nodes;
 
-        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>();
+        if (d == null)
+            d = digest.get();
 
-        MessageDigest d = digest.get();
+        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
 
-        for (ClusterNode node : nodes) {
-            Object nodeHash = resolveNodeHash(node);
+        try {
+            for (int i = 0; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
 
-            try {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                byte[] nodeHashBytes = nodesHash.get(node);
 
-                byte[] nodeHashBytes = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+                if (nodeHashBytes == null) {
+                    Object nodeHash = resolveNodeHash(node);
 
-                out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
-                out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
+                    byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+
+                    // Add 4 bytes for partition bytes.
+                    nodeHashBytes = new byte[nodeHashBytes0.length + 4];
+
+                    System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
+
+                    nodesHash.put(node, nodeHashBytes0);
+                }
+
+                U.intToBytes(part, nodeHashBytes, 0);
 
                 d.reset();
 
-                byte[] bytes = d.digest(out.toByteArray());
+                byte[] bytes = d.digest(nodeHashBytes);
 
                 long hash =
-                      (bytes[0] & 0xFFL)
-                    | ((bytes[1] & 0xFFL) << 8)
-                    | ((bytes[2] & 0xFFL) << 16)
-                    | ((bytes[3] & 0xFFL) << 24)
-                    | ((bytes[4] & 0xFFL) << 32)
-                    | ((bytes[5] & 0xFFL) << 40)
-                    | ((bytes[6] & 0xFFL) << 48)
-                    | ((bytes[7] & 0xFFL) << 56);
+                    (bytes[0] & 0xFFL)
+                        | ((bytes[1] & 0xFFL) << 8)
+                        | ((bytes[2] & 0xFFL) << 16)
+                        | ((bytes[3] & 0xFFL) << 24)
+                        | ((bytes[4] & 0xFFL) << 32)
+                        | ((bytes[5] & 0xFFL) << 40)
+                        | ((bytes[6] & 0xFFL) << 48)
+                        | ((bytes[7] & 0xFFL) << 56);
 
                 lst.add(F.t(hash, node));
             }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
         }
 
         Collections.sort(lst, COMPARATOR);
@@ -474,8 +496,18 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
         Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
             GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
 
+        MessageDigest d = digest.get();
+
+        List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+        Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
+
         for (int i = 0; i < parts; i++) {
-            List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
+            List<ClusterNode> partAssignment = assignPartition(d,
+                i,
+                nodes,
+                nodesHash,
+                affCtx.backups(),
                 neighborhoodCache);
 
             assignments.add(partAssignment);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d24f900..488dabe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -42,7 +42,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -1623,17 +1623,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * Gets cache remote nodes for cache with given name.
      *
-     * @param cacheName Cache name.
-     * @param topVer Topology version.
-     * @return Collection of cache nodes.
-     */
-    public Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheName, topVer).remoteCacheNodes(cacheName, topVer.topologyVersion());
-    }
-
-    /**
-     * Gets cache remote nodes for cache with given name.
-     *
      * @param topVer Topology version.
      * @return Collection of cache nodes.
      */
@@ -1664,33 +1653,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Gets alive remote server nodes with at least one cache configured.
-     *
      * @param topVer Topology version (maximum allowed node order).
-     * @return Collection of alive cache nodes.
+     * @return Oldest alive server nodes with at least one cache configured.
      */
-    public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion());
-    }
+    @Nullable public ClusterNode oldestAliveCacheServerNode(AffinityTopologyVersion topVer) {
+        DiscoCache cache = resolveDiscoCache(null, topVer);
 
-    /**
-     * Gets alive server nodes with at least one cache configured.
-     *
-     * @param topVer Topology version (maximum allowed node order).
-     * @return Collection of alive cache nodes.
-     */
-    public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion());
-    }
+        Map.Entry<ClusterNode, Boolean> e = cache.aliveSrvNodesWithCaches.firstEntry();
 
-    /**
-     * Gets alive nodes with at least one cache configured.
-     *
-     * @param topVer Topology version (maximum allowed node order).
-     * @return Collection of alive cache nodes.
-     */
-    public Collection<ClusterNode> aliveNodesWithCaches(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).aliveNodesWithCaches(topVer.topologyVersion());
+        return e != null ? e.getKey() : null;
     }
 
     /**
@@ -2580,19 +2551,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes;
 
         /**
-         * Cached alive remote nodes with caches.
-         */
-        private final Collection<ClusterNode> aliveNodesWithCaches;
-
-        /**
          * Cached alive server remote nodes with caches.
          */
-        private final Collection<ClusterNode> aliveSrvNodesWithCaches;
-
-        /**
-         * Cached alive remote server nodes with caches.
-         */
-        private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
+        private final ConcurrentSkipListMap<ClusterNode, Boolean> aliveSrvNodesWithCaches;
 
         /**
          * @param loc Local node.
@@ -2625,9 +2586,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
             aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
             aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
-            aliveNodesWithCaches = new ConcurrentSkipListSet<>();
-            aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
-            aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+            aliveSrvNodesWithCaches = new ConcurrentSkipListMap<>(GridNodeOrderComparator.INSTANCE);
             nodesByVer = new TreeMap<>();
 
             long maxOrder0 = 0;
@@ -2681,18 +2640,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     }
                 }
 
-                if (hasCaches) {
-                    if (alive(node.id())) {
-                        aliveNodesWithCaches.add(node);
-
-                        if (!CU.clientNode(node)) {
-                            aliveSrvNodesWithCaches.add(node);
-
-                            if (!loc.id().equals(node.id()))
-                                aliveRmtSrvNodesWithCaches.add(node);
-                        }
-                    }
-                }
+                if (hasCaches && alive(node.id()) && !CU.clientNode(node))
+                    aliveSrvNodesWithCaches.put(node, Boolean.TRUE);
 
                 IgniteProductVersion nodeVer = U.productVersion(node);
 
@@ -2821,17 +2770,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
-         * Gets all remote nodes that have cache with given name.
-         *
-         * @param cacheName Cache name.
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) {
-            return filter(topVer, rmtCacheNodes.get(cacheName));
-        }
-
-        /**
          * Gets all remote nodes that have at least one cache configured.
          *
          * @param topVer Topology version.
@@ -2876,36 +2814,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
-         * Gets all alive remote server nodes with at least one cache configured.
-         *
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
-            return filter(topVer, aliveRmtSrvNodesWithCaches);
-        }
-
-        /**
-         * Gets all alive server nodes with at least one cache configured.
-         *
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
-            return filter(topVer, aliveSrvNodesWithCaches);
-        }
-
-        /**
-         * Gets all alive remote nodes with at least one cache configured.
-         *
-         * @param topVer Topology version.
-         * @return Collection of nodes.
-         */
-        Collection<ClusterNode> aliveNodesWithCaches(final long topVer) {
-            return filter(topVer, aliveNodesWithCaches);
-        }
-
-        /**
          * Checks if cache with given name has at least one node with near cache enabled.
          *
          * @param cacheName Cache name.
@@ -2928,9 +2836,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
             filterNodeMap(aliveRmtCacheNodes, leftNode);
 
-            aliveNodesWithCaches.remove(leftNode);
             aliveSrvNodesWithCaches.remove(leftNode);
-            aliveRmtSrvNodesWithCaches.remove(leftNode);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 503b334..5651e58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -740,7 +740,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * Partition refresh callback.
      */
     private void refreshPartitions() {
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
         if (oldest == null) {
             if (log.isDebugEnabled())
@@ -1224,7 +1224,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null)
-                        updated |= top.update(null, entry.getValue(), null) != null;
+                        updated |= top.update(null, entry.getValue(), null);
                 }
 
                 if (!cctx.kernalContext().clientNode() && updated)
@@ -1273,7 +1273,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null) {
-                        updated |= top.update(null, entry.getValue(), null) != null;
+                        updated |= top.update(null, entry.getValue(), null);
 
                         cctx.affinity().checkRebalanceState(top, cacheId);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 90e428c..d32f4c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -490,23 +490,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets oldest alive server node with at least one cache configured for specified topology version.
-     *
-     * @param ctx Context.
-     * @param topVer Maximum allowed topology version.
-     * @return Oldest alive cache server node.
-     */
-    @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx,
-        AffinityTopologyVersion topVer) {
-        Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer);
-
-        if (nodes.isEmpty())
-            return null;
-
-        return oldest(nodes);
-    }
-
-    /**
      * @param nodes Nodes.
      * @return Oldest node for the given topology version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 568a4da..1d60c42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -304,8 +304,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
             assert !metaDataCache.context().affinityNode();
 
             while (true) {
-                ClusterNode oldestSrvNode =
-                    CU.oldestAliveCacheServerNode(ctx.cache().context(), AffinityTopologyVersion.NONE);
+                ClusterNode oldestSrvNode = ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldestSrvNode == null)
                     break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5efb317..f2ee758 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -271,7 +271,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             removeNode(exchId.nodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
 
         assert oldest != null;
 
@@ -549,7 +549,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -563,7 +563,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                     log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -571,7 +571,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                     log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
-                return null;
+                return false;
             }
 
             updateSeq.incrementAndGet();
@@ -634,7 +634,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (log.isDebugEnabled())
                 log.debug("Partition map after full update: " + fullMapString());
 
-            return null;
+            return false;
         }
         finally {
             lock.writeLock().unlock();
@@ -643,7 +643,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
         Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -654,21 +654,21 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                 log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
                     ", parts=" + parts + ']');
 
-            return null;
+            return false;
         }
 
         lock.writeLock().lock();
 
         try {
             if (stopping)
-                return null;
+                return false;
 
             if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (exchId != null)
@@ -688,7 +688,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                     log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
                         ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
 
-                return null;
+                return false;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
@@ -740,7 +740,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (log.isDebugEnabled())
                 log.debug("Partition map after single update: " + fullMapString());
 
-            return changed ? localPartitionMap() : null;
+            return changed;
         }
         finally {
             lock.writeLock().unlock();
@@ -761,7 +761,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         assert nodeId.equals(cctx.localNodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
 
         // If this node became the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
@@ -811,7 +811,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+        ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
 
         ClusterNode loc = cctx.localNode();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 0f75a5d..33a6fdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -203,9 +203,9 @@ public interface GridDhtPartitionTopology {
      * @param exchId Exchange ID.
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
-     * @return Local partition map if there were evictions or {@code null} otherwise.
+     * @return {@code True} if topology state changed.
      */
-    public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, Long> cntrMap);
 
@@ -213,9 +213,9 @@ public interface GridDhtPartitionTopology {
      * @param exchId Exchange ID.
      * @param parts Partitions.
      * @param cntrMap Partition update counters.
-     * @return Local partition map if there were evictions or {@code null} otherwise.
+     * @return {@code True} if topology state changed.
      */
-    @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
         @Nullable Map<Integer, Long> cntrMap);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index ab573bd..24ff3ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -340,7 +340,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            initPartitions0(exchFut, updateSeq);
+            ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+            initPartitions0(oldest, exchFut, updateSeq);
 
             consistencyCheck();
         }
@@ -350,14 +352,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /**
+     * @param oldest Oldest server node.
      * @param exchFut Exchange future.
      * @param updateSeq Update sequence.
      */
-    private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
+    private void initPartitions0(ClusterNode oldest, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
         ClusterNode loc = cctx.localNode();
 
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
         assert oldest != null || cctx.kernalContext().clientNode();
 
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -397,12 +398,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (log.isDebugEnabled())
                             log.debug("Owned partition for oldest node: " + locPart);
 
-                        updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                        updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
                     }
                 }
             }
             else
-                createPartitions(aff, updateSeq);
+                createPartitions(oldest, aff, updateSeq);
         }
         else {
             // If preloader is disabled, then we simply clear out
@@ -419,7 +420,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (state.active()) {
                             locPart.rent(false);
 
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
 
                             if (log.isDebugEnabled())
                                 log.debug("Evicting partition with rebalancing disabled " +
@@ -433,7 +434,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         }
 
         if (node2part != null && node2part.valid())
-            checkEvictions(updateSeq, aff);
+            checkEvictions(oldest, updateSeq, aff);
 
         updateRebalanceVersion(aff);
     }
@@ -442,7 +443,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param aff Affinity assignments.
      * @param updateSeq Update sequence.
      */
-    private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
+    private void createPartitions(ClusterNode oldest, List<List<ClusterNode>> aff, long updateSeq) {
         ClusterNode loc = cctx.localNode();
 
         int num = cctx.affinity().partitions();
@@ -454,7 +455,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     // will be created in MOVING state.
                     GridDhtLocalPartition locPart = createPartition(p);
 
-                    updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                    updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
                 }
             }
             // If this node's map is empty, we pre-create local partitions,
@@ -486,7 +487,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 removeNode(exchId.nodeId());
 
             // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+            ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
 
             assert oldest != null || cctx.kernalContext().clientNode();
 
@@ -523,11 +524,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             }
 
             if (affReady)
-                initPartitions0(exchFut, updateSeq);
+                initPartitions0(oldest, exchFut, updateSeq);
             else {
                 List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
 
-                createPartitions(aff, updateSeq);
+                createPartitions(oldest, aff, updateSeq);
             }
 
             consistencyCheck();
@@ -574,6 +575,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
+            ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
             for (int p = 0; p < num; p++) {
                 GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
 
@@ -600,7 +603,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" +
                                     locPart + ']';
 
-                                updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                                updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
 
                                 changed = true;
 
@@ -620,7 +623,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                     locPart + ", owners = " + owners + ']');
                         }
                         else
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
                     }
                 }
                 else {
@@ -630,7 +633,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (state == MOVING) {
                             locPart.rent(false);
 
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                            updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
 
                             changed = true;
 
@@ -985,7 +988,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -997,7 +1000,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             if (stopping)
-                return null;
+                return false;
 
             if (cntrMap != null) {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
@@ -1025,7 +1028,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -1033,7 +1036,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
-                return null;
+                return false;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
@@ -1100,7 +1103,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
                 List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-                changed = checkEvictions(updateSeq, aff);
+                ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+                changed = checkEvictions(oldest, updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1110,7 +1115,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (log.isDebugEnabled())
                 log.debug("Partition map after full update: " + fullMapString());
 
-            return changed ? localPartitionMap() : null;
+            return changed;
         }
         finally {
             lock.writeLock().unlock();
@@ -1119,7 +1124,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
         @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
@@ -1130,33 +1135,28 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
                     ", parts=" + parts + ']');
 
-            return null;
+            return false;
         }
 
         lock.writeLock().lock();
 
         try {
             if (stopping)
-                return null;
+                return false;
 
             if (cntrMap != null) {
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
-                    Long cntr = this.cntrMap.get(e.getKey());
+                    Integer p = e.getKey();
 
-                    if (cntr == null || cntr < e.getValue())
-                        this.cntrMap.put(e.getKey(), e.getValue());
-                }
+                    Long cntr = this.cntrMap.get(p);
 
-                for (int i = 0; i < locParts.length(); i++) {
-                    GridDhtLocalPartition part = locParts.get(i);
-
-                    if (part == null)
-                        continue;
+                    if (cntr == null || cntr < e.getValue())
+                        this.cntrMap.put(p, e.getValue());
 
-                    Long cntr = cntrMap.get(part.id());
+                    GridDhtLocalPartition part = locParts.get(p);
 
-                    if (cntr != null)
-                        part.updateCounter(cntr);
+                    if (part != null)
+                        part.updateCounter(e.getValue());
                 }
             }
 
@@ -1165,7 +1165,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
                         lastExchangeId + ", exchId=" + exchId + ']');
 
-                return null;
+                return false;
             }
 
             if (exchId != null)
@@ -1182,7 +1182,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
                         ", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
 
-                return null;
+                return false;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
@@ -1225,7 +1225,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
                 List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-                changed |= checkEvictions(updateSeq, aff);
+                ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+                changed |= checkEvictions(oldest, updateSeq, aff);
 
                 updateRebalanceVersion(aff);
             }
@@ -1235,7 +1237,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (log.isDebugEnabled())
                 log.debug("Partition map after single update: " + fullMapString());
 
-            return changed ? localPartitionMap() : null;
+            return changed;
         }
         finally {
             lock.writeLock().unlock();
@@ -1243,11 +1245,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /**
+     * @param oldest Oldest server node.
      * @param updateSeq Update sequence.
      * @param aff Affinity assignments.
      * @return Checks if any of the local partitions need to be evicted.
      */
-    private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
+    private boolean checkEvictions(ClusterNode oldest, long updateSeq, List<List<ClusterNode>> aff) {
         boolean changed = false;
 
         UUID locId = cctx.nodeId();
@@ -1270,7 +1273,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     if (nodeIds.containsAll(F.nodeIds(affNodes))) {
                         part.rent(false);
 
-                        updateLocal(part.id(), locId, part.state(), updateSeq);
+                        updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
 
                         changed = true;
 
@@ -1295,7 +1298,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 if (locId.equals(n.id())) {
                                     part.rent(false);
 
-                                    updateLocal(part.id(), locId, part.state(), updateSeq);
+                                    updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
 
                                     changed = true;
 
@@ -1318,18 +1321,16 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /**
      * Updates value for single partition.
      *
+     * @param oldest Oldest server node.
      * @param p Partition.
      * @param nodeId Node ID.
      * @param state State.
      * @param updateSeq Update sequence.
      */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
+    private void updateLocal(ClusterNode oldest, int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
         assert nodeId.equals(cctx.nodeId());
 
-        // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
         assert oldest != null || cctx.kernalContext().clientNode();
 
         // If this node became the oldest node.
@@ -1424,7 +1425,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         try {
             if (part.own()) {
-                updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
+                ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+                updateLocal(oldest, part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
 
                 consistencyCheck();
 
@@ -1452,7 +1455,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
 
-            updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
+            ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(topVer);
+
+            updateLocal(oldest, part.id(), cctx.localNodeId(), part.state(), seq);
 
             consistencyCheck();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f391265..704c654 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -112,6 +112,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /** */
     @GridToStringExclude
+    private int pendingSingleUpdates;
+
+    /** */
+    @GridToStringExclude
     private List<ClusterNode> srvNodes;
 
     /** */
@@ -1162,13 +1166,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      */
     private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
         boolean allReceived = false;
+        boolean updateSingleMap = false;
 
         synchronized (mux) {
             assert crd != null;
 
             if (crd.isLocal()) {
                 if (remaining.remove(node.id())) {
-                    updatePartitionSingleMap(msg);
+                    updateSingleMap = true;
+
+                    pendingSingleUpdates++;
 
                     allReceived = remaining.isEmpty();
                 }
@@ -1177,8 +1184,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 singleMsgs.put(node, msg);
         }
 
-        if (allReceived)
+        if (updateSingleMap) {
+            try {
+                updatePartitionSingleMap(msg);
+            }
+            finally {
+                synchronized (mux) {
+                    assert pendingSingleUpdates > 0;
+
+                    pendingSingleUpdates--;
+
+                    if (pendingSingleUpdates == 0)
+                        mux.notifyAll();
+                }
+            }
+        }
+
+        if (allReceived) {
+            awaitSingleMapUpdates();
+
             onAllReceived(false);
+        }
+    }
+
+    /**
+     *
+     */
+    private void awaitSingleMapUpdates() {
+        synchronized (mux) {
+            try {
+                while (pendingSingleUpdates > 0)
+                    U.wait(mux);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+            }
+        }
     }
 
     /**
@@ -1374,7 +1415,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (cacheCtx != null)
                 cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
             else {
-                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+                ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal())
                     cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
@@ -1557,6 +1598,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                         if (crd0.isLocal()) {
                             if (allReceived) {
+                                awaitSingleMapUpdates();
+
                                 onAllReceived(true);
 
                                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 6c26363..b9b92b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1265,7 +1265,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         try {
             if (!cache.context().affinityNode()) {
                 ClusterNode oldestSrvNode =
-                    CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
+                    ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldestSrvNode == null)
                     return new GridEmptyIterator<>();
@@ -1589,7 +1589,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
                 depExe.submit(new BusyRunnable() {
                     @Override public void run0() {
-                        ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
+                        ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
 
                         if (oldest != null && oldest.isLocal()) {
                             final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8814745..0e90418 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -129,6 +129,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -136,6 +137,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -174,8 +176,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         IgniteProductVersion.fromString("1.5.0");
 
     /** */
-    private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
-        new LinkedBlockingQueue<Runnable>());
+    private IgniteThreadPoolExecutor utilityPool;
 
     /** Nodes ring. */
     @GridToStringExclude
@@ -297,6 +298,13 @@ class ServerImpl extends TcpDiscoveryImpl {
             spiState = DISCONNECTED;
         }
 
+        utilityPool = new IgniteThreadPoolExecutor("disco-pool",
+            spi.ignite().name(),
+            0,
+            1,
+            2000,
+            new LinkedBlockingQueue<Runnable>());
+
         if (debugMode) {
             if (!log.isInfoEnabled())
                 throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
@@ -2403,9 +2411,12 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Connection check threshold. */
         private long connCheckThreshold;
 
+        /** */
+        private long lastRingMsgTime;
+
         /**
          */
-        protected RingMessageWorker() {
+        RingMessageWorker() {
             super("tcp-disco-msg-worker", 10);
 
             initConnectionCheckFrequency();
@@ -2500,6 +2511,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to process.
          */
         @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+            sendHeartbeatMessage();
+
             DebugLogger log = messageLogger(msg);
 
             if (log.isDebugEnabled())
@@ -2508,6 +2521,11 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (debugMode)
                 debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
+            boolean ensured = spi.ensured(msg);
+
+            if (!locNode.id().equals(msg.senderNodeId()) && ensured)
+                lastRingMsgTime = U.currentTimeMillis();
+
             if (locNode.internalOrder() == 0) {
                 boolean proc = false;
 
@@ -2564,7 +2582,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
-            if (spi.ensured(msg) && redirectToClients(msg))
+            if (ensured && redirectToClients(msg))
                 msgHist.add(msg);
 
             if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
@@ -5336,12 +5354,9 @@ class ServerImpl extends TcpDiscoveryImpl {
          * Sends heartbeat message if needed.
          */
         private void sendHeartbeatMessage() {
-            if (!isLocalNodeCoordinator())
-                return;
-
             long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
 
-            if (elapsed > 0)
+            if (elapsed > 0 || !isLocalNodeCoordinator())
                 return;
 
             TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
@@ -5361,7 +5376,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
                 lastTimeStatusMsgSent = locNode.lastUpdateTime();
 
-            long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+            long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime);
+
+            long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis();
 
             if (elapsed > 0)
                 return;
@@ -6062,11 +6079,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             TcpDiscoverySpiState state = spiStateCopy();
 
-            long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+            long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
                 spi.getSocketTimeout();
 
             if (state == CONNECTED) {
-                spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+                spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -6103,7 +6120,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Local node is stopping. Remote node should try next one.
                     res = RES_CONTINUE_JOIN;
 
-                spi.writeToSocket(msg, sock, res, socketTimeout);
+                spi.writeToSocket(msg, sock, res, sockTimeout);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
index 878d7d1..43017db 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
@@ -104,6 +104,7 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac
     }
 
     /**
+     * @param backups Number of backups.
      * @throws Exception If failed.
      */
     protected void checkNodeRemoved(int backups) throws Exception {
@@ -247,7 +248,6 @@ public abstract class AbstractAffinityFunctionSelfTest extends GridCommonAbstrac
         }
     }
 
-
     /**
      * @param assignment Assignment to verify.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
index 390c83e..31b4bc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
@@ -239,7 +239,7 @@ public class GridDiscoveryManagerAliveCacheSelfTest extends GridCommonAbstractTe
                 GridCacheSharedContext<?, ?> ctx = k.context().cache().context();
 
                 ClusterNode oldest =
-                    GridCacheUtils.oldestAliveCacheServerNode(ctx, new AffinityTopologyVersion(currVer));
+                    ctx.discovery().oldestAliveCacheServerNode(new AffinityTopologyVersion(currVer));
 
                 assertNotNull(oldest);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4477e84/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
index a59ca8b..2d46cf4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
@@ -76,7 +76,12 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe
         Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
 
         for (int part = 0; part < aff.getPartitions(); part++) {
-            Collection<ClusterNode> affNodes = aff.assignPartition(part, new ArrayList(nodes), 0, null);
+            Collection<ClusterNode> affNodes = aff.assignPartition(null,
+                part,
+                new ArrayList<>(nodes),
+                new HashMap<ClusterNode, byte[]>(),
+                0,
+                null);
 
             assertEquals(1, affNodes.size());
 


Mime
View raw message