ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/4] ignite git commit: ignite-5272 Do not use synchronous custom discovery event for client cache start/close
Date Thu, 15 Jun 2017 06:23:50 GMT
ignite-5272 Do not use synchronous custom discovery event for client cache start/close


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4026ddcc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4026ddcc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4026ddcc

Branch: refs/heads/master
Commit: 4026ddcc2161ca4bf5ad3dc04cabd284c608738a
Parents: 3fd9e04
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Jun 15 09:23:36 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jun 15 09:23:36 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  12 +
 .../internal/managers/discovery/DiscoCache.java |  17 -
 .../discovery/GridDiscoveryManager.java         |  88 ++-
 .../affinity/GridAffinityProcessor.java         |  24 +-
 .../cache/CacheAffinitySharedManager.java       | 612 +++++++++++++++----
 .../processors/cache/CacheGroupContext.java     |   3 +
 .../ClientCacheChangeDiscoveryMessage.java      | 176 ++++++
 .../ClientCacheChangeDummyDiscoveryMessage.java | 104 ++++
 .../cache/ClientCacheUpdateTimeout.java         |  44 ++
 .../processors/cache/ClusterCachesInfo.java     |  94 ++-
 .../cache/DynamicCacheChangeRequest.java        |  30 -
 .../processors/cache/ExchangeActions.java       |  87 +--
 .../processors/cache/GridCacheContext.java      |   9 +-
 .../processors/cache/GridCacheIoManager.java    | 213 +++++--
 .../GridCachePartitionExchangeManager.java      |  11 +-
 .../processors/cache/GridCacheProcessor.java    | 364 ++++++-----
 .../dht/ClientCacheDhtTopologyFuture.java       |  78 +++
 .../dht/GridClientPartitionTopology.java        |  41 +-
 .../dht/GridDhtAffinityAssignmentRequest.java   |  40 +-
 .../dht/GridDhtAffinityAssignmentResponse.java  |  66 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |  22 +-
 .../distributed/dht/GridDhtCacheEntry.java      |  10 +-
 .../dht/GridDhtPartitionTopology.java           |  14 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  49 +-
 .../dht/GridDhtTopologyFutureAdapter.java       | 195 ++++++
 .../dht/GridDhtTxPrepareRequest.java            |  43 +-
 .../dht/GridDhtTxPrepareResponse.java           |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  14 +-
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |  11 +-
 .../GridDhtPartitionsExchangeFuture.java        | 201 ++----
 .../cache/transactions/IgniteTxEntry.java       |   7 +
 .../cache/transactions/IgniteTxHandler.java     |  13 +-
 .../cache/transactions/IgniteTxManager.java     |  27 +
 .../GridProjectionForCachesSelfTest.java        |  24 +-
 .../IgniteClientReconnectCacheTest.java         |  29 +-
 .../cache/CacheStopAndDestroySelfTest.java      |  38 +-
 .../cache/GridCacheClearSelfTest.java           |  15 +-
 ...IgniteClientCacheInitializationFailTest.java |   6 +-
 .../IgniteClientCacheStartFailoverTest.java     | 585 ++++++++++++++++++
 .../IgniteDynamicClientCacheStartSelfTest.java  | 248 +++++++-
 .../cache/IgniteNearClientCacheCloseTest.java   | 262 ++++++++
 ...gniteTopologyValidatorAbstractCacheTest.java | 178 ++++--
 ...ologyValidatorAbstractTxCacheGroupsTest.java |  12 +-
 ...iteTopologyValidatorAbstractTxCacheTest.java |  28 +-
 .../CacheLateAffinityAssignmentTest.java        |  16 +-
 ...teCacheClientNodePartitionsExchangeTest.java |  23 +-
 .../testframework/junits/GridAbstractTest.java  |   2 +
 .../testsuites/IgniteCacheTestSuite2.java       |   4 +
 48 files changed, 3270 insertions(+), 925 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 539f288..d9c112c 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import javax.net.ssl.HostnameVerifier;
+import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
@@ -623,6 +624,17 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_SECURITY_COMPATIBILITY_MODE = "IGNITE_SECURITY_COMPATIBILITY_MODE";
 
     /**
+     * When client cache is started or closed special discovery message is sent to notify cluster (for example this is
+     * needed for {@link ClusterGroup#forCacheNodes(String)} API. This timeout specifies how long to wait
+     * after client cache start/close before sending this message. If during this timeout another client
+     * cache changed, these events are combined into single message.
+     * <p>
+     * Default is 10 seconds.
+     */
+    public static final String IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT =
+        "IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 22c2d07..2b3c4fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -74,10 +74,6 @@ public class DiscoCache {
     /** Node map. */
     private final Map<UUID, ClusterNode> nodeMap;
 
-    /** Caches where at least one node has near cache enabled. */
-    @GridToStringInclude
-    private final Set<Integer> nearEnabledCaches;
-
     /** Alive nodes. */
     private final Set<UUID> alives = new GridConcurrentHashSet<>();
 
@@ -93,7 +89,6 @@ public class DiscoCache {
      * @param allCacheNodes Cache nodes by cache name.
      * @param cacheGrpAffNodes Affinity nodes by cache group ID.
      * @param nodeMap Node map.
-     * @param nearEnabledCaches Caches where at least one node has near cache enabled.
      * @param alives Alive nodes.
      */
     DiscoCache(ClusterNode loc,
@@ -107,7 +102,6 @@ public class DiscoCache {
         Map<Integer, List<ClusterNode>> allCacheNodes,
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
-        Set<Integer> nearEnabledCaches,
         Set<UUID> alives) {
         this.loc = loc;
         this.rmtNodes = rmtNodes;
@@ -120,7 +114,6 @@ public class DiscoCache {
         this.allCacheNodes = allCacheNodes;
         this.cacheGrpAffNodes = cacheGrpAffNodes;
         this.nodeMap = nodeMap;
-        this.nearEnabledCaches = nearEnabledCaches;
         this.alives.addAll(alives);
     }
 
@@ -243,16 +236,6 @@ public class DiscoCache {
     }
 
     /**
-     * Checks if cache with given ID has at least one node with near cache enabled.
-     *
-     * @param cacheId Cache ID.
-     * @return {@code True} if cache with given name has at least one node with near cache enabled.
-     */
-    public boolean hasNearCache(int cacheId) {
-        return nearEnabledCaches.contains(cacheId);
-    }
-
-    /**
      * @param id Node ID.
      * @return Node.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index c91ff74..0f0d1c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -71,8 +71,11 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage;
+import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -357,11 +360,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * Called from discovery thread. Adds dynamic cache filter.
      *
+     * @param cacheId Cache ID.
      * @param grpId Cache group ID.
      * @param cacheName Cache name.
      * @param nearEnabled Near enabled flag.
      */
     public void setCacheFilter(
+        int cacheId,
         int grpId,
         String cacheName,
         boolean nearEnabled
@@ -374,7 +379,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             if (grp.cacheMode == CacheMode.REPLICATED)
                 nearEnabled = false;
 
-            registeredCaches.put(cacheName, new CachePredicate(grp, nearEnabled));
+            registeredCaches.put(cacheName, new CachePredicate(cacheId, grp, nearEnabled));
         }
     }
 
@@ -586,8 +591,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     assert customMsg != null;
 
                     boolean incMinorTopVer = ctx.cache().onCustomEvent(
-                        customMsg, new AffinityTopologyVersion(topVer, minorTopVer)
-                    );
+                        customMsg,
+                        new AffinityTopologyVersion(topVer, minorTopVer),
+                        node);
 
                     if (incMinorTopVer) {
                         minorTopVer++;
@@ -1860,17 +1866,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * Checks if cache with given ID has at least one node with near cache enabled.
-     *
-     * @param cacheId Cache ID.
-     * @param topVer Topology version.
-     * @return {@code True} if cache with given name has at least one node with near cache enabled.
-     */
-    public boolean hasNearCache(int cacheId, AffinityTopologyVersion topVer) {
-        return resolveDiscoCache(cacheId, topVer).hasNearCache(cacheId);
-    }
-
-    /**
      * Gets discovery cache for given topology version.
      *
      * @param grpId Cache group ID (participates in exception message).
@@ -1994,6 +1989,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param reqId Start request ID.
+     * @param startReqs Cache start requests.
+     * @param cachesToClose Cache to close.
+     */
+    public void clientCacheStartEvent(UUID reqId,
+        @Nullable Map<String, DynamicCacheChangeRequest> startReqs,
+        @Nullable Set<String> cachesToClose) {
+        discoWrk.addEvent(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
+            AffinityTopologyVersion.NONE,
+            localNode(),
+            null,
+            Collections.<ClusterNode>emptyList(),
+            new ClientCacheChangeDummyDiscoveryMessage(reqId, startReqs, cachesToClose));
+    }
+
+    /**
      * Gets first grid node start time, see {@link DiscoverySpi#getGridStartTime()}.
      *
      * @return Start time of the first grid node.
@@ -2109,8 +2120,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
         Set<ClusterNode> srvNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
 
-        Set<Integer> nearEnabledCaches = new HashSet<>();
-
         for (ClusterNode node : allNodes) {
             assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
             assert !node.isDaemon();
@@ -2143,9 +2152,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         rmtNodesWithCaches.add(node);
 
                     addToMap(allCacheNodes, cacheName, node);
-
-                    if (filter.nearNode(node))
-                        nearEnabledCaches.add(CU.cacheId(cacheName));
                 }
             }
         }
@@ -2162,7 +2168,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             Collections.unmodifiableMap(allCacheNodes),
             Collections.unmodifiableMap(cacheGrpAffNodes),
             Collections.unmodifiableMap(nodeMap),
-            Collections.unmodifiableSet(nearEnabledCaches),
             alives);
     }
 
@@ -2808,23 +2813,32 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      * Cache predicate.
      */
-    private static class CachePredicate {
+    private class CachePredicate {
+        /** */
+        private final int cacheId;
+
         /** Cache filter. */
         private final CacheGroupAffinity aff;
 
         /** If near cache is enabled on data nodes. */
         private final boolean nearEnabled;
 
-        /** Collection of client near nodes. */
+        /**
+         * Collection of client nodes.
+         *
+         * Note: if client cache started/closed this map is updated asynchronously.
+         */
         private final ConcurrentHashMap<UUID, Boolean> clientNodes;
 
         /**
+         * @param cacheId Cache ID.
          * @param aff Cache group affinity.
          * @param nearEnabled Near enabled flag.
          */
-        private CachePredicate(CacheGroupAffinity aff, boolean nearEnabled) {
+        private CachePredicate(int cacheId, CacheGroupAffinity aff, boolean nearEnabled) {
             assert aff != null;
 
+            this.cacheId = cacheId;
             this.aff = aff;
             this.nearEnabled = nearEnabled;
 
@@ -2836,7 +2850,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @param nearEnabled Near enabled flag.
          * @return {@code True} if new node ID was added.
          */
-        public boolean addClientNode(UUID nodeId, boolean nearEnabled) {
+        boolean addClientNode(UUID nodeId, boolean nearEnabled) {
             assert nodeId != null;
 
             Boolean old = clientNodes.putIfAbsent(nodeId, nearEnabled);
@@ -2868,19 +2882,20 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
          * @param node Node to check.
          * @return {@code True} if cache is accessible on the given node.
          */
-        public boolean cacheNode(ClusterNode node) {
-            return !node.isDaemon() && (CU.affinityNode(node, aff.cacheFilter) || clientNodes.containsKey(node.id()));
+        boolean cacheNode(ClusterNode node) {
+            return !node.isDaemon() && (CU.affinityNode(node, aff.cacheFilter) ||
+                cacheClientNode(node) != null);
         }
 
         /**
          * @param node Node to check.
          * @return {@code True} if near cache is present on the given nodes.
          */
-        public boolean nearNode(ClusterNode node) {
+        boolean nearNode(ClusterNode node) {
             if (CU.affinityNode(node, aff.cacheFilter))
                 return nearEnabled;
 
-            Boolean near = clientNodes.get(node.id());
+            Boolean near = cacheClientNode(node);
 
             return near != null && near;
         }
@@ -2893,9 +2908,24 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             if (node.isDaemon())
                 return false;
 
-            Boolean near = clientNodes.get(node.id());
+            Boolean near = cacheClientNode(node);
 
             return near != null && !near;
         }
+
+        /**
+         * @param node Node.
+         * @return {@code Null} if client cache does not exist, otherwise cache near enabled flag.
+         */
+        private Boolean cacheClientNode(ClusterNode node) {
+            // On local node check actual cache state since clientNodes map is updated asynchronously.
+            if (ctx.localNodeId().equals(node.id())) {
+                GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
+
+                return cctx != null ? CU.isNearEnabled(cctx) : null;
+            }
+
+            return clientNodes.get(node.id());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 87c424a..6b43fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -39,6 +39,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -364,18 +365,9 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         if (fut != null)
             return fut.get();
 
-        ClusterNode loc = ctx.discovery().localNode();
-
-        // Check local node.
-        Collection<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer);
-
-        if (cacheNodes.contains(loc)) {
-            GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-
-            // Cache is being stopped.
-            if (cache == null)
-                return null;
+        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
 
+        if (cache != null) {
             GridCacheContext<Object, Object> cctx = cache.context();
 
             cctx.awaitStarted();
@@ -412,6 +404,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             }
         }
 
+        Collection<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer);
+
         if (F.isEmpty(cacheNodes))
             return null;
 
@@ -443,7 +437,13 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
 
             CacheMode mode = ctx.cache().cacheMode(cacheName);
 
-            assert mode != null;
+            if (mode == null) {
+                if (ctx.clientDisconnected())
+                    throw new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                            "Failed to get affinity mapping, client disconnected.");
+
+                throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName);
+            }
 
             // Map all keys to a single node, if the cache mode is LOCAL.
             if (mode == LOCAL) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index b6fe90b..0f62db2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -28,7 +29,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -36,13 +39,18 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.ClientCacheDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -68,6 +76,10 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
  */
 @SuppressWarnings("ForLoopReplaceableByForEach")
 public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
+    /** */
+    private final long clientCacheMsgTimeout =
+        IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT, 10_000);
+
     /** Late affinity assignment flag. */
     private boolean lateAffAssign;
 
@@ -81,7 +93,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private AffinityTopologyVersion lastAffVer;
 
     /** Registered caches (updated from exchange thread). */
-    private final Map<Integer, CacheGroupDescriptor> registeredGrps = new HashMap<>();
+    private final CachesInfo caches = new CachesInfo();
 
     /** */
     private WaitRebalanceInfo waitInfo;
@@ -93,6 +105,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private final ConcurrentMap<Long, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
         new ConcurrentHashMap8<>();
 
+    /** */
+    private final ThreadLocal<ClientCacheChangeDiscoveryMessage> clientCacheChanges = new ThreadLocal<>();
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -126,14 +141,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
         if (type == EVT_NODE_JOINED && node.isLocal()) {
             // Clean-up in case of client reconnect.
-            registeredGrps.clear();
+            caches.clear();
 
             affCalcVer = null;
 
             lastAffVer = null;
 
-            for (CacheGroupDescriptor desc : cctx.cache().cacheGroupDescriptors().values())
-                registeredGrps.put(desc.groupId(), desc);
+            caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors());
         }
 
         if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) {
@@ -319,20 +333,312 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param exchActions Cache change requests to execute on exchange.
+     * @param reqId Request ID.
+     * @param startReqs Client cache start request.
+     * @return Descriptors for caches to start.
      */
-    private void updateCachesInfo(ExchangeActions exchActions) {
-        for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) {
-            CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId());
+    @Nullable private List<DynamicCacheDescriptor> clientCachesToStart(UUID reqId,
+        Map<String, DynamicCacheChangeRequest> startReqs) {
+        List<DynamicCacheDescriptor> startDescs = new ArrayList<>(startReqs.size());
+
+        for (DynamicCacheChangeRequest startReq : startReqs.values()) {
+            DynamicCacheDescriptor desc = caches.cache(CU.cacheId(startReq.cacheName()));
+
+            if (desc == null) {
+                CacheException err = new CacheException("Failed to start client cache " +
+                    "(a cache with the given name is not started): " + startReq.cacheName());
+
+                cctx.cache().completeClientCacheChangeFuture(reqId, err);
 
-            assert rmvd != null : stopDesc.cacheOrGroupName();
+                return null;
+            }
+
+            if (cctx.cacheContext(desc.cacheId()) != null)
+                continue;
+
+            startDescs.add(desc);
         }
 
-        for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) {
-            CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc);
+        return startDescs;
+    }
 
-            assert old == null : old;
+    /**
+     * @param msg Change request.
+     * @param crd Coordinator flag.
+     * @param topVer Current topology version.
+     * @param discoCache Discovery data cache.
+     * @return Map of started caches (cache ID to near enabled flag).
+     */
+    @Nullable private Map<Integer, Boolean> processClientCacheStartRequests(
+        ClientCacheChangeDummyDiscoveryMessage msg,
+        boolean crd,
+        AffinityTopologyVersion topVer,
+        DiscoCache discoCache) {
+        Map<String, DynamicCacheChangeRequest> startReqs = msg.startRequests();
+
+        if (startReqs == null)
+            return null;
+
+        List<DynamicCacheDescriptor> startDescs = clientCachesToStart(msg.requestId(), msg.startRequests());
+
+        if (startDescs == null || startDescs.isEmpty()) {
+            cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
+
+            return null;
+        }
+
+        Map<Integer, GridDhtAssignmentFetchFuture> fetchFuts = U.newHashMap(startDescs.size());
+
+        Set<String> startedCaches = U.newHashSet(startDescs.size());
+
+        Map<Integer, Boolean> startedInfos = U.newHashMap(startDescs.size());
+
+        for (DynamicCacheDescriptor desc : startDescs) {
+            try {
+                startedCaches.add(desc.cacheName());
+
+                DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName());
+
+                cctx.cache().prepareCacheStart(desc, startReq.nearCacheConfiguration(), topVer);
+
+                startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null);
+
+                CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+                assert grp != null : desc.groupId();
+                assert !grp.affinityNode() || grp.isLocal() : grp.cacheOrGroupName();
+
+                if (!grp.isLocal() && grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
+                    assert grp.localStartVersion().equals(topVer) : grp.localStartVersion();
+
+                    if (crd) {
+                        CacheGroupHolder grpHolder = grpHolders.get(grp.groupId());
+
+                        assert grpHolder != null && grpHolder.affinity().idealAssignment() != null;
+
+                        if (grpHolder.client()) {
+                            ClientCacheDhtTopologyFuture topFut = new ClientCacheDhtTopologyFuture(topVer);
+
+                            grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+
+                            GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId());
+
+                            if (clientTop != null) {
+                                grp.topology().update(topVer,
+                                    clientTop.partitionMap(true),
+                                    clientTop.updateCounters(false));
+                            }
+
+                            grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
+
+                            grpHolders.put(grp.groupId(), grpHolder);
+
+                            assert grpHolder.affinity().lastVersion().equals(grp.affinity().lastVersion());
+                        }
+                    }
+                    else if (!fetchFuts.containsKey(grp.groupId())) {
+                        GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
+                            grp.groupId(),
+                            topVer,
+                            discoCache);
+
+                        fetchFut.init(true);
+
+                        fetchFuts.put(grp.groupId(), fetchFut);
+                    }
+                }
+            }
+            catch (IgniteCheckedException e) {
+                cctx.cache().closeCaches(startedCaches, false);
+
+                cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e);
+
+                return null;
+            }
         }
+
+        for (GridDhtAssignmentFetchFuture fetchFut : fetchFuts.values()) {
+            try {
+                CacheGroupContext grp = cctx.cache().cacheGroup(fetchFut.groupId());
+
+                assert grp != null;
+
+                GridDhtAffinityAssignmentResponse res = fetchAffinity(topVer,
+                    null,
+                    discoCache,
+                    grp.affinity(),
+                    fetchFut);
+
+                GridDhtPartitionFullMap partMap;
+                ClientCacheDhtTopologyFuture topFut;
+
+                if (res != null) {
+                    partMap = res.partitionMap();
+
+                    assert partMap != null : res;
+
+                    topFut = new ClientCacheDhtTopologyFuture(topVer);
+                }
+                else {
+                    partMap = new GridDhtPartitionFullMap(cctx.localNodeId(), cctx.localNode().order(), 1);
+
+                    topFut = new ClientCacheDhtTopologyFuture(topVer,
+                        new ClusterTopologyServerNotFoundException("All server nodes left grid."));
+                }
+
+                grp.topology().updateTopologyVersion(topFut, discoCache, -1, false);
+
+                grp.topology().update(topVer, partMap, null);
+
+                topFut.validate(grp, discoCache.allNodes());
+            }
+            catch (IgniteCheckedException e) {
+                cctx.cache().closeCaches(startedCaches, false);
+
+                cctx.cache().completeClientCacheChangeFuture(msg.requestId(), e);
+
+                return null;
+            }
+        }
+
+        cctx.cache().initCacheProxies(topVer, null);
+
+        cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
+
+        return startedInfos;
+    }
+
+    /**
+     * @param msg Change request.
+     * @param topVer Current topology version.
+     * @param crd Coordinator flag.
+     * @return Closed caches IDs.
+     */
+    private Set<Integer> processCacheCloseRequests(
+        ClientCacheChangeDummyDiscoveryMessage msg,
+        boolean crd,
+        AffinityTopologyVersion topVer) {
+        Set<String> cachesToClose = msg.cachesToClose();
+
+        if (cachesToClose == null)
+            return null;
+
+        Set<Integer> closed = cctx.cache().closeCaches(cachesToClose, true);
+
+        if (crd) {
+            for (CacheGroupHolder hld : grpHolders.values()) {
+                if (!hld.client() && cctx.cache().cacheGroup(hld.groupId()) == null) {
+                    int grpId = hld.groupId();
+
+                    // All client cache groups were stopped, need create 'client' CacheGroupHolder.
+                    CacheGroupHolder grpHolder = grpHolders.remove(grpId);
+
+                    assert grpHolder != null && !grpHolder.client() : grpHolder;
+
+                    try {
+                        grpHolder = CacheGroupHolder2.create(cctx,
+                            caches.group(grpId),
+                            topVer,
+                            grpHolder.affinity());
+
+                        grpHolders.put(grpId, grpHolder);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to initialize cache: " + e, e);
+                    }
+                }
+            }
+        }
+
+        cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null);
+
+        return closed;
+    }
+
+    /**
+     * Process client cache start/close requests, called from exchange thread.
+     *
+     * @param msg Change request.
+     */
+    void processClientCachesChanges(ClientCacheChangeDummyDiscoveryMessage msg) {
+        AffinityTopologyVersion topVer = cctx.exchange().readyAffinityVersion();
+
+        DiscoCache discoCache = cctx.discovery().discoCache(topVer);
+
+        boolean crd = cctx.localNode().equals(discoCache.oldestAliveServerNode());
+
+        Map<Integer, Boolean> startedCaches = processClientCacheStartRequests(msg, crd, topVer, discoCache);
+
+        Set<Integer> closedCaches = processCacheCloseRequests(msg, crd, topVer);
+
+        if (startedCaches != null || closedCaches != null)
+            scheduleClientChangeMessage(startedCaches, closedCaches);
+    }
+
+    /**
+     * Sends discovery message about started/closed client caches, called from exchange thread.
+     *
+     * @param timeoutObj Timeout object initiated send.
+     */
+    void sendClientCacheChangesMessage(ClientCacheUpdateTimeout timeoutObj) {
+        ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get();
+
+        // Timeout object was changed if one more client cache changed during timeout,
+        // another timeoutObj was scheduled.
+        if (msg != null && msg.updateTimeoutObject() == timeoutObj) {
+            assert !msg.empty() : msg;
+
+            clientCacheChanges.remove();
+
+            msg.checkCachesExist(caches.registeredCaches.keySet());
+
+            try {
+                if (!msg.empty())
+                    cctx.discovery().sendCustomEvent(msg);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send discovery event: " + e, e);
+            }
+        }
+    }
+
+    /**
+     * @param startedCaches Started caches.
+     * @param closedCaches Closed caches.
+     */
+    private void scheduleClientChangeMessage(Map<Integer, Boolean> startedCaches, Set<Integer> closedCaches) {
+        ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get();
+
+        if (msg == null) {
+            msg = new ClientCacheChangeDiscoveryMessage(startedCaches, closedCaches);
+
+            clientCacheChanges.set(msg);
+        }
+        else {
+            msg.merge(startedCaches, closedCaches);
+
+            if (msg.empty()) {
+                cctx.time().removeTimeoutObject(msg.updateTimeoutObject());
+
+                clientCacheChanges.remove();
+
+                return;
+            }
+        }
+
+        if (msg.updateTimeoutObject() != null)
+            cctx.time().removeTimeoutObject(msg.updateTimeoutObject());
+
+        long timeout = clientCacheMsgTimeout;
+
+        if (timeout <= 0)
+            timeout = 10_000;
+
+        ClientCacheUpdateTimeout timeoutObj = new ClientCacheUpdateTimeout(cctx, timeout);
+
+        msg.updateTimeoutObject(timeoutObj);
+
+        cctx.time().addTimeoutObject(timeoutObj);
     }
 
     /**
@@ -342,16 +648,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param crd Coordinator flag.
      * @param exchActions Cache change requests.
      * @throws IgniteCheckedException If failed.
-     * @return {@code True} if client-only exchange is needed.
      */
-    public boolean onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
+    public void onCacheChangeRequest(final GridDhtPartitionsExchangeFuture fut,
         boolean crd,
         final ExchangeActions exchActions)
         throws IgniteCheckedException
     {
         assert exchActions != null && !exchActions.empty() : exchActions;
 
-        updateCachesInfo(exchActions);
+        caches.updateCachesInfo(exchActions);
 
         // Affinity did not change for existing caches.
         forAllCacheGroups(crd && lateAffAssign, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@@ -363,7 +668,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         });
 
-        for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
+        for (ExchangeActions.ActionData action : exchActions.cacheStartRequests()) {
             DynamicCacheDescriptor cacheDesc = action.descriptor();
 
             DynamicCacheChangeRequest req = action.request();
@@ -396,81 +701,34 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 U.error(log, "Failed to initialize cache. Will try to rollback cache start routine. " +
                     "[cacheName=" + req.cacheName() + ']', e);
 
-                cctx.cache().forceCloseCache(fut.topologyVersion(), action, e);
-            }
-        }
-
-            Set<Integer> gprs = new HashSet<>();
-
-                for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) {
-                    Integer grpId = action.descriptor().groupId();
+                cctx.cache().closeCaches(Collections.singleton(req.cacheName()), false);
 
-                    if (gprs.add(grpId)) {
-                        if (crd && lateAffAssign)
-                    initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());else  {
-                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
-                        if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) {
-                        assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
-
-                        initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut);
-                    }
-                }
+                cctx.cache().completeCacheStartFuture(req, false, e);
             }
         }
 
-        List<ExchangeActions.ActionData> closeReqs = exchActions.closeRequests(cctx.localNodeId());
-
-        for (ExchangeActions.ActionData req : closeReqs) {
-            cctx.cache().blockGateway(req.request());
-
-            if (crd) {
-                CacheGroupContext grp = cctx.cache().cacheGroup(req.descriptor().groupId());
-
-                assert grp != null;
-
-                if (grp.affinityNode())
-                    continue;
-
-                boolean grpClosed = false;
-
-                if (grp.sharedGroup()) {
-                    boolean cacheRemaining = false;
-
-                    for (GridCacheContext ctx : cctx.cacheContexts()) {
-                        if (ctx.group() == grp && !cacheClosed(ctx.cacheId(), closeReqs)) {
-                            cacheRemaining = true;
-
-                            break;
-                        }
-                    }
-
-                    if (!cacheRemaining)
-                        grpClosed = true;
-                }
-                else
-                    grpClosed = true;
+        Set<Integer> gprs = new HashSet<>();
 
-                // All client cache groups were stopped, need create 'client' CacheGroupHolder.
-                if (grpClosed) {
-                    CacheGroupHolder grpHolder = grpHolders.remove(grp.groupId());
+        for (ExchangeActions.ActionData action : exchActions.cacheStartRequests()) {
+            Integer grpId = action.descriptor().groupId();
 
-                    if (grpHolder != null) {
-                        assert !grpHolder.client() : grpHolder;
+            if (gprs.add(grpId)) {
+                if (crd && lateAffAssign)
+                    initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor());
+                else  {
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                        grpHolder = CacheGroupHolder2.create(cctx,
-                            registeredGrps.get(grp.groupId()),
-                            fut,
-                            grp.affinity());
+                    if (grp != null && !grp.isLocal() && grp.localStartVersion().equals(fut.topologyVersion())) {
+                        assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion();
 
-                        grpHolders.put(grp.groupId(), grpHolder);
+                        initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
                     }
                 }
             }
         }
 
         for (ExchangeActions.ActionData action : exchActions.cacheStopRequests())
-            cctx.cache().blockGateway(action.request());
+            cctx.cache().blockGateway(action.request().cacheName(), true);
 
         Set<Integer> stoppedGrps = null;
 
@@ -519,21 +777,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         }
 
-        return exchActions.clientOnlyExchange();
-    }
+        ClientCacheChangeDiscoveryMessage msg = clientCacheChanges.get();
 
-    /**
-     * @param cacheId Cache ID.
-     * @param closeReqs Close requests.
-     * @return {@code True} if requests contain request for given cache ID.
-     */
-    private boolean cacheClosed(int cacheId, List<ExchangeActions.ActionData> closeReqs) {
-        for (ExchangeActions.ActionData req : closeReqs) {
-            if (req.descriptor().cacheId() == cacheId)
-                return true;
-        }
+        if (msg != null) {
+            msg.checkCachesExist(caches.registeredCaches.keySet());
 
-        return false;
+            if (msg.empty())
+                clientCacheChanges.remove();
+        }
     }
 
     /**
@@ -542,7 +793,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     public void removeAllCacheInfo(){
         grpHolders.clear();
 
-        registeredGrps.clear();
+        caches.clear();
     }
 
     /**
@@ -633,7 +884,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 assert affTopVer.topologyVersion() > 0 : affTopVer;
 
-                CacheGroupDescriptor desc = registeredGrps.get(aff.groupId());
+                CacheGroupDescriptor desc = caches.group(aff.groupId());
 
                 assert desc != null : aff.cacheOrGroupName();
 
@@ -763,7 +1014,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private void forAllRegisteredCacheGroups(IgniteInClosureX<CacheGroupDescriptor> c) throws IgniteCheckedException {
         assert lateAffAssign;
 
-        for (CacheGroupDescriptor cacheDesc : registeredGrps.values()) {
+        for (CacheGroupDescriptor cacheDesc : caches.allGroups()) {
             if (cacheDesc.config().getCacheMode() == LOCAL)
                 continue;
 
@@ -811,7 +1062,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (grpHolder == null) {
             grpHolder = grp != null ?
                 new CacheGroupHolder1(grp, null) :
-                CacheGroupHolder2.create(cctx, grpDesc, fut, null);
+                CacheGroupHolder2.create(cctx, grpDesc, fut.topologyVersion(), null);
 
             CacheGroupHolder old = grpHolders.put(grpId, grpHolder);
 
@@ -843,12 +1094,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     public void initStartedCaches(boolean crd,
         final GridDhtPartitionsExchangeFuture fut,
         Collection<DynamicCacheDescriptor> descs) throws IgniteCheckedException {
-        for (DynamicCacheDescriptor desc : descs) {
-            CacheGroupDescriptor grpDesc = desc.groupDescriptor();
-
-            if (!registeredGrps.containsKey(grpDesc.groupId()))
-                registeredGrps.put(grpDesc.groupId(), grpDesc);
-        }
+        caches.initStartedCaches(descs);
 
         if (crd && lateAffAssign) {
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() {
@@ -868,7 +1114,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
                 @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                     if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
-                        initAffinity(registeredGrps.get(aff.groupId()), aff, fut);
+                        initAffinity(caches.group(aff.groupId()), aff, fut);
                 }
             });
         }
@@ -893,13 +1139,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
         else {
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                desc,
+                desc.groupId(),
                 fut.topologyVersion(),
                 fut.discoCache());
 
-            fetchFut.init();
+            fetchFut.init(false);
 
-            fetchAffinity(fut, aff, fetchFut);
+            fetchAffinity(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                fut.discoCache(),
+                aff, fetchFut);
         }
     }
 
@@ -990,7 +1239,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         StringBuilder names = new StringBuilder();
 
         for (Integer grpId : grpIds) {
-            String name = registeredGrps.get(grpId).cacheOrGroupName();
+            String name = caches.group(grpId).cacheOrGroupName();
 
             if (names.length() != 0)
                 names.append(", ");
@@ -1022,16 +1271,16 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 grp.affinity().initialize(fut.topologyVersion(), assignment);
             }
             else {
-                CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId());
+                CacheGroupDescriptor grpDesc = caches.group(grp.groupId());
 
                 assert grpDesc != null : grp.cacheOrGroupName();
 
                 GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                    grpDesc,
+                    grpDesc.groupId(),
                     topVer,
                     fut.discoCache());
 
-                fetchFut.init();
+                fetchFut.init(false);
 
                 fetchFuts.add(fetchFut);
             }
@@ -1042,48 +1291,57 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             Integer grpId = fetchFut.groupId();
 
-            fetchAffinity(fut, cctx.cache().cacheGroup(grpId).affinity(), fetchFut);
+            fetchAffinity(fut.topologyVersion(),
+                fut.discoveryEvent(),
+                fut.discoCache(),
+                cctx.cache().cacheGroup(grpId).affinity(),
+                fetchFut);
         }
     }
 
     /**
-     * @param fut Exchange future.
+     * @param topVer Topology version.
+     * @param discoveryEvt Discovery event.
+     * @param discoCache Discovery data cache.
      * @param affCache Affinity.
      * @param fetchFut Affinity fetch future.
      * @throws IgniteCheckedException If failed.
+     * @return Affinity assignment response.
      */
-    private void fetchAffinity(GridDhtPartitionsExchangeFuture fut,
+    private GridDhtAffinityAssignmentResponse fetchAffinity(AffinityTopologyVersion topVer,
+        @Nullable DiscoveryEvent discoveryEvt,
+        DiscoCache discoCache,
         GridAffinityAssignmentCache affCache,
         GridDhtAssignmentFetchFuture fetchFut)
         throws IgniteCheckedException {
         assert affCache != null;
 
-        AffinityTopologyVersion topVer = fut.topologyVersion();
-
         GridDhtAffinityAssignmentResponse res = fetchFut.get();
 
         if (res == null) {
-            List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+            List<List<ClusterNode>> aff = affCache.calculate(topVer, discoveryEvt, discoCache);
 
             affCache.initialize(topVer, aff);
         }
         else {
-            List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(cctx.discovery());
+            List<List<ClusterNode>> idealAff = res.idealAffinityAssignment(discoCache);
 
             if (idealAff != null)
                 affCache.idealAssignment(idealAff);
             else {
                 assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
 
-                affCache.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
+                affCache.calculate(topVer, discoveryEvt, discoCache);
             }
 
-            List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery());
+            List<List<ClusterNode>> aff = res.affinityAssignment(discoCache);
 
             assert aff != null : res;
 
             affCache.initialize(topVer, aff);
         }
+
+        return res;
     }
 
     /**
@@ -1136,7 +1394,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (grp.isLocal())
                 continue;
 
-            initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut);
+            initAffinity(caches.group(grp.groupId()), grp.affinity(), fut);
         }
     }
 
@@ -1176,7 +1434,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         }
                     );
 
-                    grpHolder = CacheGroupHolder2.create(cctx, desc, fut, null);
+                    grpHolder = CacheGroupHolder2.create(cctx, desc, fut.topologyVersion(), null);
 
                     final GridAffinityAssignmentCache aff = grpHolder.affinity();
 
@@ -1198,18 +1456,21 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     assert prev.topologyVersion().compareTo(fut.topologyVersion()) < 0 : prev;
 
                     GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                        desc,
+                        desc.groupId(),
                         prev.topologyVersion(),
                         prev.discoCache());
 
-                    fetchFut.init();
+                    fetchFut.init(false);
 
                     final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
 
                     fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
                         @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
                             throws IgniteCheckedException {
-                            fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut);
+                            fetchAffinity(prev.topologyVersion(),
+                                prev.discoveryEvent(),
+                                prev.discoCache(),
+                                aff, (GridDhtAssignmentFetchFuture)fetchFut);
 
                             aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
@@ -1268,7 +1529,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 }
             );
 
-            cacheGrp = CacheGroupHolder2.create(cctx, desc, fut, null);
+            cacheGrp = CacheGroupHolder2.create(cctx, desc, fut.topologyVersion(), null);
         }
         else
             cacheGrp = new CacheGroupHolder1(grp, null);
@@ -1749,7 +2010,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         /**
          * @param cctx Context.
          * @param grpDesc Cache group descriptor.
-         * @param fut Exchange future.
+         * @param topVer Current exchange version.
          * @param initAff Current affinity.
          * @return Cache holder.
          * @throws IgniteCheckedException If failed.
@@ -1757,7 +2018,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         static CacheGroupHolder2 create(
             GridCacheSharedContext cctx,
             CacheGroupDescriptor grpDesc,
-            GridDhtPartitionsExchangeFuture fut,
+            AffinityTopologyVersion topVer,
             @Nullable GridAffinityAssignmentCache initAff) throws IgniteCheckedException {
             assert grpDesc != null;
             assert !cctx.kernalContext().clientNode();
@@ -1768,7 +2029,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             assert ccfg.getCacheMode() != LOCAL : ccfg.getName();
 
             assert !cctx.discovery().cacheGroupAffinityNodes(grpDesc.groupId(),
-                fut.topologyVersion()).contains(cctx.localNode()) : grpDesc.cacheOrGroupName();
+                topVer).contains(cctx.localNode()) : grpDesc.cacheOrGroupName();
 
             AffinityFunction affFunc = cctx.cache().clone(ccfg.getAffinity());
 
@@ -1872,7 +2133,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (cacheWaitParts == null) {
                 waitGrps.put(grpId, cacheWaitParts = new HashMap<>());
 
-                deploymentIds.put(grpId, registeredGrps.get(grpId).deploymentId());
+                deploymentIds.put(grpId, caches.group(grpId).deploymentId());
             }
 
             cacheWaitParts.put(part, waitNode);
@@ -1891,4 +2152,101 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 ", grps=" + (waitGrps != null ? waitGrps.keySet() : null) + ']';
         }
     }
+
+    /**
+     *
+     */
+    static class CachesInfo {
+        /** Registered cache groups (updated from exchange thread). */
+        private final ConcurrentHashMap<Integer, CacheGroupDescriptor> registeredGrps = new ConcurrentHashMap<>();
+
+        /** Registered caches (updated from exchange thread). */
+        private final ConcurrentHashMap<Integer, DynamicCacheDescriptor> registeredCaches = new ConcurrentHashMap<>();
+
+        /**
+         * @param grps Registered groups.
+         * @param caches Registered caches.
+         */
+        void init(Map<Integer, CacheGroupDescriptor> grps, Map<String, DynamicCacheDescriptor> caches) {
+            for (CacheGroupDescriptor grpDesc : grps.values())
+                registeredGrps.put(grpDesc.groupId(), grpDesc);
+
+            for (DynamicCacheDescriptor cacheDesc : caches.values())
+                registeredCaches.put(cacheDesc.cacheId(), cacheDesc);
+        }
+
+        /**
+         * @return All registered groups.
+         */
+        Collection<CacheGroupDescriptor> allGroups() {
+            return registeredGrps.values();
+        }
+
+        /**
+         * @param grpId Group ID.
+         * @return Group descriptor.
+         */
+        CacheGroupDescriptor group(int grpId) {
+            CacheGroupDescriptor desc = registeredGrps.get(grpId);
+
+            assert desc != null : grpId;
+
+            return desc;
+        }
+
+        /**
+          * @param descs Cache descriptor.
+         */
+        void initStartedCaches(Collection<DynamicCacheDescriptor> descs) {
+            for (DynamicCacheDescriptor desc : descs) {
+                CacheGroupDescriptor grpDesc = desc.groupDescriptor();
+
+                if (!registeredGrps.containsKey(grpDesc.groupId()))
+                    registeredGrps.put(grpDesc.groupId(), grpDesc);
+
+                if (!registeredCaches.containsKey(desc.cacheName()))
+                    registeredCaches.put(desc.cacheId(), desc);
+            }
+        }
+
+        /**
+         * @param exchActions Exchange actions.
+         */
+        void updateCachesInfo(ExchangeActions exchActions) {
+            for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) {
+                CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId());
+
+                assert rmvd != null : stopDesc.cacheOrGroupName();
+            }
+
+            for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) {
+                CacheGroupDescriptor old = registeredGrps.put(startDesc.groupId(), startDesc);
+
+                assert old == null : old;
+            }
+
+            for (ExchangeActions.ActionData req : exchActions.cacheStopRequests())
+                registeredCaches.remove(req.descriptor().cacheId());
+
+            for (ExchangeActions.ActionData req : exchActions.cacheStartRequests())
+                registeredCaches.put(req.descriptor().cacheId(), req.descriptor());
+        }
+
+        /**
+         * @param cacheId Cache ID.
+         * @return Cache descriptor if cache found.
+         */
+        @Nullable DynamicCacheDescriptor cache(Integer cacheId) {
+            return registeredCaches.get(cacheId);
+        }
+
+        /**
+         *
+         */
+        void clear() {
+            registeredGrps.clear();
+
+            registeredCaches.clear();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 4844a55..196df57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -898,6 +898,9 @@ public class CacheGroupContext {
             res.idealAffinityAssignment(assignment.idealAssignment());
         }
 
+        if (req.sendPartitionsState())
+            res.partitionMap(top.partitionMap(true));
+
         try {
             ctx.io().send(nodeId, res, AFFINITY_POOL);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
new file mode 100644
index 0000000..3d120f7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDiscoveryMessage.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Sent from cache client node to asynchronously notify about started.closed client caches.
+ */
+public class ClientCacheChangeDiscoveryMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final IgniteUuid id = IgniteUuid.randomUuid();;
+
+    /** */
+    @GridToStringInclude
+    private Map<Integer, Boolean> startedCaches;
+
+    /** */
+    @GridToStringInclude
+    private Set<Integer> closedCaches;
+
+    /** Update timeout object, used to batch multiple starts/close into single discovery message. */
+    private transient ClientCacheUpdateTimeout updateTimeoutObj;
+
+    /**
+     * @param startedCaches Started caches.
+     * @param closedCaches Closed caches.
+     */
+    public ClientCacheChangeDiscoveryMessage(Map<Integer, Boolean> startedCaches, Set<Integer> closedCaches) {
+        this.startedCaches = startedCaches;
+        this.closedCaches = closedCaches;
+    }
+
+    /**
+     * @param startedCaches Started caches.
+     * @param closedCaches Closed caches.
+     */
+    public void merge(@Nullable Map<Integer, Boolean> startedCaches, @Nullable Set<Integer> closedCaches) {
+        Map<Integer, Boolean> startedCaches0 = this.startedCaches;
+        Set<Integer> closedCaches0 = this.closedCaches;
+
+        if (startedCaches != null) {
+            if (startedCaches0 == null)
+                this.startedCaches = startedCaches0 = new HashMap<>();
+
+            for (Map.Entry<Integer, Boolean> e : startedCaches.entrySet()) {
+                if (closedCaches0 != null && closedCaches0.remove(e.getKey()))
+                    continue;
+
+                Boolean old = startedCaches0.put(e.getKey(), e.getValue());
+
+                assert old == null : e.getKey();
+            }
+        }
+
+        if (closedCaches != null) {
+            if (closedCaches0 == null)
+                this.closedCaches = closedCaches0 = new HashSet<>();
+
+            for (Integer cacheId : closedCaches) {
+                if (startedCaches0 != null && startedCaches0.remove(cacheId) != null)
+                    continue;
+
+                boolean add = closedCaches0.add(cacheId);
+
+                assert add : cacheId;
+            }
+        }
+    }
+
+    /**
+     * @return {@code True} if there are no info about started/closed caches.
+     */
+    public boolean empty() {
+        return F.isEmpty(startedCaches) && F.isEmpty(closedCaches);
+    }
+
+    /**
+     * @param caches Started caches' IDs.
+     */
+    void checkCachesExist(Set<Integer> caches) {
+        if (closedCaches != null) {
+            for (Iterator<Integer> it = closedCaches.iterator(); it.hasNext();) {
+                Integer cacheId = it.next();
+
+                if (!caches.contains(cacheId))
+                    it.remove();
+            }
+        }
+
+        if (startedCaches != null) {
+            for (Iterator<Integer> it = startedCaches.keySet().iterator(); it.hasNext();) {
+                Integer cacheId = it.next();
+
+                if (!caches.contains(cacheId))
+                    it.remove();
+            }
+        }
+    }
+
+    /**
+     * @return Update timeout object.
+     */
+    public ClientCacheUpdateTimeout updateTimeoutObject() {
+        return updateTimeoutObj;
+    }
+
+    /**
+     * @param updateTimeoutObj Update timeout object.
+     */
+    public void updateTimeoutObject(ClientCacheUpdateTimeout updateTimeoutObj) {
+        this.updateTimeoutObj = updateTimeoutObj;
+    }
+
+    /**
+     * @return Started caches map (cache ID to near enabled flag).
+     */
+    @Nullable public Map<Integer, Boolean> startedCaches() {
+        return startedCaches;
+    }
+
+    /**
+     * @return Closed caches.
+     */
+    @Nullable public Set<Integer> closedCaches() {
+        return closedCaches;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ClientCacheChangeDiscoveryMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
new file mode 100644
index 0000000..68bca27
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy discovery message which is not really sent via ring, it is just added in local discovery worker queue.
+ */
+public class ClientCacheChangeDummyDiscoveryMessage implements DiscoveryCustomMessage,
+    CachePartitionExchangeWorkerTask {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final UUID reqId;
+
+    /** */
+    private final Map<String, DynamicCacheChangeRequest> startReqs;
+
+    /** */
+    @GridToStringInclude
+    private final Set<String> cachesToClose;
+
+    /**
+     * @param reqId Start request ID.
+     * @param startReqs Caches start requests.
+     * @param cachesToClose Cache to close.
+     */
+    public ClientCacheChangeDummyDiscoveryMessage(UUID reqId,
+        @Nullable Map<String, DynamicCacheChangeRequest> startReqs,
+        @Nullable Set<String> cachesToClose) {
+        assert reqId != null;
+        assert startReqs != null ^ cachesToClose != null;
+
+        this.reqId = reqId;
+        this.startReqs = startReqs;
+        this.cachesToClose = cachesToClose;
+    }
+
+    /**
+     * @return Start request ID.
+     */
+    UUID requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Cache start requests.
+     */
+    @Nullable Map<String, DynamicCacheChangeRequest> startRequests() {
+        return startReqs;
+    }
+
+    /**
+     * @return Client caches to close.
+     */
+    Set<String> cachesToClose() {
+        return cachesToClose;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ClientCacheChangeDummyDiscoveryMessage.class, this,
+            "startCaches", (startReqs != null ? startReqs.keySet() : ""));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
new file mode 100644
index 0000000..aab3a3e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheUpdateTimeout.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+
+/**
+ *
+ */
+class ClientCacheUpdateTimeout extends GridTimeoutObjectAdapter implements CachePartitionExchangeWorkerTask {
+    /** */
+    private final GridCacheSharedContext cctx;
+
+    /**
+     * @param cctx Context.
+     * @param timeout Timeout.
+     */
+    ClientCacheUpdateTimeout(GridCacheSharedContext cctx, long timeout) {
+        super(timeout);
+
+        this.cctx = cctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        if (!cctx.kernalContext().isStopping())
+            cctx.exchange().addCustomTask(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index d5718f8..7398074 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -280,6 +280,40 @@ class ClusterCachesInfo {
     }
 
     /**
+     * @param msg Message.
+     * @param node Node sent message.
+     */
+    void onClientCacheChange(ClientCacheChangeDiscoveryMessage msg, ClusterNode node) {
+        Map<Integer, Boolean> startedCaches = msg.startedCaches();
+
+        if (startedCaches != null) {
+            for (Map.Entry<Integer, Boolean> e : startedCaches.entrySet()) {
+                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                    if (e.getKey().equals(desc.cacheId())) {
+                        ctx.discovery().addClientNode(desc.cacheName(), node.id(), e.getValue());
+
+                        break;
+                    }
+                }
+            }
+        }
+
+        Set<Integer> closedCaches = msg.closedCaches();
+
+        if (closedCaches != null) {
+            for (Integer cacheId : closedCaches) {
+                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+                    if (cacheId.equals(desc.cacheId())) {
+                        ctx.discovery().onClientCacheClose(desc.cacheName(), node.id());
+
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
      * @param batch Cache change request.
      * @param topVer Topology version.
      * @return {@code True} if minor topology version should be increased.
@@ -325,10 +359,14 @@ class ClusterCachesInfo {
                 continue;
             }
 
+            assert !req.clientStartOnly() : req;
+
             DynamicCacheDescriptor desc = req.globalStateChange() ? null : registeredCaches.get(req.cacheName());
 
             boolean needExchange = false;
 
+            boolean clientCacheStart = false;
+
             AffinityTopologyVersion waitTopVer = null;
 
             if (req.start()) {
@@ -388,6 +426,7 @@ class ClusterCachesInfo {
                         assert old == null;
 
                         ctx.discovery().setCacheFilter(
+                            startDesc.cacheId(),
                             grpDesc.groupId(),
                             ccfg.getName(),
                             ccfg.getNearConfiguration() != null);
@@ -406,40 +445,35 @@ class ClusterCachesInfo {
                 else {
                     assert req.initiatingNodeId() != null : req;
 
-                    // Cache already exists, exchange is needed only if client cache should be created.
-                    ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
-
-                    boolean clientReq = node != null &&
-                        !ctx.discovery().cacheAffinityNode(node, req.cacheName());
-
-                    if (req.clientStartOnly()) {
-                        needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
-                            req.initiatingNodeId(),
-                            req.nearCacheConfiguration() != null);
+                    if (req.failIfExists()) {
+                        ctx.cache().completeCacheStartFuture(req, false,
+                            new CacheExistsException("Failed to start cache " +
+                                "(a cache with the same name is already started): " + req.cacheName()));
                     }
                     else {
-                        if (req.failIfExists()) {
-                            ctx.cache().completeCacheStartFuture(req, false,
-                                new CacheExistsException("Failed to start cache " +
-                                    "(a cache with the same name is already started): " + req.cacheName()));
-                        }
-                        else {
-                            needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
+                        // Cache already exists, it is possible client cache is needed.
+                        ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
+
+                        boolean clientReq = node != null &&
+                            !ctx.discovery().cacheAffinityNode(node, req.cacheName());
+
+                        if (clientReq) {
+                            ctx.discovery().addClientNode(req.cacheName(),
                                 req.initiatingNodeId(),
                                 req.nearCacheConfiguration() != null);
-                        }
-                    }
 
-                    if (needExchange) {
-                        req.clientStartOnly(true);
+                            if (node.id().equals(req.initiatingNodeId())) {
+                                desc.clientCacheStartVersion(topVer);
 
-                        desc.clientCacheStartVersion(topVer.nextMinorVersion());
+                                clientCacheStart = true;
 
-                        exchangeActions.addClientCacheToStart(req, desc);
+                                ctx.discovery().clientCacheStartEvent(req.requestId(), F.asMap(req.cacheName(), req), null);
+                            }
+                        }
                     }
                 }
 
-                if (!needExchange && desc != null) {
+                if (!needExchange && !clientCacheStart && desc != null) {
                     if (desc.clientCacheStartVersion() != null)
                         waitTopVer = desc.clientCacheStartVersion();
                     else {
@@ -508,19 +542,11 @@ class ClusterCachesInfo {
                     }
                 }
             }
-            else if (req.close()) {
-                if (desc != null) {
-                    needExchange = ctx.discovery().onClientCacheClose(req.cacheName(), req.initiatingNodeId());
-
-                    if (needExchange)
-                        exchangeActions.addCacheToClose(req, desc);
-                }
-            }
             else
                 assert false : req;
 
             if (!needExchange) {
-                if (req.initiatingNodeId().equals(ctx.localNodeId()))
+                if (!clientCacheStart && req.initiatingNodeId().equals(ctx.localNodeId()))
                     reqsToComplete.add(new T2<>(req, waitTopVer));
             }
             else
@@ -849,6 +875,7 @@ class ClusterCachesInfo {
             registeredCaches.put(cacheData.cacheConfiguration().getName(), desc);
 
             ctx.discovery().setCacheFilter(
+                desc.cacheId(),
                 grpDesc.groupId(),
                 cfg.getName(),
                 cfg.getNearConfiguration() != null);
@@ -1082,6 +1109,7 @@ class ClusterCachesInfo {
                     joinData.cacheDeploymentId());
 
                 ctx.discovery().setCacheFilter(
+                    cacheId,
                     grpDesc.groupId(),
                     cfg.getName(),
                     cfg.getNearConfiguration() != null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 5434061..c6da43f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -67,9 +67,6 @@ public class DynamicCacheChangeRequest implements Serializable {
     /** Destroy. */
     private boolean destroy;
 
-    /** Close flag. */
-    private boolean close;
-
     /** Whether cache was created through SQL. */
     private boolean sql;
 
@@ -155,19 +152,6 @@ public class DynamicCacheChangeRequest implements Serializable {
     /**
      * @param ctx Context.
      * @param cacheName Cache name.
-     * @return Request to close client cache.
-     */
-    static DynamicCacheChangeRequest closeRequest(GridKernalContext ctx, String cacheName) {
-        DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
-
-        req.close(true);
-
-        return req;
-    }
-
-    /**
-     * @param ctx Context.
-     * @param cacheName Cache name.
      * @param sql {@code true} if the cache must be stopped only if it was created by SQL command {@code CREATE TABLE}.
      * @param destroy Destroy flag.
      * @return Cache stop request.
@@ -372,20 +356,6 @@ public class DynamicCacheChangeRequest implements Serializable {
     }
 
     /**
-     * @return Close flag.
-     */
-    public boolean close() {
-        return close;
-    }
-
-    /**
-     * @param close New close flag.
-     */
-    public void close(boolean close) {
-        this.close = close;
-    }
-
-    /**
      * @return SQL flag.
      */
     public boolean sql() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index d577b30..31546be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -43,15 +43,9 @@ public class ExchangeActions {
     private Map<String, ActionData> cachesToStart;
 
     /** */
-    private Map<String, ActionData> clientCachesToStart;
-
-    /** */
     private Map<String, ActionData> cachesToStop;
 
     /** */
-    private Map<String, ActionData> cachesToClose;
-
-    /** */
     private Map<String, ActionData> cachesToResetLostParts;
 
     /** */
@@ -60,7 +54,7 @@ public class ExchangeActions {
     /**
      * @return {@code True} if server nodes should not participate in exchange.
      */
-    boolean clientOnlyExchange() {
+    public boolean clientOnlyExchange() {
         return F.isEmpty(cachesToStart) &&
             F.isEmpty(cachesToStop) &&
             F.isEmpty(cacheGrpsToStart) &&
@@ -69,27 +63,6 @@ public class ExchangeActions {
     }
 
     /**
-     * @param nodeId Local node ID.
-     * @return Close cache requests.
-     */
-    List<ActionData> closeRequests(UUID nodeId) {
-        List<ActionData> res = null;
-
-        if (cachesToClose != null) {
-            for (ActionData req : cachesToClose.values()) {
-                if (nodeId.equals(req.req.initiatingNodeId())) {
-                    if (res == null)
-                        res = new ArrayList<>(cachesToClose.size());
-
-                    res.add(req);
-                }
-            }
-        }
-
-        return res != null ? res : Collections.<ActionData>emptyList();
-    }
-
-    /**
      * @return New caches start requests.
      */
     Collection<ActionData> cacheStartRequests() {
@@ -97,25 +70,6 @@ public class ExchangeActions {
     }
 
     /**
-     * @return Start cache requests.
-     */
-    Collection<ActionData> newAndClientCachesStartRequests() {
-        if (cachesToStart != null || clientCachesToStart != null) {
-            List<ActionData> res = new ArrayList<>();
-
-            if (cachesToStart != null)
-                res.addAll(cachesToStart.values());
-
-            if (clientCachesToStart != null)
-                res.addAll(clientCachesToStart.values());
-
-            return res;
-        }
-
-        return Collections.emptyList();
-    }
-
-    /**
      * @return Stop cache requests.
      */
     Collection<ActionData> cacheStopRequests() {
@@ -128,8 +82,6 @@ public class ExchangeActions {
     public void completeRequestFutures(GridCacheSharedContext ctx) {
         completeRequestFutures(cachesToStart, ctx);
         completeRequestFutures(cachesToStop, ctx);
-        completeRequestFutures(cachesToClose, ctx);
-        completeRequestFutures(clientCachesToStart, ctx);
         completeRequestFutures(cachesToResetLostParts, ctx);
     }
 
@@ -194,21 +146,6 @@ public class ExchangeActions {
     }
 
     /**
-     * @param nodeId Local node ID.
-     * @return {@code True} if client cache was started.
-     */
-    public boolean clientCacheStarted(UUID nodeId) {
-        if (clientCachesToStart != null) {
-            for (ActionData cache : clientCachesToStart.values()) {
-                if (nodeId.equals(cache.req.initiatingNodeId()))
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
-    /**
      * @param state New cluster state.
      */
     void newClusterState(ClusterState state) {
@@ -260,16 +197,6 @@ public class ExchangeActions {
      * @param req Request.
      * @param desc Cache descriptor.
      */
-    void addClientCacheToStart(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
-        assert req.start() : req;
-
-        clientCachesToStart = add(clientCachesToStart, req, desc);
-    }
-
-    /**
-     * @param req Request.
-     * @param desc Cache descriptor.
-     */
     void addCacheToStop(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
         assert req.stop() : req;
 
@@ -280,16 +207,6 @@ public class ExchangeActions {
      * @param req Request.
      * @param desc Cache descriptor.
      */
-    void addCacheToClose(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
-        assert req.close() : req;
-
-        cachesToClose = add(cachesToClose, req, desc);
-    }
-
-    /**
-     * @param req Request.
-     * @param desc Cache descriptor.
-     */
     void addCacheToResetLostPartitions(DynamicCacheChangeRequest req, DynamicCacheDescriptor desc) {
         assert req.resetLostPartitions() : req;
 
@@ -369,9 +286,7 @@ public class ExchangeActions {
      */
     public boolean empty() {
         return F.isEmpty(cachesToStart) &&
-            F.isEmpty(clientCachesToStart) &&
             F.isEmpty(cachesToStop) &&
-            F.isEmpty(cachesToClose) &&
             F.isEmpty(cacheGrpsToStart) &&
             F.isEmpty(cacheGrpsToStop) &&
             F.isEmpty(cachesToResetLostParts);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 839ddbd..d753f99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -829,14 +829,7 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Partition topology.
      */
     public GridDhtPartitionTopology topology() {
-        GridCacheAdapter<K, V> cache = this.cache;
-
-        if (cache == null)
-            throw new IllegalStateException("Cache stopped: " + cacheName);
-
-        assert cache.isNear() || cache.isDht() || cache.isColocated() || cache.isDhtAtomic() : cache;
-
-        return topology(cache);
+        return grp.topology();
     }
 
     /**


Mime
View raw message