ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1171
Date Fri, 18 Sep 2015 10:09:32 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1171 98083b715 -> 04fe2c24a


ignite-1171


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04fe2c24
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04fe2c24
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04fe2c24

Branch: refs/heads/ignite-1171
Commit: 04fe2c24a3f98dcadea9215afc51c8a53c0c83b2
Parents: 98083b7
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Sep 18 13:09:24 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Sep 18 13:09:24 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   4 +-
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |   6 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 142 ++++++++++++++++++-
 3 files changed, 147 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/04fe2c24/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 18db05a..619e53c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3340,9 +3340,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy()
== CONNECTED && fireEvt) {
-                joiningNodes.remove(nodeId);
+            joiningNodes.remove(nodeId);
 
+            if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy()
== CONNECTED && fireEvt) {
                 spi.stats.onNodeJoined();
 
                 // Make sure that node with greater order will never get EVT_NODE_JOINED

http://git-wip-us.apache.org/repos/asf/ignite/blob/04fe2c24/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 86825aa..4f3c9a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -265,9 +265,13 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
 
             IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>()
{
                 @Override public Object call() throws Exception {
+                    int idx = startIdx.getAndIncrement();
+
+                    Thread.currentThread().setName("start-thread-" + idx);
+
                     barrier.await();
 
-                    Ignite ignite = startGrid(startIdx.getAndIncrement());
+                    Ignite ignite = startGrid(idx);
 
                     assertFalse(ignite.configuration().isClientMode());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/04fe2c24/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 981f649..9bd30b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -35,8 +35,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
@@ -45,6 +47,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
+import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
@@ -57,6 +60,8 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
@@ -87,6 +92,9 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     /** */
     private UUID nodeId;
 
+    /** */
+    private TcpDiscoverySpi nodeSpi;
+
     /**
      * @throws Exception If fails.
      */
@@ -99,8 +107,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode")
?
-            new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
+        TcpDiscoverySpi spi = nodeSpi;
+
+        if (spi == null)
+            spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
+                new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
 
         discoMap.put(gridName, spi);
 
@@ -1164,6 +1175,133 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed
+     */
+    public void testCustomEventRace1_1() throws Exception {
+        customEventRace1(true);
+    }
+
+    /**
+     * @throws Exception If failed
+     */
+    public void testCustomEventRace1_2() throws Exception {
+        customEventRace1(false);
+    }
+
+    /**
+     * @param cacheStartFrom1 If {code true} starts cache from node1.
+     * @throws Exception If failed
+     */
+    private void customEventRace1(final boolean cacheStartFrom1) throws Exception {
+        TestFailoverTcpDiscoverySpi spi0 = new TestFailoverTcpDiscoverySpi();
+
+        nodeSpi = spi0;
+
+        final Ignite ignite0 = startGrid(0);
+
+        nodeSpi = new TestFailoverTcpDiscoverySpi();
+
+        final Ignite ignite1 = startGrid(1);
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+
+        spi0.nodeAdded1 = latch1;
+        spi0.nodeAdded2 = latch2;
+        spi0.debug = true;
+
+        IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>()
{
+            @Override public Void call() throws Exception {
+                log.info("Start 2");
+
+                nodeSpi = new TestFailoverTcpDiscoverySpi();
+
+                Ignite ignite2 = startGrid(2);
+
+                return null;
+            }
+        });
+
+        latch1.await();
+
+        final String CACHE_NAME = "cache";
+
+        IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>()
{
+            @Override public Void call() throws Exception {
+                CacheConfiguration ccfg = new CacheConfiguration();
+
+                ccfg.setName(CACHE_NAME);
+
+                Ignite ignite = cacheStartFrom1 ? ignite1 : ignite0;
+
+                ignite.createCache(ccfg);
+
+                return null;
+            }
+        });
+
+        U.sleep(500);
+
+        latch2.countDown();
+
+        fut1.get();
+        fut2.get();
+
+        IgniteCache<Object, Object> cache = grid(2).cache(CACHE_NAME);
+
+        assertNotNull(cache);
+
+        cache.put(1, 1);
+
+        assertEquals(1, cache.get(1));
+
+        startGrid(3);
+    }
+
+    /**
+     *
+     */
+    private static class TestFailoverTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private volatile CountDownLatch nodeAdded1;
+
+        /** */
+        private volatile CountDownLatch nodeAdded2;
+
+        /** */
+        private boolean debug;
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException
{
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                if (nodeAdded1 != null) {
+                    nodeAdded1.countDown();
+
+                    if (debug)
+                        log.info("--- Wait node added: " + msg);
+
+                    U.await(nodeAdded2);
+
+                    nodeAdded1 = null;
+                    nodeAdded2 = null;
+                }
+
+                if (debug)
+                    log.info("--- Send node added: " + msg);
+            }
+
+            if (debug && msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+                log.info("--- Send node finished: " + msg);
+
+            if (debug && msg instanceof TcpDiscoveryCustomEventMessage)
+                log.info("--- Send custom event: " + msg);
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+    }
+
+    /**
      * Starts new grid with given index. Method optimize is not invoked.
      *
      * @param idx Index of the grid to start.


Mime
View raw message