ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dma...@apache.org
Subject ignite git commit: IGNITE-2688: fixed InterruptException processing in RingMessageWorker caused by segmentation
Date Fri, 15 Apr 2016 20:59:31 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2688 21bd626d7 -> 1d327819f


IGNITE-2688: fixed InterruptException processing in RingMessageWorker caused by segmentation


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d327819
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d327819
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d327819

Branch: refs/heads/ignite-2688
Commit: 1d327819f664c3cdc426dbe2fede50c77671ea13
Parents: 21bd626
Author: Denis Magda <dmagda@gridgain.com>
Authored: Fri Apr 15 20:32:53 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Fri Apr 15 20:32:53 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   5 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  18 ++-
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   7 +
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 156 +++++++++++++++++--
 4 files changed, 172 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1d327819/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 31d614f..c5caec2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -778,6 +778,11 @@ class ClientImpl extends TcpDiscoveryImpl {
         return msgWorker;
     }
 
+    /** {@inheritDoc} */
+    @Override protected Boolean workerThreadStoppedAbnormally() {
+        return false;
+    }
+
     /**
      * FOR TEST PURPOSE ONLY!
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d327819/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 75430c6..8e01f04 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1524,6 +1524,11 @@ class ServerImpl extends TcpDiscoveryImpl {
         return msgWorker;
     }
 
+    /** {@inheritDoc} */
+    @Override protected Boolean workerThreadStoppedAbnormally() {
+        return msgWorker.stoppedAbnormally;
+    }
+
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      * <p>
@@ -2132,6 +2137,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Force pending messages send. */
         private boolean forceSndPending;
 
+        /** FOR TESTS ONLY. Used to check whether the worker has been stopped abnormally.
*/
+        private volatile Boolean stoppedAbnormally;
+
         /** Socket. */
         private Socket sock;
 
@@ -2177,12 +2185,16 @@ class ServerImpl extends TcpDiscoveryImpl {
         @Override protected void body() throws InterruptedException {
             try {
                 super.body();
+
+                stoppedAbnormally = false;
             }
             catch (Throwable e) {
-                if (!spi.isNodeStopping0()) {
-                    final Ignite ignite = spi.ignite();
+                if (!spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) {
+                        final Ignite ignite = spi.ignite();
 
                     if (ignite != null) {
+                        stoppedAbnormally = true;
+
                         U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally.
" +
                             "Stopping the node in order to prevent cluster wide instability.",
e);
 
@@ -2202,6 +2214,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }, "node-stop-thread").start();
                     }
                 }
+                else
+                    stoppedAbnormally = false;
 
                 // Must be processed by IgniteSpiThread as well.
                 throw e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d327819/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index 1aef728..6dea491 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -255,6 +255,13 @@ abstract class TcpDiscoveryImpl {
     protected abstract IgniteSpiThread workerThread();
 
     /**
+     * <strong>FOR TEST ONLY!!!</strong>
+     *
+     * @return {@code true} if the worker Thread has been stopped abnormally.
+     */
+    protected abstract Boolean workerThreadStoppedAbnormally();
+
+    /**
      * @throws IgniteSpiException If failed.
      */
     @SuppressWarnings("BusyWait")

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d327819/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 7efaca0..123ee88 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -188,6 +188,10 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         }
         else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode"))
             cfg.setFailureDetectionTimeout(30_000);
+        else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureNormalNode"))
+            cfg.setFailureDetectionTimeout(3_000);
+        else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureSegmentedNode"))
+            cfg.setFailureDetectionTimeout(6_000);
 
         return cfg;
     }
@@ -1355,7 +1359,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
      */
     public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception {
         try {
-            TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1();
+            TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1(
+                TestMessageWorkerFailureSpi1.EXCEPTION_MODE);
 
             nodeSpi.set(spi0);
 
@@ -1389,20 +1394,109 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest
{
 
             assertTrue(disconnected.get());
 
-            try {
-                ignite0.cluster().localNode().id();
-            }
-            catch (IllegalStateException e) {
-                if (e.getMessage().contains("Grid is in invalid state to perform this operation"))
-                    return;
-            }
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    Boolean stoppedAbnormally = spi0.stoppedAbnormally();
 
-            fail();
+                    return stoppedAbnormally != null && stoppedAbnormally;
+                }
+            }, 10_000);
+
+            Boolean stoppedAbnormally = spi0.stoppedAbnormally();
+
+            assertTrue(stoppedAbnormally != null && stoppedAbnormally);
         }
         finally {
             stopAllGrids();
         }
     }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testNoRingMessageWorkerAbnormalFailureOnSegmentation() throws Exception {
+        try {
+            TestMessageWorkerFailureSpi1 spi1 = new TestMessageWorkerFailureSpi1(
+                TestMessageWorkerFailureSpi1.SEGMENTATION_MODE);
+
+            nodeSpi.set(spi1);
+
+            Ignite ignite1 = startGrid("testNoRingMessageWorkerAbnormalFailureNormalNode");
+
+
+            TestMessageWorkerFailureSpi3 spi2 = new TestMessageWorkerFailureSpi3();
+
+            nodeSpi.set(spi2);
+
+            final Ignite ignite2 = startGrid("testNoRingMessageWorkerAbnormalFailureSegmentedNode");
+
+
+            final AtomicBoolean disconnected = new AtomicBoolean();
+
+            final AtomicBoolean segmented = new AtomicBoolean();
+
+            final CountDownLatch disLatch = new CountDownLatch(1);
+
+            final CountDownLatch segLatch = new CountDownLatch(1);
+
+            final UUID failedNodeId = ignite2.cluster().localNode().id();
+
+            ignite1.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    if (evt.type() == EventType.EVT_NODE_FAILED &&
+                        failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id()))
+                        disconnected.set(true);
+
+                    disLatch.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_NODE_FAILED);
+
+            ignite2.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    if (evt.type() == EventType.EVT_NODE_SEGMENTED &&
+                        failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id()))
+                        segmented.set(true);
+
+                    segLatch.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_NODE_SEGMENTED);
+
+
+            spi1.stop = true;
+
+            disLatch.await(15, TimeUnit.SECONDS);
+
+            assertTrue(disconnected.get());
+
+
+            spi1.stop = false;
+
+            segLatch.await(15, TimeUnit.SECONDS);
+
+            assertTrue(segmented.get());
+
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    Boolean stoppedAbnormally = spi2.stoppedAbnormally();
+
+                    return stoppedAbnormally != null && !stoppedAbnormally;
+                }
+            }, 10_000);
+
+            Boolean stoppedAbnormally = spi2.stoppedAbnormally();
+
+            assertTrue(stoppedAbnormally != null && !stoppedAbnormally);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
     /**
      * @throws Exception If failed
      */
@@ -2069,16 +2163,43 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     /**
      *
      */
-    private static class TestMessageWorkerFailureSpi1 extends TcpDiscoverySpi {
+    private static class TestMessageWorkerFailureSpi1 extends TestMessageWorkerFailureSpi3
{
+        /** */
+        private static int EXCEPTION_MODE = 0;
+
+        /** */
+        private static int SEGMENTATION_MODE = 1;
+
+        /** */
+        private final int failureMode;
+
         /** */
         private volatile boolean stop;
 
+        /**
+         * @param failureMode Failure mode to use during the test.
+         */
+        public TestMessageWorkerFailureSpi1(int failureMode) {
+            this.failureMode = failureMode;
+        }
+
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage
msg,
             long timeout) throws IOException, IgniteCheckedException {
 
-            if (stop)
-                throw new RuntimeException("Failing ring message worker explicitly");
+            if (stop) {
+                if (failureMode == EXCEPTION_MODE)
+                    throw new RuntimeException("Failing ring message worker explicitly");
+                else {
+                    try {
+                        Thread.sleep(5_000);
+                    }
+                    catch (InterruptedException e) {
+                        // Ignore.
+                    }
+                }
+
+            }
 
             super.writeToSocket(sock, out, msg, timeout);
         }
@@ -2104,6 +2225,17 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
         }
     }
 
+    private static class TestMessageWorkerFailureSpi3 extends TcpDiscoverySpi {
+        /**
+         * Checks if the worker thread has been stopped abnormally.
+         *
+         * @return {@code true} if the worker thread has been stopped abnormally.
+         */
+        Boolean stoppedAbnormally() {
+            return impl.workerThreadStoppedAbnormally();
+        }
+    }
+
     /**
      * Starts new grid with given index. Method optimize is not invoked.
      *


Mime
View raw message