ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [4/7] ignite git commit: ignite-1452 Cancel cache operations on node stop
Date Thu, 17 Sep 2015 18:54:53 GMT
ignite-1452 Cancel cache operations on node stop


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/585761f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/585761f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/585761f2

Branch: refs/heads/ignite-1171
Commit: 585761f28e8b70487eaf2198d6ea39f7232b088d
Parents: b8c0b30
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Sep 17 16:26:02 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Sep 17 16:26:02 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  7 ---
 .../processors/cache/GridCacheContext.java      |  6 +--
 .../cache/GridCacheEvictionManager.java         |  6 +--
 .../cache/GridCacheEvictionResponse.java        |  2 +-
 .../processors/cache/GridCacheIoManager.java    | 47 +++++++++++++-------
 .../processors/cache/GridCacheMessage.java      |  7 +++
 .../processors/cache/GridCacheMvccManager.java  | 34 +++++++++++---
 .../GridCachePartitionExchangeManager.java      | 41 +++++++++++++----
 .../processors/cache/GridCacheProcessor.java    | 28 ++++++++----
 .../GridDistributedLockResponse.java            |  6 +--
 .../GridDistributedTxPrepareResponse.java       |  6 +--
 .../distributed/dht/GridDhtTopologyFuture.java  |  6 ++-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  2 +-
 .../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 16 ++++---
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  2 +
 .../atomic/GridNearAtomicUpdateResponse.java    | 11 ++---
 .../colocated/GridDhtColocatedLockFuture.java   | 44 ++++++++++++++----
 .../dht/preloader/GridDhtForceKeysFuture.java   |  2 +-
 .../dht/preloader/GridDhtForceKeysResponse.java |  6 +--
 .../GridDhtPartitionsExchangeFuture.java        | 19 ++++++--
 .../distributed/near/GridNearGetResponse.java   |  6 +--
 .../distributed/near/GridNearLockFuture.java    | 26 ++++++++---
 .../near/GridNearOptimisticTxPrepareFuture.java | 20 +++++++--
 .../near/GridNearTxFinishResponse.java          |  6 +--
 .../cache/query/GridCacheQueryResponse.java     |  6 +--
 .../continuous/CacheContinuousQueryHandler.java | 12 +++--
 .../transactions/IgniteTxLocalAdapter.java      |  4 +-
 .../ignite/internal/util/GridSpinBusyLock.java  | 10 +++++
 .../IgniteCacheEntryProcessorNodeJoinTest.java  | 24 +++++++---
 .../loadtests/hashmap/GridCacheTestContext.java |  4 +-
 .../IgniteCacheQueryNodeRestartSelfTest2.java   |  2 -
 32 files changed, 292 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index daf7d23..82db059 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1806,8 +1806,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 notifyLifecycleBeansEx(LifecycleEventType.BEFORE_NODE_STOP);
             }
 
-            GridCacheProcessor cacheProcessor = ctx.cache();
-
             List<GridComponent> comps = ctx.components();
 
             ctx.marshallerContext().onKernalStop();
@@ -1856,11 +1854,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                     // Note that interrupted flag is cleared.
                     interrupted = true;
                 }
-                finally {
-                    // Cleanup even on successful acquire.
-                    if (cacheProcessor != null)
-                        cacheProcessor.cancelUserOperations();
-                }
             }
 
             if (interrupted)

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 86ba3e6..5385dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -283,12 +283,12 @@ public class GridCacheContext<K, V> implements Externalizable {
         GridCacheEvictionManager evictMgr,
         GridCacheQueryManager<K, V> qryMgr,
         CacheContinuousQueryManager contQryMgr,
-        GridCacheAffinityManager affMgr,
         CacheDataStructuresManager dataStructuresMgr,
         GridCacheTtlManager ttlMgr,
         GridCacheDrManager drMgr,
         CacheConflictResolutionManager<K, V> rslvrMgr,
-        CachePluginManager pluginMgr
+        CachePluginManager pluginMgr,
+        GridCacheAffinityManager affMgr
     ) {
         assert ctx != null;
         assert sharedCtx != null;
@@ -323,12 +323,12 @@ public class GridCacheContext<K, V> implements Externalizable {
         this.evictMgr = add(evictMgr);
         this.qryMgr = add(qryMgr);
         this.contQryMgr = add(contQryMgr);
-        this.affMgr = add(affMgr);
         this.dataStructuresMgr = add(dataStructuresMgr);
         this.ttlMgr = add(ttlMgr);
         this.drMgr = add(drMgr);
         this.rslvrMgr = add(rslvrMgr);
         this.pluginMgr = add(pluginMgr);
+        this.affMgr = add(affMgr);
 
         log = ctx.log(getClass());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 3e0e2f9..1c34c76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1943,7 +1943,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                     lock.readLock().unlock();
                 }
 
-                if (res.error())
+                if (res.evictError())
                     // Complete future, since there was a class loading error on at least one node.
                     complete(false);
                 else
@@ -1985,14 +1985,14 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
                 boolean err = F.forAny(resMap.values(), new P1<GridCacheEvictionResponse>() {
                     @Override public boolean apply(GridCacheEvictionResponse res) {
-                        return res.error();
+                        return res.evictError();
                     }
                 });
 
                 if (err) {
                     Collection<UUID> ids = F.view(resMap.keySet(), new P1<UUID>() {
                         @Override public boolean apply(UUID e) {
-                            return resMap.get(e).error();
+                            return resMap.get(e).evictError();
                         }
                     });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
index 4d40c8d..aa3911b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionResponse.java
@@ -116,7 +116,7 @@ public class GridCacheEvictionResponse extends GridCacheMessage {
     /**
      * @return {@code True} if request processing has finished with error.
      */
-    boolean error() {
+    boolean evictError() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b55c84d..421ec82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -182,8 +182,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass()));
 
         if (c == null) {
-            U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg +
-                ", nodeId=" + nodeId + ']');
+            if (cctx.kernalContext().isStopping()) {
+                if (log.isDebugEnabled())
+                    log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg +
+                        ", nodeId=" + nodeId + ']');
+            }
+            else {
+                U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg +
+                    ", nodeId=" + nodeId + ']');
+            }
 
             return;
         }
@@ -596,9 +603,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      *
      * @param msg Message to send.
      * @param destNodeId Destination node ID.
+     * @return {@code True} if should send message.
      * @throws IgniteCheckedException If failed.
      */
-    private void onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
+    private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws IgniteCheckedException {
+        if (msg.error() != null && cctx.kernalContext().isStopping())
+            return false;
+
         if (msg.messageId() < 0)
             // Generate and set message ID.
             msg.messageId(idGen.incrementAndGet());
@@ -609,6 +620,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             if (depEnabled && msg instanceof GridCacheDeployable)
                 cctx.deploy().prepare((GridCacheDeployable)msg);
         }
+
+        return true;
     }
 
     /**
@@ -624,7 +637,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     public void send(ClusterNode node, GridCacheMessage msg, byte plc) throws IgniteCheckedException {
         assert !node.isLocal();
 
-        onSend(msg, node.id());
+        if (!onSend(msg, node.id()))
+            return;
 
         if (log.isDebugEnabled())
             log.debug("Sending cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
@@ -663,12 +677,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      * @param msg Message to send.
      * @param plc IO policy.
      * @param fallback Callback for failed nodes.
-     * @return {@code True} if nodes are empty or message was sent, {@code false} if
-     *      all nodes have left topology while sending this message.
      * @throws IgniteCheckedException If send failed.
      */
     @SuppressWarnings({"BusyWait", "unchecked"})
-    public boolean safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc,
+    public void safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc,
         @Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException {
         assert nodes != null;
         assert msg != null;
@@ -677,10 +689,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             if (log.isDebugEnabled())
                 log.debug("Message will not be sent as collection of nodes is empty: " + msg);
 
-            return true;
+            return;
         }
 
-        onSend(msg, null);
+        if (!onSend(msg, null))
+            return;
 
         if (log.isDebugEnabled())
             log.debug("Sending cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
@@ -709,7 +722,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                         if (fallback != null && !fallback.apply(n))
                             // If fallback signalled to stop.
-                            return false;
+                            return;
 
                         added = true;
                     }
@@ -721,7 +734,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                             log.debug("Message will not be sent because all nodes left topology [msg=" + msg +
                                 ", nodes=" + U.toShortString(nodes) + ']');
 
-                        return false;
+                        return;
                     }
                 }
 
@@ -737,7 +750,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                         if (fallback != null && !fallback.apply(n))
                             // If fallback signalled to stop.
-                            return false;
+                            return;
 
                         added = true;
                     }
@@ -757,7 +770,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                         log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" +
                             U.toShortString(nodes) + ']');
 
-                    return false;
+                    return;
                 }
 
                 if (log.isDebugEnabled())
@@ -768,8 +781,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
         if (log.isDebugEnabled())
             log.debug("Sent cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
-
-        return true;
     }
 
     /**
@@ -800,7 +811,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
      */
     public void sendOrderedMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc,
         long timeout) throws IgniteCheckedException {
-        onSend(msg, node.id());
+        if (!onSend(msg, node.id()))
+            return;
 
         int cnt = 0;
 
@@ -854,7 +866,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
         assert node != null;
         assert msg != null;
 
-        onSend(msg, null);
+        if (!onSend(msg, null))
+            return;
 
         try {
             cctx.gridIO().send(node, TOPIC_CACHE, msg, plc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 4e737a0..55688e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -77,6 +77,13 @@ public abstract class GridCacheMessage implements Message {
     protected int cacheId;
 
     /**
+     * @return Error, if any.
+     */
+    @Nullable public Throwable error() {
+        return null;
+    }
+
+    /**
      * Gets next ID for indexed message ID.
      *
      * @return Message ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 555bbda..e2d0302 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -120,6 +120,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
     private IgniteLogger exchLog;
 
+    /** */
+    private volatile boolean stopping;
+
     /** Lock callback. */
     @GridToStringExclude
     private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
@@ -325,8 +328,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     /**
      * Cancels all client futures.
      */
-    public void cancelClientFutures() {
-        cancelClientFutures(new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+    public void onStop() {
+        stopping = true;
+
+        cancelClientFutures(stopError());
     }
 
     /** {@inheritDoc} */
@@ -362,6 +367,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Node stop exception.
+     */
+    private IgniteCheckedException stopError() {
+        return new IgniteCheckedException("Operation has been cancelled (node is stopping).");
+    }
+
+    /**
      * @param from From version.
      * @return To version.
      */
@@ -385,8 +397,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
 
         assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
 
-        if (cctx.kernalContext().clientDisconnected())
-            ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+        onFutureAdded(fut);
     }
 
     /**
@@ -507,17 +518,26 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 fut.onNodeLeft(n.id());
         }
 
-        if (cctx.kernalContext().clientDisconnected())
-            ((GridFutureAdapter)fut).onDone(disconnectedError(null));
-
         // Just in case if future was completed before it was added.
         if (fut.isDone())
             removeFuture(fut);
+        else
+            onFutureAdded(fut);
 
         return true;
     }
 
     /**
+     * @param fut Future.
+     */
+    private void onFutureAdded(IgniteInternalFuture<?> fut) {
+        if (stopping)
+            ((GridFutureAdapter)fut).onDone(stopError());
+        else if (cctx.kernalContext().clientDisconnected())
+            ((GridFutureAdapter)fut).onDone(disconnectedError(null));
+    }
+
+    /**
      * @param fut Future to remove.
      * @return {@code True} if removed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 20340d1..34c571c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -147,6 +147,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private ExchangeFutureSet exchFuts = new ExchangeFutureSet();
 
+    /** */
+    private volatile IgniteCheckedException stopErr;
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -381,7 +384,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class);
         cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class);
 
-        IgniteCheckedException err = cctx.kernalContext().clientDisconnected() ?
+        stopErr = cctx.kernalContext().clientDisconnected() ?
             new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(),
                 "Client node disconnected: " + cctx.gridName()) :
             new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
@@ -391,11 +394,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         if (exchFuts0 != null) {
             for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
-                f.onDone(err);
+                f.onDone(stopErr);
         }
 
         for (AffinityReadyFuture f : readyFuts.values())
-            f.onDone(err);
+            f.onDone(stopErr);
+
+        for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts)
+            f.onDone(stopErr);
+
+        if (locExchFut != null)
+            locExchFut.onDone(stopErr);
 
         U.cancel(exchWorker);
 
@@ -519,6 +528,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             fut.onDone(topVer);
         }
+        else if (stopErr != null)
+            fut.onDone(stopErr);
 
         return fut;
     }
@@ -791,6 +802,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (discoEvt != null)
             fut.onEvent(exchId, discoEvt);
 
+        if (stopErr != null)
+            fut.onDone(stopErr);
+
         return fut;
     }
 
@@ -799,12 +813,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param err Error.
      */
     public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) {
-        if (err == null) {
-            AffinityTopologyVersion topVer = exchFut.topologyVersion();
+        AffinityTopologyVersion topVer = exchFut.topologyVersion();
 
-            if (log.isDebugEnabled())
-                log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ']');
+        if (log.isDebugEnabled())
+            log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
 
+        if (err == null) {
             while (true) {
                 AffinityTopologyVersion readyVer = readyTopVer.get();
 
@@ -825,8 +839,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
             }
         }
-        else if (log.isDebugEnabled())
-            log.debug("Exchange done with error [fut=" + exchFut + ", err=" + err + ']');
+        else {
+            for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
+                if (entry.getKey().compareTo(topVer) <= 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Completing created topology ready future with error " +
+                            "[ver=" + topVer + ", fut=" + entry.getValue() + ']');
+
+                    entry.getValue().onDone(err);
+                }
+            }
+        }
 
         ExchangeFutureSet exchFuts0 = exchFuts;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 4ae0baa..c92de7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -960,6 +960,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
         }
 
+        cancelFutures();
+
         List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();
 
         for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
@@ -1323,12 +1325,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             evictMgr,
             qryMgr,
             contQryMgr,
-            affMgr,
             dataStructuresMgr,
             ttlMgr,
             drMgr,
             rslvrMgr,
-            pluginMgr
+            pluginMgr,
+            affMgr
         );
 
         cacheCtx.cacheObjectContext(cacheObjCtx);
@@ -1452,12 +1454,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 evictMgr,
                 qryMgr,
                 contQryMgr,
-                affMgr,
                 dataStructuresMgr,
                 ttlMgr,
                 drMgr,
                 rslvrMgr,
-                pluginMgr
+                pluginMgr,
+                affMgr
             );
 
             cacheCtx.cacheObjectContext(cacheObjCtx);
@@ -2325,9 +2327,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             try {
                 ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
 
-                if (ctx.clientDisconnected())
+                if (ctx.isStopping()) {
+                    err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+                        "node is stopping.");
+                }
+                else if (ctx.clientDisconnected()) {
                     err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
                         "Failed to execute dynamic cache change request, client node disconnected.");
+                }
             }
             catch (IgniteCheckedException e) {
                 err = e;
@@ -3036,9 +3043,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         try {
             ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));
 
-            if (ctx.clientDisconnected())
+            if (ctx.isStopping()) {
+                err = new IgniteCheckedException("Failed to execute dynamic cache change request, " +
+                    "node is stopping.");
+            }
+            else if (ctx.clientDisconnected()) {
                 err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
                     "Failed to execute dynamic cache change request, client node disconnected.");
+            }
         }
         catch (IgniteCheckedException e) {
             err = e;
@@ -3104,8 +3116,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * Cancel all user operations.
      */
-    public void cancelUserOperations() {
-        sharedCtx.mvcc().cancelClientFutures();
+    private void cancelFutures() {
+        sharedCtx.mvcc().onStop();
 
         Exception err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index cdb878d..8a95b14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -137,10 +137,8 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
         return futId;
     }
 
-    /**
-     * @return Error.
-     */
-    public Throwable error() {
+    /** {@inheritDoc} */
+    @Override public Throwable error() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 4264830..e798458 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -67,10 +67,8 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
         this.err = err;
     }
 
-    /**
-     * @return Error.
-     */
-    public Throwable error() {
+    /** {@inheritDoc} */
+    @Override public Throwable error() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
index c11a3d7..6ade26f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Future that implements a barrier after which dht topology is safe to use. Topology is considered to be
@@ -38,9 +39,10 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo
     public AffinityTopologyVersion topologyVersion();
 
     /**
-     * Returns is cache topology valid.
+     * Returns error is cache topology is not valid.
+     *
      * @param cctx Cache context.
      * @return valid ot not.
      */
-    public boolean isCacheTopologyValid(GridCacheContext cctx);
+    @Nullable public Throwable validateCache(GridCacheContext cctx);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index b9514a9..1a869e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1217,7 +1217,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
         Throwable err = res.error();
 
         // Log error before sending reply.
-        if (err != null && !(err instanceof GridCacheLockTimeoutException))
+        if (err != null && !(err instanceof GridCacheLockTimeoutException) && !ctx.kernalContext().isStopping())
             U.error(log, "Failed to acquire lock for request: " + req, err);
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 33651bc..04d36e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -97,16 +97,15 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
     /**
      * Sets update error.
-     * @param err
+     *
+     * @param err Error.
      */
     public void onError(IgniteCheckedException err){
         this.err = err;
     }
 
-    /**
-     * @return Gets update error.
-     */
-    public IgniteCheckedException error() {
+    /** {@inheritDoc} */
+    @Override public IgniteCheckedException error() {
         return err;
     }
 
@@ -154,8 +153,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
         nearEvicted.add(key);
     }
 
-    /** {@inheritDoc}
-     * @param ctx*/
+    /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index d93f68f..fb2c5ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -385,9 +385,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
 
             if (fut.isDone()) {
-                if (!fut.isCacheTopologyValid(cctx)) {
-                    onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
-                        cctx.name()));
+                Throwable err = fut.validateCache(cctx);
+
+                if (err != null) {
+                    onDone(err);
 
                     return;
                 }
@@ -811,6 +812,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
 
             Exception err = null;
+            GridNearAtomicUpdateRequest singleReq0 = null;
             Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
 
             int size = keys.size();
@@ -837,13 +839,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     if (size == 1 && !fastMap) {
                         assert remapKeys == null || remapKeys.size() == 1;
 
-                        singleReq = mapSingleUpdate();
+                        singleReq0 = singleReq = mapSingleUpdate();
                     }
                     else {
                         pendingMappings = mapUpdate(topNodes);
 
                         if (pendingMappings.size() == 1)
-                            singleReq = F.firstValue(pendingMappings);
+                            singleReq0 = singleReq = F.firstValue(pendingMappings);
                         else {
                             if (syncMode == PRIMARY_SYNC) {
                                 mappings = U.newHashMap(pendingMappings.size());
@@ -874,8 +876,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
 
             // Optimize mapping for single key.
-            if (singleReq != null)
-                mapSingle(singleReq.nodeId(), singleReq);
+            if (singleReq0 != null)
+                mapSingle(singleReq0.nodeId(), singleReq0);
             else {
                 assert pendingMappings != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 5f5fbb5..ccb67d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -198,6 +198,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         boolean skipStore,
         boolean clientReq
     ) {
+        assert futVer != null;
+
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futVer = futVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 8bc145c..376f4ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -116,6 +116,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param futVer Future version.
      */
     public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
+        assert futVer != null;
+
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futVer = futVer;
@@ -149,16 +151,15 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /**
      * Sets update error.
-     * @param err
+     *
+     * @param err Error.
      */
     public void error(IgniteCheckedException err){
         this.err = err;
     }
 
-    /**
-     * @return Update error, if any.
-     */
-    public IgniteCheckedException error() {
+    /** {@inheritDoc} */
+    @Override public IgniteCheckedException error() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 596ec77..1a08265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -524,7 +525,22 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridDhtColocatedLockFuture.class, this, "inTx", inTx(), "super", super.toString());
+        Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+            @Override public String apply(IgniteInternalFuture<?> f) {
+                if (isMini(f)) {
+                    MiniFuture m = (MiniFuture)f;
+
+                    return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+                }
+                else
+                    return "[loc=true, done=" + f.isDone() + "]";
+            }
+        });
+
+        return S.toString(GridDhtColocatedLockFuture.class, this,
+            "innerFuts", futs,
+            "inTx", inTx(),
+            "super", super.toString());
     }
 
     /**
@@ -565,9 +581,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
         if (topVer != null) {
             for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
                 if (fut.topologyVersion().equals(topVer)){
-                    if (!fut.isCacheTopologyValid(cctx)) {
-                        onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
-                            cctx.name()));
+                    Throwable err = fut.validateCache(cctx);
+
+                    if (err != null) {
+                        onDone(err);
 
                         return;
                     }
@@ -612,9 +629,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
             if (fut.isDone()) {
-                if (!fut.isCacheTopologyValid(cctx)) {
-                    onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
-                        cctx.name()));
+                Throwable err = fut.validateCache(cctx);
+
+                if (err != null) {
+                    onDone(err);
 
                     return;
                 }
@@ -643,10 +661,15 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
             }
             else {
                 fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                         try {
+                            fut.get();
+
                             mapOnTopology(remap, c);
                         }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
                         finally {
                             cctx.shared().txContextReset();
                         }
@@ -1327,8 +1350,13 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                         affFut.listen(new CI1<IgniteInternalFuture<?>>() {
                             @Override public void apply(IgniteInternalFuture<?> fut) {
                                 try {
+                                    fut.get();
+
                                     remap();
                                 }
+                                catch (IgniteCheckedException e) {
+                                    onDone(e);
+                                }
                                 finally {
                                     cctx.shared().txContextReset();
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 36a2da1..eaed424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -283,7 +283,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                         // Fail the whole thing.
                         if (e instanceof ClusterTopologyCheckedException)
                             fut.onResult((ClusterTopologyCheckedException)e);
-                        else
+                        else if (!cctx.kernalContext().isStopping())
                             fut.onResult(e);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index d31f096..93e39ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -98,10 +98,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
         this.err = err;
     }
 
-    /**
-     * @return Error, if any.
-     */
-    public IgniteCheckedException error() {
+    /** {@inheritDoc} */
+    @Override public IgniteCheckedException error() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 865bbdc..a1b03c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1081,9 +1081,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isCacheTopologyValid(GridCacheContext cctx) {
-        return cctx.config().getTopologyValidator() != null && cacheValidRes.containsKey(cctx.cacheId()) ?
-            cacheValidRes.get(cctx.cacheId()) : true;
+    @Override public Throwable validateCache(GridCacheContext cctx) {
+        Throwable err = error();
+
+        if (err != null)
+            return err;
+
+        if (cctx.config().getTopologyValidator() != null) {
+            Boolean res = cacheValidRes.get(cctx.cacheId());
+
+            if (res != null && !res) {
+                return new IgniteCheckedException("Failed to perform cache operation " +
+                    "(cache topology is not valid): " + cctx.name());
+            }
+        }
+
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 3276377..d4493a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -163,10 +163,8 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
         return topVer != null ? topVer : super.topologyVersion();
     }
 
-    /**
-     * @return Error.
-     */
-    public IgniteCheckedException error() {
+    /** {@inheritDoc} */
+    @Override public IgniteCheckedException error() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index f3e5ca3..dcc8da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -703,9 +703,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
         if (topVer != null) {
             for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){
                 if (fut.topologyVersion().equals(topVer)){
-                    if (!fut.isCacheTopologyValid(cctx)) {
-                        onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
-                            cctx.name()));
+                    Throwable err = fut.validateCache(cctx);
+
+                    if (err != null) {
+                        onDone(err);
 
                         return;
                     }
@@ -749,9 +750,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
             if (fut.isDone()) {
-                if (!fut.isCacheTopologyValid(cctx)) {
-                    onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
-                        cctx.name()));
+                Throwable err = fut.validateCache(cctx);
+
+                if (err != null) {
+                    onDone(err);
 
                     return;
                 }
@@ -777,10 +779,15 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
             }
             else {
                 fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                         try {
+                            fut.get();
+
                             mapOnTopology(remap);
                         }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
                         finally {
                             cctx.shared().txContextReset();
                         }
@@ -1435,8 +1442,13 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
                         affFut.listen(new CI1<IgniteInternalFuture<?>>() {
                             @Override public void apply(IgniteInternalFuture<?> fut) {
                                 try {
+                                    fut.get();
+
                                     remap();
                                 }
+                                catch (IgniteCheckedException e) {
+                                    onDone(e);
+                                }
                                 finally {
                                     cctx.shared().txContextReset();
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2048fdf..25028c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -319,7 +319,9 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
 
                 assert ctx != null : cacheId;
 
-                if (!topFut.isCacheTopologyValid(ctx)) {
+                Throwable err = topFut.validateCache(ctx);
+
+                if (err != null) {
                     if (invalidCaches != null)
                         invalidCaches.append(", ");
                     else
@@ -343,12 +345,17 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
         }
         else {
             topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+                @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
                     cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
                         @Override public void run() {
                             try {
+                                fut.get();
+
                                 prepareOnTopology(remap, c);
                             }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
                             finally {
                                 cctx.txContextReset();
                             }
@@ -841,7 +848,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
                         if (affFut != null && !affFut.isDone()) {
                             affFut.listen(new CI1<IgniteInternalFuture<?>>() {
                                 @Override public void apply(IgniteInternalFuture<?> fut) {
-                                    remap();
+                                    try {
+                                        fut.get();
+
+                                        remap();
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        onDone(e);
+                                    }
                                 }
                             });
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index cec7d73..c860baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -75,10 +75,8 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
         this.err = err;
     }
 
-    /**
-     * @return Error.
-     */
-    @Nullable public Throwable error() {
+    /** {@inheritDoc} */
+    @Nullable @Override public Throwable error() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 3e4cdeb..78e2ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -193,10 +193,8 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
         return reqId;
     }
 
-    /**
-     * @return Error.
-     */
-    public Throwable error() {
+    /** {@inheritDoc} */
+    @Override public Throwable error() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index df6b4b7..c99e07f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -97,6 +98,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     /** Whether to skip primary check for REPLICATED cache. */
     private transient boolean skipPrimaryCheck;
 
+    /** */
+    private transient int cacheId;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -145,6 +149,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         this.ignoreExpired = ignoreExpired;
         this.taskHash = taskHash;
         this.skipPrimaryCheck = skipPrimaryCheck;
+
+        cacheId = CU.cacheId(cacheName);
     }
 
     /** {@inheritDoc} */
@@ -457,6 +463,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         sync = in.readBoolean();
         ignoreExpired = in.readBoolean();
         taskHash = in.readInt();
+
+        cacheId = CU.cacheId(cacheName);
     }
 
     /**
@@ -466,9 +474,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) {
         assert ctx != null;
 
-        GridCacheAdapter<K, V> cache = ctx.cache().internalCache(cacheName);
-
-        return cache == null ? null : cache.context();
+        return ctx.cache().<K, V>context().cacheContext(cacheId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 00b91dd..6ca1f72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1105,6 +1105,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
     /**
      * Commits transaction to transaction manager. Used for one-phase commit transactions only.
+     *
+     * @param commit If {@code true} commits transaction, otherwise rollbacks.
      */
     public void tmFinish(boolean commit) {
         assert onePhaseCommit();
@@ -1118,7 +1120,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
             state(commit ? COMMITTED : ROLLED_BACK);
 
-            boolean needsCompletedVersions = needsCompletedVersions();
+            boolean needsCompletedVersions = commit && needsCompletedVersions();
 
             assert !needsCompletedVersions || completedBase != null;
             assert !needsCompletedVersions || committedVers != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
index 2aae6ef..6bfd4fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridSpinBusyLock.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.util;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 
 /**
@@ -76,6 +77,15 @@ public class GridSpinBusyLock {
     }
 
     /**
+     * @param millis Timeout.
+     * @return {@code True} if lock was acquired.
+     * @throws InterruptedException If interrupted.
+     */
+    public boolean tryBlock(long millis) throws InterruptedException {
+        return lock.tryWriteLock(millis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
      * Makes possible for activities entering busy state again.
      */
     public void unblock() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 6b4d473..151167a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -184,20 +185,29 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
 
                     String val = "value-" + k;
 
-                    cache.invoke(key, new Processor(val));
+                    procs.put(key, new Processor(val));
                 }
 
-                cache.invokeAll(procs);
+                Map<String, EntryProcessorResult<Integer>> resMap = cache.invokeAll(procs);
+
+                for (String key : procs.keySet()) {
+                    EntryProcessorResult<Integer> res = resMap.get(key);
+
+                    assertNotNull(res);
+                    assertEquals(k + 1, (Object) res.get());
+                }
             }
             else {
+                IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+
                 for (int i = 0; i < NUM_SETS; i++) {
                     String key = "set-" + i;
 
                     String val = "value-" + k;
 
-                    IgniteCache<String, Set<String>> cache = ignite(0).cache(null);
+                    Integer valsCnt = cache.invoke(key, new Processor(val));
 
-                    cache.invoke(key, new Processor(val));
+                    assertEquals(k + 1, (Object)valsCnt);
                 }
             }
         }
@@ -275,7 +285,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
     }
 
     /** */
-    private static class Processor implements EntryProcessor<String, Set<String>, Void>, Serializable {
+    private static class Processor implements EntryProcessor<String, Set<String>, Integer>, Serializable {
         /** */
         private String val;
 
@@ -287,7 +297,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
         }
 
         /** {@inheritDoc} */
-        @Override public Void process(MutableEntry<String, Set<String>> e, Object... args) {
+        @Override public Integer process(MutableEntry<String, Set<String>> e, Object... args) {
             Set<String> vals = e.getValue();
 
             if (vals == null)
@@ -297,7 +307,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
 
             e.setValue(vals);
 
-            return null;
+            return vals.size();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 7aae48c..88605b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -79,12 +79,12 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
             new GridCacheEvictionManager(),
             new GridCacheLocalQueryManager<K, V>(),
             new CacheContinuousQueryManager(),
-            new GridCacheAffinityManager(),
             new CacheDataStructuresManager(),
             new GridCacheTtlManager(),
             new GridOsCacheDrManager(),
             new CacheOsConflictResolutionManager<K, V>(),
-            new CachePluginManager(ctx, new CacheConfiguration())
+            new CachePluginManager(ctx, new CacheConfiguration()),
+            new GridCacheAffinityManager()
         );
 
         store().initialize(null, new IdentityHashMap<CacheStore, ThreadLocal>());

http://git-wip-us.apache.org/repos/asf/ignite/blob/585761f2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
index 1276405..e00611b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -185,8 +185,6 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
      * @throws Exception If failed.
      */
     public void testRestarts() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1452");
-
         int duration = 90 * 1000;
         int qryThreadNum = 4;
         int restartThreadsNum = 2; // 4 + 2 = 6 nodes


Mime
View raw message