ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [01/34] ignite git commit: IGNITE-3727: added ability intercept "stop listener" message, check future status after invoke stopRemoteListen.
Date Tue, 14 Feb 2017 13:11:20 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3727-2 [created] 68f2d38e3


IGNITE-3727: added ability intercept "stop listener" message, check future status after invoke
stopRemoteListen.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e6605f1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e6605f1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e6605f1

Branch: refs/heads/ignite-3727-2
Commit: 4e6605f1691a43b2ba57da3d4eef98e6dd460a43
Parents: 2d5d5bc
Author: DmitriyGovorukhin <dgovorukhin@gridgain.com>
Authored: Tue Aug 30 18:38:13 2016 +0300
Committer: DmitriyGovorukhin <dgovorukhin@gridgain.com>
Committed: Tue Aug 30 18:38:13 2016 +0300

----------------------------------------------------------------------
 .../ignite/messaging/GridMessagingSelfTest.java | 101 +++++++++++++++++--
 1 file changed, 92 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4e6605f1/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
index e796eb5..2e2afd4 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java
@@ -24,27 +24,26 @@ import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteMessaging;
+
+import org.apache.ignite.*;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.*;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -198,7 +197,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements
Ser
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        TestTcpDiscoverySpi discoSpi = new TestTcpDiscoverySpi();
 
         discoSpi.setIpFinder(ipFinder);
 
@@ -944,7 +943,7 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements
Ser
      * @throws Exception If error occurs.
      */
     public void testSendMessageWithExternalClassLoader() throws Exception {
-        URL[] urls = new URL[] { new URL(GridTestProperties.getProperty("p2p.uri.cls")) };
+        URL[] urls = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))};
 
         ClassLoader extLdr = new URLClassLoader(urls);
 
@@ -1028,6 +1027,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements
Ser
     public void testAsync() throws Exception {
         final AtomicInteger msgCnt = new AtomicInteger();
 
+        TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite2.configuration().getDiscoverySpi();
+
         assertFalse(ignite2.message().isAsync());
 
         final IgniteMessaging msg = ignite2.message().withAsync();
@@ -1085,6 +1086,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements
Ser
 
         assertEquals(1, msgCnt.get());
 
+        discoSpi.blockCustomEvent();
+
         msg.stopRemoteListen(id);
 
         IgniteFuture<?> stopFut = msg.future();
@@ -1099,8 +1102,14 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements
Ser
             }
         }, IllegalStateException.class, null);
 
+        Assert.assertFalse(stopFut.isDone());
+
+        discoSpi.stopBlock();
+
         stopFut.get();
 
+        Assert.assertTrue(stopFut.isDone());
+
         message(ignite1.cluster().forRemotes()).send(topic, "msg2");
 
         U.sleep(1000);
@@ -1109,6 +1118,80 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements
Ser
     }
 
     /**
+     *
+     */
+    static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private boolean blockCustomEvt;
+
+        /** */
+        private final Object mux = new Object();
+
+        /** */
+        private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException
{
+            synchronized (mux) {
+                if (blockCustomEvt) {
+                    DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate");
+                    if (msg0 instanceof StopRoutineDiscoveryMessage) {
+                        log.info("Block custom message: " + msg0);
+                        blockedMsgs.add(msg);
+
+                        mux.notifyAll();
+                    }
+                    return;
+                }
+            }
+
+            super.sendCustomEvent(msg);
+        }
+
+        /**
+         *
+         */
+        public void blockCustomEvent() {
+            synchronized (mux) {
+                assert blockedMsgs.isEmpty() : blockedMsgs;
+
+                blockCustomEvt = true;
+            }
+        }
+
+        /**
+         * @throws InterruptedException If interrupted.
+         */
+        public void waitCustomEvent() throws InterruptedException {
+            synchronized (mux) {
+                while (blockedMsgs.isEmpty())
+                    mux.wait();
+            }
+        }
+
+        /**
+         *
+         */
+        public void stopBlock() {
+            List<DiscoverySpiCustomMessage> msgs;
+
+            synchronized (this) {
+                msgs = new ArrayList<>(blockedMsgs);
+
+                blockCustomEvt = false;
+
+                blockedMsgs.clear();
+            }
+
+            for (DiscoverySpiCustomMessage msg : msgs) {
+                log.info("Resend blocked message: " + msg);
+
+                super.sendCustomEvent(msg);
+            }
+        }
+    }
+
+    /**
      * Tests that message listener registers only for one oldest node.
      *
      * @throws Exception If an error occurred.


Mime
View raw message