ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [3/4] ignite git commit: ignite-5272 Do not use synchronous custom discovery event for client cache start/close
Date Thu, 15 Jun 2017 06:23:49 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 37e495e..09eea27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -301,52 +302,151 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      */
     @SuppressWarnings("unchecked")
     private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg, MessageHandlers msgHandlers) {
-        int msgIdx = cacheMsg.lookupIndex();
+        Lock lock = rw.readLock();
 
-        IgniteBiInClosure<UUID, GridCacheMessage> c = null;
+        lock.lock();
 
-        if (msgIdx >= 0) {
-            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
+        try {
+            int msgIdx = cacheMsg.lookupIndex();
 
-            IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId());
+            IgniteBiInClosure<UUID, GridCacheMessage> c = null;
 
-            if (cacheClsHandlers != null)
-                c = cacheClsHandlers[msgIdx];
-        }
+            if (msgIdx >= 0) {
+                Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
 
-        if (c == null)
-            c = msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass()));
+                IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers0.get(cacheMsg.handlerId());
 
-        if (c == null) {
-            IgniteLogger log = cacheMsg.messageLogger(cctx);
+                if (cacheClsHandlers != null)
+                    c = cacheClsHandlers[msgIdx];
+            }
 
-            StringBuilder msg0 = new StringBuilder("Received message without registered handler (will ignore) [");
+            if (c == null)
+                c = msgHandlers.clsHandlers.get(new ListenerKey(cacheMsg.handlerId(), cacheMsg.getClass()));
 
-            appendMessageInfo(cacheMsg, nodeId, msg0);
+            if (c == null) {
+                if (processMissedHandler(nodeId, cacheMsg))
+                    return;
 
-            msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()).
-                append(", msgTopVer=").append(cacheMsg.topologyVersion()).
-                append(", desc=").append(descriptorForMessage(cacheMsg)).
-                append(']');
+                IgniteLogger log = cacheMsg.messageLogger(cctx);
 
-            msg0.append(U.nl()).append("Registered listeners:");
+                StringBuilder msg0 = new StringBuilder("Received message without registered handler (will ignore) [");
 
-            Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
+                appendMessageInfo(cacheMsg, nodeId, msg0);
 
-            for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet())
-                msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
+                msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()).
+                    append(", msgTopVer=").append(cacheMsg.topologyVersion()).
+                    append(", desc=").append(descriptorForMessage(cacheMsg)).
+                    append(']');
 
-            if (cctx.kernalContext().isStopping()) {
-                if (log.isDebugEnabled())
-                    log.debug(msg0.toString());
+                msg0.append(U.nl()).append("Registered listeners:");
+
+                Map<Integer, IgniteBiInClosure[]> idxClsHandlers0 = msgHandlers.idxClsHandlers;
+
+                for (Map.Entry<Integer, IgniteBiInClosure[]> e : idxClsHandlers0.entrySet())
+                    msg0.append(U.nl()).append(e.getKey()).append("=").append(Arrays.toString(e.getValue()));
+
+                if (cctx.kernalContext().isStopping()) {
+                    if (log.isDebugEnabled())
+                        log.debug(msg0.toString());
+                }
+                else
+                    U.error(log, msg0.toString());
+
+                return;
             }
-            else
-                U.error(log, msg0.toString());
 
-            return;
+            onMessage0(nodeId, cacheMsg, c);
+        }
+        finally {
+            lock.unlock();
         }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param cacheMsg Message.
+     * @return {@code True} if message processed.
+     */
+    private boolean processMissedHandler(UUID nodeId, GridCacheMessage cacheMsg) {
+        // It is possible to receive reader update after client near cache was closed.
+        if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest) {
+            GridDhtAtomicAbstractUpdateRequest req = (GridDhtAtomicAbstractUpdateRequest)cacheMsg;
+
+            if (req.nearSize() > 0) {
+                List<KeyCacheObject> nearEvicted = new ArrayList<>(req.nearSize());
+
+                for (int i = 0; i < req.nearSize(); i++)
+                    nearEvicted.add(req.nearKey(i));
+
+                GridDhtAtomicUpdateResponse dhtRes = new GridDhtAtomicUpdateResponse(req.cacheId(),
+                    req.partition(),
+                    req.futureId(),
+                    false);
+
+                dhtRes.nearEvicted(nearEvicted);
+
+                sendMessageForMissedHandler(cacheMsg,
+                    nodeId,
+                    dhtRes,
+                    nodeId,
+                    GridIoPolicy.SYSTEM_POOL);
+
+                if (req.nearNodeId() != null) {
+                    GridDhtAtomicNearResponse nearRes = new GridDhtAtomicNearResponse(req.cacheId(),
+                        req.partition(),
+                        req.nearFutureId(),
+                        nodeId,
+                        req.flags());
+
+                    sendMessageForMissedHandler(cacheMsg,
+                        nodeId,
+                        nearRes,
+                        req.nearNodeId(),
+                        GridIoPolicy.SYSTEM_POOL);
+                }
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param origMsg Message without handler.
+     * @param origMsgNode Node sent {@code origMsg}.
+     * @param nodeId Target node ID.
+     * @param msg Response.
+     * @param plc Policy.
+     */
+    private void sendMessageForMissedHandler(
+        GridCacheMessage origMsg,
+        UUID origMsgNode,
+        GridCacheMessage msg,
+        UUID nodeId,
+        byte plc) {
+        IgniteLogger log = msg.messageLogger(cctx);
 
-        onMessage0(nodeId, cacheMsg, c);
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Received message without registered handler, " +
+                    "send response [locTopVer=" + cctx.exchange().readyAffinityVersion() +
+                    ", msgTopVer=" + origMsg.topologyVersion() +
+                    ", node=" + origMsgNode +
+                    ", msg=" + origMsg +
+                    ", resNode=" + nodeId +
+                    ", res=" + msg + ']');
+            }
+
+            send(nodeId, msg, plc);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send response, node left [nodeId=" + nodeId + ", msg=" + msg + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send response [nodeId=" + nodeId + ", msg=" + msg + ", err=" + e + ']');
+        }
     }
 
     /** {@inheritDoc} */
@@ -359,17 +459,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         cctx.gridIO().addMessageListener(TOPIC_CACHE, lsnr);
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings("BusyWait")
-    @Override protected void onKernalStop0(boolean cancel) {
-        cctx.gridIO().removeMessageListener(TOPIC_CACHE);
-
-        for (Object ordTopic : cacheHandlers.orderedHandlers.keySet())
-            cctx.gridIO().removeMessageListener(ordTopic);
-
-        for (Object ordTopic : grpHandlers.orderedHandlers.keySet())
-            cctx.gridIO().removeMessageListener(ordTopic);
-
+    /**
+     *
+     */
+    public void writeLock() {
         boolean interrupted = false;
 
         // Busy wait is intentional.
@@ -389,6 +482,27 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
         if (interrupted)
             Thread.currentThread().interrupt();
+    }
+
+    /**
+     *
+     */
+    public void writeUnlock() {
+        rw.writeLock().unlock();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("BusyWait")
+    @Override protected void onKernalStop0(boolean cancel) {
+        cctx.gridIO().removeMessageListener(TOPIC_CACHE);
+
+        for (Object ordTopic : cacheHandlers.orderedHandlers.keySet())
+            cctx.gridIO().removeMessageListener(ordTopic);
+
+        for (Object ordTopic : grpHandlers.orderedHandlers.keySet())
+            cctx.gridIO().removeMessageListener(ordTopic);
+
+        writeLock();
 
         try {
             stopping = true;
@@ -406,10 +520,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private void onMessage0(final UUID nodeId, final GridCacheMessage cacheMsg,
         final IgniteBiInClosure<UUID, GridCacheMessage> c) {
-        Lock lock = rw.readLock();
-
-        lock.lock();
-
         try {
             if (stopping) {
                 if (log.isDebugEnabled())
@@ -438,8 +548,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         finally {
             if (depEnabled)
                 cctx.deploy().ignoreOwnership(false);
-
-            lock.unlock();
         }
     }
 
@@ -1527,9 +1635,18 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             if (log.isDebugEnabled())
                 log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
 
-            final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
+            Lock lock = rw.readLock();
 
-            onMessage0(nodeId, cacheMsg, c);
+            lock.lock();
+
+            try {
+                GridCacheMessage cacheMsg = (GridCacheMessage)msg;
+
+                onMessage0(nodeId, cacheMsg, c);
+            }
+            finally {
+                lock.unlock();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index fdf8a2f..901667f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -337,6 +337,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @param task Task to run in exchange worker thread.
+     */
+    public void addCustomTask(CachePartitionExchangeWorkerTask task) {
+        assert task != null;
+
+        exchWorker.addCustomTask(task);
+    }
+
+    /**
      * @return Reconnect partition exchange future.
      */
     public IgniteInternalFuture<?> reconnectExchangeFuture() {
@@ -1731,7 +1740,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 cctx.cache().processCustomExchangeTask(task);
             }
             catch (Exception e) {
-                U.warn(log, "Failed to process custom exchange task: " + task, e);
+                U.error(log, "Failed to process custom exchange task: " + task, e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ad83b14..96b45df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -184,7 +184,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private final Map<String, GridCacheAdapter> stoppedCaches = new ConcurrentHashMap<>();
 
     /** Map of proxies. */
-    private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies;
+    private final ConcurrentHashMap<String, IgniteCacheProxy<?, ?>> jCacheProxies;
 
     /** Caches stop sequence. */
     private final Deque<String> stopSeq;
@@ -363,6 +363,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             if (msg0.exchange())
                 return new SchemaExchangeWorkerTask(msg0);
         }
+        else if (msg instanceof ClientCacheChangeDummyDiscoveryMessage) {
+            ClientCacheChangeDummyDiscoveryMessage msg0 = (ClientCacheChangeDummyDiscoveryMessage)msg;
+
+            return msg0;
+        }
 
         return null;
     }
@@ -372,7 +377,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @param task Task.
      */
-    public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
+    void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
         if (task instanceof SchemaExchangeWorkerTask) {
             SchemaAbstractDiscoveryMessage msg = ((SchemaExchangeWorkerTask)task).message();
 
@@ -389,6 +394,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             ctx.query().onNodeLeave(task0.node());
         }
+        else if (task instanceof ClientCacheChangeDummyDiscoveryMessage) {
+            ClientCacheChangeDummyDiscoveryMessage task0 = (ClientCacheChangeDummyDiscoveryMessage)task;
+
+            sharedCtx.affinity().processClientCachesChanges(task0);
+        }
+        else if (task instanceof ClientCacheUpdateTimeout) {
+            ClientCacheUpdateTimeout task0 = (ClientCacheUpdateTimeout)task;
+
+            sharedCtx.affinity().sendClientCacheChangesMessage(task0);
+        }
         else
             U.warn(log, "Unsupported custom exchange task: " + task);
     }
@@ -1998,21 +2013,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param req Stop request.
+     * @param cacheName Cache name.
+     * @param stop {@code True} for stop cache, {@code false} for close cache.
      */
-    void blockGateway(DynamicCacheChangeRequest req) {
-        assert req.stop() || req.close();
-
-        if (req.stop() || (req.close() && req.initiatingNodeId().equals(ctx.localNodeId()))) {
-            // Break the proxy before exchange future is done.
-            IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(req.cacheName());
+    void blockGateway(String cacheName, boolean stop) {
+        // Break the proxy before exchange future is done.
+        IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
 
-            if (proxy != null) {
-                if (req.stop())
-                    proxy.gate().stopped();
-                else
-                    proxy.closeProxy();
-            }
+        if (proxy != null) {
+            if (stop)
+                proxy.gate().stopped();
+            else
+                proxy.closeProxy();
         }
     }
 
@@ -2030,22 +2042,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param req Stop request.
+     * @param cacheName Cache name.
+     * @param destroy Cache destroy flag.
      * @return Cache group for stopped cache.
      */
-    private CacheGroupContext prepareCacheStop(DynamicCacheChangeRequest req, boolean forceClose) {
-        assert req.stop() || req.close() || forceClose : req;
-
-        GridCacheAdapter<?, ?> cache = caches.remove(req.cacheName());
+    private CacheGroupContext prepareCacheStop(String cacheName, boolean destroy) {
+        GridCacheAdapter<?, ?> cache = caches.remove(cacheName);
 
         if (cache != null) {
             GridCacheContext<?, ?> ctx = cache.context();
 
             sharedCtx.removeCacheContext(ctx);
 
-            onKernalStop(cache, req.destroy());
+            onKernalStop(cache, destroy);
 
-            stopCache(cache, true, req.destroy());
+            stopCache(cache, true, destroy);
 
             return ctx.group();
         }
@@ -2054,24 +2065,91 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Closes cache even if it's not fully initialized (e.g. fail on cache init stage).
-     *
-     * @param topVer Completed topology version.
-     * @param act Exchange action.
-     * @param err Error.
+     * @param startTopVer Cache start version.
+     * @param err Cache start error if any.
      */
-    void forceCloseCache(
-        AffinityTopologyVersion topVer,
-        final ExchangeActions.ActionData act,
-        Throwable err
-    ) {
-        ExchangeActions actions = new ExchangeActions(){
-            @Override List<ActionData> closeRequests(UUID nodeId) {
-                return Collections.singletonList(act);
+    void initCacheProxies(AffinityTopologyVersion startTopVer, @Nullable Throwable err) {
+        for (GridCacheAdapter<?, ?> cache : caches.values()) {
+            GridCacheContext<?, ?> cacheCtx = cache.context();
+
+            if (cacheCtx.startTopologyVersion().equals(startTopVer) && !jCacheProxies.containsKey(cacheCtx.name())) {
+                jCacheProxies.putIfAbsent(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
+
+                if (cacheCtx.preloader() != null)
+                    cacheCtx.preloader().onInitialExchangeComplete(err);
+            }
+        }
+    }
+
+    /**
+     * @param cachesToClose Caches to close.
+     * @param retClientCaches {@code True} if return IDs of closed client caches.
+     * @return Closed client caches' IDs.
+     */
+    Set<Integer> closeCaches(Set<String> cachesToClose, boolean retClientCaches) {
+        Set<Integer> ids = null;
+
+        boolean locked = false;
+
+        try {
+            for (String cacheName : cachesToClose) {
+                blockGateway(cacheName, false);
+
+                GridCacheContext ctx = sharedCtx.cacheContext(CU.cacheId(cacheName));
+
+                if (ctx == null)
+                    continue;
+
+                if (retClientCaches && !ctx.affinityNode()) {
+                    if (ids == null)
+                        ids = U.newHashSet(cachesToClose.size());
+
+                    ids.add(ctx.cacheId());
+                }
+
+                if (!ctx.affinityNode() && !locked) {
+                    // Do not close client cache while requests processing is in progress.
+                    sharedCtx.io().writeLock();
+
+                    locked = true;
+                }
+
+                if (!ctx.affinityNode() && ctx.transactional())
+                    sharedCtx.tm().rollbackTransactionsForCache(ctx.cacheId());
+
+                closeCache(ctx, false);
             }
-        };
 
-        onExchangeDone(topVer, actions, err, true);
+            return ids;
+        }
+        finally {
+            if (locked)
+                sharedCtx.io().writeUnlock();
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param destroy Destroy flag.
+     */
+    private void closeCache(GridCacheContext cctx, boolean destroy) {
+        if (cctx.affinityNode()) {
+            GridCacheAdapter<?, ?> cache = caches.get(cctx.name());
+
+            assert cache != null : cctx.name();
+
+            jCacheProxies.put(cctx.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
+        }
+        else {
+            jCacheProxies.remove(cctx.name());
+
+            cctx.gate().onStopped();
+
+            CacheGroupContext grp = prepareCacheStop(cctx.name(), destroy);
+
+            if (grp != null && !grp.hasCaches())
+                stopCacheGroup(grp.groupId());
+        }
     }
 
     /**
@@ -2085,58 +2163,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     public void onExchangeDone(
         AffinityTopologyVersion topVer,
         @Nullable ExchangeActions exchActions,
-        Throwable err,
-        boolean forceClose
+        Throwable err
     ) {
-        for (GridCacheAdapter<?, ?> cache : caches.values()) {
-            GridCacheContext<?, ?> cacheCtx = cache.context();
-
-            if (cacheCtx.startTopologyVersion().equals(topVer)) {
-                jCacheProxies.put(cacheCtx.name(), new IgniteCacheProxy(cache.context(), cache, null, false));
+        initCacheProxies(topVer, err);
 
-                if (cacheCtx.preloader() != null)
-                    cacheCtx.preloader().onInitialExchangeComplete(err);
-            }
-        }
-
-        if (exchActions != null && (err == null || forceClose)) {
+        if (exchActions != null && err == null) {
             for (ExchangeActions.ActionData action : exchActions.cacheStopRequests()) {
                 stopGateway(action.request());
 
-                prepareCacheStop(action.request(), forceClose);
+                prepareCacheStop(action.request().cacheName(), action.request().destroy());
             }
 
             for (CacheGroupDescriptor grpDesc : exchActions.cacheGroupsToStop())
                 stopCacheGroup(grpDesc.groupId());
-
-            for (ExchangeActions.ActionData req : exchActions.closeRequests(ctx.localNodeId())) {
-                String cacheName = req.request().cacheName();
-
-                IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(cacheName);
-
-                if (proxy != null) {
-                    if (proxy.context().affinityNode()) {
-                        GridCacheAdapter<?, ?> cache = caches.get(cacheName);
-
-                        assert cache != null : cacheName;
-
-                        jCacheProxies.put(cacheName, new IgniteCacheProxy(cache.context(), cache, null, false));
-                    }
-                    else {
-                        jCacheProxies.remove(cacheName);
-
-                        proxy.context().gate().onStopped();
-
-                        CacheGroupContext grp = prepareCacheStop(req.request(), forceClose);
-
-                        if (grp != null && !grp.hasCaches())
-                            stopCacheGroup(grp.groupId());
-                    }
-                }
-
-                if (forceClose)
-                    completeCacheStartFuture(req.request(), false, err);
-            }
         }
     }
 
@@ -2187,6 +2226,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param reqId Request ID.
+     * @param err Error if any.
+     */
+    void completeClientCacheChangeFuture(UUID reqId, @Nullable Exception err) {
+        DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(reqId);
+
+        if (fut != null)
+            fut.onDone(false, err);
+    }
+
+    /**
      * Creates shared context.
      *
      * @param kernalCtx Kernal context.
@@ -2486,8 +2536,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 failIfExists,
                 failIfNotStarted);
 
-            if (req != null)
-                return F.first(initiateCacheChanges(F.asList(req), failIfExists));
+            if (req != null) {
+                if (req.clientStartOnly())
+                    return startClientCacheChange(F.asMap(req.cacheName(), req), null);
+
+                return F.first(initiateCacheChanges(F.asList(req)));
+            }
             else
                 return new GridFinishedFuture<>();
         }
@@ -2497,6 +2551,31 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param startReqs Start requests.
+     * @param cachesToClose Cache tp close.
+     * @return Future for cache change operation.
+     */
+    private IgniteInternalFuture<Boolean> startClientCacheChange(
+        @Nullable Map<String, DynamicCacheChangeRequest> startReqs, @Nullable Set<String> cachesToClose) {
+        assert startReqs != null ^ cachesToClose != null;
+
+        DynamicCacheStartFuture fut = new DynamicCacheStartFuture(UUID.randomUUID());
+
+        IgniteInternalFuture old = pendingFuts.put(fut.id, fut);
+
+        assert old == null : old;
+
+        ctx.discovery().clientCacheStartEvent(fut.id, startReqs, cachesToClose);
+
+        IgniteCheckedException err = checkNodeState();
+
+        if (err != null)
+            fut.onDone(err);
+
+        return fut;
+    }
+
+    /**
      * Dynamically starts multiple caches.
      *
      * @param ccfgList Collection of cache configuration.
@@ -2527,7 +2606,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (checkThreadTx)
             checkEmptyTransactions();
 
-        List<DynamicCacheChangeRequest> reqList = new ArrayList<>(ccfgList.size());
+        List<DynamicCacheChangeRequest> srvReqs = null;
+        Map<String, DynamicCacheChangeRequest> clientReqs = null;
 
         try {
             for (CacheConfiguration ccfg : ccfgList) {
@@ -2541,20 +2621,41 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     true
                 );
 
-                if (req != null)
-                    reqList.add(req);
+                if (req != null) {
+                    if (req.clientStartOnly()) {
+                        if (clientReqs == null)
+                            clientReqs = U.newLinkedHashMap(ccfgList.size());
+
+                        clientReqs.put(req.cacheName(), req);
+                    }
+                    else {
+                        if (srvReqs == null)
+                            srvReqs = new ArrayList<>(ccfgList.size());
+
+                        srvReqs.add(req);
+                    }
+                }
             }
         }
         catch (Exception e) {
             return new GridFinishedFuture<>(e);
         }
 
-        if (!reqList.isEmpty()) {
+        if (srvReqs != null || clientReqs != null) {
+            if (clientReqs != null && srvReqs == null)
+                return startClientCacheChange(clientReqs, null);
+
             GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>();
 
-            for (DynamicCacheStartFuture fut : initiateCacheChanges(reqList, failIfExists))
+            for (DynamicCacheStartFuture fut : initiateCacheChanges(srvReqs))
                 compoundFut.add((IgniteInternalFuture)fut);
 
+            if (clientReqs != null) {
+                IgniteInternalFuture<Boolean> clientStartFut = startClientCacheChange(clientReqs, null);
+
+                compoundFut.add((IgniteInternalFuture)clientStartFut);
+            }
+
             compoundFut.markInitialized();
 
             return compoundFut;
@@ -2578,7 +2679,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, sql, true);
 
-        return F.first(initiateCacheChanges(F.asList(req), false));
+        return F.first(initiateCacheChanges(F.asList(req)));
     }
 
     /**
@@ -2600,7 +2701,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         GridCompoundFuture<?, ?> compoundFut = new GridCompoundFuture<>();
 
-        for (DynamicCacheStartFuture fut : initiateCacheChanges(reqs, false))
+        for (DynamicCacheStartFuture fut : initiateCacheChanges(reqs))
             compoundFut.add((IgniteInternalFuture)fut);
 
         compoundFut.markInitialized();
@@ -2622,9 +2723,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         checkEmptyTransactions();
 
-        DynamicCacheChangeRequest t = DynamicCacheChangeRequest.closeRequest(ctx, cacheName);
+        if (proxy.context().isLocal())
+            return dynamicDestroyCache(cacheName, false, true);
 
-        return F.first(initiateCacheChanges(F.asList(t), false));
+        return startClientCacheChange(null, Collections.singleton(cacheName));
     }
 
     /**
@@ -2657,7 +2759,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         GridCompoundFuture fut = new GridCompoundFuture();
 
-        for (DynamicCacheStartFuture f : initiateCacheChanges(reqs, false))
+        for (DynamicCacheStartFuture f : initiateCacheChanges(reqs))
             fut.add(f);
 
         fut.markInitialized();
@@ -2751,38 +2853,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param reqs Requests.
-     * @param failIfExists Fail if exists flag.
      * @return Collection of futures.
      */
     @SuppressWarnings("TypeMayBeWeakened")
     private Collection<DynamicCacheStartFuture> initiateCacheChanges(
-        Collection<DynamicCacheChangeRequest> reqs,
-        boolean failIfExists
+        Collection<DynamicCacheChangeRequest> reqs
     ) {
         Collection<DynamicCacheStartFuture> res = new ArrayList<>(reqs.size());
 
         Collection<DynamicCacheChangeRequest> sndReqs = new ArrayList<>(reqs.size());
 
         for (DynamicCacheChangeRequest req : reqs) {
-            DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.cacheName(), req);
+            DynamicCacheStartFuture fut = new DynamicCacheStartFuture(req.requestId());
 
             try {
-                if (req.stop() || req.close()) {
+                if (req.stop()) {
                     DynamicCacheDescriptor desc = cacheDescriptor(req.cacheName());
 
                     if (desc == null)
                         // No-op.
                         fut.onDone(false);
-                    else {
-                        assert desc.cacheConfiguration() != null : desc;
-
-                        if (req.close() && desc.cacheConfiguration().getCacheMode() == LOCAL) {
-                            req.close(false);
-
-                            req.stop(true);
-                        }
-                    }
                 }
+
                 if (req.start() && req.startCacheConfiguration() != null) {
                     CacheConfiguration ccfg = req.startCacheConfiguration();
 
@@ -2821,14 +2913,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             try {
                 ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
 
-                if (ctx.isStopping()) {
-                    err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
-                        "node is stopping.");
-                }
-                else if (ctx.clientDisconnected()) {
-                    err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                        "Failed to execute dynamic cache change request, client node disconnected.");
-                }
+                err = checkNodeState();
             }
             catch (IgniteCheckedException e) {
                 err = e;
@@ -2844,6 +2929,22 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return Non null exception if node is stopping or disconnected.
+     */
+    @Nullable private IgniteCheckedException checkNodeState() {
+        if (ctx.isStopping()) {
+            return new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+                "node is stopping.");
+        }
+        else if (ctx.clientDisconnected()) {
+            return new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                "Failed to execute dynamic cache change request, client node disconnected.");
+        }
+
+        return null;
+    }
+
+    /**
      * @param type Event type.
      * @param node Event node.
      * @param topVer Topology version.
@@ -2859,9 +2960,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @param msg Customer message.
      * @param topVer Current topology version.
+     * @param node Node sent message.
      * @return {@code True} if minor topology version should be increased.
      */
-    public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer) {
+    public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer, ClusterNode node) {
         if (msg instanceof SchemaAbstractDiscoveryMessage) {
             ctx.query().onDiscovery((SchemaAbstractDiscoveryMessage)msg);
 
@@ -2878,6 +2980,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (msg instanceof DynamicCacheChangeBatch)
             return cachesInfo.onCacheChangeRequested((DynamicCacheChangeBatch)msg, topVer);
 
+        if (msg instanceof ClientCacheChangeDiscoveryMessage)
+            cachesInfo.onClientCacheChange((ClientCacheChangeDiscoveryMessage)msg, node);
+
         return false;
     }
 
@@ -3156,13 +3261,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (log.isDebugEnabled())
             log.debug("Getting public cache for name: " + cacheName);
 
-        IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName);
-
         DynamicCacheDescriptor desc = cacheDescriptor(cacheName);
 
         if (desc != null && !desc.cacheType().userCache())
             throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);
 
+        IgniteCacheProxy<?, ?> cache = jCacheProxies.get(cacheName);
+
         if (cache == null) {
             dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get();
 
@@ -3731,33 +3836,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
     private class DynamicCacheStartFuture extends GridFutureAdapter<Boolean> {
-        /** Cache name. */
-        private String cacheName;
-
-        /** Change request. */
-        @GridToStringInclude
-        private DynamicCacheChangeRequest req;
-
-        /**
-         * @param cacheName Cache name.
-         * @param req Cache start request.
-         */
-        private DynamicCacheStartFuture(String cacheName, DynamicCacheChangeRequest req) {
-            this.cacheName = cacheName;
-            this.req = req;
-        }
+        /** */
+        private UUID id;
 
         /**
-         * @return Request.
+         * @param id Future ID.
          */
-        public DynamicCacheChangeRequest request() {
-            return req;
+        private DynamicCacheStartFuture(UUID id) {
+            this.id = id;
         }
 
         /** {@inheritDoc} */
         @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
             // Make sure to remove future before completion.
-            pendingFuts.remove(req.requestId(), this);
+            pendingFuts.remove(id, this);
 
             return super.onDone(res, err);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
new file mode 100644
index 0000000..3166159
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/ClientCacheDhtTopologyFuture.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Topology future created for client cache start.
+ */
+public class ClientCacheDhtTopologyFuture extends GridDhtTopologyFutureAdapter
+    implements GridDhtTopologyFuture {
+    /** */
+    final AffinityTopologyVersion topVer;
+
+    /**
+     * @param topVer Topology version.
+     */
+    public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer) {
+        assert topVer != null;
+
+        this.topVer = topVer;
+
+        onDone(topVer);
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @param e Error.
+     */
+    public ClientCacheDhtTopologyFuture(AffinityTopologyVersion topVer, IgniteCheckedException e) {
+        assert e != null;
+        assert topVer != null;
+
+        this.topVer = topVer;
+
+        onDone(e);
+    }
+
+    /**
+     * @param grp Cache group.
+     * @param topNodes Topology nodes.
+     */
+    public void validate(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
+        grpValidRes = U.newHashMap(1);
+
+        grpValidRes.put(grp.groupId(), validateCacheGroup(grp,topNodes));
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "ClientCacheDhtTopologyFuture [topVer=" + topologyVersion() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 619630f..2d9e4f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -83,7 +83,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private Map<Integer, Set<UUID>> part2node = new HashMap<>();
 
     /** */
-    private GridDhtPartitionExchangeId lastExchangeId;
+    private AffinityTopologyVersion lastExchangeVer;
 
     /** */
     private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
@@ -183,21 +183,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
-        GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut,
+        GridDhtTopologyFuture exchFut,
+        DiscoCache discoCache,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {
         U.writeLock(lock);
 
         try {
-            assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
-                ", exchId=" + exchId + ']';
+            AffinityTopologyVersion exchTopVer = exchFut.topologyVersion();
+
+            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
+                ", exchVer=" + exchTopVer + ']';
 
             this.stopping = stopping;
 
-            topVer = exchId.topologyVersion();
-            discoCache = exchFut.discoCache();
+            topVer = exchTopVer;
+            this.discoCache = discoCache;
 
             updateSeq.setIfGreater(updSeq);
 
@@ -560,19 +562,20 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public GridDhtPartitionMap update(
+        @Nullable AffinityTopologyVersion exchVer,
         GridDhtPartitionFullMap partMap,
         Map<Integer, T2<Long, Long>> cntrMap) {
         if (log.isDebugEnabled())
-            log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
+            log.debug("Updating full partition map [exchVer=" + exchVer + ", parts=" + fullMapString() + ']');
 
         lock.writeLock().lock();
 
         try {
-            if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) {
+            if (exchVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchVer) >= 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                        lastExchangeVer + ", exchVer=" + exchVer + ']');
 
                 return null;
             }
@@ -580,15 +583,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
+                        lastExchangeVer + ", exchVer=" + exchVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
                 return null;
             }
 
             updateSeq.incrementAndGet();
 
-            if (exchId != null)
-                lastExchangeId = exchId;
+            if (exchVer != null)
+                lastExchangeVer = exchVer;
 
             if (node2part != null) {
                 for (GridDhtPartitionMap part : node2part.values()) {
@@ -598,7 +601,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                     // then we keep the newer value.
                     if (newPart != null && newPart.updateSequence() < part.updateSequence()) {
                         if (log.isDebugEnabled())
-                            log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
+                            log.debug("Overriding partition map in full update map [exchVer=" + exchVer + ", curPart=" +
                                 mapString(part) + ", newPart=" + mapString(newPart) + ']');
 
                         partMap.put(part.nodeId(), part);
@@ -694,16 +697,16 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (stopping)
                 return null;
 
-            if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
+            if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" +
+                        lastExchangeVer + ", exchId=" + exchId + ']');
 
                 return null;
             }
 
             if (exchId != null)
-                lastExchangeId = exchId;
+                lastExchangeVer = exchId.topologyVersion();
 
             if (node2part == null)
                 // Create invalid partition map.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index d9d642a..44c7b88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -32,6 +32,12 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
     private static final long serialVersionUID = 0L;
 
     /** */
+    private static final int SND_PART_STATE_MASK = 0x01;
+
+    /** */
+    private byte flags;
+
+    /** */
     private long futId;
 
     /** Topology version being queried. */
@@ -48,16 +54,28 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
      * @param futId Future ID.
      * @param grpId Cache group ID.
      * @param topVer Topology version.
+     * @param sndPartMap {@code True} if need send in response cache partitions state.
      */
     public GridDhtAffinityAssignmentRequest(
         long futId,
         int grpId,
-        AffinityTopologyVersion topVer) {
+        AffinityTopologyVersion topVer,
+        boolean sndPartMap) {
         assert topVer != null;
 
         this.futId = futId;
         this.grpId = grpId;
         this.topVer = topVer;
+
+        if (sndPartMap)
+            flags |= SND_PART_STATE_MASK;
+    }
+
+    /**
+     * @return {@code True} if need send in response cache partitions state.
+     */
+    public boolean sendPartitionsState() {
+        return (flags & SND_PART_STATE_MASK) != 0;
     }
 
     /**
@@ -91,7 +109,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 6;
     }
 
     /** {@inheritDoc} */
@@ -110,12 +128,18 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeLong("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -138,7 +162,7 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
 
         switch (reader.state()) {
             case 3:
-                futId = reader.readLong("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -146,6 +170,14 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 4:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 4df3fc1..5b0de08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -21,19 +21,20 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Affinity assignment response.
@@ -62,6 +63,13 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     /** Affinity assignment bytes. */
     private byte[] idealAffAssignmentBytes;
 
+    /** */
+    @GridDirectTransient
+    private GridDhtPartitionFullMap partMap;
+
+    /** */
+    private byte[] partBytes;
+
     /**
      * Empty constructor.
      */
@@ -107,30 +115,30 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     }
 
     /**
-     * @param disco Discovery manager.
+     * @param discoCache Discovery data cache.
      * @return Affinity assignment.
      */
-    public List<List<ClusterNode>> affinityAssignment(GridDiscoveryManager disco) {
+    public List<List<ClusterNode>> affinityAssignment(DiscoCache discoCache) {
         if (affAssignmentIds != null)
-            return nodes(disco, affAssignmentIds);
+            return nodes(discoCache, affAssignmentIds);
 
         return null;
     }
 
     /**
-     * @param disco Discovery manager.
+     * @param discoCache Discovery data cache.
      * @return Ideal affinity assignment.
      */
-    public List<List<ClusterNode>> idealAffinityAssignment(GridDiscoveryManager disco) {
-        return nodes(disco, idealAffAssignment);
+    public List<List<ClusterNode>> idealAffinityAssignment(DiscoCache discoCache) {
+        return nodes(discoCache, idealAffAssignment);
     }
 
     /**
-     * @param disco Discovery manager.
+     * @param discoCache Discovery data cache.
      * @param assignmentIds Assignment node IDs.
      * @return Assignment nodes.
      */
-    private List<List<ClusterNode>> nodes(GridDiscoveryManager disco, List<List<UUID>> assignmentIds) {
+    private List<List<ClusterNode>> nodes(DiscoCache discoCache, List<List<UUID>> assignmentIds) {
         if (assignmentIds != null) {
             List<List<ClusterNode>> assignment = new ArrayList<>(assignmentIds.size());
 
@@ -139,7 +147,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
                 List<ClusterNode> nodes = new ArrayList<>(ids.size());
 
                 for (int j = 0; j < ids.size(); j++) {
-                    ClusterNode node = disco.node(topVer, ids.get(j));
+                    ClusterNode node = discoCache.node(ids.get(j));
 
                     assert node != null;
 
@@ -163,6 +171,20 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     }
 
     /**
+     * @param partMap Partition map.
+     */
+    public void partitionMap(GridDhtPartitionFullMap partMap) {
+        this.partMap = partMap;
+    }
+
+    /**
+     * @return Partition map.
+     */
+    @Nullable public GridDhtPartitionFullMap partitionMap() {
+        return partMap;
+    }
+
+    /**
      * @param assignments Assignment.
      * @return Assignment where cluster nodes are converted to their ids.
      */
@@ -193,7 +215,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /**
@@ -208,6 +230,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
 
         if (idealAffAssignment != null && idealAffAssignmentBytes == null)
             idealAffAssignmentBytes = U.marshal(ctx, idealAffAssignment);
+
+        if (partMap != null && partBytes == null)
+            partBytes = U.zip(U.marshal(ctx.marshaller(), partMap));
     }
 
     /** {@inheritDoc} */
@@ -222,6 +247,9 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
 
         if (idealAffAssignmentBytes != null && idealAffAssignment == null)
             idealAffAssignment = U.unmarshal(ctx, idealAffAssignmentBytes, ldr);
+
+        if (partBytes != null && partMap == null)
+            partMap = U.unmarshalZip(ctx.marshaller(), partBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
     }
 
     /** {@inheritDoc} */
@@ -263,6 +291,12 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeByteArray("partBytes", partBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -309,6 +343,14 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
                 reader.incrementState();
 
             case 6:
+                partBytes = reader.readByteArray("partBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 8746320..dcc08d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -27,13 +27,12 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridNodeOrderComparator;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -76,25 +75,28 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     /** */
     private final int grpId;
 
+    /** */
+    private boolean needPartState;
+
     /**
      * @param ctx Context.
-     * @param grpDesc Group descriptor.
+     * @param grpId Group ID.
      * @param topVer Topology version.
      * @param discoCache Discovery cache.
      */
     public GridDhtAssignmentFetchFuture(
         GridCacheSharedContext ctx,
-        CacheGroupDescriptor grpDesc,
+        int grpId,
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
         this.topVer = topVer;
-        this.grpId = grpDesc.groupId();
+        this.grpId = grpId;
         this.ctx = ctx;
 
         id = idGen.getAndIncrement();
 
-        Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId());
+        Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpId);
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
@@ -127,8 +129,12 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
 
     /**
      * Initializes fetch future.
+     *
+     * @param needPartState {@code True} if also need fetch partitions state.
      */
-    public void init() {
+    public void init(boolean needPartState) {
+        this.needPartState = needPartState;
+
         ctx.affinity().addDhtAssignmentFetchFuture(this);
 
         requestFromNextNode();
@@ -195,7 +201,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                             ", node=" + node + ']');
 
                     ctx.io().send(node,
-                        new GridDhtAffinityAssignmentRequest(id, grpId, topVer),
+                        new GridDhtAffinityAssignmentRequest(id, grpId, topVer, needPartState),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 05831b1..ed80f83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -407,14 +407,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
             return null;
         }
 
-        // If remote node has no near cache, don't add it.
-        if (!cctx.discovery().cacheNearNode(node, cacheName())) {
-            if (log.isDebugEnabled())
-                log.debug("Ignoring near reader because near cache is disabled: " + nodeId);
-
-            return null;
-        }
-
         // If remote node is (primary?) or back up, don't add it as a reader.
         if (cctx.affinity().partitionBelongs(node, partition(), topVer)) {
             if (log.isDebugEnabled())
@@ -653,7 +645,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         for (int i = 0; i < rdrs.length; i++) {
             ClusterNode node = cctx.discovery().getAlive(rdrs[i].nodeId());
 
-            if (node == null || !cctx.discovery().cacheNode(node, cacheName())) {
+            if (node == null) {
                 // Node has left and if new list has already been created, just skip.
                 // Otherwise, create new list and add alive nodes.
                 if (newRdrs == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d365a8e..acb822c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -54,15 +55,15 @@ public interface GridDhtPartitionTopology {
     /**
      * Updates topology version.
      *
-     * @param exchId Exchange ID.
      * @param exchFut Exchange future.
+     * @param discoCache Discovery data cache.
      * @param updateSeq Update sequence.
      * @param stopping Stopping flag.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void updateTopologyVersion(
-        GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut,
+        GridDhtTopologyFuture exchFut,
+        DiscoCache discoCache,
         long updateSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException;
@@ -221,12 +222,13 @@ public interface GridDhtPartitionTopology {
     public void onRemoved(GridDhtCacheEntry e);
 
     /**
-     * @param exchId Exchange ID.
+     * @param exchangeVer Exchange version.
      * @param partMap Update partition map.
      * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
-    public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
+    public GridDhtPartitionMap update(
+        @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap);
 
@@ -250,7 +252,7 @@ public interface GridDhtPartitionTopology {
      * This method should be called on topology coordinator after all partition messages are received.
      *
      * @param discoEvt Discovery event for which we detect lost partitions.
-     * @return {@code True} if partitons state got updated.
+     * @return {@code True} if partitions state got updated.
      */
     public boolean detectLostPartitions(DiscoveryEvent discoEvt);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 89554f3..ea731f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -105,7 +105,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private Map<Integer, Set<UUID>> part2node = new HashMap<>();
 
     /** */
-    private GridDhtPartitionExchangeId lastExchangeId;
+    private AffinityTopologyVersion lastExchangeVer;
 
     /** */
     private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
@@ -167,7 +167,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             part2node = new HashMap<>();
 
-            lastExchangeId = null;
+            lastExchangeVer = null;
 
             updateSeq.set(1);
 
@@ -301,16 +301,18 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public void updateTopologyVersion(
-        GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionsExchangeFuture exchFut,
+        GridDhtTopologyFuture exchFut,
+        DiscoCache discoCache,
         long updSeq,
         boolean stopping
     ) throws IgniteInterruptedCheckedException {
         U.writeLock(lock);
 
         try {
-            assert exchId.topologyVersion().compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
-                ", exchId=" + exchId +
+            AffinityTopologyVersion exchTopVer = exchFut.topologyVersion();
+
+            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer=" + topVer +
+                ", exchTopVer=" + exchTopVer +
                 ", fut=" + exchFut + ']';
 
             this.stopping = stopping;
@@ -321,9 +323,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             rebalancedTopVer = AffinityTopologyVersion.NONE;
 
-            topVer = exchId.topologyVersion();
+            topVer = exchTopVer;
 
-            discoCache = exchFut.discoCache();
+            this.discoCache = discoCache;
         }
         finally {
             lock.writeLock().unlock();
@@ -1109,12 +1111,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Override public GridDhtPartitionMap update(
-        @Nullable GridDhtPartitionExchangeId exchId,
+        @Nullable AffinityTopologyVersion exchangeVer,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap
     ) {
         if (log.isDebugEnabled())
-            log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
+            log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
 
         assert partMap != null;
 
@@ -1147,27 +1149,26 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            //if need skip
-            if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) {
+            if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                    log.debug("Stale exchange id for full partition map update (will ignore) [lastExch=" +
+                        lastExchangeVer + ", exch=" + exchangeVer + ']');
 
                 return null;
             }
 
             if (node2part != null && node2part.compareTo(partMap) >= 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
+                    log.debug("Stale partition map for full partition map update (will ignore) [lastExch=" +
+                        lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
 
                 return null;
             }
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
-            if (exchId != null)
-                lastExchangeId = exchId;
+            if (exchangeVer != null)
+                lastExchangeVer = exchangeVer;
 
             if (node2part != null) {
                 for (GridDhtPartitionMap part : node2part.values()) {
@@ -1180,8 +1181,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         (grp.localStartVersion().compareTo(newPart.topologyVersion()) > 0))
                         ) {
                         if (log.isDebugEnabled())
-                            log.debug("Overriding partition map in full update map [exchId=" + exchId + ", curPart=" +
-                                mapString(part) + ", newPart=" + mapString(newPart) + ']');
+                            log.debug("Overriding partition map in full update map [exch=" + exchangeVer +
+                                ", curPart=" + mapString(part) + ", newPart=" + mapString(newPart) + ']');
 
                         partMap.put(part.nodeId(), part);
                     }
@@ -1333,16 +1334,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return null;
 
-            if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
+            if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
                 if (log.isDebugEnabled())
-                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
-                        lastExchangeId + ", exchId=" + exchId + ']');
+                    log.debug("Stale exchange id for single partition map update (will ignore) [lastExch=" +
+                        lastExchangeVer + ", exch=" + exchId.topologyVersion() + ']');
 
                 return null;
             }
 
             if (exchId != null)
-                lastExchangeId = exchId;
+                lastExchangeVer = exchId.topologyVersion();
 
             if (node2part == null)
                 // Create invalid partition map.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
new file mode 100644
index 0000000..e70f383
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFutureAdapter.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_ALL;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_ALL;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+
+/**
+ *
+ */
+public abstract class GridDhtTopologyFutureAdapter extends GridFutureAdapter<AffinityTopologyVersion>
+    implements GridDhtTopologyFuture {
+    /** Cache groups validation results. */
+    protected volatile Map<Integer, CacheValidation> grpValidRes;
+
+    /**
+     * @param grp Cache group.
+     * @param topNodes Topology nodes.
+     * @return Validation result.
+     */
+    protected final CacheValidation validateCacheGroup(CacheGroupContext grp, Collection<ClusterNode> topNodes) {
+        Collection<Integer> lostParts = grp.isLocal() ?
+            Collections.<Integer>emptyList() : grp.topology().lostPartitions();
+
+        boolean valid = true;
+
+        if (!grp.systemCache()) {
+            TopologyValidator validator = grp.topologyValidator();
+
+            if (validator != null)
+                valid = validator.validate(topNodes);
+        }
+
+        return new CacheValidation(valid, lostParts);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public final Throwable validateCache(
+        GridCacheContext cctx,
+        boolean recovery,
+        boolean read,
+        @Nullable Object key,
+        @Nullable Collection<?> keys
+    ) {
+        assert isDone() : this;
+
+        Throwable err = error();
+
+        if (err != null)
+            return err;
+
+        if (!cctx.shared().kernalContext().state().active())
+            return new CacheInvalidStateException(
+                "Failed to perform cache operation (cluster is not activated): " + cctx.name());
+
+        CacheGroupContext grp = cctx.group();
+
+        PartitionLossPolicy partLossPlc = grp.config().getPartitionLossPolicy();
+
+        if (grp.needsRecovery() && !recovery) {
+            if (!read && (partLossPlc == READ_ONLY_SAFE || partLossPlc == READ_ONLY_ALL))
+                return new IgniteCheckedException("Failed to write to cache (cache is moved to a read-only state): " +
+                    cctx.name());
+        }
+
+        if (grp.needsRecovery() || grp.topologyValidator() != null) {
+            CacheValidation validation = grpValidRes.get(grp.groupId());
+
+            if (validation == null)
+                return null;
+
+            if (!validation.valid && !read)
+                return new IgniteCheckedException("Failed to perform cache operation " +
+                    "(cache topology is not valid): " + cctx.name());
+
+            if (recovery || !grp.needsRecovery())
+                return null;
+
+            if (key != null) {
+                int p = cctx.affinity().partition(key);
+
+                CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, key, p,
+                    validation.lostParts, partLossPlc);
+
+                if (ex != null)
+                    return ex;
+            }
+
+            if (keys != null) {
+                for (Object k : keys) {
+                    int p = cctx.affinity().partition(k);
+
+                    CacheInvalidStateException ex = validatePartitionOperation(cctx.name(), read, k, p,
+                        validation.lostParts, partLossPlc);
+
+                    if (ex != null)
+                        return ex;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param read Read flag.
+     * @param key Key to check.
+     * @param part Partition this key belongs to.
+     * @param lostParts Collection of lost partitions.
+     * @param plc Partition loss policy.
+     * @return Invalid state exception if this operation is disallowed.
+     */
+    private CacheInvalidStateException validatePartitionOperation(
+        String cacheName,
+        boolean read,
+        Object key,
+        int part,
+        Collection<Integer> lostParts,
+        PartitionLossPolicy plc
+    ) {
+        if (lostParts.contains(part)) {
+            if (!read) {
+                assert plc == READ_WRITE_ALL || plc == READ_WRITE_SAFE;
+
+                if (plc == READ_WRITE_SAFE) {
+                    return new CacheInvalidStateException("Failed to execute cache operation " +
+                        "(all partition owners have left the grid, partition data has been lost) [" +
+                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+                }
+            }
+            else {
+                // Read.
+                if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE)
+                    return new CacheInvalidStateException("Failed to execute cache operation " +
+                        "(all partition owners have left the grid, partition data has been lost) [" +
+                        "cacheName=" + cacheName + ", part=" + part + ", key=" + key + ']');
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Cache validation result.
+     */
+    protected static class CacheValidation {
+        /** Topology validation result. */
+        private boolean valid;
+
+        /** Lost partitions on this topology version. */
+        private Collection<Integer> lostParts;
+
+        /**
+         * @param valid Valid flag.
+         * @param lostParts Lost partitions.
+         */
+        private CacheValidation(boolean valid, Collection<Integer> lostParts) {
+            this.valid = valid;
+            this.lostParts = lostParts;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index c7c4280..631fe10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -97,6 +99,10 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     /** Preload keys. */
     private BitSet preloadKeys;
 
+    /** */
+    @GridDirectTransient
+    private List<IgniteTxKey> nearWritesCacheMissed;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -163,6 +169,13 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     }
 
     /**
+     * @return Near cache writes for which cache was not found (possible if client near cache was closed).
+     */
+    @Nullable public List<IgniteTxKey> nearWritesCacheMissed() {
+        return nearWritesCacheMissed;
+    }
+
+    /**
      * @return Near transaction ID.
      */
     public GridCacheVersion nearXidVersion() {
@@ -319,13 +332,37 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
             while (keyIter.hasNext()) {
                 IgniteTxKey key = keyIter.next();
 
-                key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr);
+                GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(key.cacheId());
+
+                if (cacheCtx != null) {
+                    key.finishUnmarshal(cacheCtx, ldr);
 
-                owned.put(key, valIter.next());
+                    owned.put(key, valIter.next());
+                }
             }
         }
 
-        unmarshalTx(nearWrites, true, ctx, ldr);
+        if (nearWrites != null) {
+            for (Iterator<IgniteTxEntry> it = nearWrites.iterator(); it.hasNext();) {
+                IgniteTxEntry e = it.next();
+
+                GridCacheContext<?, ?> cacheCtx = ctx.cacheContext(e.cacheId());
+
+                if (cacheCtx == null) {
+                    it.remove();
+
+                    if (nearWritesCacheMissed == null)
+                        nearWritesCacheMissed = new ArrayList<>();
+
+                    nearWritesCacheMissed.add(e.txKey());
+                }
+                else {
+                    e.context(cacheCtx);
+
+                    e.unmarshal(ctx, true, ldr);
+                }
+            }
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 5dcb98c..0c2bf81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -124,7 +124,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     /**
      * @return Evicted readers.
      */
-    Collection<IgniteTxKey> nearEvicted() {
+    public Collection<IgniteTxKey> nearEvicted() {
         return nearEvicted;
     }
 
@@ -194,7 +194,9 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
             for (IgniteTxKey key : nearEvicted) {
                 GridCacheContext cctx = ctx.cacheContext(key.cacheId());
 
-                key.prepareMarshal(cctx);
+                // Can be null if client near cache was removed, in this case assume do not need prepareMarshal.
+                if (cctx != null)
+                    key.prepareMarshal(cctx);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 67e3ebc..52f007a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -3240,9 +3240,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         GridDhtAtomicUpdateResponse dhtRes = null;
 
-        if (isNearEnabled(cacheCfg)) {
-            List<KeyCacheObject> nearEvicted =
-                ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
+        if (req.nearSize() > 0) {
+            List<KeyCacheObject> nearEvicted;
+
+            if (isNearEnabled(ctx))
+                nearEvicted = ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
+            else {
+                nearEvicted = new ArrayList<>(req.nearSize());
+
+                for (int i = 0; i < req.nearSize(); i++)
+                    nearEvicted.add(req.nearKey(i));
+            }
 
             if (nearEvicted != null) {
                 dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/4026ddcc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 7b2547a..70bf6f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -118,7 +118,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheIdMessage implements G
     /**
      * @param nearEvicted Evicted near cache keys.
      */
-    void nearEvicted(List<KeyCacheObject> nearEvicted) {
+    public void nearEvicted(List<KeyCacheObject> nearEvicted) {
         this.nearEvicted = nearEvicted;
     }
 
@@ -133,10 +133,13 @@ public class GridDhtAtomicUpdateResponse extends GridCacheIdMessage implements G
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        prepareMarshalCacheObjects(nearEvicted, cctx);
+        // Can be null if client near cache was removed, in this case assume do not need prepareMarshal.
+        if (cctx != null) {
+            prepareMarshalCacheObjects(nearEvicted, cctx);
 
-        if (errs != null)
-            errs.prepareMarshal(this, cctx);
+            if (errs != null)
+                errs.prepareMarshal(this, cctx);
+        }
     }
 
     /** {@inheritDoc} */


Mime
View raw message