ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [21/32] ignite git commit: Forcible node drop makes cluster unstable in some cases. Disable forcible node drop by default.
Date Wed, 19 Jul 2017 09:27:32 GMT
Forcible node drop makes cluster unstable in some cases. Disable forcible node drop by default.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e46cf958
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e46cf958
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e46cf958

Branch: refs/heads/ignite-5578
Commit: e46cf9582bb05c22a45b868c2c78ea7ed4818d62
Parents: a71691a
Author: Andrey V. Mashenkov <andrey.mashenkov@gmail.com>
Authored: Tue Jul 18 15:49:00 2017 +0300
Committer: Andrey V. Mashenkov <andrey.mashenkov@gmail.com>
Committed: Tue Jul 18 15:49:00 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  6 ++
 .../service/GridServiceProcessor.java           |  4 +-
 .../spi/IgniteSpiOperationTimeoutHelper.java    |  8 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 81 ++++++++++++++------
 .../IgniteClientReconnectAbstractTest.java      |  5 ++
 ...niteBinaryMetadataUpdateNodeRestartTest.java | 10 +++
 .../IgniteCacheNearRestartRollbackSelfTest.java | 15 ++++
 ...teSynchronizationModesMultithreadedTest.java |  5 ++
 .../org/apache/ignite/spi/GridTcpForwarder.java | 26 +++++++
 .../tcp/TcpCommunicationSpiDropNodesTest.java   | 15 ++++
 .../TcpCommunicationSpiFaultyClientTest.java    | 20 +++--
 11 files changed, 160 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 616ac3f..5da7bd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -684,6 +684,12 @@ public final class IgniteSystemProperties {
         "IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD";
 
     /**
+     * If this property is set, a node will forcible fail a remote node when it fails to
establish a communication
+     * connection.
+     */
+    public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 23a29f8..db632ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -1528,7 +1528,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
         private volatile AffinityTopologyVersion currTopVer = null;
 
         /** {@inheritDoc} */
-        @Override public void onEvent(DiscoveryEvent evt, final DiscoCache discoCache) {
+        @Override public void onEvent(final DiscoveryEvent evt, final DiscoCache discoCache)
{
             GridSpinBusyLock busyLock = GridServiceProcessor.this.busyLock;
 
             if (busyLock == null || !busyLock.enterBusy())
@@ -1589,7 +1589,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
                                             log.info("Service processor detected a topology
change during " +
                                                 "assignments calculation (will abort current
iteration and " +
                                                 "re-calculate on the newer version): " +
-                                                "[topVer=" + topVer + ", newTopVer=" + currTopVer
+ ']');
+                                                "[topVer=" + topVer + ", newTopVer=" + currTopVer0
+ ']');
 
                                         return;
                                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index c685ea9..b2432ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -98,7 +98,9 @@ public class IgniteSpiOperationTimeoutHelper {
         if (!failureDetectionTimeoutEnabled)
             return false;
 
-        return e instanceof IgniteSpiOperationTimeoutException || e instanceof SocketTimeoutException
||
-            X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketException.class);
+        if (X.hasCause(e, IgniteSpiOperationTimeoutException.class, SocketTimeoutException.class,
SocketException.class))
+            return true;
+
+        return (timeout - (U.currentTimeMillis() - lastOperStartTs) <= 0);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 5aca2f9..af12d3b 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -341,6 +342,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
     /** */
     private ConnectionPolicy connPlc;
 
+    /** */
+    private boolean enableForcibleNodeKill = IgniteSystemProperties
+        .getBoolean(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
     /** Server listener. */
     private final GridNioServerListener<Message> srvLsnr =
         new GridNioServerListenerAdapter<Message>() {
@@ -2663,8 +2668,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                                     }
                                 }
                             }
-                            else
+                            else {
                                 U.sleep(200);
+
+                                if (getSpiContext().node(node.id()) == null)
+                                    throw new ClusterTopologyCheckedException("Failed to
send message " +
+                                        "(node left topology): " + node);
+                            }
                         }
 
                         fut.onDone(client0);
@@ -3088,6 +3098,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                     }
 
                     if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException
||
+                        X.hasCause(e, SocketException.class) ||
                         timeoutHelper.checkFailureTimeoutReached(e))) {
 
                         String msg = "Handshake timed out (failure detection timeout is reached)
" +
@@ -3179,8 +3190,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                         "[addr=" + addr + ", err=" + e.getMessage() + ']', e));
 
                     // Reconnect for the second time, if connection is not established.
-                    if (!failureDetThrReached && connectAttempts < 2 &&
-                        (e instanceof ConnectException || X.hasCause(e, ConnectException.class)))
{
+                    if (!failureDetThrReached && connectAttempts < 5 &&
+                        (X.hasCause(e, ConnectException.class, HandshakeException.class,
SocketTimeoutException.class))) {
+                        U.sleep(200);
+
                         connectAttempts++;
 
                         continue;
@@ -3203,21 +3216,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                     "operating system firewall is disabled on local and remote hosts) " +
                     "[addrs=" + addrs + ']');
 
-            if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) ||
!CU.clientNode(getLocalNode())) &&
-                X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
-                    IgniteSpiOperationTimeoutException.class)) {
+            if (enableForcibleNodeKill) {
+                if (getSpiContext().node(node.id()) != null && (CU.clientNode(node)
|| !CU.clientNode(getLocalNode())) &&
+                    X.hasCause(errs, ConnectException.class, HandshakeException.class,
+                        SocketTimeoutException.class, HandshakeTimeoutException.class,
+                        IgniteSpiOperationTimeoutException.class)) {
 
-                U.error(log, "TcpCommunicationSpi failed to establish connection to node,
node will be dropped from " +
-                    "cluster [" +
-                    "rmtNode=" + node + "]", errs);
+                    U.error(log, "TcpCommunicationSpi failed to establish connection to node,
node will be dropped from " +
+                        "cluster [" +
+                        "rmtNode=" + node + "]", errs);
 
-                getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish
connection to node [" +
-                    "rmtNode=" + node +
-                    ", errs=" + errs +
-                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+                    getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish
connection to node [" +
+                        "rmtNode=" + node +
+                        ", errs=" + errs +
+                        ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+                }
             }
 
-            throw errs;
+            if (X.hasCause(errs, ConnectException.class, HandshakeException.class))
+                throw errs;
         }
 
         return client;
@@ -3269,7 +3286,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                         sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, directBuf,
ByteOrder.nativeOrder(), log);
 
                         if (!sslHnd.handshake())
-                            throw new IgniteCheckedException("SSL handshake is not completed.");
+                            throw new HandshakeException("SSL handshake is not completed.");
 
                         ByteBuffer handBuff = sslHnd.applicationBuffer();
 
@@ -3279,7 +3296,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                             int read = ch.read(buf);
 
                             if (read == -1)
-                                throw new IgniteCheckedException("Failed to read remote node
ID (connection closed).");
+                                throw new HandshakeException("Failed to read remote node
ID (connection closed).");
 
                             buf.flip();
 
@@ -3295,7 +3312,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                             int read = ch.read(buf);
 
                             if (read == -1)
-                                throw new IgniteCheckedException("Failed to read remote node
ID (connection closed).");
+                                throw new HandshakeException("Failed to read remote node
ID (connection closed).");
 
                             i += read;
                         }
@@ -3304,7 +3321,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                     UUID rmtNodeId0 = U.bytesToUuid(buf.array(), Message.DIRECT_TYPE_SIZE);
 
                     if (!rmtNodeId.equals(rmtNodeId0))
-                        throw new IgniteCheckedException("Remote node ID is not as expected
[expected=" + rmtNodeId +
+                        throw new HandshakeException("Remote node ID is not as expected [expected="
+ rmtNodeId +
                             ", rcvd=" + rmtNodeId0 + ']');
                     else if (log.isDebugEnabled())
                         log.debug("Received remote node ID: " + rmtNodeId0);
@@ -3391,7 +3408,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                                 int read = ch.read(buf);
 
                                 if (read == -1)
-                                    throw new IgniteCheckedException("Failed to read remote
node recovery handshake " +
+                                    throw new HandshakeException("Failed to read remote node
recovery handshake " +
                                         "(connection closed).");
 
                                 buf.flip();
@@ -3429,7 +3446,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                                 int read = ch.read(buf);
 
                                 if (read == -1)
-                                    throw new IgniteCheckedException("Failed to read remote
node recovery handshake " +
+                                    throw new HandshakeException("Failed to read remote node
recovery handshake " +
                                         "(connection closed).");
 
                                 i += read;
@@ -3471,8 +3488,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
 
             // Ignoring whatever happened after timeout - reporting only timeout event.
             if (!cancelled)
-                throw new HandshakeTimeoutException("Failed to perform handshake due to timeout
(consider increasing " +
-                    "'connectionTimeout' configuration property).");
+                throw new HandshakeTimeoutException(
+                    new IgniteSpiOperationTimeoutException("Failed to perform handshake due
to timeout " +
+                        "(consider increasing 'connectionTimeout' configuration property)."));
         }
 
         return rcvCnt;
@@ -3662,18 +3680,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
     }
 
     /** Internal exception class for proper timeout handling. */
-    private static class HandshakeTimeoutException extends IgniteCheckedException {
+    private static class HandshakeException extends IgniteCheckedException {
         /** */
         private static final long serialVersionUID = 0L;
 
         /**
-         * @param msg Message.
+         * @param msg Error message.
          */
-        HandshakeTimeoutException(String msg) {
+        HandshakeException(String msg) {
             super(msg);
         }
     }
 
+    /** Internal exception class for proper timeout handling. */
+    private static class HandshakeTimeoutException extends IgniteCheckedException {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param cause Exception cause
+         */
+        HandshakeTimeoutException(IgniteSpiOperationTimeoutException cause) {
+            super(cause);
+        }
+    }
+
     /**
      * This worker takes responsibility to shut the server down when stopping,
      * No other thread shall stop passed server.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 10fa6ad..fa9cc35 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
@@ -152,6 +153,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
+
         int srvs = serverCount();
 
         if (srvs > 0)
@@ -172,6 +175,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
     @Override protected void afterTestsStopped() throws Exception {
         super.afterTestsStopped();
 
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
         stopAllGrids();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
index ee1b48e..420fdd8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
@@ -26,6 +26,7 @@ import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -89,9 +90,18 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstrac
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
 
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
         super.afterTestsStopped();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
index 79f15ad..1a9e13a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
@@ -31,6 +31,7 @@ import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -92,6 +93,20 @@ public class IgniteCacheNearRestartRollbackSelfTest extends GridCommonAbstractTe
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+    }
+
     /**
      * @param igniteInstanceName Ignite instance name.
      * @return Cache configuration.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
index d23e870..8e57387 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
@@ -32,6 +32,7 @@ import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
@@ -106,6 +107,8 @@ public class IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends
Gri
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+
         startGrids(SRVS);
 
         clientMode = true;
@@ -121,6 +124,8 @@ public class IgniteTxCacheWriteSynchronizationModesMultithreadedTest extends
Gri
     @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
 
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+
         super.afterTestsStopped();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
index d08321e..68d97c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpForwarder.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -85,6 +86,10 @@ public final class GridTcpForwarder implements AutoCloseable {
                                 outputCon.getInputStream(), inputCon.getOutputStream()
                             );
 
+                            //Force closing sibling if one of thread failed.
+                            forwardThread1.setUncaughtExceptionHandler(new ForwarderExceptionHandler(forwardThread2));
+                            forwardThread2.setUncaughtExceptionHandler(new ForwarderExceptionHandler(forwardThread1));
+
                             forwardThread1.start();
                             forwardThread2.start();
 
@@ -128,6 +133,25 @@ public final class GridTcpForwarder implements AutoCloseable {
     }
 
     /**
+     *
+     */
+    private static class ForwarderExceptionHandler implements Thread.UncaughtExceptionHandler
{
+        /** */
+        private Thread siblingThread;
+
+        /** */
+        public ForwarderExceptionHandler(Thread siblingThread) {
+
+            this.siblingThread = siblingThread;
+        }
+
+        /** */
+        @Override public void uncaughtException(Thread t, Throwable e) {
+            siblingThread.interrupt();
+        }
+    }
+
+    /**
      * Thread reads data from input stream and write to output stream.
      */
     private class ForwardThread extends Thread {
@@ -166,6 +190,8 @@ public final class GridTcpForwarder implements AutoCloseable {
             }
             catch (IOException e) {
                 log.error("IOException while forwarding data [threadName=" + getName() +
"]", e);
+
+                throw new IgniteException(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index ddebd40..e215a34 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
@@ -82,6 +83,20 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest
{
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e46cf958/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
index b4d6cbc..bead697 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@ -18,18 +18,16 @@
 package org.apache.ignite.spi.communication.tcp;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
-import java.net.Socket;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
@@ -44,8 +42,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -95,6 +91,20 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 


Mime
View raw message