ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [18/31] ignite git commit: ignite-4499 Drop node from topology in case when connection creation is impossible
Date Mon, 13 Feb 2017 10:32:15 GMT
ignite-4499 Drop node from topology in case when connection creation is impossible


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9aaf035
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9aaf035
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9aaf035

Branch: refs/heads/ignite-4436-2
Commit: f9aaf0353cea54afefea4caac74b1583eb17969b
Parents: ecf4b8b
Author: agura <agura>
Authored: Wed Jan 18 18:04:45 2017 +0300
Committer: agura <agura>
Committed: Wed Jan 18 18:04:45 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../communication/tcp/TcpCommunicationSpi.java  |  16 +
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  88 ++++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  61 ++--
 .../messages/TcpDiscoveryAbstractMessage.java   |  21 ++
 .../tcp/TcpCommunicationSpiDropNodesTest.java   | 322 +++++++++++++++++++
 .../TcpCommunicationSpiFaultyClientTest.java    | 270 ++++++++++++++++
 .../ignite/testframework/GridTestNode.java      |   1 +
 .../testframework/junits/GridAbstractTest.java  |   2 +
 .../IgniteSpiCommunicationSelfTestSuite.java    |   5 +
 .../cache/IgniteCacheAbstractQuerySelfTest.java |   8 +-
 11 files changed, 758 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 0da0f49..d77b2fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -65,6 +65,9 @@ public final class IgniteSystemProperties {
      */
     public static final String IGNITE_NO_DISCO_ORDER = "IGNITE_NO_DISCO_ORDER";
 
+    /** Defines reconnect delay in milliseconds for client node that was failed forcible.
*/
+    public static final String IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY = "IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY";
+
     /**
      * If this system property is set to {@code false} - no checks for new versions will
      * be performed by Ignite. By default, Ignite periodically checks for the new

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1fe437c..94b7efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -94,6 +94,7 @@ import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -2550,6 +2551,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     "operating system firewall is disabled on local and remote hosts) " +
                     "[addrs=" + addrs + ']');
 
+            if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) ||
!CU.clientNode(getLocalNode())) &&
+                X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
+                    IgniteSpiOperationTimeoutException.class)) {
+                LT.warn(log, "TcpCommunicationSpi failed to establish connection to node,
node will be dropped from " +
+                    "cluster [" +
+                    "rmtNode=" + node +
+                    ", err=" + errs +
+                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+
+                getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish
connection to node [" +
+                    "rmtNode=" + node +
+                    ", errs=" + errs +
+                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+            }
+
             throw errs;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 9a1261c..35f0908 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -50,6 +50,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
@@ -100,6 +101,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -166,6 +168,9 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** */
     protected MessageWorker msgWorker;
 
+    /** Force fail message for local node. */
+    private TcpDiscoveryNodeFailedMessage forceFailMsg;
+
     /** */
     @GridToStringExclude
     private int joinCnt;
@@ -450,6 +455,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             msg.warning(warning);
 
+            msg.force(true);
+
             msgWorker.addMessage(msg);
         }
     }
@@ -1396,6 +1403,14 @@ class ClientImpl extends TcpDiscoveryImpl {
                         else
                             leaveLatch.countDown();
                     }
+                    else if (msg instanceof TcpDiscoveryNodeFailedMessage &&
+                        ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(locNode.id()))
{
+                        TcpDiscoveryNodeFailedMessage msg0 = (TcpDiscoveryNodeFailedMessage)msg;
+
+                        assert msg0.force() : msg0;
+
+                        forceFailMsg = msg0;
+                    }
                     else if (msg instanceof SocketClosedMessage) {
                         if (((SocketClosedMessage)msg).sock == currSock) {
                             currSock = null;
@@ -1412,25 +1427,45 @@ class ClientImpl extends TcpDiscoveryImpl {
                                 }
                             }
                             else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Connection closed, will try to restore connection.");
+                                if (forceFailMsg != null) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Connection closed, local node received
force fail message, " +
+                                            "will not try to restore connection");
+                                    }
+
+                                    queue.addFirst(SPI_RECONNECT_FAILED);
+                                }
+                                else {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Connection closed, will try to restore
connection.");
 
-                                assert reconnector == null;
+                                    assert reconnector == null;
 
-                                final Reconnector reconnector = new Reconnector(join);
-                                this.reconnector = reconnector;
-                                reconnector.start();
+                                    final Reconnector reconnector = new Reconnector(join);
+                                    this.reconnector = reconnector;
+                                    reconnector.start();
+                                }
                             }
                         }
                     }
                     else if (msg == SPI_RECONNECT_FAILED) {
-                        reconnector.cancel();
-                        reconnector.join();
+                        if (reconnector != null) {
+                            reconnector.cancel();
+                            reconnector.join();
 
-                        reconnector = null;
+                            reconnector = null;
+                        }
+                        else
+                            assert forceFailMsg != null;
 
                         if (spi.isClientReconnectDisabled()) {
                             if (state != SEGMENTED && state != STOPPED) {
+                                if (forceFailMsg != null) {
+                                    U.quietAndWarn(log, "Local node was dropped from cluster
due to network problems " +
+                                        "[nodeInitiatedFail=" + forceFailMsg.creatorNodeId()
+
+                                        ", msg=" + forceFailMsg.warning() + ']');
+                                }
+
                                 if (log.isDebugEnabled()) {
                                     log.debug("Failed to restore closed connection, reconnect
disabled, " +
                                         "local node segmented [networkTimeout=" + spi.netTimeout
+ ']');
@@ -1445,7 +1480,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                             if (state == STARTING || state == CONNECTED) {
                                 if (log.isDebugEnabled()) {
                                     log.debug("Failed to restore closed connection, will
try to reconnect " +
-                                        "[networkTimeout=" + spi.netTimeout + ", joinTimeout="
+ spi.joinTimeout + ']');
+                                        "[networkTimeout=" + spi.netTimeout +
+                                        ", joinTimeout=" + spi.joinTimeout +
+                                        ", failMsg=" + forceFailMsg + ']');
                                 }
 
                                 state = DISCONNECTED;
@@ -1468,7 +1505,36 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                             UUID newId = UUID.randomUUID();
 
-                            if (log.isInfoEnabled()) {
+                            if (forceFailMsg != null) {
+                                long delay = IgniteSystemProperties.getLong(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY,
+                                    10_000);
+
+                                if (delay > 0) {
+                                    U.quietAndWarn(log, "Local node was dropped from cluster
due to network problems, " +
+                                        "will try to reconnect with new id after " + delay
+ "ms (reconnect delay " +
+                                        "can be changed using IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY
system " +
+                                        "property) [" +
+                                        "newId=" + newId +
+                                        ", prevId=" + locNode.id() +
+                                        ", locNode=" + locNode +
+                                        ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId()
+
+                                        ", msg=" + forceFailMsg.warning() + ']');
+
+                                    Thread.sleep(delay);
+                                }
+                                else {
+                                    U.quietAndWarn(log, "Local node was dropped from cluster
due to network problems, " +
+                                        "will try to reconnect with new id [" +
+                                        "newId=" + newId +
+                                        ", prevId=" + locNode.id() +
+                                        ", locNode=" + locNode +
+                                        ", nodeInitiatedFail=" + forceFailMsg.creatorNodeId()
+
+                                        ", msg=" + forceFailMsg.warning() + ']');
+                                }
+
+                                forceFailMsg = null;
+                            }
+                            else if (log.isInfoEnabled()) {
                                 log.info("Client node disconnected from cluster, will try
to reconnect with new id " +
                                     "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode="
+ locNode + ']');
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 40da281..f33566c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -775,6 +775,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             msg.warning(warning);
 
+            msg.force(true);
+
             msgWorker.addMessage(msg);
         }
     }
@@ -4610,8 +4612,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                 else {
                     boolean contains;
 
+                    UUID creatorId = msg.creatorNodeId();
+
+                    assert creatorId != null : msg;
+
                     synchronized (mux) {
-                        contains = failedNodes.containsKey(sndNode);
+                        contains = failedNodes.containsKey(sndNode) || ring.node(creatorId)
== null;
                     }
 
                     if (contains) {
@@ -4623,25 +4629,29 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            UUID nodeId = msg.failedNodeId();
+            UUID failedNodeId = msg.failedNodeId();
             long order = msg.order();
 
-            TcpDiscoveryNode node = ring.node(nodeId);
+            TcpDiscoveryNode failedNode = ring.node(failedNodeId);
 
-            if (node != null && node.internalOrder() != order) {
+            if (failedNode != null && failedNode.internalOrder() != order) {
                 if (log.isDebugEnabled())
                     log.debug("Ignoring node failed message since node internal order does
not match " +
-                        "[msg=" + msg + ", node=" + node + ']');
+                        "[msg=" + msg + ", node=" + failedNode + ']');
 
                 return;
             }
 
-            if (node != null) {
-                assert !node.isLocal() || !msg.verified() : msg;
+            if (failedNode != null) {
+                assert !failedNode.isLocal() || !msg.verified() : msg;
 
-                synchronized (mux) {
-                    if (!failedNodes.containsKey(node))
-                        failedNodes.put(node, msg.senderNodeId() != null ? msg.senderNodeId()
: getLocalNodeId());
+                boolean skipUpdateFailedNodes = msg.force() && !msg.verified();
+
+                if (!skipUpdateFailedNodes) {
+                    synchronized (mux) {
+                        if (!failedNodes.containsKey(failedNode))
+                            failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId()
: getLocalNodeId());
+                    }
                 }
             }
             else {
@@ -4668,11 +4678,11 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified()) {
-                node = ring.removeNode(nodeId);
+                failedNode = ring.removeNode(failedNodeId);
 
-                interruptPing(node);
+                interruptPing(failedNode);
 
-                assert node != null;
+                assert failedNode != null;
 
                 long topVer;
 
@@ -4698,16 +4708,18 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 synchronized (mux) {
-                    failedNodes.remove(node);
+                    failedNodes.remove(failedNode);
 
-                    leavingNodes.remove(node);
+                    leavingNodes.remove(failedNode);
 
-                    failedNodesMsgSent.remove(node.id());
+                    failedNodesMsgSent.remove(failedNode.id());
 
-                    ClientMessageWorker worker = clientMsgWorkers.remove(node.id());
+                    if (!msg.force()) { // ClientMessageWorker will stop after sending force
fail message.
+                        ClientMessageWorker worker = clientMsgWorkers.remove(failedNode.id());
 
-                    if (worker != null)
-                        worker.interrupt();
+                        if (worker != null)
+                            worker.interrupt();
+                    }
                 }
 
                 if (msg.warning() != null && !msg.creatorNodeId().equals(getLocalNodeId()))
{
@@ -4719,10 +4731,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 synchronized (mux) {
-                    joiningNodes.remove(node.id());
+                    joiningNodes.remove(failedNode.id());
                 }
 
-                notifyDiscovery(EVT_NODE_FAILED, topVer, node);
+                notifyDiscovery(EVT_NODE_FAILED, topVer, failedNode);
 
                 spi.stats.onNodeFailed();
             }
@@ -6317,7 +6329,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                         spi.failureDetectionTimeout() : spi.getSocketTimeout());
                 }
 
-                success = true;
+                boolean clientFailed = msg instanceof TcpDiscoveryNodeFailedMessage &&
+                    ((TcpDiscoveryNodeFailedMessage)msg).failedNodeId().equals(clientNodeId);
+
+                assert !clientFailed || msg.force() : msg;
+
+                success = !clientFailed;
             }
             catch (IgniteCheckedException | IOException e) {
                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 783a113..e982b2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -48,6 +48,9 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable
{
     /** */
     protected static final int CLIENT_ACK_FLAG_POS = 4;
 
+    /** */
+    protected static final int FORCE_FAIL_FLAG_POS = 8;
+
     /** Sender of the message (transient). */
     private transient UUID sndNodeId;
 
@@ -205,6 +208,24 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable
{
     }
 
     /**
+     * Get force fail node flag.
+     *
+     * @return Force fail node flag.
+     */
+    public boolean force() {
+        return getFlag(FORCE_FAIL_FLAG_POS);
+    }
+
+    /**
+     * Sets force fail node flag.
+     *
+     * @param force Force fail node flag.
+     */
+    public void force(boolean force) {
+        setFlag(FORCE_FAIL_FLAG_POS, force);
+    }
+
+    /**
      * @return Pending message index.
      */
     public short pendingIndex() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
new file mode 100644
index 0000000..d29231e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
+/**
+ *
+ */
+public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Nodes count. */
+    private static final int NODES_CNT = 4;
+
+    /** Block. */
+    private static volatile boolean block;
+
+    /** Predicate. */
+    private static IgniteBiPredicate<ClusterNode, ClusterNode> pred;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClockSyncFrequency(300000);
+        cfg.setFailureDetectionTimeout(1000);
+
+        TestCommunicationSpi spi = new TestCommunicationSpi();
+
+        spi.setIdleConnectionTimeout(100);
+        spi.setSharedMemoryPort(-1);
+
+        TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setCommunicationSpi(spi);
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        block = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOneNode() throws Exception {
+        pred = new IgniteBiPredicate<ClusterNode, ClusterNode>() {
+            @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) {
+                return block && rmtNode.order() == 3;
+            }
+        };
+
+        startGrids(NODES_CNT);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        grid(0).events().localListen(new IgnitePredicate<Event>() {
+            @Override
+            public boolean apply(Event event) {
+                latch.countDown();
+
+                return true;
+            }
+        }, EVT_NODE_FAILED);
+
+        U.sleep(1000); // Wait for write timeout and closing idle connections.
+
+        block = true;
+
+        grid(0).compute().broadcast(new IgniteRunnable() {
+            @Override public void run() {
+                // No-op.
+            }
+        });
+
+        assertTrue(latch.await(15, TimeUnit.SECONDS));
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return grid(3).cluster().topologyVersion() == NODES_CNT + 1;
+            }
+        }, 5000));
+
+        for (int i = 0; i < 10; i++) {
+            U.sleep(1000);
+
+            assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size());
+
+            int liveNodesCnt = 0;
+
+            for (int j = 0; j < NODES_CNT; j++) {
+                IgniteEx ignite;
+
+                try {
+                    ignite = grid(j);
+
+                    log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes());
+
+                    ClusterNode locNode = ignite.localNode();
+
+                    if (locNode.order() != 3) {
+                        assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size());
+
+                        for (ClusterNode node : ignite.cluster().nodes())
+                            assertTrue(node.order() != 3);
+
+                        liveNodesCnt++;
+                    }
+                }
+                catch (Exception e) {
+                    log.info("Checking topology for grid(" + j + "): no grid in topology.");
+                }
+            }
+
+            assertEquals(NODES_CNT - 1, liveNodesCnt);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTwoNodesEachOther() throws Exception {
+        pred = new IgniteBiPredicate<ClusterNode, ClusterNode>() {
+            @Override public boolean apply(ClusterNode locNode, ClusterNode rmtNode) {
+                return block && (locNode.order() == 2 || locNode.order() == 4) &&
+                    (rmtNode.order() == 2 || rmtNode.order() == 4);
+            }
+        };
+
+        startGrids(NODES_CNT);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        grid(0).events().localListen(new IgnitePredicate<Event>() {
+            @Override
+            public boolean apply(Event event) {
+                latch.countDown();
+
+                return true;
+            }
+        }, EVT_NODE_FAILED);
+
+        U.sleep(1000); // Wait for write timeout and closing idle connections.
+
+        block = true;
+
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+
+        IgniteInternalFuture<Void> fut1 = GridTestUtils.runAsync(new Callable<Void>()
{
+            @Override public Void call() throws Exception {
+                barrier.await();
+
+                grid(1).compute().withNoFailover().broadcast(new IgniteRunnable() {
+                    @Override public void run() {
+                        // No-op.
+                    }
+                });
+
+                return null;
+            }
+        });
+
+        IgniteInternalFuture<Void> fut2 = GridTestUtils.runAsync(new Callable<Void>()
{
+            @Override public Void call() throws Exception {
+                barrier.await();
+
+                grid(3).compute().withNoFailover().broadcast(new IgniteRunnable() {
+                    @Override public void run() {
+                        // No-op.
+                    }
+                });
+
+                return null;
+            }
+        });
+
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return grid(2).cluster().nodes().size() == NODES_CNT - 1;
+            }
+        }, 5000);
+
+        try {
+            fut1.get();
+        }
+        catch (IgniteCheckedException e) {
+            // No-op.
+        }
+
+        try {
+            fut2.get();
+        }
+        catch (IgniteCheckedException e) {
+            // No-op.
+        }
+
+        long failedNodeOrder = 1 + 2 + 3 + 4;
+
+        for (ClusterNode node : grid(0).cluster().nodes())
+            failedNodeOrder -= node.order();
+
+        for (int i = 0; i < 10; i++) {
+            U.sleep(1000);
+
+            assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size());
+
+            int liveNodesCnt = 0;
+
+            for (int j = 0; j < NODES_CNT; j++) {
+                IgniteEx ignite;
+
+                try {
+                    ignite = grid(j);
+
+                    log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes());
+
+                    ClusterNode locNode = ignite.localNode();
+
+                    if (locNode.order() != failedNodeOrder) {
+                        assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size());
+
+                        for (ClusterNode node : ignite.cluster().nodes())
+                            assertTrue(node.order() != failedNodeOrder);
+
+                        liveNodesCnt++;
+                    }
+                }
+                catch (Exception e) {
+                    log.info("Checking topology for grid(" + j + "): no grid in topology.");
+                }
+            }
+
+            assertEquals(NODES_CNT - 1, liveNodesCnt);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws
IgniteCheckedException {
+            if (pred.apply(getLocalNode(), node)) {
+                Map<String, Object> attrs = new HashMap<>(node.attributes());
+
+                attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1"));
+                attrs.put(createAttributeName(ATTR_PORT), 47200);
+                attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList());
+                attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList());
+
+                ((TcpDiscoveryNode)node).setAttributes(attrs);
+            }
+
+            return super.createTcpClient(node);
+        }
+
+        /**
+         * @param name Name.
+         */
+        private String createAttributeName(String name) {
+            return getClass().getSimpleName() + '.' + name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
new file mode 100644
index 0000000..6e99487
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.nio.GridCommunicationClient;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
+/**
+ * Tests that faulty client will be failed if connection can't be established.
+ */
+public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Predicate. */
+    private static final IgnitePredicate<ClusterNode> PRED = new IgnitePredicate<ClusterNode>()
{
+        @Override public boolean apply(ClusterNode node) {
+            return block && node.order() == 3;
+        }
+    };
+
+    /** Client mode. */
+    private static boolean clientMode;
+
+    /** Block. */
+    private static volatile boolean block;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClockSyncFrequency(300000);
+        cfg.setFailureDetectionTimeout(1000);
+        cfg.setClientMode(clientMode);
+
+        TestCommunicationSpi spi = new TestCommunicationSpi();
+
+        spi.setIdleConnectionTimeout(100);
+        spi.setSharedMemoryPort(-1);
+
+        TcpDiscoverySpi discoSpi = (TcpDiscoverySpi) cfg.getDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+        discoSpi.setClientReconnectDisabled(true);
+
+        cfg.setCommunicationSpi(spi);
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        block = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoServerOnHost() throws Exception {
+        testFailClient(null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNotAcceptedConnection() throws Exception {
+        testFailClient(new FakeServer());
+    }
+
+    /**
+     * @param srv Server.
+     * @throws Exception If failed.
+     */
+    private void testFailClient(FakeServer srv) throws Exception {
+        IgniteInternalFuture<Long> fut = null;
+
+        try {
+            if (srv != null)
+                fut = GridTestUtils.runMultiThreadedAsync(srv, 1, "fake-server");
+
+            clientMode = false;
+
+            startGrids(2);
+
+            clientMode = true;
+
+            startGrid(2);
+            startGrid(3);
+
+            U.sleep(1000); // Wait for write timeout and closing idle connections.
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            grid(0).events().localListen(new IgnitePredicate<Event>() {
+                @Override
+                public boolean apply(Event event) {
+                    latch.countDown();
+
+                    return true;
+                }
+            }, EVT_NODE_FAILED);
+
+            block = true;
+
+            try {
+                grid(0).compute(grid(0).cluster().forClients()).withNoFailover().broadcast(new
IgniteRunnable() {
+                    @Override public void run() {
+                        // No-op.
+                    }
+                });
+            }
+            catch (IgniteException e) {
+                // No-op.
+            }
+
+            assertTrue(latch.await(3, TimeUnit.SECONDS));
+
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return grid(0).cluster().forClients().nodes().size() == 1;
+                }
+            }, 5000));
+
+            for (int i = 0; i < 5; i++) {
+                U.sleep(1000);
+
+                log.info("Check topology (" + (i + 1) + "): " + grid(0).cluster().nodes());
+
+                assertEquals(1, grid(0).cluster().forClients().nodes().size());
+            }
+        }
+        finally {
+            if (srv != null) {
+                srv.stop();
+
+                assert fut != null;
+
+                fut.get();
+            }
+
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Server that emulates connection troubles.
+     */
+    private static class FakeServer implements Runnable {
+        /** Server. */
+        private final ServerSocket srv;
+
+        /** Stop. */
+        private volatile boolean stop;
+
+        /**
+         * Default constructor.
+         */
+        FakeServer() throws IOException {
+            this.srv = new ServerSocket(47200, 50, InetAddress.getByName("127.0.0.1"));
+        }
+
+        /**
+         *
+         */
+        public void stop() {
+            stop = true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                while (!stop) {
+                    try {
+                        U.sleep(10);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        // No-op.
+                    }
+                }
+            }
+            finally {
+                U.closeQuiet(srv);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws
IgniteCheckedException {
+            if (PRED.apply(node)) {
+                Map<String, Object> attrs = new HashMap<>(node.attributes());
+
+                attrs.put(createAttributeName(ATTR_ADDRS), Collections.singleton("127.0.0.1"));
+                attrs.put(createAttributeName(ATTR_PORT), 47200);
+                attrs.put(createAttributeName(ATTR_EXT_ADDRS), Collections.emptyList());
+                attrs.put(createAttributeName(ATTR_HOST_NAMES), Collections.emptyList());
+
+                ((TcpDiscoveryNode)node).setAttributes(attrs);
+            }
+
+            return super.createTcpClient(node);
+        }
+
+        /**
+         * @param name Name.
+         */
+        private String createAttributeName(String name) {
+            return getClass().getSimpleName() + '.' + name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
index e0e8eba..6365443 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
@@ -82,6 +82,7 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod
     private void initAttributes() {
         attrs.put(IgniteNodeAttributes.ATTR_BUILD_VER, "10");
         attrs.put(IgniteNodeAttributes.ATTR_GRID_NAME, "null");
+        attrs.put(IgniteNodeAttributes.ATTR_CLIENT_MODE, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 9f507e6..ffaec4f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -102,6 +102,7 @@ import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.support.FileSystemXmlApplicationContext;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -173,6 +174,7 @@ public abstract class GridAbstractTest extends TestCase {
     static {
         System.setProperty(IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE,
"10000");
         System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false");
+        System.setProperty(IGNITE_DISCO_FAILED_CLIENT_RECONNECT_DELAY, "1");
 
         if (BINARY_MARSHALLER)
             GridTestProperties.setProperty(GridTestProperties.MARSH_CLASS_NAME, BinaryMarshaller.class.getName());

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index c557fbb..ddc2551 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -35,6 +35,8 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpFailure
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpNoDelayOffSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpSelfTest;
 import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAckClosureSelfTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
 
 /**
  * Test suite for all communication SPIs.
@@ -72,6 +74,9 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
 
         suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
 
+        suite.addTest(new TestSuite(TcpCommunicationSpiFaultyClientTest.class));
+        suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class));
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9aaf035/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 7e0d20b..9f56877 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -633,7 +633,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
 
         assertEquals(2, qry.getAll().size());
 
-        Throwable throwable = GridTestUtils.assertThrowsInherited(log, new GridPlainCallable<Void>()
{
+        GridTestUtils.assertThrows(log, new GridPlainCallable<Void>() {
             @Override public Void call() throws Exception {
                 QueryCursor<Cache.Entry<Integer, Type1>> qry =
                     cache.query(new SqlQuery<Integer, Type1>(Type1.class, "FROM Type1"));
@@ -642,11 +642,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
 
                 return null;
             }
-        }, RuntimeException.class, null);
-
-        assertNotNull(throwable);
-
-        assertTrue(throwable instanceof IgniteException || throwable instanceof CacheException);
+        }, CacheException.class, null);
     }
 
     /**


Mime
View raw message