ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [8/8] incubator-ignite git commit: # ignite-23 skip client nodes from partition exchange
Date Tue, 19 May 2015 15:00:44 GMT
# ignite-23 skip client nodes from partition exchange


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/72d6ea5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/72d6ea5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/72d6ea5d

Branch: refs/heads/ignite-23
Commit: 72d6ea5d1d772594d9ef4c567b2011024268475b
Parents: d6f5f15
Author: sboikov <sboikov@gridgain.com>
Authored: Tue May 19 14:49:37 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue May 19 17:47:15 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalContext.java      |   5 +
 .../ignite/internal/GridKernalContextImpl.java  |   5 +
 .../discovery/GridDiscoveryManager.java         |  64 ++-
 .../GridCachePartitionExchangeManager.java      |  76 ++-
 .../processors/cache/GridCachePreloader.java    |   4 +-
 .../cache/GridCachePreloaderAdapter.java        |   4 +-
 .../cache/GridCacheSharedContext.java           |   1 +
 .../processors/cache/GridCacheUtils.java        |  58 +--
 .../dht/GridDhtPartitionTopologyImpl.java       |   4 +-
 .../preloader/GridDhtPartitionDemandPool.java   |  20 +-
 .../preloader/GridDhtPartitionSupplyPool.java   |   6 +-
 .../GridDhtPartitionsExchangeFuture.java        | 223 +++++----
 .../preloader/GridDhtPartitionsFullMessage.java |   4 +-
 .../GridDhtPartitionsSingleMessage.java         |  33 +-
 .../dht/preloader/GridDhtPreloader.java         |  14 +-
 .../preloader/GridDhtPreloaderAssignments.java  |   3 +-
 .../GridCacheAbstractRemoveFailureTest.java     |  20 +
 ...niteCacheClientNodeChangingTopologyTest.java |  42 ++
 .../IgniteCacheClientNodeExchangeTest.java      | 184 -------
 ...teCacheClientNodePartitionsExchangeTest.java | 486 +++++++++++++++++++
 .../GridCacheDhtClientRemoveFailureTest.java    |  28 ++
 ...cClientInvalidPartitionHandlingSelfTest.java |  29 ++
 .../GridCacheAtomicClientRemoveFailureTest.java |  28 ++
 ...eAtomicInvalidPartitionHandlingSelfTest.java |  12 +
 .../IgniteCacheFailoverTestSuite.java           |   3 +
 .../testsuites/IgniteCacheTestSuite2.java       |   2 +-
 26 files changed, 985 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index ad7d562..d6542f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -552,4 +552,9 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      * @return Marshaller context.
      */
     public MarshallerContextImpl marshallerContext();
+
+    /**
+     * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
+     */
+    public boolean clientNode();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 1ff483e..f921d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -894,6 +894,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public boolean clientNode() {
+        return cfg.isClientMode();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridKernalContextImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 62548d8..7130421 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1246,13 +1246,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Gets alive remote nodes with at least one cache configured.
+     * Gets alive remote server nodes with at least one cache configured.
      *
      * @param topVer Topology version (maximum allowed node order).
      * @return Collection of alive cache nodes.
      */
-    public Collection<ClusterNode> aliveRemoteNodesWithCaches(AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(null, topVer).aliveRemoteNodesWithCaches(topVer.topologyVersion());
+    public Collection<ClusterNode> aliveRemoteServerNodesWithCaches(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(null, topVer).aliveRemoteServerNodesWithCaches(topVer.topologyVersion());
+    }
+
+    /**
+     * Gets alive server nodes with at least one cache configured.
+     *
+     * @param topVer Topology version (maximum allowed node order).
+     * @return Collection of alive cache nodes.
+     */
+    public Collection<ClusterNode> aliveServerNodesWithCaches(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(null, topVer).aliveServerNodesWithCaches(topVer.topologyVersion());
     }
 
     /**
@@ -1350,7 +1360,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             // Find the eldest acceptable discovery cache.
             Map.Entry<AffinityTopologyVersion, DiscoCache> eldest = Collections.min(discoCacheHist.entrySet(), histCmp);
 
-            if (topVer.compareTo(eldest.getKey()) < 0)
+            if (topVer.compareTo(eldest.getKey()) <= 0)
                 cache = eldest.getValue();
         }
 
@@ -2094,9 +2104,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         private final Collection<ClusterNode> aliveNodesWithCaches;
 
         /**
-         * Cached alive remote nodes with caches.
+         * Cached alive server remote nodes with caches.
+         */
+        private final Collection<ClusterNode> aliveSrvNodesWithCaches;
+
+        /**
+         * Cached alive remote server nodes with caches.
          */
-        private final Collection<ClusterNode> aliveRmtNodesWithCaches;
+        private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches;
 
         /**
          * @param loc Local node.
@@ -2131,7 +2146,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
             aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f);
             aliveNodesWithCaches = new ConcurrentSkipListSet<>();
-            aliveRmtNodesWithCaches = new ConcurrentSkipListSet<>();
+            aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>();
+            aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>();
             nodesByVer = new TreeMap<>();
 
             long maxOrder0 = 0;
@@ -2183,8 +2199,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     if (alive(node.id())) {
                         aliveNodesWithCaches.add(node);
 
-                        if (!loc.id().equals(node.id()))
-                            aliveRmtNodesWithCaches.add(node);
+                        if (!CU.clientNode(node)) {
+                            aliveSrvNodesWithCaches.add(node);
+
+                            if (!loc.id().equals(node.id()))
+                                aliveRmtSrvNodesWithCaches.add(node);
+                        }
                     }
                 }
 
@@ -2269,13 +2289,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
-         * @return All nodes with at least one cache configured.
-         */
-        Collection<ClusterNode> allNodesWithCaches() {
-            return allNodesWithCaches;
-        }
-
-        /**
          * Gets collection of nodes which have version equal or greater than {@code ver}.
          *
          * @param ver Version to check.
@@ -2374,13 +2387,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
 
         /**
-         * Gets all alive remote nodes with at least one cache configured.
+         * Gets all alive remote server nodes with at least one cache configured.
+         *
+         * @param topVer Topology version.
+         * @return Collection of nodes.
+         */
+        Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) {
+            return filter(topVer, aliveRmtSrvNodesWithCaches);
+        }
+
+        /**
+         * Gets all alive server nodes with at least one cache configured.
          *
          * @param topVer Topology version.
          * @return Collection of nodes.
          */
-        Collection<ClusterNode> aliveRemoteNodesWithCaches(final long topVer) {
-            return filter(topVer, aliveRmtNodesWithCaches);
+        Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) {
+            return filter(topVer, aliveSrvNodesWithCaches);
         }
 
         /**
@@ -2417,7 +2440,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             filterNodeMap(aliveRmtCacheNodes, leftNode);
 
             aliveNodesWithCaches.remove(leftNode);
-            aliveRmtNodesWithCaches.remove(leftNode);
+            aliveSrvNodesWithCaches.remove(leftNode);
+            aliveRmtSrvNodesWithCaches.remove(leftNode);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index c399c23..cc06d4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -554,7 +554,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * Partition refresh callback.
      */
     void refreshPartitions() {
-        ClusterNode oldest = CU.oldest(cctx);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+
+        if (oldest == null) {
+            if (log.isDebugEnabled())
+                log.debug("Skip partitions refresh, there are no server nodes [loc=" + cctx.localNodeId() + ']');
+
+            return;
+        }
 
         if (log.isDebugEnabled())
             log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
@@ -641,7 +648,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private boolean sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
         throws IgniteCheckedException {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
+        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+            cctx.kernalContext().clientNode(),
+            cctx.versions().last());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
@@ -687,6 +696,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @param exchId Exchange ID.
      * @param discoEvt Discovery event.
+     * @param reqs Cache change requests.
      * @return Exchange future.
      */
     GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@@ -827,7 +837,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param node Node ID.
      * @param msg Message.
      */
-    private void processSinglePartitionUpdate(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+    private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
         if (!enterBusy())
             return;
 
@@ -858,8 +868,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (updated)
                     scheduleResendPartitions();
             }
-            else
-                exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+            else {
+                if (msg.client()) {
+                    IgniteInternalFuture<?> fut = affinityReadyFuture(msg.exchangeId().topologyVersion());
+
+                    if (fut != null) {
+                        fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                processSinglePartitionClientUpdate(node, msg);
+                            }
+                        });
+                    }
+                    else
+                        processSinglePartitionClientUpdate(node, msg);
+                }
+                else
+                    exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+            }
         }
         finally {
             leaveBusy();
@@ -867,6 +892,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @param node Node.
+     * @param msg Message.
+     */
+    private void processSinglePartitionClientUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
+        final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+            null,
+            null);
+
+        exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                // Finished future should reply only to sender client node.
+                exchFut.onReceive(node.id(), msg);
+            }
+        });
+    }
+
+    /**
      * @param node Node ID.
      * @param msg Message.
      */
@@ -982,7 +1024,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     busy = true;
 
-                    Map<Integer, GridDhtPreloaderAssignments<K, V>> assignsMap = new HashMap<>();
+                    Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
 
                     boolean dummyReassign = exchFut.dummyReassign();
                     boolean forcePreload = exchFut.forcePreload();
@@ -1017,7 +1059,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 changed |= cacheCtx.topology().afterExchange(exchFut);
 
                                 // Preload event notification.
-                                if (cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
+                                if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
                                     if (!cacheCtx.isReplicated() || !startEvtFired) {
                                         DiscoveryEvent discoEvt = exchFut.discoveryEvent();
 
@@ -1043,16 +1085,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             }
                         }
 
-                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                            long delay = cacheCtx.config().getRebalanceDelay();
+                        if (!exchFut.skipPreload()) {
+                            assignsMap = new HashMap<>();
 
-                            GridDhtPreloaderAssignments<K, V> assigns = null;
+                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                                long delay = cacheCtx.config().getRebalanceDelay();
+
+                                GridDhtPreloaderAssignments assigns = null;
 
-                            // Don't delay for dummy reassigns to avoid infinite recursion.
-                            if (delay == 0 || forcePreload)
-                                assigns = cacheCtx.preloader().assign(exchFut);
+                                // Don't delay for dummy reassigns to avoid infinite recursion.
+                                if (delay == 0 || forcePreload)
+                                    assigns = cacheCtx.preloader().assign(exchFut);
 
-                            assignsMap.put(cacheCtx.cacheId(), assigns);
+                                assignsMap.put(cacheCtx.cacheId(), assigns);
+                            }
                         }
                     }
                     finally {
@@ -1061,7 +1107,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     }
 
                     if (assignsMap != null) {
-                        for (Map.Entry<Integer, GridDhtPreloaderAssignments<K, V>> e : assignsMap.entrySet()) {
+                        for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
                             int cacheId = e.getKey();
 
                             GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 2e181f9..5a73843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -78,7 +78,7 @@ public interface GridCachePreloader<K, V> {
      * @param exchFut Exchange future to assign.
      * @return Assignments.
      */
-    public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut);
+    public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
 
     /**
      * Adds assignments to preloader.
@@ -86,7 +86,7 @@ public interface GridCachePreloader<K, V> {
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
      */
-    public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload);
+    public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 80d3d6b..8cd5264 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -131,12 +131,12 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V>
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         return null;
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
         // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 294c2b0..4c08beb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -499,6 +499,7 @@ public class GridCacheSharedContext<K, V> {
     /**
      * @param tx Transaction to rollback.
      * @throws IgniteCheckedException If failed.
+     * @return Rollback future.
      */
     public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException {
         Collection<Integer> cacheIds = tx.activeCacheIds();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index ef04ff4..b7bc115 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -527,8 +527,9 @@ public class GridCacheUtils {
      * @param topOrder Maximum allowed node order.
      * @return Affinity nodes.
      */
-    public static Collection<ClusterNode> aliveRemoteCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveRemoteNodesWithCaches(topOrder);
+    public static Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final GridCacheSharedContext ctx,
+        AffinityTopologyVersion topOrder) {
+        return ctx.discovery().aliveRemoteServerNodesWithCaches(topOrder);
     }
 
     /**
@@ -607,26 +608,6 @@ public class GridCacheUtils {
      * Gets oldest alive node for specified topology version.
      *
      * @param cctx Cache context.
-     * @return Oldest node for the current topology version.
-     */
-    public static ClusterNode oldest(GridCacheContext cctx) {
-        return oldest(cctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets oldest alive node across nodes with at least one cache configured.
-     *
-     * @param ctx Cache context.
-     * @return Oldest node.
-     */
-    public static ClusterNode oldest(GridCacheSharedContext ctx) {
-        return oldest(ctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets oldest alive node for specified topology version.
-     *
-     * @param cctx Cache context.
      * @param topOrder Maximum allowed node order.
      * @return Oldest node for the given topology version.
      */
@@ -665,6 +646,23 @@ public class GridCacheUtils {
     }
 
     /**
+     * Gets oldest alive server node with at least one cache configured for specified topology version.
+     *
+     * @param ctx Context.
+     * @param topVer Maximum allowed topology version.
+     * @return Oldest alive cache server node.
+     */
+    @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx,
+        AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer);
+
+        if (nodes.isEmpty())
+            return null;
+
+        return oldest(nodes);
+    }
+
+    /**
      * @param nodes Nodes.
      * @return Oldest node for the given topology version.
      */
@@ -1802,16 +1800,22 @@ public class GridCacheUtils {
 
     /**
      * @param node Node.
-     * @param filter Node filter.
-     * @return {@code True} if node is not client node and pass given filter.
+     * @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
      */
-    public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) {
+    public static boolean clientNode(ClusterNode node) {
         Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
 
         assert clientModeAttr != null : node;
 
-        boolean clientMode = clientModeAttr != null && clientModeAttr;
+        return clientModeAttr != null && clientModeAttr;
+    }
 
-        return !clientMode && filter.apply(node);
+    /**
+     * @param node Node.
+     * @param filter Node filter.
+     * @return {@code True} if node is not client node and pass given filter.
+     */
+    public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) {
+        return !clientNode(node) && filter.apply(node);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 073e0e7..56f6a62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -239,7 +239,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
                 removeNode(exchId.nodeId());
 
             // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldest(cctx.shared(), topVer);
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+            assert oldest != null;
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 633f237..1071468 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -53,12 +53,12 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
  * and populating local cache.
  */
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool<K, V> {
+public class GridDhtPartitionDemandPool {
     /** Dummy message to wake up a blocking queue if a node leaves. */
     private final SupplyMessage DUMMY_TOP = new SupplyMessage();
 
     /** */
-    private final GridCacheContext<K, V> cctx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** */
     private final IgniteLogger log;
@@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
      */
-    public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+    public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
         assert cctx != null;
         assert busyLock != null;
 
@@ -327,7 +327,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
      */
-    void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) {
+    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -399,7 +399,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         private int id;
 
         /** Partition-to-node assignments. */
-        private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>();
+        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
 
         /** Message queue. */
         private final LinkedBlockingDeque<SupplyMessage> msgQ =
@@ -425,7 +425,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         /**
          * @param assigns Assignments.
          */
-        void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) {
+        void addAssignments(GridDhtPreloaderAssignments assigns) {
             assert assigns != null;
 
             assignQ.offer(assigns);
@@ -885,7 +885,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                     }
 
                     // Sync up all demand threads at this step.
-                    GridDhtPreloaderAssignments<K, V> assigns = null;
+                    GridDhtPreloaderAssignments assigns = null;
 
                     while (assigns == null)
                         assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
@@ -995,12 +995,12 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param exchFut Exchange future.
      * @return Assignments of partitions to nodes.
      */
-    GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+    GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         // No assignments for disabled preloader.
         GridDhtPartitionTopology top = cctx.dht().topology();
 
         if (!cctx.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
         int partCnt = cctx.affinity().partitions();
 
@@ -1009,7 +1009,7 @@ public class GridDhtPartitionDemandPool<K, V> {
             "Topology version mismatch [exchId=" + exchFut.exchangeId() +
                 ", topVer=" + top.topologyVersion() + ']';
 
-        GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
         AffinityTopologyVersion topVer = assigns.topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..84ac7c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Thread pool for supplying partitions to demanding nodes.
  */
-class GridDhtPartitionSupplyPool<K, V> {
+class GridDhtPartitionSupplyPool {
     /** */
-    private final GridCacheContext<K, V> cctx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** */
     private final IgniteLogger log;
@@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool<K, V> {
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
      */
-    GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+    GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
         assert cctx != null;
         assert busyLock != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f4dcf3b..102176e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
@@ -118,8 +119,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private GridFutureAdapter<Boolean> initFut;
 
     /** Topology snapshot. */
-    private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot =
-        new AtomicReference<>();
+    private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
 
     /** Last committed cache version before next topology version use. */
     private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
@@ -150,6 +150,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Cache validation results. */
     private volatile Map<Integer, Boolean> cacheValidRes;
 
+    /** Skip preload flag. */
+    private boolean skipPreload;
+
     /**
      * Dummy future created to trigger reassignments if partition
      * topology changed while preloading.
@@ -227,23 +230,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         initFut = new GridFutureAdapter<>();
 
         // Grab all nodes with order of equal or less than last joined node.
-        Collection<ClusterNode> nodes = CU.aliveCacheNodes(cctx, exchId.topologyVersion());
-
-        if (nodes.isEmpty()) {
-            initFut.onDone(true);
+        ClusterNode node = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
 
-            onDone(exchId.topologyVersion());
-
-            return;
-        }
-
-        oldestNode.set(CU.oldest(nodes));
-
-        assert oldestNode.get() != null;
+        oldestNode.set(node);
 
         if (log.isDebugEnabled())
-            log.debug("Creating exchange future [localNode=" + cctx.localNodeId() +
-                ", fut=" + this + ']');
+            log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
     }
 
     /** {@inheritDoc} */
@@ -263,6 +255,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     * @return Skip preload flag.
+     */
+    public boolean skipPreload() {
+        return skipPreload;
+    }
+
+    /**
      * @return Dummy flag.
      */
     public boolean dummy() {
@@ -415,13 +414,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @return Exchange id.
-     */
-    GridDhtPartitionExchangeId key() {
-        return exchId;
-    }
-
-    /**
      * @return Exchange ID.
      */
     public GridDhtPartitionExchangeId exchangeId() {
@@ -429,13 +421,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @return Init future.
-     */
-    IgniteInternalFuture<?> initFuture() {
-        return initFut;
-    }
-
-    /**
      * @return {@code true} if entered to busy state.
      */
     private boolean enterBusy() {
@@ -464,8 +449,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (isDone())
             return;
 
-        assert oldestNode.get() != null;
-
         if (init.compareAndSet(false, true)) {
             if (isDone())
                 return;
@@ -475,63 +458,98 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 // will return corresponding nodes.
                 U.await(evtLatch);
 
-                if (!dummy && !forcePreload && F.isEmpty(reqs)) { // If exchange initiated by node join or leave.
-                    assert discoEvt != null;
+                assert discoEvt != null : this;
+                assert !dummy && !forcePreload : this;
 
+                startCaches();
+
+                // True if client node joined or failed.
+                boolean clientNodeEvt;
+
+                if (F.isEmpty(reqs)) {
                     int type = discoEvt.type();
 
                     assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt;
 
-                    ClusterNode node = discoEvt.eventNode();
+                    clientNodeEvt = CU.clientNode(discoEvt.eventNode());
+                }
+                else {
+                    assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
+
+                    clientNodeEvt = false;
+                }
 
-                    if (!node.isLocal()) {
-                        boolean affNode = false;
+                if (clientNodeEvt) {
+                    ClusterNode node = discoEvt.eventNode();
 
+                    if (!node.isLocal()) {  // Client need to initialize affinity for local join event.
                         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                             if (cacheCtx.isLocal())
                                 continue;
 
-                            if (CU.affinityNode(node, cacheCtx.config().getNodeFilter())) {
-                                affNode = true;
+                            cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion());
 
-                                break;
-                            }
+                            GridDhtPartitionTopology top = cacheCtx.topology();
+
+                            GridDhtPartitionMap parts = top.partitions(node.id());
+
+                            assert parts == null || parts.size() == 0 : parts;
+
+                            top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
                         }
 
-                        if (!affNode) {
-                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                if (cacheCtx.isLocal())
-                                    continue;
+                        if (exchId.isLeft())
+                            cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
 
-                                cacheCtx.affinity().clientNodeTopologyChange(node, exchId.topologyVersion());
+                        onDone(exchId.topologyVersion());
 
-                                GridDhtPartitionTopology top = cacheCtx.topology();
+                        skipPreload = true;
 
-                                GridDhtPartitionMap parts = top.partitions(node.id());
+                        return;
+                    }
+                }
 
-                                assert parts == null || parts.size() == 0 : parts;
+                if (cctx.kernalContext().clientNode()) {
+                    skipPreload = true;
 
-                                top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
-                            }
+                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                        if (cacheCtx.isLocal())
+                            continue;
 
-                            if (!exchId.isLeft()) {
-                                rmtNodes = new ConcurrentLinkedQueue<>(F.asList(node));
+                        GridDhtPartitionTopology top = cacheCtx.topology();
 
-                                rmtIds = F.asList(node.id());
-                            }
+                        top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+                    }
 
-                            ready.set(true);
+                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                        if (cacheCtx.isLocal())
+                            continue;
 
-                            initFut.onDone(true);
+                        initTopology(cacheCtx);
+                    }
 
-                            onDone(exchId.topologyVersion());
+                    if (oldestNode.get() != null) {
+                        rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
+                            exchId.topologyVersion()));
 
-                            return;
-                        }
+                        rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
+
+                        ready.set(true);
+
+                        initFut.onDone(true);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Initialized future: " + this);
+
+                        sendPartitions();
                     }
+                    else
+                        onDone(exchId.topologyVersion());
+
+                    return;
                 }
 
-                startCaches();
+                assert oldestNode.get() != null;
 
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (isCacheAdded(cacheCtx.cacheId())) {
@@ -614,7 +632,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
 
                 // Grab all alive remote nodes with order of equal or less than last joined node.
-                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx,
+                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
                     exchId.topologyVersion()));
 
                 rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
@@ -821,7 +839,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
+        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+            cctx.kernalContext().clientNode(), cctx.versions().last());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal())
@@ -1091,9 +1110,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() +
                     ", unexpectedNodeId=" + nodeId + ']');
 
-            ClusterNode sender = cctx.discovery().node(nodeId);
+            ClusterNode snd = cctx.discovery().node(nodeId);
 
-            if (sender == null) {
+            if (snd == null) {
                 if (log.isDebugEnabled())
                     log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId +
                         ", exchId=" + msg.exchangeId() + ']');
@@ -1102,7 +1121,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
 
             // Will process message later if sender node becomes oldest node.
-            if (sender.order() > curOldest.order())
+            if (snd.order() > curOldest.order())
                 fullMsgs.put(nodeId, msg);
 
             return;
@@ -1141,8 +1160,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             if (cacheCtx != null)
                 cacheCtx.topology().update(exchId, entry.getValue());
-            else if (CU.oldest(cctx).isLocal())
-                cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+            else {
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+
+                if (oldest != null && oldest.isLocal())
+                    cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+            }
         }
     }
 
@@ -1201,40 +1224,47 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                             boolean set = false;
 
-                            ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion());
-
-                            // If local node is now oldest.
-                            if (newOldest.id().equals(cctx.localNodeId())) {
-                                synchronized (mux) {
-                                    if (oldestNode.compareAndSet(oldest, newOldest)) {
-                                        // If local node is just joining.
-                                        if (exchId.nodeId().equals(cctx.localNodeId())) {
-                                            try {
-                                                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                                    if (!cacheCtx.isLocal())
-                                                        cacheCtx.topology().beforeExchange(
-                                                            GridDhtPartitionsExchangeFuture.this);
+                            for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) {
+                                if (it.next().id().equals(nodeId))
+                                    it.remove();
+                            }
+
+                            ClusterNode newOldest = CU.oldest(rmtNodes);
+
+                            if (newOldest != null) {
+                                // If local node is now oldest.
+                                if (newOldest.id().equals(cctx.localNodeId())) {
+                                    synchronized (mux) {
+                                        if (oldestNode.compareAndSet(oldest, newOldest)) {
+                                            // If local node is just joining.
+                                            if (exchId.nodeId().equals(cctx.localNodeId())) {
+                                                try {
+                                                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                                                        if (!cacheCtx.isLocal())
+                                                            cacheCtx.topology().beforeExchange(
+                                                                GridDhtPartitionsExchangeFuture.this);
+                                                    }
                                                 }
-                                            }
-                                            catch (IgniteCheckedException e) {
-                                                onDone(e);
+                                                catch (IgniteCheckedException e) {
+                                                    onDone(e);
 
-                                                return;
+                                                    return;
+                                                }
                                             }
-                                        }
 
-                                        set = true;
+                                            set = true;
+                                        }
                                     }
                                 }
-                            }
-                            else {
-                                synchronized (mux) {
-                                    set = oldestNode.compareAndSet(oldest, newOldest);
-                                }
+                                else {
+                                    synchronized (mux) {
+                                        set = oldestNode.compareAndSet(oldest, newOldest);
+                                    }
 
-                                if (set && log.isDebugEnabled())
-                                    log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
-                                        ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+                                    if (set && log.isDebugEnabled())
+                                        log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
+                                            ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+                                }
                             }
 
                             if (set) {
@@ -1256,9 +1286,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                             assert rmtNodes != null;
 
-                            for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); )
+                            for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) {
                                 if (it.next().id().equals(nodeId))
                                     it.remove();
+                            }
 
                             if (allReceived() && ready.get() && replied.compareAndSet(false, true))
                                 if (spreadPartitions())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 8256274..73794ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -59,8 +59,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /**
      * @param id Exchange ID.
      * @param lastVer Last version.
+     * @param topVer Topology version.
      */
-    public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer,
+    public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
+        @Nullable GridCacheVersion lastVer,
         @NotNull AffinityTopologyVersion topVer) {
         super(id, lastVer);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 66140cd..713a80b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -45,6 +45,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Serialized partitions. */
     private byte[] partsBytes;
 
+    /** */
+    private boolean client;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -54,10 +57,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /**
      * @param exchId Exchange ID.
+     * @param client Client message flag.
      * @param lastVer Last version.
      */
-    public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) {
+    public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
+        boolean client,
+        @Nullable GridCacheVersion lastVer) {
         super(exchId, lastVer);
+
+        this.client = client;
+    }
+
+    /**
+     * @return {@code True} if sent from client node.
+     */
+    public boolean client() {
+        return client;
     }
 
     /**
@@ -110,6 +125,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         switch (writer.state()) {
             case 5:
+                if (!writer.writeBoolean("client", client))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -132,6 +153,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         switch (reader.state()) {
             case 5:
+                client = reader.readBoolean("client");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -151,7 +180,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d6373f0..61ba8b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -60,10 +60,10 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
     private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<K, V>> forceKeyFuts = newMap();
 
     /** Partition suppliers. */
-    private GridDhtPartitionSupplyPool<K, V> supplyPool;
+    private GridDhtPartitionSupplyPool supplyPool;
 
     /** Partition demanders. */
-    private GridDhtPartitionDemandPool<K, V> demandPool;
+    private GridDhtPartitionDemandPool demandPool;
 
     /** Start future. */
     private final GridFutureAdapter<Object> startFut;
@@ -158,8 +158,8 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
                 }
             });
 
-        supplyPool = new GridDhtPartitionSupplyPool<>(cctx, busyLock);
-        demandPool = new GridDhtPartitionDemandPool<>(cctx, busyLock);
+        supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
+        demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -253,12 +253,12 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         return demandPool.assign(exchFut);
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
         demandPool.addAssignments(assignments, forcePreload);
     }
 
@@ -271,7 +271,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>() : demandPool.syncFuture();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 369fc68..2f6ef6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -27,8 +27,7 @@ import java.util.concurrent.*;
 /**
  * Partition to node assignments.
  */
-public class GridDhtPreloaderAssignments<K, V> extends
-    ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
+public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
index c6ede61..d5d80ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -71,6 +72,16 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
     private String sizePropVal;
 
     /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (testClientNode() && getTestGridName(0).equals(gridName))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
     @Override protected int gridCount() {
         return GRID_CNT;
     }
@@ -106,9 +117,18 @@ public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstra
     }
 
     /**
+     * @return {@code True} if test updates from client node.
+     */
+    protected boolean testClientNode() {
+        return false;
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testPutAndRemove() throws Exception {
+        assertEquals(testClientNode(), (boolean)grid(0).configuration().isClientMode());
+
         final IgniteCache<Integer, Integer> sndCache0 = grid(0).cache(null);
 
         final AtomicBoolean stop = new AtomicBoolean();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
new file mode 100644
index 0000000..c233bb9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+/**
+ *
+ */
+public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
deleted file mode 100644
index 66db3c6..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeExchangeTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.communication.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.eclipse.jetty.util.*;
-
-import java.util.*;
-
-/**
- *
- */
-public class IgniteCacheClientNodeExchangeTest extends GridCommonAbstractTest {
-    /** */
-    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private boolean client;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
-
-        cfg.setClientMode(client);
-
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        cfg.setCacheConfiguration(ccfg);
-
-        cfg.setCommunicationSpi(new TestCommunicationSpi());
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
-
-        stopAllGrids();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNoPartitionExchangeForClient() throws Exception {
-        Ignite ignite0 = startGrid(0);
-
-        TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi();
-
-        Ignite ignite1 = startGrid(1);
-
-        TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi();
-
-        assertEquals(0, spi0.partitionsSingleMessages().size());
-        assertEquals(1, spi0.partitionsFullMessages().size());
-
-        assertEquals(1, spi1.partitionsSingleMessages().size());
-        assertEquals(0, spi1.partitionsFullMessages().size());
-
-        spi0.reset();
-        spi1.reset();
-
-        client = true;
-
-        for (int i = 0; i < 3; i++) {
-            log.info("Start client node: " + i);
-
-            Ignite ignite2 = startGrid(2);
-
-            TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
-
-            assertEquals(0, spi0.partitionsSingleMessages().size());
-            assertEquals(1, spi0.partitionsFullMessages().size());
-
-            assertEquals(0, spi1.partitionsSingleMessages().size());
-            assertEquals(0, spi1.partitionsFullMessages().size());
-
-            assertEquals(1, spi2.partitionsSingleMessages().size());
-            assertEquals(0, spi2.partitionsFullMessages().size());
-
-            spi0.reset();
-            spi1.reset();
-            spi2.reset();
-
-            log.info("Stop client node.");
-
-            ignite2.close();
-
-            assertEquals(0, spi0.partitionsSingleMessages().size());
-            assertEquals(0, spi0.partitionsFullMessages().size());
-
-            assertEquals(0, spi1.partitionsSingleMessages().size());
-            assertEquals(0, spi1.partitionsFullMessages().size());
-        }
-    }
-
-    /**
-     * Test communication SPI.
-     */
-    private static class TestCommunicationSpi extends TcpCommunicationSpi {
-        /** */
-        private ConcurrentHashSet<GridDhtPartitionsSingleMessage> partSingleMsgs = new ConcurrentHashSet<>();
-
-        /** */
-        private ConcurrentHashSet<GridDhtPartitionsFullMessage> partFullMsgs = new ConcurrentHashSet<>();
-
-        /** */
-        @LoggerResource
-        private IgniteLogger log;
-
-        /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg) {
-            super.sendMessage(node, msg);
-
-            Object msg0 = ((GridIoMessage)msg).message();
-
-            if (msg0 instanceof GridDhtPartitionsSingleMessage) {
-                if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) {
-                    log.info("Partitions message: " + msg0.getClass().getSimpleName());
-
-                    partSingleMsgs.add((GridDhtPartitionsSingleMessage) msg0);
-                }
-            }
-            else if (msg0 instanceof GridDhtPartitionsFullMessage) {
-                if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) {
-                    log.info("Partitions message: " + msg0.getClass().getSimpleName());
-
-                    partFullMsgs.add((GridDhtPartitionsFullMessage) msg0);
-                }
-            }
-        }
-
-        /**
-         *
-         */
-        void reset() {
-            partSingleMsgs.clear();
-            partFullMsgs.clear();
-        }
-
-        /**
-         * @return Sent partitions single messages.
-         */
-        Collection<GridDhtPartitionsSingleMessage> partitionsSingleMessages() {
-            return partSingleMsgs;
-        }
-
-        /**
-         * @return Sent partitions full messages.
-         */
-        Collection<GridDhtPartitionsFullMessage> partitionsFullMessages() {
-            return partFullMsgs;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72d6ea5d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
new file mode 100644
index 0000000..3fac400
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.fair.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstractTest {
+    /** */
+    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private boolean fairAffinity;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        if (fairAffinity)
+            ccfg.setAffinity(new FairAffinityFunction());
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerNodeLeave() throws Exception {
+        Ignite ignite0 = startGrid(0);
+
+        client = true;
+
+        final Ignite ignite1 = startGrid(1);
+
+        waitForTopologyUpdate(2, 2);
+
+        final Ignite ignite2 = startGrid(2);
+
+        waitForTopologyUpdate(3, 3);
+
+        ignite0.close();
+
+        waitForTopologyUpdate(2, 4);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite1.cache(null).get(1);
+
+                return null;
+            }
+        }, CacheServerNotFoundException.class, null);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite2.cache(null).get(1);
+
+                return null;
+            }
+        }, CacheServerNotFoundException.class, null);
+
+        ignite1.close();
+
+        waitForTopologyUpdate(1, 5);
+
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                ignite2.cache(null).get(1);
+
+                return null;
+            }
+        }, CacheServerNotFoundException.class, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSkipPreload() throws Exception {
+        Ignite ignite0 = startGrid(0);
+
+        final CountDownLatch evtLatch0 = new CountDownLatch(1);
+
+        ignite0.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                log.info("Rebalance event: " + evt);
+
+                evtLatch0.countDown();
+
+                return true;
+            }
+        }, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED);
+
+        client = true;
+
+        Ignite ignite1 = startGrid(1);
+
+        assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+
+        ignite1.close();
+
+        assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+
+        ignite1 = startGrid(1);
+
+        final CountDownLatch evtLatch1 = new CountDownLatch(1);
+
+        ignite1.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                log.info("Rebalance event: " + evt);
+
+                evtLatch1.countDown();
+
+                return true;
+            }
+        }, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED);
+
+        assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+
+        client = false;
+
+        startGrid(2);
+
+        assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+        assertFalse(evtLatch1.await(1000, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionsExchange() throws Exception {
+        partitionsExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionsExchangeFairAffinity() throws Exception {
+        fairAffinity = true;
+
+        partitionsExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void partitionsExchange() throws Exception {
+        Ignite ignite0 = startGrid(0);
+
+        TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+
+        Ignite ignite1 = startGrid(1);
+
+        waitForTopologyUpdate(2, 2);
+
+        TestCommunicationSpi spi1 = (TestCommunicationSpi)ignite1.configuration().getCommunicationSpi();
+
+        assertEquals(0, spi0.partitionsSingleMessages());
+        assertEquals(1, spi0.partitionsFullMessages());
+
+        assertEquals(1, spi1.partitionsSingleMessages());
+        assertEquals(0, spi1.partitionsFullMessages());
+
+        spi0.reset();
+        spi1.reset();
+
+        client = true;
+
+        log.info("Start client node1.");
+
+        Ignite ignite2 = startGrid(2);
+
+        waitForTopologyUpdate(3, 3);
+
+        TestCommunicationSpi spi2 = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
+
+        assertEquals(0, spi0.partitionsSingleMessages());
+        assertEquals(1, spi0.partitionsFullMessages());
+
+        assertEquals(0, spi1.partitionsSingleMessages());
+        assertEquals(0, spi1.partitionsFullMessages());
+
+        assertEquals(1, spi2.partitionsSingleMessages());
+        assertEquals(0, spi2.partitionsFullMessages());
+
+        spi0.reset();
+        spi1.reset();
+        spi2.reset();
+
+        log.info("Start client node2.");
+
+        Ignite ignite3 = startGrid(3);
+
+        waitForTopologyUpdate(4, 4);
+
+        TestCommunicationSpi spi3 = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
+
+        assertEquals(0, spi0.partitionsSingleMessages());
+        assertEquals(1, spi0.partitionsFullMessages());
+
+        assertEquals(0, spi1.partitionsSingleMessages());
+        assertEquals(0, spi1.partitionsFullMessages());
+
+        assertEquals(0, spi2.partitionsSingleMessages());
+        assertEquals(0, spi2.partitionsFullMessages());
+
+        assertEquals(1, spi3.partitionsSingleMessages());
+        assertEquals(0, spi3.partitionsFullMessages());
+
+        spi0.reset();
+        spi1.reset();
+        spi2.reset();
+        spi3.reset();
+
+        log.info("Start one more server node.");
+
+        client = false;
+
+        Ignite ignite4 = startGrid(4);
+
+        waitForTopologyUpdate(5, 5);
+
+        TestCommunicationSpi spi4 = (TestCommunicationSpi)ignite4.configuration().getCommunicationSpi();
+
+        assertEquals(0, spi0.partitionsSingleMessages());
+        assertEquals(4, spi0.partitionsFullMessages());
+
+        assertEquals(1, spi1.partitionsSingleMessages());
+        assertEquals(0, spi1.partitionsFullMessages());
+
+        assertEquals(1, spi2.partitionsSingleMessages());
+        assertEquals(0, spi2.partitionsFullMessages());
+
+        assertEquals(1, spi3.partitionsSingleMessages());
+        assertEquals(0, spi3.partitionsFullMessages());
+
+        assertEquals(1, spi4.partitionsSingleMessages());
+        assertEquals(0, spi4.partitionsFullMessages());
+
+        spi0.reset();
+        spi1.reset();
+        spi2.reset();
+        spi3.reset();
+
+        log.info("Stop server node.");
+
+        ignite4.close();
+
+        waitForTopologyUpdate(4, 6);
+
+        assertEquals(0, spi0.partitionsSingleMessages());
+        assertEquals(3, spi0.partitionsFullMessages());
+
+        assertEquals(1, spi1.partitionsSingleMessages());
+        assertEquals(0, spi1.partitionsFullMessages());
+
+        assertEquals(1, spi2.partitionsSingleMessages());
+        assertEquals(0, spi2.partitionsFullMessages());
+
+        assertEquals(1, spi3.partitionsSingleMessages());
+        assertEquals(0, spi3.partitionsFullMessages());
+
+        spi0.reset();
+        spi1.reset();
+        spi2.reset();
+
+        log.info("Stop client node2.");
+
+        ignite3.close();
+
+        waitForTopologyUpdate(3, 7);
+
+        assertEquals(0, spi0.partitionsSingleMessages());
+        assertEquals(0, spi0.partitionsFullMessages());
+
+        assertEquals(0, spi1.partitionsSingleMessages());
+        assertEquals(0, spi1.partitionsFullMessages());
+
+        assertEquals(0, spi2.partitionsSingleMessages());
+        assertEquals(0, spi2.partitionsFullMessages());
+
+        spi0.reset();
+        spi1.reset();
+
+        log.info("Stop client node1.");
+
+        ignite2.close();
+
+        waitForTopologyUpdate(2, 8);
+
+        assertEquals(0, spi0.partitionsSingleMessages());
+        assertEquals(0, spi0.partitionsFullMessages());
+
+        assertEquals(0, spi1.partitionsSingleMessages());
+        assertEquals(0, spi1.partitionsFullMessages());
+
+        log.info("Stop server node.");
+
+        ignite1.close();
+
+        waitForTopologyUpdate(1, 9);
+
+        assertEquals(0, spi0.partitionsSingleMessages());
+        assertEquals(0, spi0.partitionsFullMessages());
+    }
+
+    /**
+     * @param expNodes Expected number of nodes.
+     * @param topVer Expected topology version.
+     * @throws Exception If failed.
+     */
+    private void waitForTopologyUpdate(int expNodes, int topVer) throws Exception {
+        List<Ignite> nodes = G.allGrids();
+
+        assertEquals(expNodes, nodes.size());
+
+        final AffinityTopologyVersion ver = new AffinityTopologyVersion(topVer, 0);
+
+        for (Ignite ignite : nodes) {
+            final IgniteKernal kernal = (IgniteKernal)ignite;
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return ver.equals(kernal.context().cache().context().exchange().readyAffinityVersion());
+                }
+            }, 10_000);
+
+            assertEquals("Unexpected affinity version for " + ignite.name(),
+                ver,
+                kernal.context().cache().context().exchange().readyAffinityVersion());
+        }
+
+        Iterator<Ignite> it = nodes.iterator();
+
+        Ignite ignite0 = it.next();
+
+        Affinity<Integer> aff0 = ignite0.affinity(null);
+
+        while (it.hasNext()) {
+            Ignite ignite = it.next();
+
+            Affinity<Integer> aff = ignite.affinity(null);
+
+            assertEquals(aff0.partitions(), aff.partitions());
+
+            for (int part = 0; part < aff.partitions(); part++)
+                assertEquals(aff0.mapPartitionToPrimaryAndBackups(part), aff.mapPartitionToPrimaryAndBackups(part));
+        }
+
+        for (Ignite ignite : nodes) {
+            final IgniteKernal kernal = (IgniteKernal)ignite;
+
+            for (IgniteInternalCache cache : kernal.context().cache().caches()) {
+                GridDhtPartitionTopology top = cache.context().topology();
+
+                assertEquals("Unexpected topology version [node=" + ignite.name() + ", cache=" + cache.name() + ']',
+                    ver,
+                    top.topologyVersion());
+            }
+        }
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * Test communication SPI.
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        private AtomicInteger partSingleMsgs = new AtomicInteger();
+
+        /** */
+        private AtomicInteger partFullMsgs = new AtomicInteger();
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) {
+            super.sendMessage(node, msg);
+
+            Object msg0 = ((GridIoMessage)msg).message();
+
+            if (msg0 instanceof GridDhtPartitionsSingleMessage) {
+                if (((GridDhtPartitionsSingleMessage)msg0).exchangeId() != null) {
+                    log.info("Partitions message: " + msg0.getClass().getSimpleName());
+
+                    partSingleMsgs.incrementAndGet();
+                }
+            }
+            else if (msg0 instanceof GridDhtPartitionsFullMessage) {
+                if (((GridDhtPartitionsFullMessage)msg0).exchangeId() != null) {
+                    log.info("Partitions message: " + msg0.getClass().getSimpleName());
+
+                    partFullMsgs.incrementAndGet();
+                }
+            }
+        }
+
+        /**
+         *
+         */
+        void reset() {
+            partSingleMsgs.set(0);
+            partFullMsgs.set(0);
+        }
+
+        /**
+         * @return Sent partitions single messages.
+         */
+        int partitionsSingleMessages() {
+            return partSingleMsgs.get();
+        }
+
+        /**
+         * @return Sent partitions full messages.
+         */
+        int partitionsFullMessages() {
+            return partFullMsgs.get();
+        }
+    }
+
+}



Mime
View raw message