ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1758 Discovery fixes.
Date Tue, 10 Nov 2015 13:57:43 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1758-debug e7723fe16 -> bf9cf422b


ignite-1758 Discovery fixes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf9cf422
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf9cf422
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf9cf422

Branch: refs/heads/ignite-1758-debug
Commit: bf9cf422b0384af44f94afdbd97cf5cda84e1f5c
Parents: e7723fe
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Nov 10 16:17:07 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Nov 10 16:34:39 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  94 +++++-
 .../tcp/internal/TcpDiscoveryNodesRing.java     |   2 +-
 .../messages/TcpDiscoveryAbstractMessage.java   |  31 ++
 .../tcp/TcpDiscoveryMultiThreadedTest.java      | 156 ++++++----
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 293 ++++++++++++++++++-
 .../testframework/junits/GridAbstractTest.java  |  15 +-
 6 files changed, 502 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0fe2881..0233435 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -191,10 +191,10 @@ class ServerImpl extends TcpDiscoveryImpl {
     private StatisticsPrinter statsPrinter;
 
     /** Failed nodes (but still in topology). */
-    private Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
+    private final Collection<TcpDiscoveryNode> failedNodes = new HashSet<>();
 
     /** Leaving nodes (but still in topology). */
-    private Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
+    private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>();
 
     /** If non-shared IP finder is used this flag shows whether IP finder contains local
address. */
     private boolean ipFinderHasLocAddr;
@@ -1080,9 +1080,17 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 openSock = true;
 
+                TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
+
+                if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                    synchronized (failedNodes) {
+                        for (TcpDiscoveryNode node : failedNodes)
+                            req.addFailedNode(node);
+                    }
+                }
+
                 // Handshake.
-                spi.writeToSocket(sock, new TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
-                    spi.getSocketTimeout()));
+                spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, null, timeoutHelper.nextTimeoutChunk(
                     ackTimeout0));
@@ -1754,6 +1762,25 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     * Adds failed nodes specified in the received message to the local failed nodes list.
+     *
+     * @param msg Message.
+     */
+    private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) {
+        if (msg.failedNodes() != null) {
+            for (UUID nodeId : msg.failedNodes()) {
+                TcpDiscoveryNode failedNode = ring.node(nodeId);
+
+                if (failedNode != null) {
+                    synchronized (mux) {
+                        failedNodes.add(failedNode);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
      * Discovery messages history used for client reconnect.
      */
     private class EnsuredMessageHistory {
@@ -2135,6 +2162,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             spi.stats.onMessageProcessingStarted(msg);
 
+            processMessageFailedNodes(msg);
+
             if (msg instanceof TcpDiscoveryJoinRequestMessage)
                 processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
 
@@ -2200,6 +2229,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             checkHeartbeatsReceiving();
 
             checkPendingCustomMessages();
+
+            checkFailedNodesList();
         }
 
         /**
@@ -2540,6 +2571,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (timeoutHelper == null)
                                     timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
 
+                                if (!failedNodes.isEmpty()) {
+                                    for (TcpDiscoveryNode node : failedNodes)
+                                        msg.addFailedNode(node);
+                                }
+
                                 writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp);
@@ -2679,11 +2715,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         if (log.isDebugEnabled())
                             log.debug("Pending message has been sent to local node [msg="
+ msg.id() +
-                                ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
']');
+                                ", pendingMsgId=" + pendingMsg + ", next=" + (next != null
? next.id() : null) + ']');
 
                         if (debugMode)
                             debugLog("Pending message has been sent to local node [msg="
+ msg.id() +
-                                ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
']');
+                                ", pendingMsgId=" + pendingMsg + ", next=" + (next != null
? next.id() : null) + ']');
                     }
                 }
 
@@ -3447,6 +3483,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                             spi.gridStartTime = msg.gridStartTime();
 
                             for (TcpDiscoveryNode n : top) {
+                                assert n.internalOrder() < node.internalOrder() :
+                                    "Invalid node [topNode=" + n + ", added=" + node + ']';
+
                                 // Make all preceding nodes and local node visible.
                                 n.visible(true);
                             }
@@ -3500,6 +3539,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
                         spi.onExchange(node.id(), entry.getKey(), entry.getValue(), U.gridClassLoader());
                 }
+
+                processMessageFailedNodes(msg);
             }
 
             if (sendMessageToRemotes(msg))
@@ -4053,7 +4094,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         onException("Failed to respond to status check message
(connection refused) " +
                                             "[recipient=" + msg.creatorNodeId() + ", status="
+ msg.status() + ']', e);
                                     }
-                                    else {
+                                    else if (!spi.isNodeStopping0()){
                                         if (pingNode(msg.creatorNode()))
                                             // Node exists and accepts incoming connections.
                                             U.error(log, "Failed to respond to status check
message [recipient=" +
@@ -4441,6 +4482,34 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed
node
+         * is still in the ring.
+         */
+        private void checkFailedNodesList() {
+            List<TcpDiscoveryNodeFailedMessage> msgs = null;
+
+            synchronized (mux) {
+                for (Iterator<TcpDiscoveryNode> it = failedNodes.iterator(); it.hasNext();)
{
+                    TcpDiscoveryNode node = it.next();
+
+                    if (ring.node(node.id()) != null) {
+                        if (msgs == null)
+                            msgs = new ArrayList<>(failedNodes.size());
+
+                        msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(),
node.internalOrder()));
+                    }
+                    else
+                        it.remove();
+                }
+            }
+
+            if (msgs != null) {
+                for (TcpDiscoveryNodeFailedMessage msg : msgs)
+                    addMessage(msg);
+            }
+        }
+
+        /**
          * Checks and flushes custom event messages if no nodes are attempting to join the
grid.
          */
         private void checkPendingCustomMessages() {
@@ -4640,10 +4709,10 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     synchronized (mux) {
                         readers.add(reader);
-
-                        reader.start();
                     }
 
+                    reader.start();
+
                     spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp);
                 }
             }
@@ -4792,6 +4861,13 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Handshake.
                     TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg;
 
+                    if (req.failedNodes() != null && req.failedNodes().contains(getLocalNodeId()))
{
+                        if (log.isDebugEnabled())
+                            log.debug("Ignore handshake request, local node is in failed
list: " + req);
+
+                        return;
+                    }
+
                     UUID nodeId = req.creatorNodeId();
 
                     this.nodeId = nodeId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 7ca092c..b234f40 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -451,7 +451,7 @@ public class TcpDiscoveryNodesRing {
      * topology contains less than two nodes.
      */
     @Nullable public TcpDiscoveryNode nextNode(@Nullable Collection<TcpDiscoveryNode>
excluded) {
-        assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode);
+        assert excluded == null || excluded.isEmpty() || !excluded.contains(locNode) : excluded;
 
         rwLock.readLock().lock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 875d18e..a2e48fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -19,10 +19,16 @@ package org.apache.ignite.spi.discovery.tcp.messages;
 
 import java.io.Externalizable;
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Base class to implement discovery messages.
@@ -62,6 +68,10 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable
{
     /** Pending message index. */
     private short pendingIdx;
 
+    /** */
+    @GridToStringInclude
+    private Set<UUID> failedNodes;
+
     /**
      * Default no-arg constructor for {@link Externalizable} interface.
      */
@@ -236,6 +246,27 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable
{
         return false;
     }
 
+    /**
+     * Adds node ID to the failed nodes list.
+     *
+     * @param node Node.
+     */
+    public void addFailedNode(TcpDiscoveryNode node) {
+        assert node != null;
+
+        if (failedNodes == null)
+            failedNodes = new HashSet<>();
+
+        failedNodes.add(node.id());
+    }
+
+    /**
+     * @return Failed nodes IDs.
+     */
+    @Nullable public Collection<UUID> failedNodes() {
+        return failedNodes;
+    }
+
     /** {@inheritDoc} */
     @Override public final boolean equals(Object obj) {
         if (this == obj)

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 55474dc..5a01121 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -212,6 +213,22 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
     public void testMultiThreadedClientsServersRestart() throws Throwable {
         fail("https://issues.apache.org/jira/browse/IGNITE-1123");
 
+        multiThreadedClientsServersRestart(GRID_CNT, CLIENT_GRID_CNT);
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void _testMultiThreadedServersRestart() throws Throwable {
+        multiThreadedClientsServersRestart(GRID_CNT * 2, 0);
+    }
+
+    /**
+     * @param srvs Number of servers.
+     * @param clients Number of clients.
+     * @throws Exception If any error occurs.
+     */
+    private void multiThreadedClientsServersRestart(int srvs, int clients) throws Throwable
{
         final AtomicBoolean done = new AtomicBoolean();
 
         try {
@@ -219,91 +236,95 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
 
             info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
 
-            startGridsMultiThreaded(GRID_CNT);
+            startGridsMultiThreaded(srvs);
 
-            clientFlagGlobal = true;
-
-            startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+            IgniteInternalFuture<?> clientFut = null;
 
             final AtomicReference<Throwable> error = new AtomicReference<>();
 
-            final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>();
+            if (clients > 0) {
+                clientFlagGlobal = true;
 
-            for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
-                clientStopIdxs.add(i);
+                startGridsMultiThreaded(srvs, clients);
 
-            final AtomicInteger clientStartIdx = new AtomicInteger(9000);
+                final BlockingQueue<Integer> clientStopIdxs = new LinkedBlockingQueue<>();
 
-            IgniteInternalFuture<?> fut1 = multithreadedAsync(
-                new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        try {
-                            clientFlagPerThread.set(true);
+                for (int i = srvs; i < srvs + clients; i++)
+                    clientStopIdxs.add(i);
 
-                            while (!done.get() && error.get() == null) {
-                                Integer stopIdx = clientStopIdxs.take();
-
-                                log.info("Stop client: " + stopIdx);
+                final AtomicInteger clientStartIdx = new AtomicInteger(9000);
 
-                                stopGrid(stopIdx);
+                clientFut = multithreadedAsync(
+                    new Callable<Object>() {
+                        @Override public Object call() throws Exception {
+                            try {
+                                clientFlagPerThread.set(true);
 
                                 while (!done.get() && error.get() == null) {
-                                    // Generate unique name to simplify debugging.
-                                    int startIdx = clientStartIdx.getAndIncrement();
+                                    Integer stopIdx = clientStopIdxs.take();
 
-                                    log.info("Start client: " + startIdx);
+                                    log.info("Stop client: " + stopIdx);
 
-                                    UUID id = UUID.randomUUID();
+                                    stopGrid(stopIdx);
 
-                                    nodeId.set(id);
+                                    while (!done.get() && error.get() == null) {
+                                        // Generate unique name to simplify debugging.
+                                        int startIdx = clientStartIdx.getAndIncrement();
 
-                                    try {
-                                        Ignite ignite = startGrid(startIdx);
+                                        log.info("Start client: " + startIdx);
 
-                                        assertTrue(ignite.configuration().isClientMode());
+                                        UUID id = UUID.randomUUID();
 
-                                        clientStopIdxs.add(startIdx);
+                                        nodeId.set(id);
 
-                                        break;
-                                    }
-                                    catch (Exception e) {
-                                        if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class)
||
-                                            X.hasCause(e, IgniteClientDisconnectedException.class))
-                                            log.info("Client disconnected: " + e);
-                                        else if (X.hasCause(e, ClusterTopologyCheckedException.class))
-                                            log.info("Client failed to start: " + e);
-                                        else {
-                                            if (failedNodes.contains(id) && X.hasCause(e,
IgniteSpiException.class))
-                                                log.info("Client failed: " + e);
-                                            else
-                                                throw e;
+                                        try {
+                                            Ignite ignite = startGrid(startIdx);
+
+                                            assertTrue(ignite.configuration().isClientMode());
+
+                                            clientStopIdxs.add(startIdx);
+
+                                            break;
+                                        }
+                                        catch (Exception e) {
+                                            if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class)
||
+                                                X.hasCause(e, IgniteClientDisconnectedException.class))
+                                                log.info("Client disconnected: " + e);
+                                            else if (X.hasCause(e, ClusterTopologyCheckedException.class))
+                                                log.info("Client failed to start: " + e);
+                                            else {
+                                                if (failedNodes.contains(id) && X.hasCause(e,
IgniteSpiException.class))
+                                                    log.info("Client failed: " + e);
+                                                else
+                                                    throw e;
+                                            }
                                         }
                                     }
                                 }
                             }
-                        }
-                        catch (Throwable e) {
-                            log.error("Unexpected error: " + e, e);
+                            catch (Throwable e) {
+                                log.error("Unexpected error: " + e, e);
 
-                            error.compareAndSet(null, e);
+                                error.compareAndSet(null, e);
+
+                                return null;
+                            }
 
                             return null;
                         }
-
-                        return null;
-                    }
-                },
-                CLIENT_GRID_CNT,
-                "client-restart");
+                    },
+                    clients,
+                    "client-restart");
+            }
 
             final BlockingQueue<Integer> srvStopIdxs = new LinkedBlockingQueue<>();
 
-            for (int i = 0; i < GRID_CNT; i++)
+            for (int i = 0; i < srvs; i++)
                 srvStopIdxs.add(i);
 
-            final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + CLIENT_GRID_CNT);
+            final AtomicInteger srvStartIdx = new AtomicInteger(srvs + clients);
 
-            IgniteInternalFuture<?> fut2 = multithreadedAsync(
+            IgniteInternalFuture<?> srvFut = multithreadedAsync(
                 new Callable<Object>() {
                     @Override public Object call() throws Exception {
                         try {
@@ -312,6 +333,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
                             while (!done.get() && error.get() == null) {
                                 int stopIdx = srvStopIdxs.take();
 
+                                Thread.currentThread().setName("stop-server-" + getTestGridName(stopIdx));
+
                                 log.info("Stop server: " + stopIdx);
 
                                 stopGrid(stopIdx);
@@ -319,13 +342,20 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
                                 // Generate unique name to simplify debugging.
                                 int startIdx = srvStartIdx.getAndIncrement();
 
+                                Thread.currentThread().setName("start-server-" + getTestGridName(startIdx));
+
                                 log.info("Start server: " + startIdx);
 
-                                Ignite ignite = startGrid(startIdx);
+                                try {
+                                    Ignite ignite = startGrid(startIdx);
 
-                                assertFalse(ignite.configuration().isClientMode());
+                                    assertFalse(ignite.configuration().isClientMode());
 
-                                srvStopIdxs.add(startIdx);
+                                    srvStopIdxs.add(startIdx);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    log.info("Failed to start: " + e);
+                                }
                             }
                         }
                         catch (Throwable e) {
@@ -339,7 +369,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
                         return null;
                     }
                 },
-                GRID_CNT - 1,
+                srvs - 1,
                 "server-restart");
 
             final long timeToExec = getTestTimeout() - 60_000;
@@ -356,8 +386,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
 
                     done.set(true);
 
-                    fut1.cancel();
-                    fut2.cancel();
+                    if (clientFut != null)
+                        clientFut.cancel();
+
+                    srvFut.cancel();
 
                     throw err;
                 }
@@ -367,8 +399,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest
{
 
             done.set(true);
 
-            fut1.get();
-            fut2.get();
+            if (clientFut != null)
+                clientFut.get();
+
+            srvFut.get();
         }
         finally {
             done.set(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 51d8a2d..0e51c8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -21,12 +21,14 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -38,6 +40,8 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -48,6 +52,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
@@ -64,6 +69,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -94,7 +100,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     private UUID nodeId;
 
     /** */
-    private TcpDiscoverySpi nodeSpi;
+    private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>();
 
     /**
      * @throws Exception If fails.
@@ -108,11 +114,14 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi spi = nodeSpi;
+        TcpDiscoverySpi spi = nodeSpi.get();
 
-        if (spi == null)
+        if (spi == null) {
             spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
                 new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
+        }
+        else
+            nodeSpi.set(null);
 
         discoMap.put(gridName, spi);
 
@@ -1202,11 +1211,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     private void customEventRace1(final boolean cacheStartFrom1, boolean stopCrd) throws
Exception {
         TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi();
 
-        nodeSpi = spi0;
+        nodeSpi.set(spi0);
 
         final Ignite ignite0 = startGrid(0);
 
-        nodeSpi = new TestCustomEventRaceSpi();
+        nodeSpi.set(new TestCustomEventRaceSpi());
 
         final Ignite ignite1 = startGrid(1);
 
@@ -1221,7 +1230,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
             @Override public Void call() throws Exception {
                 log.info("Start 2");
 
-                nodeSpi = new TestCustomEventRaceSpi();
+                nodeSpi.set(new TestCustomEventRaceSpi());
 
                 Ignite ignite2 = startGrid(2);
 
@@ -1271,7 +1280,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
 
         assertEquals(1, cache.get(1));
 
-        nodeSpi = new TestCustomEventRaceSpi();
+        nodeSpi.set(new TestCustomEventRaceSpi());
 
         Ignite ignite = startGrid(3);
 
@@ -1314,15 +1323,15 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     private void customEventCoordinatorFailure(boolean twoNodes) throws Exception {
         TestCustomEventCoordinatorFailureSpi spi0 = new TestCustomEventCoordinatorFailureSpi();
 
-        nodeSpi = spi0;
+        nodeSpi.set(spi0);
 
         Ignite ignite0 = startGrid(0);
 
-        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
 
         Ignite ignite1 = startGrid(1);
 
-        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
 
         Ignite ignite2 = twoNodes ? null : startGrid(2);
 
@@ -1366,7 +1375,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
 
         log.info("Try start one more node.");
 
-        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
 
         Ignite ignite = startGrid(twoNodes ? 2 : 3);
 
@@ -1381,6 +1390,268 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Coordinator is added in failed list during node start.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes1() throws Exception {
+        try {
+            final int FAIL_ORDER = 3;
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            final Ignite ignite0 = startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            startGrid(1);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            Ignite ignite2 = startGrid(2);
+
+            assertEquals(2, ignite2.cluster().nodes().size());
+
+            waitNodeStop(ignite0.name());
+
+            tryCreateCache(2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Coordinator is added in failed list, concurrent nodes start.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes2() throws Exception {
+        try {
+            final int FAIL_ORDER = 3;
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            Ignite ignite0 = startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            startGrid(1);
+
+            final AtomicInteger nodeIdx = new AtomicInteger(1);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int idx = nodeIdx.incrementAndGet();
+
+                    nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+                    startGrid(idx);
+
+                    return null;
+                }
+            }, 3, "start-node");
+
+            Ignite ignite2 = ignite(2);
+
+            waitForRemoteNodes(ignite2, 3);
+
+            waitNodeStop(ignite0.name());
+
+            tryCreateCache(4);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Coordinator is added in failed list during node start, test with two nodes.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes3() throws Exception {
+        try {
+            nodeSpi.set(new TestFailedNodesSpi(-1));
+
+            Ignite ignite0 = startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(2));
+
+            Ignite ignite1 = startGrid(1);
+
+            assertEquals(1, ignite1.cluster().nodes().size());
+
+            waitNodeStop(ignite0.name());
+
+            ignite1.getOrCreateCache(new CacheConfiguration<>()).put(1, 1);
+
+            startGrid(2);
+
+            assertEquals(2, ignite1.cluster().nodes().size());
+
+            tryCreateCache(2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Coordinator is added in failed list during node start, but node detected failure dies
before
+     * sending {@link TcpDiscoveryNodeFailedMessage}.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes4() throws Exception {
+        try {
+            final int FAIL_ORDER = 3;
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            final Ignite ignite0 = startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            Ignite ignite1 = startGrid(1);
+
+            TestFailedNodesSpi spi = new TestFailedNodesSpi(FAIL_ORDER);
+
+            spi.stopBeforeSndFail = true;
+
+            nodeSpi.set(spi);
+
+            Ignite ignite2 = startGrid(2);
+
+            waitNodeStop(ignite2.name());
+
+            log.info("Try start new node.");
+
+            Ignite ignite3 = startGrid(3);
+
+            waitNodeStop(ignite0.name());
+
+            assertEquals(2, ignite1.cluster().nodes().size());
+            assertEquals(2, ignite3.cluster().nodes().size());
+
+            tryCreateCache(2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param nodeName Node name.
+     * @throws Exception If failed.
+     */
+    private void waitNodeStop(final String nodeName) throws Exception {
+        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    Ignition.ignite(nodeName);
+
+                    return false;
+                }
+                catch (IgniteIllegalStateException e) {
+                    return true;
+                }
+            }
+        }, 10_000);
+
+        if (!wait)
+            U.dumpThreads(log);
+
+        assertTrue("Failed to wait for node stop.", wait);
+    }
+
+    /**
+     * @param expNodes Expected nodes number.
+     */
+    private void tryCreateCache(int expNodes) {
+        List<Ignite> allNodes = G.allGrids();
+
+        assertEquals(expNodes, allNodes.size());
+
+        int cntr = 0;
+
+        for (Ignite ignite : allNodes) {
+            CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+            ccfg.setName("cache-" + cntr++);
+
+            log.info("Try create cache [node=" + ignite.name() + ", cache=" + ccfg.getName()
+ ']');
+
+            ignite.getOrCreateCache(ccfg).put(1, 1);
+        }
+    }
+
+    /**
+     * Simulate scenario when node detects node failure trying to send message, but node
still alive.
+     */
+    private static class TestFailedNodesSpi extends TcpDiscoverySpi {
+        /** */
+        private AtomicBoolean failMsg = new AtomicBoolean();
+
+        /** */
+        private int failOrder;
+
+        /** */
+        private boolean stopBeforeSndFail;
+
+        /** */
+        private boolean stop;
+
+        /**
+         * @param failOrder Spi fails connection if local node order equals to this order.
+         */
+        TestFailedNodesSpi(int failOrder) {
+            this.failOrder = failOrder;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock,
+            TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (stop)
+                return;
+
+            if (locNode.internalOrder() == failOrder &&
+                (msg instanceof TcpDiscoveryNodeAddedMessage) &&
+                failMsg.compareAndSet(false, true)) {
+                log.info("IO error on message send [locNode=" + locNode + ", msg=" + msg
+ ']');
+
+                sock.close();
+
+                throw new SocketTimeoutException();
+            }
+
+            if (stopBeforeSndFail &&
+                locNode.internalOrder() == failOrder &&
+                (msg instanceof TcpDiscoveryNodeFailedMessage)) {
+                stop = true;
+
+                log.info("Skip messages send and stop node [locNode=" + locNode + ", msg="
+ msg + ']');
+
+                sock.close();
+
+                GridTestUtils.runAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        ignite.close();
+
+                        return null;
+                    }
+                }, "stop-node");
+
+                return;
+            }
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+    }
+
+    /**
      *
      */
     private static class TestCustomEventCoordinatorFailureSpi extends TcpDiscoverySpi {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bf9cf422/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 41d4b4a..e5d0a6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1117,17 +1117,18 @@ public abstract class GridAbstractTest extends TestCase {
 
         cfg.setNodeId(null);
 
-        if (gridName != null && gridName.matches(".*\\d")) {
+        if (gridName != null && gridName.startsWith(getTestGridName()) &&
gridName.matches(".*\\d")) {
             String idStr = UUID.randomUUID().toString();
 
-            char[] chars = idStr.toCharArray();
+            String idxStr = String.valueOf(getTestGridIndex(gridName));
+
+            while (idxStr.length() < 5)
+                idxStr = '0' + idxStr;
 
-            chars[0] = gridName.charAt(gridName.length() - 1);
-            chars[1] = '0';
+            char[] chars = idStr.toCharArray();
 
-            chars[chars.length - 3] = '0';
-            chars[chars.length - 2] = '0';
-            chars[chars.length - 1] = gridName.charAt(gridName.length() - 1);
+            for (int i = 0; i < idxStr.length(); i++)
+                chars[chars.length - idxStr.length() + i] = idxStr.charAt(i);
 
             cfg.setNodeId(UUID.fromString(new String(chars)));
         }


Mime
View raw message