ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5272
Date Mon, 05 Jun 2017 14:57:15 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5272 [created] 3f6cc1432


ignite-5272


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3f6cc143
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3f6cc143
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3f6cc143

Branch: refs/heads/ignite-5272
Commit: 3f6cc143295f1c3425812749f505c78902ffbb20
Parents: 0ce28c6
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jun 5 16:46:38 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jun 5 17:57:09 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  15 +
 .../cache/CacheAffinitySharedManager.java       | 297 +++++++++++++++----
 .../processors/cache/CacheGroupContext.java     |   3 +
 .../ClientCacheChangeDiscoveryMessage.java      |  89 ++++++
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../processors/cache/GridCacheProcessor.java    | 135 ++++++---
 .../dht/ClientCacheDhtTopologyFuture.java       |  45 +++
 .../dht/GridDhtAffinityAssignmentRequest.java   |  40 ++-
 .../dht/GridDhtAffinityAssignmentResponse.java  |  66 ++++-
 .../dht/GridDhtAssignmentFetchFuture.java       |  19 +-
 .../dht/GridDhtPartitionTopology.java           |  12 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  50 ++--
 12 files changed, 626 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index e144d9a..cf9abfe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -70,6 +70,8 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -2001,6 +2003,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param reqId Start request ID.
+     * @param reqs Cache start requests.
+     */
+    public void clientCacheStartEvent(UUID reqId, Map<String, DynamicCacheChangeRequest> reqs) {
+        discoWrk.addEvent(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
+            AffinityTopologyVersion.NONE,
+            localNode(),
+            null,
+            Collections.<ClusterNode>emptyList(),
+            new ClientCacheChangeDiscoveryMessage(reqId, reqs));
+    }
+
+    /**
      * Gets first grid node start time, see {@link DiscoverySpi#getGridStartTime()}.
      *
      * @return Start time of the first grid node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 44bb04f..a4afbe6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
@@ -36,13 +37,16 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -81,7 +85,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private AffinityTopologyVersion lastAffVer;
 
     /** Registered caches (updated from exchange thread). */
-    private final Map<Integer, CacheGroupDescriptor> registeredGrps = new HashMap<>();
+    private final CachesInfo caches = new CachesInfo();
 
     /** */
     private WaitRebalanceInfo waitInfo;
@@ -131,14 +135,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
         if (type == EVT_NODE_JOINED && node.isLocal()) {
             // Clean-up in case of client reconnect.
-            registeredGrps.clear();
+            caches.clear();
 
             affCalcVer = null;
 
             lastAffVer = null;
 
-            for (CacheGroupDescriptor desc : cctx.cache().cacheGroupDescriptors().values())
-                registeredGrps.put(desc.groupId(), desc);
+            caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors());
         }
 
         if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
@@ -327,17 +330,110 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param exchActions Cache change requests to execute on exchange.
      */
     private void updateCachesInfo(ExchangeActions exchActions) {
-        for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) {
-            CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId());
+        caches.updateCachesInfo(exchActions);
+    }
+
+    /**
+     * @param msg Message.
+     */
+    void startClientCaches(ClientCacheChangeDiscoveryMessage msg) {
+        List<DynamicCacheDescriptor> startDescs = new ArrayList<>(msg.requests().size());
+
+        for (DynamicCacheChangeRequest startReq : msg.requests().values()) {
+            DynamicCacheDescriptor desc = caches.cache(startReq.cacheName());
+
+            if (desc == null) {
+                CacheException err = new CacheException("Failed to start client cache " +
+                    "(a cache with the given name is not started): " + startReq.cacheName());
+
+                cctx.cache().completeClientCacheStartFuture(msg.requestId(), err);
+
+                return;
+            }
+
+            if (cctx.cacheContext(desc.cacheId()) != null)
+                continue;
 
-            assert rmvd != null : stopDesc.cacheOrGroupName();
+            startDescs.add(desc);
         }
 
-        for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) {
-            CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc);
+        AffinityTopologyVersion topVer = cctx.exchange().readyAffinityVersion();
 
-            assert old == null : old;
+        Map<Integer, GridDhtAssignmentFetchFuture> fetchFuts = U.newHashMap(startDescs.size());
+
+        for (DynamicCacheDescriptor desc : startDescs) {
+            try {
+                DynamicCacheChangeRequest startReq = msg.requests().get(desc.cacheName());
+
+                cctx.cache().prepareCacheStart(desc,
+                    startReq.nearCacheConfiguration(),
+                    topVer);
+
+                CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+                assert grp != null : desc.groupId();
+                assert !grp.affinityNode() : grp.cacheOrGroupName();
+
+                if (!grp.isLocal() &&
+                    grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) &&
+                    grp.localStartVersion().equals(topVer) &&
+                    !fetchFuts.containsKey(grp.groupId())) {
+                    GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
+                        grp.groupId(),
+                        topVer,
+                        msg.discoCache());
+
+                    fetchFut.init(true);
+
+                    fetchFuts.put(grp.groupId(), fetchFut);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                cctx.cache().completeClientCacheStartFuture(msg.requestId(), e);
+
+                return;
+            }
         }
+
+        for (GridDhtAssignmentFetchFuture fetchFut : fetchFuts.values()) {
+            try {
+                CacheGroupContext grp = cctx.cache().cacheGroup(fetchFut.groupId());
+
+                assert grp != null;
+
+                GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer,
+                    null,
+                    msg.discoCache(),
+                    grp.affinity(),
+                    fetchFut);
+
+                GridDhtPartitionFullMap partMap;
+
+                if (res != null) {
+                    partMap = res.partitionMap();
+
+                    assert partMap != null : res;
+
+                    ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer);
+
+                    grp.topology().updateTopologyVersion(topVer, topFut, msg.discoCache(), -1, false);
+
+                    grp.topology().update(topVer, partMap, null, null);
+                }
+                else {
+                    // TODO 5272: mark as 'no server nodes'
+                }
+            }
+            catch (IgniteCheckedException e) {
+                cctx.cache().completeClientCacheStartFuture(msg.requestId(), e);
+
+                return;
+            }
+        }
+
+        cctx.cache().initCacheProxies(topVer, null);
+
+        cctx.cache().completeClientCacheStartFuture(msg.requestId(), null);
     }
 
     /**
@@ -405,20 +501,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         }
 
-            Set<Integer> gprs = new HashSet<>();
+        Set<Integer> gprs = new HashSet<>();
 
-                for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
-                    Integer grpId = action.descriptor().groupId();
+        for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
+            Integer grpId = action.descriptor().groupId();
 
-                    if (gprs.add(grpId)) {
-                        if (crd && lateAffAssign)
-                    initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());else  {
-                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
+            if (gprs.add(grpId)) {
+                if (crd && lateAffAssign)
+                    initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());
+                else {
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                        if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) {
+                    if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) {
                         assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
 
-                        initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut);
+                        initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
                     }
                 }
             }
@@ -464,7 +561,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         assert !grpHolder.client() : grpHolder;
 
                         grpHolder = CacheGroupHolder2.create(cctx,
-                            registeredGrps.get(grp.groupId()),
+                            caches.group(grp.groupId()),
                             fut,
                             grp.affinity());
 
@@ -547,7 +644,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     public void removeAllCacheInfo() {
         grpHolders.clear();
 
-        registeredGrps.clear();
+        caches.clear();
     }
 
     /**
@@ -637,7 +734,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 assert affTopVer.topologyVersion() > 0 : affTopVer;
 
-                CacheGroupDescriptor desc = registeredGrps.get(aff.groupId());
+                CacheGroupDescriptor desc = caches.group(aff.groupId());
 
                 assert desc != null : aff.cacheOrGroupName();
 
@@ -767,7 +864,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException {
         assert lateAffAssign;
 
-        for (CacheGroupDescriptor cacheDesc : registeredGrps.values()) {
+        for (CacheGroupDescriptor cacheDesc : caches.allGroups()) {
             if (cacheDesc.config().getCacheMode() == LOCAL)
                 continue;
 
@@ -847,12 +944,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     public void initStartedCaches(boolean crd,
         final GridDhtPartitionsExchangeFuture fut,
         Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
-        for (DynamicCacheDescriptor desc : descs) {
-            CacheGroupDescriptor grpDesc = desc.groupDescriptor();
-
-            if (!registeredGrps.containsKey(grpDesc.groupId()))
-                registeredGrps.put(grpDesc.groupId(), grpDesc);
-        }
+        caches.initStartedCaches(descs);
 
         if (crd && lateAffAssign) {
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@@ -872,7 +964,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                     if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
-                        initAffinity(registeredGrps.get(aff.groupId()), aff, fut);
+                        initAffinity(caches.group(aff.groupId()), aff, fut);
                 }
             });
         }
@@ -897,13 +989,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
         else {
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                desc,
+                desc.groupId(),
                 fut.topologyVersion(),
                 fut.discoCache());
 
-            fetchFut.init();
+            fetchFut.init(false);
 
-            fetchAffinity(fut, aff, fetchFut);
+            fetchAffinity(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                fut.discoCache(),
+                aff, fetchFut);
         }
     }
 
@@ -994,7 +1089,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         StringBuilder names = new StringBuilder();
 
         for (Integer grpId : grpIds) {
-            String name = registeredGrps.get(grpId).cacheOrGroupName();
+            String name = caches.group(grpId).cacheOrGroupName();
 
             if (names.length() != 0)
                 names.append(", ");
@@ -1026,16 +1121,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 grp.affinity().initialize(fut.topologyVersion(), assignment);
             }
             else {
-                CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId());
+                CacheGroupDescriptor grpDesc = caches.group(grp.groupId());
 
                 assert grpDesc != null : grp.cacheOrGroupName();
 
                 GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                    grpDesc,
+                    grpDesc.groupId(),
                     topVer,
                     fut.discoCache());
 
-                fetchFut.init();
+                fetchFut.init(false);
 
                 fetchFuts.add(fetchFut);
             }
@@ -1046,48 +1141,57 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             Integer grpId = fetchFut.groupId();
 
-            fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut);
+            fetchAffinity(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                fut.discoCache(),
+                cctx.cache().cacheGroup(grpId).affinity(),
+                fetchFut);
         }
     }
 
     /**
-     * @param fut Exchange future.
+     * @param topVer Topology version.
+     * @param discoveryEvt Discovery event.
+     * @param discoCache Discovery data cache.
      * @param affCache Affinity.
      * @param fetchFut Affinity fetch future.
      * @throws IgniteCheckedException If failed.
+     * @return Affinity assignment response.
      */
-    private void fetchAffinity(GridDhtPartitionsExchangeFuture fut,
+    private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer,
+        @Nullable DiscoveryEvent discoveryEvt,
+        DiscoCache discoCache,
         GridAffinityAssignmentCache affCache,
         GridDhtAssignmentFetchFuture fetchFut)
         throws IgniteCheckedException {
         assert affCache != null;
 
-        AffinityTopologyVersion topVer = fut.topologyVersion();
-
         GridDhtAffinityAssignmentResponse res = fetchFut.get();
 
         if (res == null) {
-            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+            List<List<ClusterNode>> aff = affCache.calculate(topVer, discoveryEvt, discoCache);
 
             affCache.initialize(topVer, aff);
         }
         else {
-            List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(cctx.discovery());
+            List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache);
 
             if (idealAff != null)
                 affCache.idealAssignment(idealAff);
             else {
                 assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
 
-                affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+                affCache.calculate(topVer, discoveryEvt, discoCache);
             }
 
-            List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery());
+            List<List<ClusterNode>> aff = res.affinityAssignment(discoCache);
 
             assert aff != null : res;
 
             affCache.initialize(topVer, aff);
         }
+
+        return res;
     }
 
     /**
@@ -1140,7 +1244,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (grp.isLocal())
                 continue;
 
-            initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut);
+            initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
         }
     }
 
@@ -1202,18 +1306,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
 
                     GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                        desc,
+                        desc.groupId(),
                         prev.topologyVersion(),
                         prev.discoCache());
 
-                    fetchFut.init();
+                    fetchFut.init(false);
 
                     final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
 
                     fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
                         @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
                             throws IgniteCheckedException {
-                            fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut);
+                            fetchAffinity(prev.topologyVersion(),
+                                prev.discoveryEvent(),
+                                prev.discoCache(),
+                                aff, (GridDhtAssignmentFetchFuture)fetchFut);
 
                             aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
@@ -1876,7 +1983,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (cacheWaitParts == null) {
                 waitGrps.put(grpId, cacheWaitParts = new HashMap<>());
 
-                deploymentIds.put(grpId, registeredGrps.get(grpId).deploymentId());
+                deploymentIds.put(grpId, caches.group(grpId).deploymentId());
             }
 
             cacheWaitParts.put(part, waitNode);
@@ -1895,4 +2002,92 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + ']';
         }
     }
+
+    /**
+     *
+     */
+    static class CachesInfo {
+        /** Registered caches (updated from exchange thread). */
+        private final ConcurrentHashMap<Integer, CacheGroupDescriptor> registeredGrps = new ConcurrentHashMap<>();
+
+        /** */
+        private final ConcurrentHashMap<String, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
+
+        /**
+         * @param grps Registered groups.
+         * @param caches Registered caches.
+         */
+        void init(Map<Integer, CacheGroupDescriptor> grps, Map<String, DynamicCacheDescriptor> caches) {
+            for (CacheGroupDescriptor grp : grps.values())
+                registeredGrps.put(grp.groupId(), grp);
+
+            for (DynamicCacheDescriptor cache : caches.values())
+                registeredCaches.put(cache.cacheName(), cache);
+        }
+
+        /**
+         * @return All registered groups.
+         */
+        Collection<CacheGroupDescriptor> allGroups() {
+            return registeredGrps.values();
+        }
+
+        /**
+         * @param grpId Group ID.
+         * @return Group descriptor.
+         */
+        CacheGroupDescriptor group(int grpId) {
+            CacheGroupDescriptor desc = registeredGrps.get(grpId);
+
+            assert desc != null : grpId;
+
+            return desc;
+        }
+
+        /**
+          * @param descs Cache descriptor.
+         */
+        void initStartedCaches(Collection<DynamicCacheDescriptor> descs) {
+            for (DynamicCacheDescriptor desc : descs) {
+                CacheGroupDescriptor grpDesc = desc.groupDescriptor();
+
+                if (!registeredGrps.containsKey(grpDesc.groupId()))
+                    registeredGrps.put(grpDesc.groupId(), grpDesc);
+            }
+        }
+
+        /**
+         * @param exchActions Exchange actions.
+         */
+        void updateCachesInfo(ExchangeActions exchActions) {
+            for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) {
+                CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId());
+
+                assert rmvd != null : stopDesc.cacheOrGroupName();
+            }
+
+            for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) {
+                CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc);
+
+                assert old == null : old;
+            }
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @return Cache descriptor if cache found.
+         */
+        @Nullable DynamicCacheDescriptor cache(String cacheName) {
+            return registeredCaches.get(cacheName);
+        }
+
+        /**
+         *
+         */
+        void clear() {
+            registeredGrps.clear();
+
+            registeredCaches.clear();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index b85c41d..1ccc2d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -919,6 +919,9 @@ public class CacheGroupContext {
             res.idealAffinityAssignment(assignment.idealAssignment());
         }
 
+        if (req.sendPartitionMap())
+            res.partitionMap(top.partitionMap(true));
+
         try {
             ctx.io().send(nodeId, res, AFFINITY_POOL);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
new file mode 100644
index 0000000..d1a622d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy discovery message which is not really sent via ring, it is just added in local discovery worker queue.
+ */
+public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage, CachePartitionExchangeWorkerTask {
+    /** */
+    private final UUID reqId;
+
+    /** */
+    private final Map<String, DynamicCacheChangeRequest> reqs;
+
+    /** */
+    private DiscoCache discoCache;
+
+    /**
+     * @param reqId Start request ID.
+     * @param reqs Caches start requests.
+     */
+    public ClientCacheChangeDiscoveryMessage(UUID reqId, Map<String, DynamicCacheChangeRequest> reqs) {
+        assert reqId != null;
+        assert !F.isEmpty(reqs);
+
+        this.reqId = reqId;
+        this.reqs = reqs;
+    }
+
+    DiscoCache discoCache() {
+        return discoCache;
+    }
+
+    void discoCache(DiscoCache discoCache) {
+        this.discoCache = discoCache;
+    }
+
+    /**
+     * @return Start request ID.
+     */
+    UUID requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Cache start requests.
+     */
+    Map<String, DynamicCacheChangeRequest> requests() {
+        return reqs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index ae4c164..14f46ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -268,7 +268,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     else {
                         // Process event as custom discovery task if needed.
                         CachePartitionExchangeWorkerTask task =
-                            cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+                            cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg, cache);
 
                         if (task != null)
                             exchWorker.addCustomTask(task);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 577bfb9..33d9e92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.IgniteTransactionsEx;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
@@ -356,15 +357,24 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Create exchange worker task for custom discovery message.
      *
      * @param msg Custom discovery message.
+     * @param discoCache Discovery data cache.
      * @return Task or {@code null} if message doesn't require any special processing.
      */
-    public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+    public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg,
+        DiscoCache discoCache) {
         if (msg instanceof SchemaAbstractDiscoveryMessage) {
             SchemaAbstractDiscoveryMessage msg0 = (SchemaAbstractDiscoveryMessage)msg;
 
             if (msg0.exchange())
                 return new SchemaExchangeWorkerTask(msg0);
         }
+        else if (msg instanceof ClientCacheChangeDiscoveryMessage) {
+            ClientCacheChangeDiscoveryMessage msg0 = (ClientCacheChangeDiscoveryMessage)msg;
+
+            msg0.discoCache(discoCache);
+
+            return msg0;
+        }
 
         return null;
     }
@@ -391,6 +401,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             ctx.query().onNodeLeave(task0.node());
         }
+        else if (task instanceof ClientCacheChangeDiscoveryMessage) {
+            ClientCacheChangeDiscoveryMessage task0 = (ClientCacheChangeDiscoveryMessage)task;
+
+            sharedCtx.affinity().startClientCaches(task0);
+        }
         else
             U.warn(log, "Unsupported custom exchange task: " + task);
     }
@@ -2086,6 +2101,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         onExchangeDone(topVer, actions, err, true);
     }
 
+    void initCacheProxies(AffinityTopologyVersion startTopVer, Throwable err) {
+        for (GridCacheAdapter<?, ?> cache : caches.values()) {
+            GridCacheContext<?, ?> cacheCtx = cache.context();
+
+            if (cacheCtx.startTopologyVersion().equals(startTopVer) && !jCacheProxies.containsKey(cacheCtx.name())) {
+                jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
+
+                if (cacheCtx.preloader() != null)
+                    cacheCtx.preloader().onInitialExchangeComplete(err);
+            }
+        }
+    }
+
     /**
      * Callback invoked when first exchange future for dynamic cache is completed.
      *
@@ -2100,16 +2128,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         Throwable err,
         boolean forceClose
     ) {
-        for (GridCacheAdapter<?, ?> cache : caches.values()) {
-            GridCacheContext<?, ?> cacheCtx = cache.context();
-
-            if (cacheCtx.startTopologyVersion().equals(topVer)) {
-                jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
-
-                if (cacheCtx.preloader() != null)
-                    cacheCtx.preloader().onInitialExchangeComplete(err);
-            }
-        }
+        initCacheProxies(topVer, err);
 
         if (exchActions != null && (err == null || forceClose)) {
             Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps = null;
@@ -2234,6 +2253,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param reqId Request ID.
+     * @param err Error if any.
+     */
+    void completeClientCacheStartFuture(UUID reqId, @Nullable Exception err) {
+        DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(reqId);
+
+        if (fut != null)
+            fut.onDone(false, err);
+    }
+
+    /**
      * Creates shared context.
      *
      * @param kernalCtx Kernal context.
@@ -2550,8 +2580,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 failIfExists,
                 failIfNotStarted);
 
-            if (req != null)
+            if (req != null) {
+                if (req.clientStartOnly())
+                    return startClientCaches(F.asMap(req.cacheName(), req));
+
                 return F.first(initiateCacheChanges(F.asList(req), failIfExists));
+            }
             else
                 return new GridFinishedFuture<>();
         }
@@ -2561,6 +2595,27 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param reqs Requests.
+     * @return Start future.
+     */
+    private IgniteInternalFuture<Boolean> startClientCaches(Map<String, DynamicCacheChangeRequest> reqs) {
+        DynamicCacheStartFuture fut = new DynamicCacheStartFuture(UUID.randomUUID());
+
+        IgniteInternalFuture old = pendingFuts.put(fut.id, fut);
+
+        assert old == null;
+
+        ctx.discovery().clientCacheStartEvent(fut.id, reqs);
+
+        IgniteCheckedException err = checkNodeState();
+
+        if (err != null)
+            fut.onDone(err);
+
+        return fut;
+    }
+
+    /**
      * Dynamically starts multiple caches.
      *
      * @param ccfgList Collection of cache configuration.
@@ -2833,7 +2888,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());
 
         for (DynamicCacheChangeRequest req : reqs) {
-            DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req);
+            DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.requestId());
 
             try {
                 if (req.stop() || req.close()) {
@@ -2890,14 +2945,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             try {
                 ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
 
-                if (ctx.isStopping()) {
-                    err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
-                        "node is stopping.");
-                }
-                else if (ctx.clientDisconnected()) {
-                    err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                        "Failed to execute dynamic cache change request, client node disconnected.");
-                }
+                err = checkNodeState();
             }
             catch (IgniteCheckedException e) {
                 err = e;
@@ -2913,6 +2961,22 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return Non null exception if node is stopping or disconnected.
+     */
+    @Nullable private IgniteCheckedException checkNodeState() {
+        if (ctx.isStopping()) {
+            return new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+                "node is stopping.");
+        }
+        else if (ctx.clientDisconnected()) {
+            return new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                "Failed to execute dynamic cache change request, client node disconnected.");
+        }
+
+        return null;
+    }
+
+    /**
      * @param type Event type.
      * @param node Event node.
      * @param topVer Topology version.
@@ -3225,13 +3289,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Getting public cache for name: " + cacheName);
 
-        IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName);
-
         DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
 
         if (desc != null && !desc.cacheType().userCache())
             throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);
 
+        IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName);
+
         if (cache == null) {
             dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get();
 
@@ -3800,33 +3864,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
     private class DynamicCacheStartFuture extends GridFutureAdapter<Boolean> {
-        /** Cache name. */
-        private String cacheName;
-
-        /** Change request. */
-        @GridToStringInclude
-        private DynamicCacheChangeRequest req;
-
-        /**
-         * @param cacheName Cache name.
-         * @param req Cache start request.
-         */
-        private DynamicCacheStartFuture(String cacheName, DynamicCacheChangeRequest req) {
-            this.cacheName = cacheName;
-            this.req = req;
-        }
+        /** */
+        private UUID id;
 
         /**
-         * @return Request.
+         * @param id Future ID.
          */
-        public DynamicCacheChangeRequest request() {
-            return req;
+        private DynamicCacheStartFuture(UUID id) {
+            this.id = id;
         }
 
         /** {@inheritDoc} */
         @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
             // Make sure to remove future before completion.
-            pendingFuts.remove(req.requestId(), this);
+            pendingFuts.remove(id, this);
 
             return super.onDone(res, err);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
new file mode 100644
index 0000000..ddb64a9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class ClientCacheDhtTopologyFuture extends GridFinishedFuture<AffinityTopologyVersion> implements GridDhtTopologyFuture {
+    public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer) {
+        super(topVer);
+    }
+
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return result();
+    }
+
+    @Nullable @Override public Throwable validateCache(GridCacheContext cctx,
+        boolean recovery,
+        boolean read,
+        @Nullable Object key,
+        @Nullable Collection<?> keys) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index d9d642a..44c7b88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -32,6 +32,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
     private static final long serialVersionUID = 0L;
 
     /** */
+    private static final int SND_PART_STATE_MASK = 0x01;
+
+    /** */
+    private byte flags;
+
+    /** */
     private long futId;
 
     /** Topology version being queried. */
@@ -48,16 +54,28 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
      * @param futId Future ID.
      * @param grpId Cache group ID.
      * @param topVer Topology version.
+     * @param sndPartMap {@code True} if need send in response cache partitions state.
      */
     public GridDhtAffinityAssignmentRequest(
         long futId,
         int grpId,
-        AffinityTopologyVersion topVer) {
+        AffinityTopologyVersion topVer,
+        boolean sndPartMap) {
         assert topVer != null;
 
         this.futId = futId;
         this.grpId = grpId;
         this.topVer = topVer;
+
+        if (sndPartMap)
+            flags |= SND_PART_STATE_MASK;
+    }
+
+    /**
+     * @return {@code True} if need send in response cache partitions state.
+     */
+    public boolean sendPartitionsState() {
+        return (flags & SND_PART_STATE_MASK) != 0;
     }
 
     /**
@@ -91,7 +109,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 6;
     }
 
     /** {@inheritDoc} */
@@ -110,12 +128,18 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeLong("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -138,7 +162,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
 
         switch (reader.state()) {
             case 3:
-                futId = reader.readLong("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -146,6 +170,14 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 4:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 4df3fc1..5b0de08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -21,19 +21,20 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Affinity assignment response.
@@ -62,6 +63,13 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     /** Affinity assignment bytes. */
     private byte[] idealAffAssignmentBytes;
 
+    /** */
+    @GridDirectTransient
+    private GridDhtPartitionFullMap partMap;
+
+    /** */
+    private byte[] partBytes;
+
     /**
      * Empty constructor.
      */
@@ -107,30 +115,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     }
 
     /**
-     * @param disco Discovery manager.
+     * @param discoCache Discovery data cache.
      * @return Affinity assignment.
      */
-    public List<List<ClusterNode>> affinityAssignment(GridDiscoveryManager disco) {
+    public List<List<ClusterNode>> affinityAssignment(DiscoCache discoCache) {
         if (affAssignmentIds != null)
-            return nodes(disco, affAssignmentIds);
+            return nodes(discoCache, affAssignmentIds);
 
         return null;
     }
 
     /**
-     * @param disco Discovery manager.
+     * @param discoCache Discovery data cache.
      * @return Ideal affinity assignment.
      */
-    public List<List<ClusterNode>> idealAffinityAssignment(GridDiscoveryManager disco) {
-        return nodes(disco, idealAffAssignment);
+    public List<List<ClusterNode>> idealAffinityAssignment(DiscoCache discoCache) {
+        return nodes(discoCache, idealAffAssignment);
     }
 
     /**
-     * @param disco Discovery manager.
+     * @param discoCache Discovery data cache.
      * @param assignmentIds Assignment node IDs.
      * @return Assignment nodes.
      */
-    private List<List<ClusterNode>> nodes(GridDiscoveryManager disco, List<List<UUID>> assignmentIds) {
+    private List<List<ClusterNode>> nodes(DiscoCache discoCache, List<List<UUID>> assignmentIds) {
         if (assignmentIds != null) {
             List<List<ClusterNode>> assignment = new ArrayList<>(assignmentIds.size());
 
@@ -139,7 +147,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
                 List<ClusterNode> nodes = new ArrayList<>(ids.size());
 
                 for (int j = 0; j < ids.size(); j++) {
-                    ClusterNode node = disco.node(topVer, ids.get(j));
+                    ClusterNode node = discoCache.node(ids.get(j));
 
                     assert node != null;
 
@@ -163,6 +171,20 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     }
 
     /**
+     * @param partMap Partition map.
+     */
+    public void partitionMap(GridDhtPartitionFullMap partMap) {
+        this.partMap = partMap;
+    }
+
+    /**
+     * @return Partition map.
+     */
+    @Nullable public GridDhtPartitionFullMap partitionMap() {
+        return partMap;
+    }
+
+    /**
      * @param assignments Assignment.
      * @return Assignment where cluster nodes are converted to their ids.
      */
@@ -193,7 +215,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /**
@@ -208,6 +230,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
 
         if (idealAffAssignment != null && idealAffAssignmentBytes == null)
             idealAffAssignmentBytes = U.marshal(ctx, idealAffAssignment);
+
+        if (partMap != null && partBytes == null)
+            partBytes = U.zip(U.marshal(ctx.marshaller(), partMap));
     }
 
     /** {@inheritDoc} */
@@ -222,6 +247,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
 
         if (idealAffAssignmentBytes != null && idealAffAssignment == null)
             idealAffAssignment = U.unmarshal(ctx, idealAffAssignmentBytes, ldr);
+
+        if (partBytes != null && partMap == null)
+            partMap = U.unmarshalZip(ctx.marshaller(), partBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
     }
 
     /** {@inheritDoc} */
@@ -263,6 +291,12 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeByteArray("partBytes", partBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -309,6 +343,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 6:
+                partBytes = reader.readByteArray("partBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 8746320..9595ba4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -76,25 +76,28 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     /** */
     private final int grpId;
 
+    /** */
+    private boolean needPartState;
+
     /**
      * @param ctx Context.
-     * @param grpDesc Group descriptor.
+     * @param grpId Group ID.
      * @param topVer Topology version.
      * @param discoCache Discovery cache.
      */
     public GridDhtAssignmentFetchFuture(
         GridCacheSharedContext ctx,
-        CacheGroupDescriptor grpDesc,
+        int grpId,
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
         this.topVer = topVer;
-        this.grpId = grpDesc.groupId();
+        this.grpId = grpId;
         this.ctx = ctx;
 
         id = idGen.getAndIncrement();
 
-        Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId());
+        Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpId);
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
@@ -127,8 +130,12 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
 
     /**
      * Initializes fetch future.
+     *
+     * @param needPartState {@code True} if also need fetch partitions state.
      */
-    public void init() {
+    public void init(boolean needPartState) {
+        this.needPartState = needPartState;
+
         ctx.affinity().addDhtAssignmentFetchFuture(this);
 
         requestFromNextNode();
@@ -195,7 +202,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                             ", node=" + node + ']');
 
                     ctx.io().send(node,
-                        new GridDhtAffinityAssignmentRequest(id, grpId, topVer),
+                        new GridDhtAffinityAssignmentRequest(id, grpId, topVer, needPartState),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 4e0608d..c6795a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -54,15 +55,15 @@ public interface GridDhtPartitionTopology {
     /**
      * Updates topology version.
      *
-     * @param exchId Exchange ID.
      * @param exchFut Exchange future.
      * @param updateSeq Update sequence.
      * @param stopping Stopping flag.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void updateTopologyVersion(
-        GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut,
+        AffinityTopologyVersion topVer,
+        GridDhtTopologyFuture exchFut,
+        DiscoCache discoCache,
         long updateSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException;
@@ -233,12 +234,13 @@ public interface GridDhtPartitionTopology {
     public void onRemoved(GridDhtCacheEntry e);
 
     /**
-     * @param exchFut Exchange future.
+     * @param exchangeVer Exchange version.
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
-    public GridDhtPartitionMap update(@Nullable GridDhtPartitionsExchangeFuture exchFut,
+    public GridDhtPartitionMap update(
+        @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap,
         Set<Integer> partsToReload);

http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6cc143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 791bac0..bd99535 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -98,7 +98,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private final Map<Integer, Set<UUID>> part2node;
 
     /** */
-    private GridDhtPartitionExchangeId lastExchangeId;
+    private AffinityTopologyVersion lastExchangeVer;
 
     /** */
     private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
@@ -162,7 +162,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             part2node.clear();
 
-            lastExchangeId = null;
+            lastExchangeVer = null;
 
             updateSeq.set(1);
 
@@ -209,16 +209,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
-        GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut,
+        AffinityTopologyVersion exchTopVer,
+        GridDhtTopologyFuture exchFut,
+        DiscoCache discoCache,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {
         U.writeLock(lock);
 
         try {
-            assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
-                ", exchId=" + exchId +
+            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
+                ", exchTopVer=" + exchTopVer +
                 ", fut=" + exchFut + ']';
 
             this.stopping = stopping;
@@ -229,9 +230,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             rebalancedTopVer = AffinityTopologyVersion.NONE;
 
-            topVer = exchId.topologyVersion();
+            topVer = exchTopVer;
 
-            discoCache = exchFut.discoCache();
+            discoCache = discoCache;
         }
         finally {
             lock.writeLock().unlock();
@@ -1016,15 +1017,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Override public GridDhtPartitionMap update(
-        @Nullable GridDhtPartitionsExchangeFuture exchFut,
+        @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap,
         Set<Integer> partsToReload
     ) {
-        GridDhtPartitionExchangeId exchId = exchFut != null ? exchFut.exchangeId() : null;
-
         if (log.isDebugEnabled())
-            log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
+            log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
 
         assert partMap != null;
 
@@ -1057,25 +1056,24 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            //if need skip
-            if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) {
+            if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                    log.debug("Stale exchange id for full partition map update (will ignore) [lastExch=" +
+                        lastExchangeVer + ", exch=" + exchangeVer + ']');
 
                 return null;
             }
 
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
+                    log.debug("Stale partition map for full partition map update (will ignore) [lastExch=" +
+                        lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
                 return null;
             }
 
-            if (exchId != null)
-                lastExchangeId = exchId;
+            if (exchangeVer != null)
+                lastExchangeVer = exchangeVer;
 
             if (node2part != null) {
                 for (GridDhtPartitionMap part : node2part.values()) {
@@ -1088,8 +1086,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
-                            log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
-                                mapString(part) + ", newPart=" + mapString(newPart) + ']');
+                            log.debug("Overriding partition map in full update map [exch=" + exchangeVer +
+                                ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']');
 
                         partMap.put(part.nodeId(), part);
                     }
@@ -1286,16 +1284,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return null;
 
-            if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
+            if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExch=" +
+                        lastExchangeVer + ", exch=" + exchId.topologyVersion() + ']');
 
                 return null;
             }
 
             if (exchId != null)
-                lastExchangeId = exchId;
+                lastExchangeVer = exchId.topologyVersion();
 
             if (node2part == null)
                 // Create invalid partition map.


Mime
View raw message