ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: ignite-5272
Date Tue, 06 Jun 2017 15:02:44 GMT
ignite-5272


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27d9e4ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27d9e4ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27d9e4ae

Branch: refs/heads/ignite-5272
Commit: 27d9e4ae9e3ce01393b4aa91f75bc64e3053a63b
Parents: db85d16
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jun 6 14:02:51 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Jun 6 18:02:17 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  18 +
 .../cache/CacheAffinitySharedManager.java       | 333 ++++++++++++++++---
 .../processors/cache/CacheGroupContext.java     |   3 +
 .../ClientCacheChangeDiscoveryMessage.java      |  98 ++++++
 .../processors/cache/ClusterCachesInfo.java     |  10 +-
 .../cache/DynamicCacheChangeRequest.java        |  30 --
 .../processors/cache/GridCacheContext.java      |   9 +-
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../processors/cache/GridCacheProcessor.java    | 206 ++++++++----
 .../dht/ClientCacheDhtTopologyFuture.java       |  58 ++++
 .../dht/GridClientPartitionTopology.java        |  41 +--
 .../dht/GridDhtAffinityAssignmentRequest.java   |  40 ++-
 .../dht/GridDhtAffinityAssignmentResponse.java  |  66 +++-
 .../dht/GridDhtAssignmentFetchFuture.java       |  22 +-
 .../dht/GridDhtPartitionTopology.java           |  12 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  49 +--
 .../GridDhtPartitionsExchangeFuture.java        |  33 +-
 .../IgniteDynamicClientCacheStartSelfTest.java  | 120 +++++++
 18 files changed, 910 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 8c2bd8c..a671d4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -70,6 +70,8 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -1988,6 +1990,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param reqId Start request ID.
+     * @param startReqs Cache start requests.
+     * @param cachesToClose Cache to close.
+     */
+    public void clientCacheStartEvent(UUID reqId,
+        @Nullable Map<String, DynamicCacheChangeRequest> startReqs,
+        @Nullable Set<String> cachesToClose) {
+        discoWrk.addEvent(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
+            AffinityTopologyVersion.NONE,
+            localNode(),
+            null,
+            Collections.<ClusterNode>emptyList(),
+            new ClientCacheChangeDiscoveryMessage(reqId, startReqs, cachesToClose));
+    }
+
+    /**
      * Gets first grid node start time, see {@link DiscoverySpi#getGridStartTime()}.
      *
      * @return Start time of the first grid node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index dc4a91f..b95ccfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
@@ -36,13 +37,16 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -81,7 +85,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private AffinityTopologyVersion lastAffVer;
 
     /** Registered caches (updated from exchange thread). */
-    private final Map<Integer, CacheGroupDescriptor> registeredGrps = new HashMap<>();
+    private final CachesInfo caches = new CachesInfo();
 
     /** */
     private WaitRebalanceInfo waitInfo;
@@ -126,14 +130,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
         if (type == EVT_NODE_JOINED && node.isLocal()) {
             // Clean-up in case of client reconnect.
-            registeredGrps.clear();
+            caches.clear();
 
             affCalcVer = null;
 
             lastAffVer = null;
 
-            for (CacheGroupDescriptor desc : cctx.cache().cacheGroupDescriptors().values())
-                registeredGrps.put(desc.groupId(), desc);
+            caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors());
         }
 
         if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
@@ -322,20 +325,160 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param exchActions Cache change requests to execute on exchange.
      */
     private void updateCachesInfo(ExchangeActions exchActions) {
-        for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) {
-            CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId());
+        caches.updateCachesInfo(exchActions);
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param startReqs Client cache start request.
+     * @return Descriptors for caches to start.
+     */
+    @Nullable private List<DynamicCacheDescriptor> clientCachesToStart(UUID reqId,
+        Map<String, DynamicCacheChangeRequest> startReqs) {
+        List<DynamicCacheDescriptor> startDescs = new ArrayList<>(startReqs.size());
 
-            assert rmvd != null : stopDesc.cacheOrGroupName();
+        for (DynamicCacheChangeRequest startReq : startReqs.values()) {
+            DynamicCacheDescriptor desc = caches.cache(startReq.cacheName());
+
+            if (desc == null) {
+                CacheException err = new CacheException("Failed to start client cache " +
+                    "(a cache with the given name is not started): " + startReq.cacheName());
+
+                cctx.cache().completeClientCacheChangeFuture(reqId, err);
+
+                return null;
+            }
+
+            if (cctx.cacheContext(desc.cacheId()) != null)
+                continue;
+
+            startDescs.add(desc);
         }
 
-        for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) {
-            CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc);
+        return startDescs;
+    }
+
+    /**
+     * @param msg Change request.
+     */
+    private void processClientCacheStartRequests(ClientCacheChangeDiscoveryMessage msg) {
+        Map<String, DynamicCacheChangeRequest> startReqs = msg.startRequests();
+
+        if (startReqs == null)
+            return;
+
+        List<DynamicCacheDescriptor> startDescs = clientCachesToStart(msg.requestId(), msg.startRequests());
+
+        if (startDescs == null || startDescs.isEmpty())
+            return;
+
+        AffinityTopologyVersion topVer = cctx.exchange().readyAffinityVersion();
+
+        DiscoCache discoCache = cctx.discovery().discoCache(topVer);
+
+        Map<Integer, GridDhtAssignmentFetchFuture> fetchFuts = U.newHashMap(startDescs.size());
+
+        for (DynamicCacheDescriptor desc : startDescs) {
+            try {
+                DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName());
+
+                cctx.cache().prepareCacheStart(desc, startReq.nearCacheConfiguration(), topVer);
+
+                CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+                assert grp != null : desc.groupId();
+                assert !grp.affinityNode() : grp.cacheOrGroupName();
+
+                if (!grp.isLocal() &&
+                    grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) &&
+                    grp.localStartVersion().equals(topVer) &&
+                    !fetchFuts.containsKey(grp.groupId())) {
+                    GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
+                        grp.groupId(),
+                        topVer,
+                        discoCache);
+
+                    fetchFut.init(true);
+
+                    fetchFuts.put(grp.groupId(), fetchFut);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e);
+
+                return;
+            }
+        }
+
+        for (GridDhtAssignmentFetchFuture fetchFut : fetchFuts.values()) {
+            try {
+                CacheGroupContext grp = cctx.cache().cacheGroup(fetchFut.groupId());
+
+                assert grp != null;
+
+                GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer,
+                    null,
+                    discoCache,
+                    grp.affinity(),
+                    fetchFut);
+
+                GridDhtPartitionFullMap partMap;
+
+                if (res != null) {
+                    partMap = res.partitionMap();
+
+                    assert partMap != null : res;
+
+                    ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer);
+
+                    grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+
+                    grp.topology().update(topVer, partMap, null);
+                }
+                else {
+                    cctx.cache().completeClientCacheChangeFuture(msg.requestId(), new IgniteCheckedException("test"));
+
+                    return;
+                    // TODO 5272: mark as 'no server nodes'
+                }
+            }
+            catch (IgniteCheckedException e) {
+                // TODO 5272: stop already started caches.
+                cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e);
+
+                return;
+            }
+        }
+
+        cctx.cache().initCacheProxies(topVer, null);
+
+        cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
+    }
+
+    /**
+     * @param msg Change request.
+     */
+    private void processCacheCloseRequests(ClientCacheChangeDiscoveryMessage msg) {
+        Set<String> cachesToClose = msg.cachesToClose();
+
+        if (cachesToClose == null)
+            return;
+
+        for (String cacheName : cachesToClose) {
 
-            assert old == null : old;
         }
     }
 
     /**
+     * @param msg Change request.
+     */
+    void processClientCachesChanges(ClientCacheChangeDiscoveryMessage msg) {
+        processClientCacheStartRequests(msg);
+
+        processCacheCloseRequests(msg);
+    }
+
+    /**
      * Called on exchange initiated for cache start/stop request.
      *
      * @param fut Exchange future.
@@ -413,7 +556,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) {
                         assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
 
-                        initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut);
+                        initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
                     }
                 }
             }
@@ -459,7 +602,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         assert !grpHolder.client() : grpHolder;
 
                         grpHolder = CacheGroupHolder2.create(cctx,
-                            registeredGrps.get(grp.groupId()),
+                            caches.group(grp.groupId()),
                             fut,
                             grp.affinity());
 
@@ -542,7 +685,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     public void removeAllCacheInfo(){
         grpHolders.clear();
 
-        registeredGrps.clear();
+        caches.clear();
     }
 
     /**
@@ -632,7 +775,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 assert affTopVer.topologyVersion() > 0 : affTopVer;
 
-                CacheGroupDescriptor desc = registeredGrps.get(aff.groupId());
+                CacheGroupDescriptor desc = caches.group(aff.groupId());
 
                 assert desc != null : aff.cacheOrGroupName();
 
@@ -762,7 +905,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException {
         assert lateAffAssign;
 
-        for (CacheGroupDescriptor cacheDesc : registeredGrps.values()) {
+        for (CacheGroupDescriptor cacheDesc : caches.allGroups()) {
             if (cacheDesc.config().getCacheMode() == LOCAL)
                 continue;
 
@@ -842,12 +985,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     public void initStartedCaches(boolean crd,
         final GridDhtPartitionsExchangeFuture fut,
         Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
-        for (DynamicCacheDescriptor desc : descs) {
-            CacheGroupDescriptor grpDesc = desc.groupDescriptor();
-
-            if (!registeredGrps.containsKey(grpDesc.groupId()))
-                registeredGrps.put(grpDesc.groupId(), grpDesc);
-        }
+        caches.initStartedCaches(descs);
 
         if (crd && lateAffAssign) {
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@@ -867,7 +1005,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                     if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
-                        initAffinity(registeredGrps.get(aff.groupId()), aff, fut);
+                        initAffinity(caches.group(aff.groupId()), aff, fut);
                 }
             });
         }
@@ -892,13 +1030,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
         else {
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                desc,
+                desc.groupId(),
                 fut.topologyVersion(),
                 fut.discoCache());
 
-            fetchFut.init();
+            fetchFut.init(false);
 
-            fetchAffinity(fut, aff, fetchFut);
+            fetchAffinity(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                fut.discoCache(),
+                aff, fetchFut);
         }
     }
 
@@ -989,7 +1130,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         StringBuilder names = new StringBuilder();
 
         for (Integer grpId : grpIds) {
-            String name = registeredGrps.get(grpId).cacheOrGroupName();
+            String name = caches.group(grpId).cacheOrGroupName();
 
             if (names.length() != 0)
                 names.append(", ");
@@ -1021,16 +1162,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 grp.affinity().initialize(fut.topologyVersion(), assignment);
             }
             else {
-                CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId());
+                CacheGroupDescriptor grpDesc = caches.group(grp.groupId());
 
                 assert grpDesc != null : grp.cacheOrGroupName();
 
                 GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                    grpDesc,
+                    grpDesc.groupId(),
                     topVer,
                     fut.discoCache());
 
-                fetchFut.init();
+                fetchFut.init(false);
 
                 fetchFuts.add(fetchFut);
             }
@@ -1041,48 +1182,57 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             Integer grpId = fetchFut.groupId();
 
-            fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut);
+            fetchAffinity(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                fut.discoCache(),
+                cctx.cache().cacheGroup(grpId).affinity(),
+                fetchFut);
         }
     }
 
     /**
-     * @param fut Exchange future.
+     * @param topVer Topology version.
+     * @param discoveryEvt Discovery event.
+     * @param discoCache Discovery data cache.
      * @param affCache Affinity.
      * @param fetchFut Affinity fetch future.
      * @throws IgniteCheckedException If failed.
+     * @return Affinity assignment response.
      */
-    private void fetchAffinity(GridDhtPartitionsExchangeFuture fut,
+    private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer,
+        @Nullable DiscoveryEvent discoveryEvt,
+        DiscoCache discoCache,
         GridAffinityAssignmentCache affCache,
         GridDhtAssignmentFetchFuture fetchFut)
         throws IgniteCheckedException {
         assert affCache != null;
 
-        AffinityTopologyVersion topVer = fut.topologyVersion();
-
         GridDhtAffinityAssignmentResponse res = fetchFut.get();
 
         if (res == null) {
-            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+            List<List<ClusterNode>> aff = affCache.calculate(topVer, discoveryEvt, discoCache);
 
             affCache.initialize(topVer, aff);
         }
         else {
-            List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(cctx.discovery());
+            List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache);
 
             if (idealAff != null)
                 affCache.idealAssignment(idealAff);
             else {
                 assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
 
-                affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+                affCache.calculate(topVer, discoveryEvt, discoCache);
             }
 
-            List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery());
+            List<List<ClusterNode>> aff = res.affinityAssignment(discoCache);
 
             assert aff != null : res;
 
             affCache.initialize(topVer, aff);
         }
+
+        return res;
     }
 
     /**
@@ -1135,7 +1285,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (grp.isLocal())
                 continue;
 
-            initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut);
+            initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
         }
     }
 
@@ -1197,18 +1347,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
 
                     GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                        desc,
+                        desc.groupId(),
                         prev.topologyVersion(),
                         prev.discoCache());
 
-                    fetchFut.init();
+                    fetchFut.init(false);
 
                     final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
 
                     fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
                         @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
                             throws IgniteCheckedException {
-                            fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut);
+                            fetchAffinity(prev.topologyVersion(),
+                                prev.discoveryEvent(),
+                                prev.discoCache(),
+                                aff, (GridDhtAssignmentFetchFuture)fetchFut);
 
                             aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
@@ -1871,7 +2024,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (cacheWaitParts == null) {
                 waitGrps.put(grpId, cacheWaitParts = new HashMap<>());
 
-                deploymentIds.put(grpId, registeredGrps.get(grpId).deploymentId());
+                deploymentIds.put(grpId, caches.group(grpId).deploymentId());
             }
 
             cacheWaitParts.put(part, waitNode);
@@ -1890,4 +2043,98 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + ']';
         }
     }
+
+    /**
+     *
+     */
+    static class CachesInfo {
+        /** Registered caches (updated from exchange thread). */
+        private final ConcurrentHashMap<Integer, CacheGroupDescriptor> registeredGrps = new ConcurrentHashMap<>();
+
+        /** */
+        private final ConcurrentHashMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
+
+        /**
+         * @param grps Registered groups.
+         * @param caches Registered caches.
+         */
+        void init(Map<Integer, CacheGroupDescriptor> grps, Map<String, DynamicCacheDescriptor> caches) {
+            for (CacheGroupDescriptor grp : grps.values())
+                registeredGrps.put(grp.groupId(), grp);
+
+            for (DynamicCacheDescriptor cache : caches.values())
+                registeredCaches.put(cache.cacheName(), cache);
+        }
+
+        /**
+         * @return All registered groups.
+         */
+        Collection<CacheGroupDescriptor> allGroups() {
+            return registeredGrps.values();
+        }
+
+        /**
+         * @param grpId Group ID.
+         * @return Group descriptor.
+         */
+        CacheGroupDescriptor group(int grpId) {
+            CacheGroupDescriptor desc = registeredGrps.get(grpId);
+
+            assert desc != null : grpId;
+
+            return desc;
+        }
+
+        /**
+          * @param descs Cache descriptor.
+         */
+        void initStartedCaches(Collection<DynamicCacheDescriptor> descs) {
+            for (DynamicCacheDescriptor desc : descs) {
+                CacheGroupDescriptor grpDesc = desc.groupDescriptor();
+
+                if (!registeredGrps.containsKey(grpDesc.groupId()))
+                    registeredGrps.put(grpDesc.groupId(), grpDesc);
+            }
+        }
+
+        /**
+         * @param exchActions Exchange actions.
+         */
+        void updateCachesInfo(ExchangeActions exchActions) {
+            for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) {
+                CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId());
+
+                assert rmvd != null : stopDesc.cacheOrGroupName();
+            }
+
+            for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) {
+                CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc);
+
+                assert old == null : old;
+            }
+
+            for (ExchangeActions.ActionData req : exchActions.cacheStopRequests())
+                registeredCaches.remove(req.descriptor().cacheName());
+
+            for (ExchangeActions.ActionData req : exchActions.cacheStartRequests())
+                registeredCaches.put(req.descriptor().cacheName(), req.descriptor());
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @return Cache descriptor if cache found.
+         */
+        @Nullable DynamicCacheDescriptor cache(String cacheName) {
+            return registeredCaches.get(cacheName);
+        }
+
+        /**
+         *
+         */
+        void clear() {
+            registeredGrps.clear();
+
+            registeredCaches.clear();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 4844a55..196df57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -898,6 +898,9 @@ public class CacheGroupContext {
             res.idealAffinityAssignment(assignment.idealAssignment());
         }
 
+        if (req.sendPartitionsState())
+            res.partitionMap(top.partitionMap(true));
+
         try {
             ctx.io().send(nodeId, res, AFFINITY_POOL);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
new file mode 100644
index 0000000..1f6f375
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy discovery message which is not really sent via ring, it is just added in local discovery worker queue.
+ */
+public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage, CachePartitionExchangeWorkerTask {
+    /** */
+    private final UUID reqId;
+
+    /** */
+    private final Map<String, DynamicCacheChangeRequest> startReqs;
+
+    /** */
+    private final Set<String> cachesToClose;
+
+    /**
+     * @param reqId Start request ID.
+     * @param startReqs Caches start requests.
+     * @param cachesToClose Cache to close.
+     */
+    public ClientCacheChangeDiscoveryMessage(UUID reqId,
+        @Nullable Map<String, DynamicCacheChangeRequest> startReqs,
+        @Nullable Set<String> cachesToClose) {
+        assert reqId != null;
+        assert startReqs != null ^ cachesToClose != null;
+
+        this.reqId = reqId;
+        this.startReqs = startReqs;
+        this.cachesToClose = cachesToClose;
+    }
+
+    /**
+     * @return Start request ID.
+     */
+    UUID requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Cache start requests.
+     */
+    @Nullable Map<String, DynamicCacheChangeRequest> startRequests() {
+        return startReqs;
+    }
+
+    /**
+     * @return Client caches to close.
+     */
+    Set<String> cachesToClose() {
+        return cachesToClose;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ClientCacheChangeDiscoveryMessage.class, this,
+            "startCaches", (startReqs != null ? startReqs.keySet() : ""));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index 0fcc740..3ca04a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -317,6 +317,8 @@ class ClusterCachesInfo {
                 continue;
             }
 
+            assert !req.clientStartOnly() : req;
+
             DynamicCacheDescriptor desc = req.globalStateChange() ? null : registeredCaches.get(req.cacheName());
 
             boolean needExchange = false;
@@ -500,14 +502,6 @@ class ClusterCachesInfo {
                     }
                 }
             }
-            else if (req.close()) {
-                if (desc != null) {
-                    needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
-
-                    if (needExchange)
-                        exchangeActions.addCacheToClose(req, desc);
-                }
-            }
             else
                 assert false : req;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 5434061..c6da43f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -67,9 +67,6 @@ public class DynamicCacheChangeRequest implements Serializable {
     /** Destroy. */
     private boolean destroy;
 
-    /** Close flag. */
-    private boolean close;
-
     /** Whether cache was created through SQL. */
     private boolean sql;
 
@@ -155,19 +152,6 @@ public class DynamicCacheChangeRequest implements Serializable {
     /**
      * @param ctx Context.
      * @param cacheName Cache name.
-     * @return Request to close client cache.
-     */
-    static DynamicCacheChangeRequest closeRequest(GridKernalContext ctx, String cacheName) {
-        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
-
-        req.close(true);
-
-        return req;
-    }
-
-    /**
-     * @param ctx Context.
-     * @param cacheName Cache name.
      * @param sql {@code true} if the cache must be stopped only if it was created by SQL command {@code CREATE TABLE}.
      * @param destroy Destroy flag.
      * @return Cache stop request.
@@ -372,20 +356,6 @@ public class DynamicCacheChangeRequest implements Serializable {
     }
 
     /**
-     * @return Close flag.
-     */
-    public boolean close() {
-        return close;
-    }
-
-    /**
-     * @param close New close flag.
-     */
-    public void close(boolean close) {
-        this.close = close;
-    }
-
-    /**
      * @return SQL flag.
      */
     public boolean sql() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 839ddbd..d753f99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -829,14 +829,7 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Partition topology.
      */
     public GridDhtPartitionTopology topology() {
-        GridCacheAdapter<K, V> cache = this.cache;
-
-        if (cache == null)
-            throw new IllegalStateException("Cache stopped: " + cacheName);
-
-        assert cache.isNear() || cache.isDht() || cache.isColocated() || cache.isDhtAtomic() : cache;
-
-        return topology(cache);
+        return grp.topology();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2d1aa05..c8d760f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1669,7 +1669,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 cctx.cache().processCustomExchangeTask(task);
             }
             catch (Exception e) {
-                U.warn(log, "Failed to process custom exchange task: " + task, e);
+                U.error(log, "Failed to process custom exchange task: " + task, e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ad83b14..21c52b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.IgniteTransactionsEx;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
@@ -184,7 +185,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>();
 
     /** Map of proxies. */
-    private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
+    private final ConcurrentHashMap<String, IgniteCacheProxy<?, ?>> jCacheProxies;
 
     /** Caches stop sequence. */
     private final Deque<String> stopSeq;
@@ -363,6 +364,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             if (msg0.exchange())
                 return new SchemaExchangeWorkerTask(msg0);
         }
+        else if (msg instanceof ClientCacheChangeDiscoveryMessage) {
+            ClientCacheChangeDiscoveryMessage msg0 = (ClientCacheChangeDiscoveryMessage)msg;
+
+            return msg0;
+        }
 
         return null;
     }
@@ -389,6 +395,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             ctx.query().onNodeLeave(task0.node());
         }
+        else if (task instanceof ClientCacheChangeDiscoveryMessage) {
+            ClientCacheChangeDiscoveryMessage task0 = (ClientCacheChangeDiscoveryMessage)task;
+
+            sharedCtx.affinity().processClientCachesChanges(task0);
+        }
         else
             U.warn(log, "Unsupported custom exchange task: " + task);
     }
@@ -2001,9 +2012,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param req Stop request.
      */
     void blockGateway(DynamicCacheChangeRequest req) {
-        assert req.stop() || req.close();
+        assert req.stop();
 
-        if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
+        if (req.stop()) {
             // Break the proxy before exchange future is done.
             IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(req.cacheName());
 
@@ -2034,7 +2045,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Cache group for stopped cache.
      */
     private CacheGroupContext prepareCacheStop(DynamicCacheChangeRequest req, boolean forceClose) {
-        assert req.stop() || req.close() || forceClose : req;
+        assert req.stop() || forceClose : req;
 
         GridCacheAdapter<?, ?> cache = caches.remove(req.cacheName());
 
@@ -2075,6 +2086,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param startTopVer Cache start version.
+     * @param err Cache start error if any.
+     */
+    void initCacheProxies(AffinityTopologyVersion startTopVer, @Nullable Throwable err) {
+        for (GridCacheAdapter<?, ?> cache : caches.values()) {
+            GridCacheContext<?, ?> cacheCtx = cache.context();
+
+            if (cacheCtx.startTopologyVersion().equals(startTopVer) && !jCacheProxies.containsKey(cacheCtx.name())) {
+                jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
+
+                if (cacheCtx.preloader() != null)
+                    cacheCtx.preloader().onInitialExchangeComplete(err);
+            }
+        }
+    }
+
+    /**
      * Callback invoked when first exchange future for dynamic cache is completed.
      *
      * @param topVer Completed topology version.
@@ -2088,16 +2116,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         Throwable err,
         boolean forceClose
     ) {
-        for (GridCacheAdapter<?, ?> cache : caches.values()) {
-            GridCacheContext<?, ?> cacheCtx = cache.context();
-
-            if (cacheCtx.startTopologyVersion().equals(topVer)) {
-                jCacheProxies.put(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
-
-                if (cacheCtx.preloader() != null)
-                    cacheCtx.preloader().onInitialExchangeComplete(err);
-            }
-        }
+        initCacheProxies(topVer, err);
 
         if (exchActions != null && (err == null || forceClose)) {
             for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
@@ -2187,6 +2206,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param reqId Request ID.
+     * @param err Error if any.
+     */
+    void completeClientCacheChangeFuture(UUID reqId, @Nullable Exception err) {
+        DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(reqId);
+
+        if (fut != null)
+            fut.onDone(false, err);
+    }
+
+    /**
      * Creates shared context.
      *
      * @param kernalCtx Kernal context.
@@ -2486,8 +2516,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 failIfExists,
                 failIfNotStarted);
 
-            if (req != null)
-                return F.first(initiateCacheChanges(F.asList(req), failIfExists));
+            if (req != null) {
+                if (req.clientStartOnly())
+                    return startClientCacheChange(F.asMap(req.cacheName(), req), null);
+
+                return F.first(initiateCacheChanges(F.asList(req)));
+            }
             else
                 return new GridFinishedFuture<>();
         }
@@ -2497,6 +2531,31 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param startReqs Start requests.
+     * @param cachesToClose Cache tp close.
+     * @return Future for cache change operation.
+     */
+    private IgniteInternalFuture<Boolean> startClientCacheChange(
+        @Nullable Map<String, DynamicCacheChangeRequest> startReqs, @Nullable Set<String> cachesToClose) {
+        assert startReqs != null ^ cachesToClose != null;
+
+        DynamicCacheStartFuture fut = new DynamicCacheStartFuture(UUID.randomUUID());
+
+        IgniteInternalFuture old = pendingFuts.put(fut.id, fut);
+
+        assert old == null : old;
+
+        ctx.discovery().clientCacheStartEvent(fut.id, startReqs, cachesToClose);
+
+        IgniteCheckedException err = checkNodeState();
+
+        if (err != null)
+            fut.onDone(err);
+
+        return fut;
+    }
+
+    /**
      * Dynamically starts multiple caches.
      *
      * @param ccfgList Collection of cache configuration.
@@ -2527,7 +2586,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (checkThreadTx)
             checkEmptyTransactions();
 
-        List<DynamicCacheChangeRequest> reqList = new ArrayList<>(ccfgList.size());
+        List<DynamicCacheChangeRequest> srvReqs = null;
+        Map<String, DynamicCacheChangeRequest> clientReqs = null;
 
         try {
             for (CacheConfiguration ccfg : ccfgList) {
@@ -2541,20 +2601,41 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     true
                 );
 
-                if (req != null)
-                    reqList.add(req);
+                if (req != null) {
+                    if (req.clientStartOnly()) {
+                        if (clientReqs == null)
+                            clientReqs = U.newLinkedHashMap(ccfgList.size());
+
+                        clientReqs.put(req.cacheName(), req);
+                    }
+                    else {
+                        if (srvReqs == null)
+                            srvReqs = new ArrayList<>(ccfgList.size());
+
+                        srvReqs.add(req);
+                    }
+                }
             }
         }
         catch (Exception e) {
             return new GridFinishedFuture<>(e);
         }
 
-        if (!reqList.isEmpty()) {
+        if (srvReqs != null || clientReqs != null) {
+            if (clientReqs != null && srvReqs == null)
+                return startClientCacheChange(clientReqs, null);
+
             GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>();
 
-            for (DynamicCacheStartFuture fut : initiateCacheChanges(reqList, failIfExists))
+            for (DynamicCacheStartFuture fut : initiateCacheChanges(srvReqs))
                 compoundFut.add((IgniteInternalFuture)fut);
 
+            if (clientReqs != null) {
+                IgniteInternalFuture<Boolean> clientStartFut = startClientCacheChange(clientReqs, null);
+
+                compoundFut.add((IgniteInternalFuture)clientStartFut);
+            }
+
             compoundFut.markInitialized();
 
             return compoundFut;
@@ -2578,7 +2659,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, sql, true);
 
-        return F.first(initiateCacheChanges(F.asList(req), false));
+        return F.first(initiateCacheChanges(F.asList(req)));
     }
 
     /**
@@ -2600,7 +2681,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>();
 
-        for (DynamicCacheStartFuture fut : initiateCacheChanges(reqs, false))
+        for (DynamicCacheStartFuture fut : initiateCacheChanges(reqs))
             compoundFut.add((IgniteInternalFuture)fut);
 
         compoundFut.markInitialized();
@@ -2622,9 +2703,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         checkEmptyTransactions();
 
-        DynamicCacheChangeRequest t = DynamicCacheChangeRequest.closeRequest(ctx, cacheName);
+        if (proxy.context().isLocal())
+            return dynamicDestroyCache(cacheName, false, true);
 
-        return F.first(initiateCacheChanges(F.asList(t), false));
+        return startClientCacheChange(null, Collections.singleton(cacheName));
     }
 
     /**
@@ -2657,7 +2739,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         GridCompoundFuture fut = new GridCompoundFuture();
 
-        for (DynamicCacheStartFuture f : initiateCacheChanges(reqs, false))
+        for (DynamicCacheStartFuture f : initiateCacheChanges(reqs))
             fut.add(f);
 
         fut.markInitialized();
@@ -2751,38 +2833,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param reqs Requests.
-     * @param failIfExists Fail if exists flag.
      * @return Collection of futures.
      */
     @SuppressWarnings("TypeMayBeWeakened")
     private Collection<DynamicCacheStartFuture> initiateCacheChanges(
-        Collection<DynamicCacheChangeRequest> reqs,
-        boolean failIfExists
+        Collection<DynamicCacheChangeRequest> reqs
     ) {
         Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());
 
         Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());
 
         for (DynamicCacheChangeRequest req : reqs) {
-            DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req);
+            DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.requestId());
 
             try {
-                if (req.stop() || req.close()) {
+                if (req.stop()) {
                     DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
 
                     if (desc == null)
                         // No-op.
                         fut.onDone(false);
-                    else {
-                        assert desc.cacheConfiguration() != null : desc;
-
-                        if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) {
-                            req.close(false);
-
-                            req.stop(true);
-                        }
-                    }
                 }
+
                 if (req.start() && req.startCacheConfiguration() != null) {
                     CacheConfiguration ccfg = req.startCacheConfiguration();
 
@@ -2821,14 +2893,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             try {
                 ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
 
-                if (ctx.isStopping()) {
-                    err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
-                        "node is stopping.");
-                }
-                else if (ctx.clientDisconnected()) {
-                    err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                        "Failed to execute dynamic cache change request, client node disconnected.");
-                }
+                err = checkNodeState();
             }
             catch (IgniteCheckedException e) {
                 err = e;
@@ -2844,6 +2909,22 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return Non null exception if node is stopping or disconnected.
+     */
+    @Nullable private IgniteCheckedException checkNodeState() {
+        if (ctx.isStopping()) {
+            return new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+                "node is stopping.");
+        }
+        else if (ctx.clientDisconnected()) {
+            return new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                "Failed to execute dynamic cache change request, client node disconnected.");
+        }
+
+        return null;
+    }
+
+    /**
      * @param type Event type.
      * @param node Event node.
      * @param topVer Topology version.
@@ -3156,13 +3237,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Getting public cache for name: " + cacheName);
 
-        IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName);
-
         DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
 
         if (desc != null && !desc.cacheType().userCache())
             throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);
 
+        IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName);
+
         if (cache == null) {
             dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get();
 
@@ -3731,33 +3812,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
     private class DynamicCacheStartFuture extends GridFutureAdapter<Boolean> {
-        /** Cache name. */
-        private String cacheName;
-
-        /** Change request. */
-        @GridToStringInclude
-        private DynamicCacheChangeRequest req;
-
-        /**
-         * @param cacheName Cache name.
-         * @param req Cache start request.
-         */
-        private DynamicCacheStartFuture(String cacheName, DynamicCacheChangeRequest req) {
-            this.cacheName = cacheName;
-            this.req = req;
-        }
+        /** */
+        private UUID id;
 
         /**
-         * @return Request.
+         * @param id Future ID.
          */
-        public DynamicCacheChangeRequest request() {
-            return req;
+        private DynamicCacheStartFuture(UUID id) {
+            this.id = id;
         }
 
         /** {@inheritDoc} */
         @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
             // Make sure to remove future before completion.
-            pendingFuts.remove(req.requestId(), this);
+            pendingFuts.remove(id, this);
 
             return super.onDone(res, err);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
new file mode 100644
index 0000000..d374f29
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ClientCacheDhtTopologyFuture extends GridFinishedFuture<AffinityTopologyVersion>
+    implements GridDhtTopologyFuture {
+    /**
+     * @param topVer Exchange topology version.
+     */
+    public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer) {
+        super(topVer);
+
+        assert topVer != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return result();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Throwable validateCache(GridCacheContext cctx,
+        boolean recovery,
+        boolean read,
+        @Nullable Object key,
+        @Nullable Collection<?> keys) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ClientCacheDhtTopologyFuture [topVer=" + topologyVersion() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 619630f..2d9e4f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -83,7 +83,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private Map<Integer, Set<UUID>> part2node = new HashMap<>();
 
     /** */
-    private GridDhtPartitionExchangeId lastExchangeId;
+    private AffinityTopologyVersion lastExchangeVer;
 
     /** */
     private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
@@ -183,21 +183,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
-        GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut,
+        GridDhtTopologyFuture exchFut,
+        DiscoCache discoCache,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {
         U.writeLock(lock);
 
         try {
-            assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
-                ", exchId=" + exchId + ']';
+            AffinityTopologyVersion exchTopVer = exchFut.topologyVersion();
+
+            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
+                ", exchVer=" + exchTopVer + ']';
 
             this.stopping = stopping;
 
-            topVer = exchId.topologyVersion();
-            discoCache = exchFut.discoCache();
+            topVer = exchTopVer;
+            this.discoCache = discoCache;
 
             updateSeq.setIfGreater(updSeq);
 
@@ -560,19 +562,20 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public GridDhtPartitionMap update(
+        @Nullable AffinityTopologyVersion exchVer,
         GridDhtPartitionFullMap partMap,
         Map<Integer, T2<Long, Long>> cntrMap) {
         if (log.isDebugEnabled())
-            log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
+            log.debug("Updating full partition map [exchVer=" + exchVer + ", parts=" + fullMapString() + ']');
 
         lock.writeLock().lock();
 
         try {
-            if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) {
+            if (exchVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchVer) >= 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                        lastExchangeVer + ", exchVer=" + exchVer + ']');
 
                 return null;
             }
@@ -580,15 +583,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
+                        lastExchangeVer + ", exchVer=" + exchVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
                 return null;
             }
 
             updateSeq.incrementAndGet();
 
-            if (exchId != null)
-                lastExchangeId = exchId;
+            if (exchVer != null)
+                lastExchangeVer = exchVer;
 
             if (node2part != null) {
                 for (GridDhtPartitionMap part : node2part.values()) {
@@ -598,7 +601,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                     // then we keep the newer value.
                     if (newPart != null && newPart.updateSequence() < part.updateSequence()) {
                         if (log.isDebugEnabled())
-                            log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
+                            log.debug("Overriding partition map in full update map [exchVer=" + exchVer + ", curPart=" +
                                 mapString(part) + ", newPart=" + mapString(newPart) + ']');
 
                         partMap.put(part.nodeId(), part);
@@ -694,16 +697,16 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (stopping)
                 return null;
 
-            if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
+            if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" +
+                        lastExchangeVer + ", exchId=" + exchId + ']');
 
                 return null;
             }
 
             if (exchId != null)
-                lastExchangeId = exchId;
+                lastExchangeVer = exchId.topologyVersion();
 
             if (node2part == null)
                 // Create invalid partition map.

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index d9d642a..44c7b88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -32,6 +32,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
     private static final long serialVersionUID = 0L;
 
     /** */
+    private static final int SND_PART_STATE_MASK = 0x01;
+
+    /** */
+    private byte flags;
+
+    /** */
     private long futId;
 
     /** Topology version being queried. */
@@ -48,16 +54,28 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
      * @param futId Future ID.
      * @param grpId Cache group ID.
      * @param topVer Topology version.
+     * @param sndPartMap {@code True} if need send in response cache partitions state.
      */
     public GridDhtAffinityAssignmentRequest(
         long futId,
         int grpId,
-        AffinityTopologyVersion topVer) {
+        AffinityTopologyVersion topVer,
+        boolean sndPartMap) {
         assert topVer != null;
 
         this.futId = futId;
         this.grpId = grpId;
         this.topVer = topVer;
+
+        if (sndPartMap)
+            flags |= SND_PART_STATE_MASK;
+    }
+
+    /**
+     * @return {@code True} if need send in response cache partitions state.
+     */
+    public boolean sendPartitionsState() {
+        return (flags & SND_PART_STATE_MASK) != 0;
     }
 
     /**
@@ -91,7 +109,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 6;
     }
 
     /** {@inheritDoc} */
@@ -110,12 +128,18 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeLong("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -138,7 +162,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
 
         switch (reader.state()) {
             case 3:
-                futId = reader.readLong("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -146,6 +170,14 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 4:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 4df3fc1..5b0de08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -21,19 +21,20 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Affinity assignment response.
@@ -62,6 +63,13 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     /** Affinity assignment bytes. */
     private byte[] idealAffAssignmentBytes;
 
+    /** */
+    @GridDirectTransient
+    private GridDhtPartitionFullMap partMap;
+
+    /** */
+    private byte[] partBytes;
+
     /**
      * Empty constructor.
      */
@@ -107,30 +115,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     }
 
     /**
-     * @param disco Discovery manager.
+     * @param discoCache Discovery data cache.
      * @return Affinity assignment.
      */
-    public List<List<ClusterNode>> affinityAssignment(GridDiscoveryManager disco) {
+    public List<List<ClusterNode>> affinityAssignment(DiscoCache discoCache) {
         if (affAssignmentIds != null)
-            return nodes(disco, affAssignmentIds);
+            return nodes(discoCache, affAssignmentIds);
 
         return null;
     }
 
     /**
-     * @param disco Discovery manager.
+     * @param discoCache Discovery data cache.
      * @return Ideal affinity assignment.
      */
-    public List<List<ClusterNode>> idealAffinityAssignment(GridDiscoveryManager disco) {
-        return nodes(disco, idealAffAssignment);
+    public List<List<ClusterNode>> idealAffinityAssignment(DiscoCache discoCache) {
+        return nodes(discoCache, idealAffAssignment);
     }
 
     /**
-     * @param disco Discovery manager.
+     * @param discoCache Discovery data cache.
      * @param assignmentIds Assignment node IDs.
      * @return Assignment nodes.
      */
-    private List<List<ClusterNode>> nodes(GridDiscoveryManager disco, List<List<UUID>> assignmentIds) {
+    private List<List<ClusterNode>> nodes(DiscoCache discoCache, List<List<UUID>> assignmentIds) {
         if (assignmentIds != null) {
             List<List<ClusterNode>> assignment = new ArrayList<>(assignmentIds.size());
 
@@ -139,7 +147,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
                 List<ClusterNode> nodes = new ArrayList<>(ids.size());
 
                 for (int j = 0; j < ids.size(); j++) {
-                    ClusterNode node = disco.node(topVer, ids.get(j));
+                    ClusterNode node = discoCache.node(ids.get(j));
 
                     assert node != null;
 
@@ -163,6 +171,20 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     }
 
     /**
+     * @param partMap Partition map.
+     */
+    public void partitionMap(GridDhtPartitionFullMap partMap) {
+        this.partMap = partMap;
+    }
+
+    /**
+     * @return Partition map.
+     */
+    @Nullable public GridDhtPartitionFullMap partitionMap() {
+        return partMap;
+    }
+
+    /**
      * @param assignments Assignment.
      * @return Assignment where cluster nodes are converted to their ids.
      */
@@ -193,7 +215,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /**
@@ -208,6 +230,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
 
         if (idealAffAssignment != null && idealAffAssignmentBytes == null)
             idealAffAssignmentBytes = U.marshal(ctx, idealAffAssignment);
+
+        if (partMap != null && partBytes == null)
+            partBytes = U.zip(U.marshal(ctx.marshaller(), partMap));
     }
 
     /** {@inheritDoc} */
@@ -222,6 +247,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
 
         if (idealAffAssignmentBytes != null && idealAffAssignment == null)
             idealAffAssignment = U.unmarshal(ctx, idealAffAssignmentBytes, ldr);
+
+        if (partBytes != null && partMap == null)
+            partMap = U.unmarshalZip(ctx.marshaller(), partBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
     }
 
     /** {@inheritDoc} */
@@ -263,6 +291,12 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeByteArray("partBytes", partBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -309,6 +343,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 6:
+                partBytes = reader.readByteArray("partBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 8746320..dcc08d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -27,13 +27,12 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridNodeOrderComparator;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -76,25 +75,28 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     /** */
     private final int grpId;
 
+    /** */
+    private boolean needPartState;
+
     /**
      * @param ctx Context.
-     * @param grpDesc Group descriptor.
+     * @param grpId Group ID.
      * @param topVer Topology version.
      * @param discoCache Discovery cache.
      */
     public GridDhtAssignmentFetchFuture(
         GridCacheSharedContext ctx,
-        CacheGroupDescriptor grpDesc,
+        int grpId,
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
         this.topVer = topVer;
-        this.grpId = grpDesc.groupId();
+        this.grpId = grpId;
         this.ctx = ctx;
 
         id = idGen.getAndIncrement();
 
-        Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId());
+        Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpId);
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
@@ -127,8 +129,12 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
 
     /**
      * Initializes fetch future.
+     *
+     * @param needPartState {@code True} if also need fetch partitions state.
      */
-    public void init() {
+    public void init(boolean needPartState) {
+        this.needPartState = needPartState;
+
         ctx.affinity().addDhtAssignmentFetchFuture(this);
 
         requestFromNextNode();
@@ -195,7 +201,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                             ", node=" + node + ']');
 
                     ctx.io().send(node,
-                        new GridDhtAffinityAssignmentRequest(id, grpId, topVer),
+                        new GridDhtAffinityAssignmentRequest(id, grpId, topVer, needPartState),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d365a8e..92a2981 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -54,15 +55,15 @@ public interface GridDhtPartitionTopology {
     /**
      * Updates topology version.
      *
-     * @param exchId Exchange ID.
      * @param exchFut Exchange future.
+     * @param discoCache Discovery data cache.
      * @param updateSeq Update sequence.
      * @param stopping Stopping flag.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void updateTopologyVersion(
-        GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut,
+        GridDhtTopologyFuture exchFut,
+        DiscoCache discoCache,
         long updateSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException;
@@ -221,12 +222,13 @@ public interface GridDhtPartitionTopology {
     public void onRemoved(GridDhtCacheEntry e);
 
     /**
-     * @param exchId Exchange ID.
+     * @param exchangeVer Exchange version.
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
-    public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
+    public GridDhtPartitionMap update(
+        @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/27d9e4ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 248d44e..494345b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -101,7 +101,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private Map<Integer, Set<UUID>> part2node = new HashMap<>();
 
     /** */
-    private GridDhtPartitionExchangeId lastExchangeId;
+    private AffinityTopologyVersion lastExchangeVer;
 
     /** */
     private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
@@ -163,7 +163,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             part2node = new HashMap<>();
 
-            lastExchangeId = null;
+            lastExchangeVer = null;
 
             updateSeq.set(1);
 
@@ -297,16 +297,18 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
-        GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut,
+        GridDhtTopologyFuture exchFut,
+        DiscoCache discoCache,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {
         U.writeLock(lock);
 
         try {
-            assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
-                ", exchId=" + exchId +
+            AffinityTopologyVersion exchTopVer = exchFut.topologyVersion();
+
+            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
+                ", exchTopVer=" + exchTopVer +
                 ", fut=" + exchFut + ']';
 
             this.stopping = stopping;
@@ -317,9 +319,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             rebalancedTopVer = AffinityTopologyVersion.NONE;
 
-            topVer = exchId.topologyVersion();
+            topVer = exchTopVer;
 
-            discoCache = exchFut.discoCache();
+            this.discoCache = discoCache;
         }
         finally {
             lock.writeLock().unlock();
@@ -1105,12 +1107,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Override public GridDhtPartitionMap update(
-        @Nullable GridDhtPartitionExchangeId exchId,
+        @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap
     ) {
         if (log.isDebugEnabled())
-            log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
+            log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
 
         assert partMap != null;
 
@@ -1143,27 +1145,26 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            //if need skip
-            if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) {
+            if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                    log.debug("Stale exchange id for full partition map update (will ignore) [lastExch=" +
+                        lastExchangeVer + ", exch=" + exchangeVer + ']');
 
                 return null;
             }
 
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
+                    log.debug("Stale partition map for full partition map update (will ignore) [lastExch=" +
+                        lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
                 return null;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            if (exchId != null)
-                lastExchangeId = exchId;
+            if (exchangeVer != null)
+                lastExchangeVer = exchangeVer;
 
             if (node2part != null) {
                 for (GridDhtPartitionMap part : node2part.values()) {
@@ -1176,8 +1177,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
-                            log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
-                                mapString(part) + ", newPart=" + mapString(newPart) + ']');
+                            log.debug("Overriding partition map in full update map [exch=" + exchangeVer +
+                                ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']');
 
                         partMap.put(part.nodeId(), part);
                     }
@@ -1329,16 +1330,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return null;
 
-            if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
+            if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExch=" +
+                        lastExchangeVer + ", exch=" + exchId.topologyVersion() + ']');
 
                 return null;
             }
 
             if (exchId != null)
-                lastExchangeId = exchId;
+                lastExchangeVer = exchId.topologyVersion();
 
             if (node2part == null)
                 // Create invalid partition map.


Mime
View raw message