ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5566 TcpCommunicationSpi: optimized stop procedure - do not close connections in onContextDestroyed0 (it is called before discovery stop so remote nodes still consider node as alive and try to reconnect) - when started stopping
Date Fri, 23 Jun 2017 11:04:28 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 63debcdb1 -> 039c02a67


ignite-5566 TcpCommunicationSpi: optimized stop procedure
- do not close connections in onContextDestroyed0 (it is called before discovery stop so remote nodes still consider node as alive and try to reconnect)
- when started stopping then on connection open reply with special message so that remote nodes do not retry reconnects


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/039c02a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/039c02a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/039c02a6

Branch: refs/heads/master
Commit: 039c02a67eb880af7550c79c6c5a92e1a589efa4
Parents: 63debcd
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jun 23 14:03:52 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jun 23 14:03:52 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridJobSiblingImpl.java     |  34 ++--
 .../managers/communication/GridIoManager.java   | 116 ++++++------
 .../deployment/GridDeploymentCommunication.java |   6 +
 .../eventstorage/GridEventStorageManager.java   |   5 +
 .../processors/cache/GridCacheIoManager.java    | 135 +++----------
 .../cache/binary/BinaryMetadataTransport.java   |   5 +
 .../dht/atomic/GridDhtAtomicCache.java          |   2 +-
 .../GridDhtPartitionsExchangeFuture.java        |  14 +-
 .../query/GridCacheDistributedQueryFuture.java  |  19 +-
 .../query/GridCacheDistributedQueryManager.java |  18 +-
 .../cache/transactions/IgniteTxManager.java     |   8 +
 .../continuous/GridContinuousProcessor.java     |   2 +-
 .../datastreamer/DataStreamerImpl.java          |   5 +
 .../internal/processors/igfs/IgfsContext.java   |  15 --
 .../processors/job/GridJobProcessor.java        |   3 +-
 .../internal/processors/job/GridJobWorker.java  |   2 +-
 .../GridMarshallerMappingProcessor.java         |   5 +
 .../processors/task/GridTaskProcessor.java      |   3 +-
 .../processors/task/GridTaskWorker.java         |   6 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  55 ++++--
 .../managers/IgniteDiagnosticMessagesTest.java  |  17 +-
 .../communication/GridIoManagerSelfTest.java    |   2 +-
 .../IgniteRejectConnectOnNodeStopTest.java      | 188 +++++++++++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   3 +
 .../query/h2/twostep/GridMapQueryExecutor.java  |   2 +-
 25 files changed, 412 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
index 79ac416..9c7f548 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingImpl.java
@@ -24,8 +24,10 @@ import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobSibling;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -145,31 +147,18 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
     @Override public void cancel() {
         GridTaskSessionImpl ses = ctx.session().getSession(sesId);
 
-        if (ses == null) {
-            Collection<ClusterNode> nodes = ctx.discovery().remoteNodes();
+        Collection<ClusterNode> nodes = ses == null ? ctx.discovery().remoteNodes() : ctx.discovery().nodes(ses.getTopology());
 
-            if (!nodes.isEmpty()) {
+        for (ClusterNode node : nodes) {
+            if (!ctx.localNodeId().equals(node.id())) {
                 try {
-                    ctx.io().sendToGridTopic(nodes, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
+                    ctx.io().sendToGridTopic(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(sesId, jobId), SYSTEM_POOL);
                 }
-                catch (IgniteCheckedException e) {
-                    throw U.convertException(e);
-                }
-            }
-
-            // Cancel local jobs directly.
-            ctx.job().cancelJob(sesId, jobId, false);
+                catch (ClusterTopologyCheckedException e) {
+                    IgniteLogger log = ctx.log(GridJobSiblingImpl.class);
 
-            return;
-        }
-
-        for (ClusterNode node : ctx.discovery().nodes(ses.getTopology())) {
-            if (ctx.localNodeId().equals(node.id()))
-                // Cancel local jobs directly.
-                ctx.job().cancelJob(ses.getId(), jobId, false);
-            else {
-                try {
-                    ctx.io().sendToGridTopic(node, TOPIC_JOB_CANCEL, new GridJobCancelRequest(ses.getId(), jobId), SYSTEM_POOL);
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send cancel request, node left [nodeId=" + node.id() + ", ses=" + ses + ']');
                 }
                 catch (IgniteCheckedException e) {
                     // Avoid stack trace for left nodes.
@@ -179,6 +168,9 @@ public class GridJobSiblingImpl implements ComputeJobSibling, Externalizable {
                 }
             }
         }
+
+        // Cancel local jobs directly.
+        ctx.job().cancelJob(sesId, jobId, false);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 81692da..a1ddaf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -54,9 +54,11 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -1567,6 +1569,21 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param sndErr Send error.
+     * @param ping {@code True} if try ping node.
+     * @return {@code True} if node left.
+     * @throws IgniteClientDisconnectedCheckedException If ping failed.
+     */
+    public boolean checkNodeLeft(UUID nodeId, IgniteCheckedException sndErr, boolean ping)
+        throws IgniteClientDisconnectedCheckedException
+    {
+        return sndErr instanceof ClusterTopologyCheckedException ||
+            ctx.discovery().node(nodeId) == null ||
+            (ping && !ctx.discovery().pingNode(nodeId));
+    }
+
+    /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
      * @param topicOrd GridTopic enumeration ordinal.
@@ -1628,6 +1645,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     getSpi().sendMessage(node, ioMsg);
             }
             catch (IgniteSpiException e) {
+                if (e.getCause() instanceof ClusterTopologyCheckedException)
+                    throw (ClusterTopologyCheckedException)e.getCause();
+
                 throw new IgniteCheckedException("Failed to send message (node may have left the grid or " +
                     "TCP connection cannot be established due to firewall issues) " +
                     "[node=" + node + ", topic=" + topic +
@@ -1767,7 +1787,22 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(nodes, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout);
+        IgniteCheckedException err = null;
+
+        for (ClusterNode node : nodes) {
+            try {
+                send(node, topic, topic.ordinal(), msg, plc, true, timeout, skipOnTimeout, null, false);
+            }
+            catch (IgniteCheckedException e) {
+                if (err == null)
+                    err = e;
+                else
+                    err.addSuppressed(e);
+            }
+        }
+
+        if (err != null)
+            throw err;
     }
 
     /**
@@ -1783,7 +1818,25 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         Message msg,
         byte plc
     ) throws IgniteCheckedException {
-        send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false);
+        assert F.find(nodes, null, F.localNode(locNodeId)) == null :
+            "Internal Ignite code should never call the method with local node in a node list.";
+
+        IgniteCheckedException err = null;
+
+        for (ClusterNode node : nodes) {
+            try {
+                send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
+            }
+            catch (IgniteCheckedException e) {
+                if (err == null)
+                    err = e;
+                else
+                    err.addSuppressed(e);
+            }
+        }
+
+        if (err != null)
+            throw err;
     }
 
     /**
@@ -1815,17 +1868,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      *
      * @param nodes Destination nodes.
      * @param msg Message to send.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg) throws IgniteCheckedException {
-        sendUserMessage(nodes, msg, null, false, 0, false);
-    }
-
-    /**
-     * Sends a peer deployable user message.
-     *
-     * @param nodes Destination nodes.
-     * @param msg Message to send.
      * @param topic Message topic to use.
      * @param ordered Is message ordered?
      * @param timeout Message timeout in milliseconds for ordered messages.
@@ -1959,54 +2001,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param nodes Destination nodes.
-     * @param topic Topic to send the message to.
-     * @param topicOrd Topic ordinal value.
-     * @param msg Message to send.
-     * @param plc Type of processing.
-     * @param ordered Ordered flag.
-     * @param timeout Message timeout.
-     * @param skipOnTimeout Whether message can be skipped in timeout.
-     * @throws IgniteCheckedException Thrown in case of any errors.
-     */
-    private void send(
-        Collection<? extends ClusterNode> nodes,
-        Object topic,
-        int topicOrd,
-        Message msg,
-        byte plc,
-        boolean ordered,
-        long timeout,
-        boolean skipOnTimeout
-    ) throws IgniteCheckedException {
-        assert nodes != null;
-        assert topic != null;
-        assert msg != null;
-
-        if (!ordered)
-            assert F.find(nodes, null, F.localNode(locNodeId)) == null :
-                "Internal Ignite code should never call the method with local node in a node list.";
-
-        try {
-            // Small optimization, as communication SPIs may have lighter implementation for sending
-            // messages to one node vs. many.
-            if (!nodes.isEmpty()) {
-                for (ClusterNode node : nodes)
-                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null, false);
-            }
-            else if (log.isDebugEnabled())
-                log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
-                    msg + ", policy=" + plc + ']');
-        }
-        catch (IgniteSpiException e) {
-            throw new IgniteCheckedException("Failed to send message (nodes may have left the grid or " +
-                "TCP connection cannot be established due to firewall issues) " +
-                "[nodes=" + nodes + ", topic=" + topic +
-                ", msg=" + msg + ", policy=" + plc + ']', e);
-        }
-    }
-
-    /**
      * @param topic Listener's topic.
      * @param lsnr Listener to add.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
index ffbde37..23d186a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentCommunication.java
@@ -28,6 +28,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -299,6 +300,11 @@ class GridDeploymentCommunication {
                 if (log.isDebugEnabled())
                     log.debug("Sent peer class loading response [node=" + node.id() + ", res=" + res + ']');
             }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send peer class loading response to node " +
+                        "(node does not exist): " + nodeId);
+            }
             catch (IgniteCheckedException e) {
                 if (ctx.discovery().pingNodeNoError(nodeId))
                     U.error(log, "Failed to send peer class loading response to node: " + nodeId, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 6d74bd0..bd43e43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -1276,6 +1277,10 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
 
                     ctx.io().sendToCustomTopic(node, req.responseTopic(), res, PUBLIC_POOL);
                 }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send event query response, node failed [node=" + nodeId + ']');
+                }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to send event query response to node [node=" + nodeId + ", res=" +
                         res + ']', e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 09eea27..a920bd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
@@ -1079,6 +1080,18 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param sndErr Send error.
+     * @return {@code True} if node left.
+     * @param ping {@code True} if try ping node.
+     * @throws IgniteClientDisconnectedCheckedException If ping failed.
+     */
+    public boolean checkNodeLeft(UUID nodeId, IgniteCheckedException sndErr, boolean ping)
+        throws IgniteClientDisconnectedCheckedException {
+        return cctx.gridIO().checkNodeLeft(nodeId, sndErr, ping);
+    }
+
+    /**
      * Sends communication message.
      *
      * @param node Node to send the message to.
@@ -1107,6 +1120,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 return;
             }
+            catch (ClusterTopologyCheckedException e) {
+                throw e;
+            }
             catch (IgniteCheckedException e) {
                 if (!cctx.discovery().alive(node.id()) || !cctx.discovery().pingNode(node.id()))
                     throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);
@@ -1125,119 +1141,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * Sends message and automatically accounts for lefts nodes.
-     *
-     * @param nodes Nodes to send to.
-     * @param msg Message to send.
-     * @param plc IO policy.
-     * @param fallback Callback for failed nodes.
-     * @throws IgniteCheckedException If send failed.
-     */
-    @SuppressWarnings({"BusyWait", "unchecked"})
-    public void safeSend(Collection<? extends ClusterNode> nodes, GridCacheMessage msg, byte plc,
-        @Nullable IgnitePredicate<ClusterNode> fallback) throws IgniteCheckedException {
-        assert nodes != null;
-        assert msg != null;
-
-        if (nodes.isEmpty()) {
-            if (log.isDebugEnabled())
-                log.debug("Message will not be sent as collection of nodes is empty: " + msg);
-
-            return;
-        }
-
-        if (!onSend(msg, null))
-            return;
-
-        if (log.isDebugEnabled())
-            log.debug("Sending cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
-
-        final Collection<UUID> leftIds = new GridLeanSet<>();
-
-        int cnt = 0;
-
-        while (cnt < retryCnt) {
-            try {
-                Collection<? extends ClusterNode> nodesView = F.view(nodes, new P1<ClusterNode>() {
-                    @Override public boolean apply(ClusterNode e) {
-                        return !leftIds.contains(e.id());
-                    }
-                });
-
-                cctx.gridIO().sendToGridTopic(nodesView, TOPIC_CACHE, msg, plc);
-
-                boolean added = false;
-
-                // Even if there is no exception, we still check here, as node could have
-                // ignored the message during stopping.
-                for (ClusterNode n : nodes) {
-                    if (!leftIds.contains(n.id()) && !cctx.discovery().alive(n.id())) {
-                        leftIds.add(n.id());
-
-                        if (fallback != null && !fallback.apply(n))
-                            // If fallback signalled to stop.
-                            return;
-
-                        added = true;
-                    }
-                }
-
-                if (added) {
-                    if (!F.exist(F.nodeIds(nodes), F0.not(F.contains(leftIds)))) {
-                        if (log.isDebugEnabled())
-                            log.debug("Message will not be sent because all nodes left topology [msg=" + msg +
-                                ", nodes=" + U.toShortString(nodes) + ']');
-
-                        return;
-                    }
-                }
-
-                break;
-            }
-            catch (IgniteCheckedException e) {
-                boolean added = false;
-
-                for (ClusterNode n : nodes) {
-                    if (!leftIds.contains(n.id()) &&
-                        (!cctx.discovery().alive(n.id()) || !cctx.discovery().pingNode(n.id()))) {
-                        leftIds.add(n.id());
-
-                        if (fallback != null && !fallback.apply(n))
-                            // If fallback signalled to stop.
-                            return;
-
-                        added = true;
-                    }
-                }
-
-                if (!added) {
-                    cnt++;
-
-                    if (cnt == retryCnt)
-                        throw e;
-
-                    U.sleep(retryDelay);
-                }
-
-                if (!F.exist(F.nodeIds(nodes), F0.not(F.contains(leftIds)))) {
-                    if (log.isDebugEnabled())
-                        log.debug("Message will not be sent because all nodes left topology [msg=" + msg + ", nodes=" +
-                            U.toShortString(nodes) + ']');
-
-                    return;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Message send will be retried [msg=" + msg + ", nodes=" + U.toShortString(nodes) +
-                        ", leftIds=" + leftIds + ']');
-            }
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Sent cache message [msg=" + msg + ", nodes=" + U.toShortString(nodes) + ']');
-    }
-
-    /**
      * Sends communication message.
      *
      * @param nodeId ID of node to send the message to.
@@ -1282,6 +1185,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                 return;
             }
+            catch (ClusterTopologyCheckedException e) {
+                throw e;
+            }
             catch (IgniteCheckedException e) {
                 if (cctx.discovery().node(node.id()) == null)
                     throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + node.id(), e);
@@ -1327,6 +1233,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             if (log.isDebugEnabled())
                 log.debug("Sent cache message [msg=" + msg + ", node=" + U.toShortString(node) + ']');
         }
+        catch (ClusterTopologyCheckedException e) {
+            throw e;
+        }
         catch (IgniteCheckedException e) {
             if (!cctx.discovery().alive(node.id()))
                 throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index e4df075..00c760f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.binary.BinaryMetadata;
 import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
@@ -589,6 +590,10 @@ final class BinaryMetadataTransport {
             try {
                 ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_METADATA_REQ, resp, SYSTEM_POOL);
             }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send metadata response, node failed: " + nodeId);
+            }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to send up-to-date metadata response.", e);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 52f007a..71cdaad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1870,7 +1870,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (dhtFut != null) {
                             if (req.writeSynchronizationMode() == PRIMARY_SYNC
                                 // To avoid deadlock disable back-pressure for sender data node.
-                                && !ctx.discovery().cacheAffinityNode(ctx.discovery().node(nodeId), ctx.name())
+                                && !ctx.discovery().cacheAffinityNode(node, ctx.name())
                                 && !dhtFut.isDone()) {
                                 final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index a7122b3..c8138f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1114,7 +1114,19 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
                 ", exchId=" + exchId + ", msg=" + m + ']');
 
-        cctx.io().safeSend(nodes, m, SYSTEM_POOL, null);
+        for (ClusterNode node : nodes) {
+            try {
+                cctx.io().send(node, m, SYSTEM_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                if (cctx.io().checkNodeLeft(node.id(), e, false)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send partitions, node failed: " + node);
+                }
+                else
+                    U.error(log, "Failed to send partitions [node=" + node + ']', e);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index da16db7..241a1e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -115,14 +115,19 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
             });
 
             if (!nodes.isEmpty()) {
-                cctx.io().safeSend(nodes, req, cctx.ioPolicy(),
-                    new P1<ClusterNode>() {
-                        @Override public boolean apply(ClusterNode node) {
-                            onNodeLeft(node.id());
-
-                            return !isDone();
+                for (ClusterNode node : nodes) {
+                    try {
+                        cctx.io().send(node, req, cctx.ioPolicy());
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (cctx.io().checkNodeLeft(node.id(), e, false)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to send cancel request, node failed: " + node);
                         }
-                    });
+                        else
+                            U.error(log, "Failed to send cancel request [node=" + node + ']', e);
+                    }
+                }
             }
         }
         catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index b112e1d..63228a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -800,13 +800,21 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
         // For example, a remote reducer has a state, we should not serialize and then send
         // the reducer changed by the local node.
         if (!F.isEmpty(rmtNodes)) {
-            cctx.io().safeSend(rmtNodes, req, GridIoPolicy.QUERY_POOL, new P1<ClusterNode>() {
-                @Override public boolean apply(ClusterNode node) {
-                    fut.onNodeLeft(node.id());
+            for (ClusterNode node : rmtNodes) {
+                try {
+                    cctx.io().send(node, req, GridIoPolicy.QUERY_POOL);
+                }
+                catch (IgniteCheckedException e) {
+                    if (cctx.io().checkNodeLeft(node.id(), e, true)) {
+                        fut.onNodeLeft(node.id());
 
-                    return !fut.isDone();
+                        if (fut.isDone())
+                            return;
+                    }
+                    else
+                        throw e;
                 }
-            });
+            }
         }
 
         if (locNode != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 0877305..a9aa13d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2475,6 +2475,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
                         cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
                     }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send response, node failed: " + nodeId);
+                    }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e);
                     }
@@ -2512,6 +2516,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                     try {
                         cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL);
                     }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send response, node failed: " + nodeId);
+                    }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId +
                             ", res=" + res + ']', e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index a72dcd6..f641399 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1405,7 +1405,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                     break;
                 }
-                catch (IgniteInterruptedCheckedException e) {
+                catch (ClusterTopologyCheckedException | IgniteInterruptedCheckedException e) {
                     throw e;
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a991385..40988d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1709,6 +1709,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     if (log.isDebugEnabled())
                         log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
                 }
+                catch (ClusterTopologyCheckedException e) {
+                    GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut);
+
+                    fut0.onDone(e);
+                }
                 catch (IgniteCheckedException e) {
                     GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 0b2558a..0e049c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -177,21 +177,6 @@ public class IgfsContext {
     }
 
     /**
-     * @param node Node.
-     * @param topic Topic.
-     * @param msg Message.
-     * @param plc Policy.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void send(ClusterNode node, Object topic, IgfsCommunicationMessage msg, byte plc)
-        throws IgniteCheckedException {
-        if (!kernalContext().localNodeId().equals(node.id()))
-            msg.prepareMarshal(kernalContext().config().getMarshaller());
-
-        kernalContext().io().sendToCustomTopic(node, topic, msg, plc);
-    }
-
-    /**
      * Checks if given node is a IGFS node.
      *
      * @param node Node to check.

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index e0bc4d2..408396a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.GridTaskSessionRequest;
 import org.apache.ignite.internal.SkipDaemon;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.collision.GridCollisionJobContextAdapter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -1396,7 +1397,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
         }
         catch (IgniteCheckedException e) {
             // The only option here is to log, as we must assume that resending will fail too.
-            if (isDeadNode(node.id()))
+            if ((e instanceof ClusterTopologyCheckedException) || isDeadNode(node.id()))
                 // Avoid stack trace for left nodes.
                 U.error(log, "Failed to reply to sender node because it left grid [nodeId=" + node.id() +
                     ", jobId=" + req.getJobId() + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index c9129c1..56e3794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -929,7 +929,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
                         }
                         catch (IgniteCheckedException e) {
                             // Log and invoke the master-leave callback.
-                            if (isDeadNode(taskNode.id())) {
+                            if ((e instanceof ClusterTopologyCheckedException) || isDeadNode(taskNode.id())) {
                                 onMasterNodeLeft();
 
                                 // Avoid stack trace for left nodes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 9b65aa5..8de6c49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -30,6 +30,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
@@ -184,6 +185,10 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
                         new MissingMappingResponseMessage(platformId, typeId, resolvedClsName),
                         SYSTEM_POOL);
             }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send missing mapping response, node failed: " + nodeId);
+            }
             catch (IgniteCheckedException e) {
                 U.error(log, "Failed to send missing mapping response.", e);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index e73d292..6ae97dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.GridTaskSessionRequest;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -994,7 +995,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
                             false);
                     }
                     catch (IgniteCheckedException e) {
-                        node = ctx.discovery().node(nodeId);
+                        node = e instanceof  ClusterTopologyCheckedException ? null : ctx.discovery().node(nodeId);
 
                         if (node != null) {
                             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 5a1bfdb..ec95001 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1284,6 +1284,10 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                             new GridJobCancelRequest(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true),
                             PUBLIC_POOL);
                 }
+                catch (ClusterTopologyCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send cancel request, node failed: " + nodeId);
+                }
                 catch (IgniteCheckedException e) {
                     try {
                         if (!isDeadNode(nodeId))
@@ -1411,7 +1415,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
             IgniteException fakeErr = null;
 
             try {
-                boolean deadNode = isDeadNode(res.getNode().id());
+                boolean deadNode = e instanceof ClusterTopologyCheckedException || isDeadNode(res.getNode().id());
 
                 // Avoid stack trace if node has left grid.
                 if (deadNode) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 5d74a80..addb840 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -138,6 +138,8 @@ import org.jsr166.LongAdder8;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
+import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
+import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NODE_STOPPING;
 
 /**
  * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -487,7 +489,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         connectedNew(recoveryDesc, ses, true);
                     else {
                         if (c.failed) {
-                            ses.send(new RecoveryLastReceivedMessage(-1));
+                            ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
 
                             for (GridNioSession ses0 : nioSrvr.sessions()) {
                                 ConnectionKey key0 = ses0.meta(CONN_IDX_META);
@@ -520,7 +522,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                                     "to this node, rejecting [locNode=" + locNode.id() +
                                     ", rmtNode=" + sndId + ']');
 
-                            ses.send(new RecoveryLastReceivedMessage(-1));
+                            ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
 
                             return;
                         }
@@ -552,7 +554,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                                         "to this node, rejecting [locNode=" + locNode.id() +
                                         ", rmtNode=" + sndId + ']');
 
-                                ses.send(new RecoveryLastReceivedMessage(-1));
+                                ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
 
                                 fut.onDone(oldClient);
 
@@ -594,7 +596,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                                     ", rmtNodeOrder=" + rmtNode.order() + ']');
                             }
 
-                            ses.send(new RecoveryLastReceivedMessage(-1));
+                            ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
                         }
                         else {
                             // The code below causes a race condition between shmem and TCP (see IGNITE-1294)
@@ -608,7 +610,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 }
             }
 
-            @Override public void onMessage(GridNioSession ses, Message msg) {
+            @Override public void onMessage(final GridNioSession ses, Message msg) {
                 ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
                 if (connKey == null) {
@@ -618,7 +620,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         if (log.isDebugEnabled())
                             log.debug("Close incoming connection, failed to enter gateway.");
 
-                        ses.close();
+                        ses.send(new RecoveryLastReceivedMessage(NODE_STOPPING)).listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                ses.close();
+                            }
+                        });
 
                         return;
                     }
@@ -822,7 +828,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
                         }
                         else
-                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1));
+                            nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(ALREADY_CONNECTED));
                     }
                     catch (IgniteCheckedException e) {
                         U.error(log, "Failed to send message: " + e, e);
@@ -2377,14 +2383,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         if (connectGate != null)
             connectGate.stopped();
 
-        // Force closing.
-        for (GridCommunicationClient[] clients0 : clients.values()) {
-            for (GridCommunicationClient client : clients0) {
-                if (client != null)
-                    client.forceClose();
-            }
-        }
-
         getSpiContext().deregisterPorts();
 
         getSpiContext().removeLocalEventListener(discoLsnr);
@@ -3022,7 +3020,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         return null;
                     }
 
-                    long rcvCnt = -1;
+                    Long rcvCnt = null;
 
                     Map<Integer, Object> meta = new HashMap<>();
 
@@ -3050,11 +3048,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             sslMeta,
                             handshakeConnIdx);
 
-                        if (rcvCnt == -1)
+                        if (rcvCnt == ALREADY_CONNECTED) {
+                            recoveryDesc.release();
+
                             return null;
+                        }
+                        else if (rcvCnt == NODE_STOPPING) {
+                            recoveryDesc.release();
+
+                            throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
+                        }
                     }
                     finally {
-                        if (recoveryDesc != null && rcvCnt == -1)
+                        if (recoveryDesc != null && rcvCnt == null)
                             recoveryDesc.release();
                     }
 
@@ -3144,6 +3150,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                         // Continue loop.
                     }
                 }
+                catch (ClusterTopologyCheckedException e) {
+                    throw e;
+                }
                 catch (Exception e) {
                     if (client != null) {
                         client.forceClose();
@@ -4045,6 +4054,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                 client.release();
             }
+            catch (ClusterTopologyCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Recovery reconnect failed, node stopping [rmtNode=" + recoveryDesc.node().id() + ']');
+            }
             catch (IgniteCheckedException | IgniteException e) {
                 try {
                     if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
@@ -4439,6 +4452,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** */
+        static final long ALREADY_CONNECTED = -1;
+
+        /** */
+        static final long NODE_STOPPING = -2;
+
         /** Message body size in bytes. */
         private static final int MESSAGE_SIZE = 8;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
index 1d1b519..572f356 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
@@ -155,7 +155,6 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
             final Ignite node1 = ignite(1);
 
             UUID id0 = node0.cluster().localNode().id();
-            UUID id1 = node1.cluster().localNode().id();
 
             TestRecordingCommunicationSpi.spi(node0).blockMessages(GridNearSingleGetResponse.class, node1.name());
 
@@ -233,9 +232,9 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
 
             IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                 @Override public Void call() throws Exception {
-                    try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                        IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME);
+                    IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME);
 
+                    try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                         Integer key = keys.get(idx.getAndIncrement() % keys.size());
 
                         cache.putIfAbsent(key, String.valueOf(key));
@@ -332,9 +331,9 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
 
             fut.add(GridTestUtils.runAsync(new Callable<Void>() {
                 @Override public Void call() throws Exception {
-                    try (Transaction tx = node0.transactions().txStart()) {
-                        IgniteCache<Object, Object> cache = node0.cache(DEFAULT_CACHE_NAME);
+                    IgniteCache<Object, Object> cache = node0.cache(DEFAULT_CACHE_NAME);
 
+                    try (Transaction tx = node0.transactions().txStart()) {
                         key.set(primaryKey(cache));
 
                         cache.putIfAbsent(key.get(), "dummy val");
@@ -351,11 +350,11 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
 
             fut.add(GridTestUtils.runAsync(new Callable<Void>() {
                 @Override public Void call() throws Exception {
+                    IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME);
+
                     try (Transaction tx = node1.transactions().txStart()) {
                         l1.await();
 
-                        IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME);
-
                         cache.replace(key.get(), "dummy val2");
 
                         tx.commit();
@@ -431,9 +430,9 @@ public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
 
             IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                 @Override public Void call() throws Exception {
-                    try (Transaction tx = node1.transactions().txStart()) {
-                        IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME);
+                    IgniteCache<Object, Object> cache = node1.cache(DEFAULT_CACHE_NAME);
 
+                    try (Transaction tx = node1.transactions().txStart()) {
                         Integer key = keys.get(idx.getAndIncrement());
 
                         cache.getAndPut(key, "new-" + key);

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index 3f6318f..9f447df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -107,7 +107,7 @@ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
 
         try {
-            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg);
+            ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, null, false, 0, false);
         }
         catch (IgniteCheckedException ignored) {
             // No-op. We are using mocks so real sending is impossible.

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
new file mode 100644
index 0000000..d34de12
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+
+/**
+ * Sanity test to check that node starts to reject connections when stop procedure started.
+ */
+public class IgniteRejectConnectOnNodeStopTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static CountDownLatch stopLatch = new CountDownLatch(1);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDiscoverySpi(new TestDiscoverySpi());
+
+        TcpDiscoverySpi discoSpi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        discoSpi.setReconnectCount(2);
+        discoSpi.setAckTimeout(30_000);
+        discoSpi.setSocketTimeout(30_000);
+        discoSpi.setIpFinder(IP_FINDER);
+
+        TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+        commSpi.setConnectTimeout(600000);
+        commSpi.setMaxConnectTimeout(600000 * 10);
+        commSpi.setReconnectCount(100);
+        commSpi.setSocketWriteTimeout(600000);
+        commSpi.setAckSendThreshold(100);
+
+        cfg.setClientMode(client);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNodeStop() throws Exception {
+        Ignite srv = startGrid(0);
+
+        client = true;
+
+        final Ignite c = startGrid(1);
+
+        ClusterGroup grp = srv.cluster().forClients();
+
+        IgniteCompute srvCompute = srv.compute(grp);
+
+        srvCompute.call(new DummyClosure());
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                IgniteCache cache = c.cache(DEFAULT_CACHE_NAME);
+
+                for (int i = 0; i < 100_000; i++) {
+                    try {
+                        cache.put(1, 1);
+                    }
+                    catch (Exception ignore) {
+                        break;
+                    }
+                }
+            }
+        }, "cache-put");
+
+        U.sleep(100);
+
+        final CountDownLatch stopStartLatch = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                stopStartLatch.countDown();
+
+                c.close();
+            }
+        });
+
+        boolean err = false;
+
+        try{
+            stopStartLatch.await();
+
+            IgniteCacheMessageRecoveryAbstractTest.closeSessions(srv);
+
+            long stopTime = U.currentTimeMillis() + 10_000;
+
+            while (U.currentTimeMillis() < stopTime) {
+                try {
+                    srvCompute.call(new DummyClosure());
+                }
+                catch (ClusterTopologyException e) {
+                    err = true;
+
+                    assertFalse(fut2.isDone());
+
+                    break;
+                }
+            }
+        }
+        finally {
+            stopLatch.countDown();
+        }
+
+        fut.get();
+        fut2.get();
+
+        assertTrue("Failed to get excpected error", err);
+    }
+
+
+    /**
+     *
+     */
+    public static class DummyClosure implements IgniteCallable<Object> {
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return 1;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestDiscoverySpi extends TcpDiscoverySpi {
+        @Override public void spiStop() throws IgniteSpiException {
+            // Called communication SPI onContextDestroyed, but do not allow discovery to stop.
+            if (ignite.configuration().isClientMode()) {
+                try {
+                    stopLatch.await(1, MINUTES);
+                }
+                catch (InterruptedException ignore) {
+                    // No-op.
+                }
+            }
+
+            super.spiStop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 140b1a5..de509ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCac
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClassNameConflictTest;
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingOnMissTest;
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurrentReadWriteTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteRejectConnectOnNodeStopTest;
 import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest;
 import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest;
 import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest;
@@ -177,6 +178,8 @@ public class IgniteBasicTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteDiagnosticMessagesTest.class);
 
+        suite.addTestSuite(IgniteRejectConnectOnNodeStopTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/039c02a6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index aa97197..6b7ba75 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -791,7 +791,7 @@ public class GridMapQueryExecutor {
                 ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
         }
         catch (IgniteCheckedException e) {
-            log.error("Failed to send message.", e);
+            U.error(log, "Failed to send message.", e);
 
             throw new IgniteException(e);
         }


Mime
View raw message