ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/3] ignite git commit: IGNITE-2688: fixed InterruptException processing in RingMessageWorker caused by segmentation
Date Thu, 21 Apr 2016 09:51:39 GMT
IGNITE-2688: fixed InterruptException processing in RingMessageWorker caused by segmentation


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1eb931b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1eb931b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1eb931b

Branch: refs/heads/ignite-2523-1
Commit: e1eb931b0c578f9d3816bae015e21f91af199a9e
Parents: 57d85a4
Author: Denis Magda <dmagda@gridgain.com>
Authored: Wed Apr 20 17:52:51 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Wed Apr 20 17:52:51 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   4 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 148 +++++++++++++++++--
 2 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e1eb931b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 572f540..3283d99 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2179,8 +2179,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 super.body();
             }
             catch (Throwable e) {
-                if (!spi.isNodeStopping0()) {
-                    final Ignite ignite = spi.ignite();
+                if (!spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) {
+                        final Ignite ignite = spi.ignite();
 
                     if (ignite != null) {
                         U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally.
" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1eb931b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 7efaca0..45cd276 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -77,6 +77,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedM
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
+import org.apache.ignite.testframework.GridStringLogger;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.eclipse.jetty.util.ConcurrentHashSet;
@@ -108,6 +109,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     /** */
     private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>();
 
+    private GridStringLogger strLogger;
+
     /**
      * @throws Exception If fails.
      */
@@ -188,6 +191,16 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         }
         else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode"))
             cfg.setFailureDetectionTimeout(30_000);
+        else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureNormalNode")) {
+            cfg.setFailureDetectionTimeout(3_000);
+        }
+        else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureSegmentedNode"))
{
+            cfg.setFailureDetectionTimeout(6_000);
+
+            cfg.setGridLogger(strLogger = new GridStringLogger());
+        }
+        else if (gridName.contains("testNodeShutdownOnRingMessageWorkerFailureFailedNode"))
+            cfg.setGridLogger(strLogger = new GridStringLogger());
 
         return cfg;
     }
@@ -1355,11 +1368,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
      */
     public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception {
         try {
-            TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1();
+            final TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1(
+                TestMessageWorkerFailureSpi1.EXCEPTION_MODE);
 
             nodeSpi.set(spi0);
 
-            final Ignite ignite0 = startGrid(0);
+            final Ignite ignite0 = startGrid("testNodeShutdownOnRingMessageWorkerFailureFailedNode");
 
             nodeSpi.set(new TcpDiscoverySpi());
 
@@ -1374,10 +1388,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
             ignite1.events().localListen(new IgnitePredicate<Event>() {
                 @Override public boolean apply(Event evt) {
                     if (evt.type() == EventType.EVT_NODE_FAILED &&
-                        failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id()))
+                        failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id())) {
                         disconnected.set(true);
 
-                    latch.countDown();
+                        latch.countDown();
+                    }
 
                     return false;
                 }
@@ -1389,20 +1404,98 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
 
             assertTrue(disconnected.get());
 
-            try {
-                ignite0.cluster().localNode().id();
-            }
-            catch (IllegalStateException e) {
-                if (e.getMessage().contains("Grid is in invalid state to perform this operation"))
-                    return;
-            }
+            String result = strLogger.toString();
 
-            fail();
+            assert result.contains("TcpDiscoverSpi's message worker thread failed abnormally")
: result;
         }
         finally {
             stopAllGrids();
         }
     }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testNoRingMessageWorkerAbnormalFailureOnSegmentation() throws Exception {
+        try {
+            TestMessageWorkerFailureSpi1 spi1 = new TestMessageWorkerFailureSpi1(
+                TestMessageWorkerFailureSpi1.SEGMENTATION_MODE);
+
+            nodeSpi.set(spi1);
+
+            Ignite ignite1 = startGrid("testNoRingMessageWorkerAbnormalFailureNormalNode");
+
+
+            nodeSpi.set(new TcpDiscoverySpi());
+
+            final Ignite ignite2 = startGrid("testNoRingMessageWorkerAbnormalFailureSegmentedNode");
+
+
+            final AtomicBoolean disconnected = new AtomicBoolean();
+
+            final AtomicBoolean segmented = new AtomicBoolean();
+
+            final CountDownLatch disLatch = new CountDownLatch(1);
+
+            final CountDownLatch segLatch = new CountDownLatch(1);
+
+            final UUID failedNodeId = ignite2.cluster().localNode().id();
+
+            ignite1.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    if (evt.type() == EventType.EVT_NODE_FAILED &&
+                        failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id()))
+                        disconnected.set(true);
+
+                    disLatch.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_NODE_FAILED);
+
+            ignite2.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    if (!failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id()))
+                        return true;
+
+                    if (evt.type() == EventType.EVT_NODE_SEGMENTED) {
+                        segmented.set(true);
+
+                        segLatch.countDown();
+                    }
+
+                    return true;
+                }
+            }, EventType.EVT_NODE_SEGMENTED);
+
+
+            spi1.stop = true;
+
+            disLatch.await(15, TimeUnit.SECONDS);
+
+            assertTrue(disconnected.get());
+
+
+            spi1.stop = false;
+
+            segLatch.await(15, TimeUnit.SECONDS);
+
+            assertTrue(segmented.get());
+
+
+            Thread.sleep(10_000);
+
+
+            String result = strLogger.toString();
+
+            assert result.contains("Local node SEGMENTED") &&
+                !result.contains("TcpDiscoverSpi's message worker thread failed abnormally")
: result;
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
     /**
      * @throws Exception If failed
      */
@@ -2071,14 +2164,41 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
      */
     private static class TestMessageWorkerFailureSpi1 extends TcpDiscoverySpi {
         /** */
+        private static int EXCEPTION_MODE = 0;
+
+        /** */
+        private static int SEGMENTATION_MODE = 1;
+
+        /** */
+        private final int failureMode;
+
+        /** */
         private volatile boolean stop;
 
+        /**
+         * @param failureMode Failure mode to use during the test.
+         */
+        public TestMessageWorkerFailureSpi1(int failureMode) {
+            this.failureMode = failureMode;
+        }
+
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
             long timeout) throws IOException, IgniteCheckedException {
 
-            if (stop)
-                throw new RuntimeException("Failing ring message worker explicitly");
+            if (stop) {
+                if (failureMode == EXCEPTION_MODE)
+                    throw new RuntimeException("Failing ring message worker explicitly");
+                else {
+                    try {
+                        Thread.sleep(5_000);
+                    }
+                    catch (InterruptedException e) {
+                        // Ignore.
+                    }
+                }
+
+            }
 
             super.writeToSocket(sock, out, msg, timeout);
         }


Mime
View raw message