ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [3/5] ignite git commit: ignite-5860 Try process TcpDiscoveryClientReconnectMessage from socket reader instead of always processing it on coordinator.
Date Fri, 27 Oct 2017 12:15:04 GMT
ignite-5860 Try process TcpDiscoveryClientReconnectMessage from socket reader instead of always
processing it on coordinator.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56a63f80
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56a63f80
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56a63f80

Branch: refs/heads/ignite-3478
Commit: 56a63f80d1181e53a3e2a4c4f88e42226bbac86e
Parents: 717c549
Author: Denis Mekhanikov <dmekhanikov@gmail.com>
Authored: Fri Oct 27 14:12:36 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Oct 27 14:13:40 2017 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  52 +++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 311 ++++++++++---------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   4 +-
 ...lientDiscoverySpiFailureTimeoutSelfTest.java |  20 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 275 +++++++++++++++-
 5 files changed, 467 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 5dbfe6e..139c110 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -26,6 +26,7 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -470,7 +471,8 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /**
-     * @param recon {@code True} if reconnects.
+     * @param prevAddr If reconnect is in progress, then previous address of the router the
client was connected to
+     *      and {@code null} otherwise.
      * @param timeout Timeout.
      * @return Opened socket or {@code null} if timeout.
      * @throws InterruptedException If interrupted.
@@ -478,9 +480,9 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @see TcpDiscoverySpi#joinTimeout
      */
     @SuppressWarnings("BusyWait")
-    @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout)
+    @Nullable private T2<SocketStream, Boolean> joinTopology(InetSocketAddress prevAddr,
long timeout)
         throws IgniteSpiException, InterruptedException {
-        Collection<InetSocketAddress> addrs = null;
+        List<InetSocketAddress> addrs = null;
 
         long startTime = U.currentTimeMillis();
 
@@ -489,7 +491,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 throw new InterruptedException();
 
             while (addrs == null || addrs.isEmpty()) {
-                addrs = spi.resolvedAddresses();
+                addrs = new ArrayList<>(spi.resolvedAddresses());
 
                 if (!F.isEmpty(addrs)) {
                     if (log.isDebugEnabled())
@@ -509,22 +511,30 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
+            // Process failed node last.
+            if (prevAddr != null) {
+                int idx = addrs.indexOf(prevAddr);
 
-            Iterator<InetSocketAddress> it = addrs.iterator();
+                if (idx != -1)
+                    Collections.swap(addrs, idx, 0);
+            }
+
+            Collection<InetSocketAddress> addrs0 = new ArrayList<>(addrs);
 
             boolean wait = false;
 
-            while (it.hasNext()) {
+            for (int i = addrs.size() - 1; i >= 0; i--) {
                 if (Thread.currentThread().isInterrupted())
                     throw new InterruptedException();
 
-                InetSocketAddress addr = it.next();
+                InetSocketAddress addr = addrs.get(i);
+
+                boolean recon = prevAddr != null;
 
                 T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon,
addr);
 
                 if (sockAndRes == null) {
-                    it.remove();
+                    addrs.remove(i);
 
                     continue;
                 }
@@ -852,8 +862,8 @@ class ClientImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteSpiThread workerThread() {
-        return msgWorker;
+    @Override protected Collection<IgniteSpiThread> threads() {
+        return Arrays.asList(sockWriter, msgWorker);
     }
 
     /**
@@ -1336,15 +1346,20 @@ class ClientImpl extends TcpDiscoveryImpl {
         private boolean clientAck;
 
         /** */
-        private boolean join;
+        private final boolean join;
+
+        /** */
+        private final InetSocketAddress prevAddr;
 
         /**
          * @param join {@code True} if reconnects during join.
+         * @param prevAddr Address of the node, that this client was previously connected
to.
          */
-        protected Reconnector(boolean join) {
+        protected Reconnector(boolean join, InetSocketAddress prevAddr) {
             super(spi.ignite().name(), "tcp-client-disco-reconnector", log);
 
             this.join = join;
+            this.prevAddr = prevAddr;
         }
 
         /**
@@ -1374,7 +1389,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             try {
                 while (true) {
-                    T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout);
+                    T2<SocketStream, Boolean> joinRes = joinTopology(prevAddr, timeout);
 
                     if (joinRes == null) {
                         if (join) {
@@ -1609,6 +1624,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                     }
                     else if (msg instanceof SocketClosedMessage) {
                         if (((SocketClosedMessage)msg).sock == currSock) {
+                            Socket sock = currSock.sock;
+
+                            InetSocketAddress prevAddr = new InetSocketAddress(sock.getInetAddress(),
sock.getPort());
+
                             currSock = null;
 
                             boolean join = joinLatch.getCount() > 0;
@@ -1637,8 +1656,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                                     assert reconnector == null;
 
-                                    final Reconnector reconnector = new Reconnector(join);
-                                    this.reconnector = reconnector;
+                                    reconnector = new Reconnector(join, prevAddr);
                                     reconnector.start();
                                 }
                             }
@@ -1811,7 +1829,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             T2<SocketStream, Boolean> joinRes;
 
             try {
-                joinRes = joinTopology(false, spi.joinTimeout);
+                joinRes = joinTopology(null, spi.joinTimeout);
             }
             catch (IgniteSpiException e) {
                 joinError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index efe531a..1c3ec2e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -219,6 +219,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished
messages. */
     private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
 
+    /** Messages history used for client reconnect. */
+    private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
+
     /** If non-shared IP finder is used this flag shows whether IP finder contains local
address. */
     private boolean ipFinderHasLocAddr;
 
@@ -1663,8 +1666,23 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /** {@inheritDoc} */
-    @Override protected IgniteSpiThread workerThread() {
-        return msgWorker;
+    @Override protected Collection<IgniteSpiThread> threads() {
+        Collection<IgniteSpiThread> threads;
+
+        synchronized (mux) {
+            threads = new ArrayList<>(readers.size() + clientMsgWorkers.size() + 4);
+            threads.addAll(readers);
+        }
+
+        threads.addAll(clientMsgWorkers.values());
+        threads.add(tcpSrvr);
+        threads.add(ipFinderCleaner);
+        threads.add(msgWorker);
+        threads.add(statsPrinter);
+
+        threads.removeAll(Collections.<IgniteSpiThread>singleton(null));
+
+        return threads;
     }
 
     /**
@@ -2122,7 +2140,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             else if (msg instanceof TcpDiscoveryNodeFailedMessage)
                 clearClientAddFinished(((TcpDiscoveryNodeFailedMessage)msg).failedNodeId());
 
-            msgs.add(msg);
+            synchronized (msgs) {
+                msgs.add(msg);
+            }
         }
 
         /**
@@ -2161,14 +2181,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                 // Client connection failed before it received TcpDiscoveryNodeAddedMessage.
                 List<TcpDiscoveryAbstractMessage> res = null;
 
-                for (TcpDiscoveryAbstractMessage msg : msgs) {
-                    if (msg instanceof TcpDiscoveryNodeAddedMessage) {
-                        if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
-                            res = new ArrayList<>(msgs.size());
-                    }
+                synchronized (msgs) {
+                    for (TcpDiscoveryAbstractMessage msg : msgs) {
+                        if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                            if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
+                                res = new ArrayList<>(msgs.size());
+                        }
 
-                    if (res != null)
-                        res.add(prepare(msg, node.id()));
+                        if (res != null)
+                            res.add(prepare(msg, node.id()));
+                    }
                 }
 
                 if (log.isDebugEnabled()) {
@@ -2181,20 +2203,26 @@ class ServerImpl extends TcpDiscoveryImpl {
                 return res;
             }
             else {
-                if (msgs.isEmpty())
-                    return Collections.emptyList();
+                Collection<TcpDiscoveryAbstractMessage> cp;
 
-                Collection<TcpDiscoveryAbstractMessage> cp = new ArrayList<>(msgs.size());
+                boolean skip;
 
-                boolean skip = true;
+                synchronized (msgs) {
+                    if (msgs.isEmpty())
+                        return Collections.emptyList();
 
-                for (TcpDiscoveryAbstractMessage msg : msgs) {
-                    if (skip) {
-                        if (msg.id().equals(lastMsgId))
-                            skip = false;
+                    cp = new ArrayList<>(msgs.size());
+
+                    skip = true;
+
+                    for (TcpDiscoveryAbstractMessage msg : msgs) {
+                        if (skip) {
+                            if (msg.id().equals(lastMsgId))
+                                skip = false;
+                        }
+                        else
+                            cp.add(prepare(msg, node.id()));
                     }
-                    else
-                        cp.add(prepare(msg, node.id()));
                 }
 
                 cp = !skip ? cp : null;
@@ -2483,9 +2511,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Pending messages. */
         private final PendingMessages pendingMsgs = new PendingMessages();
 
-        /** Messages history used for client reconnect. */
-        private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory();
-
         /** Last message that updated topology. */
         private TcpDiscoveryAbstractMessage lastMsg;
 
@@ -2659,8 +2684,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (msg instanceof TcpDiscoveryJoinRequestMessage)
                 processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
 
-            else if (msg instanceof TcpDiscoveryClientReconnectMessage)
-                processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
+            else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
+                if (sendMessageToRemotes(msg))
+                    sendMessageAcrossRing(msg);
+            }
 
             else if (msg instanceof TcpDiscoveryNodeAddedMessage)
                 processNodeAddedMessage((TcpDiscoveryNodeAddedMessage)msg);
@@ -2695,9 +2722,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
-            if (ensured && redirectToClients(msg))
-                msgHist.add(msg);
-
             if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId()))
{
                 // Received a message from remote node.
                 onMessageExchanged();
@@ -2730,6 +2754,9 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
             if (redirectToClients(msg)) {
+                if (spi.ensured(msg))
+                    msgHist.add(msg);
+
                 byte[] msgBytes = null;
 
                 for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
@@ -3836,9 +3863,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 nodeAddedMsg.client(msg.client());
 
                 processNodeAddedMessage(nodeAddedMsg);
-
-                if (nodeAddedMsg.verified())
-                    msgHist.add(nodeAddedMsg);
             }
             else if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
@@ -3941,98 +3965,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Processes client reconnect message.
-         *
-         * @param msg Client reconnect message.
-         */
-        private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg)
{
-            UUID nodeId = msg.creatorNodeId();
-
-            UUID locNodeId = getLocalNodeId();
-
-            boolean isLocNodeRouter = locNodeId.equals(msg.routerNodeId());
-
-            if (!msg.verified()) {
-                TcpDiscoveryNode node = ring.node(nodeId);
-
-                assert node == null || node.isClient();
-
-                if (node != null) {
-                    node.clientRouterNodeId(msg.routerNodeId());
-                    node.clientAliveTime(spi.clientFailureDetectionTimeout());
-                }
-
-                if (isLocalNodeCoordinator()) {
-                    msg.verify(locNodeId);
-
-                    if (node != null) {
-                        Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(),
node);
-
-                        if (pending != null) {
-                            msg.pendingMessages(pending);
-                            msg.success(true);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Accept client reconnect, restored pending messages
" +
-                                    "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId
+ ']');
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Failing reconnecting client node because failed
to restore pending " +
-                                    "messages [locNodeId=" + locNodeId + ", clientNodeId="
+ nodeId + ']');
-
-                            TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
-                                node.id(), node.internalOrder());
-
-                            processNodeFailedMessage(nodeFailedMsg);
-
-                            if (nodeFailedMsg.verified())
-                                msgHist.add(nodeFailedMsg);
-                        }
-                    }
-                    else if (log.isDebugEnabled())
-                        log.debug("Reconnecting client node is already failed [nodeId=" +
nodeId + ']');
-
-                    if (isLocNodeRouter) {
-                        ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
-                        if (wrk != null)
-                            wrk.addMessage(msg);
-                        else if (log.isDebugEnabled())
-                            log.debug("Failed to reconnect client node (disconnected during
the process) [locNodeId=" +
-                                locNodeId + ", clientNodeId=" + nodeId + ']');
-                    }
-                    else {
-                        if (sendMessageToRemotes(msg))
-                            sendMessageAcrossRing(msg);
-                    }
-                }
-                else {
-                    if (sendMessageToRemotes(msg))
-                        sendMessageAcrossRing(msg);
-                }
-            }
-            else {
-                if (isLocalNodeCoordinator())
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
-
-                if (isLocNodeRouter) {
-                    ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
-
-                    if (wrk != null)
-                        wrk.addMessage(msg);
-                    else if (log.isDebugEnabled())
-                        log.debug("Failed to reconnect client node (disconnected during the
process) [locNodeId=" +
-                            locNodeId + ", clientNodeId=" + nodeId + ']');
-                }
-                else {
-                    if (ring.hasRemoteNodes() && !isLocalNodeCoordinator())
-                        sendMessageAcrossRing(msg);
-                }
-            }
-        }
-
-        /**
          * Processes node added message.
          *
          * For coordinator node method marks the messages as verified for rest of nodes to
apply the
@@ -4078,9 +4010,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     processNodeAddFinishedMessage(addFinishMsg);
 
-                    if (addFinishMsg.verified())
-                        msgHist.add(addFinishMsg);
-
                     addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
@@ -5145,9 +5074,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                             locNodeId, clientNode.id(), clientNode.internalOrder());
 
                                         processNodeFailedMessage(nodeFailedMsg);
-
-                                        if (nodeFailedMsg.verified())
-                                            msgHist.add(nodeFailedMsg);
                                     }
                                 }
                             }
@@ -5342,9 +5268,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 ackMsg.topologyVersion(msg.topologyVersion());
 
                                 processCustomMessage(ackMsg);
-
-                                if (ackMsg.verified())
-                                    msgHist.add(ackMsg);
                             }
                             catch (IgniteCheckedException e) {
                                 U.error(log, "Failed to marshal discovery custom message.",
e);
@@ -5446,12 +5369,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (joiningEmpty && isLocalNodeCoordinator()) {
                 TcpDiscoveryCustomEventMessage msg;
 
-                while ((msg = pollPendingCustomeMessage()) != null) {
+                while ((msg = pollPendingCustomeMessage()) != null)
                     processCustomMessage(msg);
-
-                    if (msg.verified())
-                        msgHist.add(msg);
-                }
             }
         }
 
@@ -6005,24 +5924,22 @@ class ServerImpl extends TcpDiscoveryImpl {
                             }
                         }
                         else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
-                            if (clientMsgWrk != null) {
-                                TcpDiscoverySpiState state = spiStateCopy();
+                            TcpDiscoverySpiState state = spiStateCopy();
 
-                                if (state == CONNECTED) {
-                                    spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
+                            if (state == CONNECTED) {
+                                spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
 
-                                    if (clientMsgWrk.getState() == State.NEW)
-                                        clientMsgWrk.start();
+                                if (clientMsgWrk != null && clientMsgWrk.getState()
== State.NEW)
+                                    clientMsgWrk.start();
 
-                                    msgWorker.addMessage(msg);
+                                processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg);
 
-                                    continue;
-                                }
-                                else {
-                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
+                                continue;
+                            }
+                            else {
+                                spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN, sockTimeout);
 
-                                    break;
-                                }
+                                break;
                             }
                         }
                         else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
@@ -6266,6 +6183,100 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * Processes client reconnect message.
+         *
+         * @param msg Client reconnect message.
+         */
+        private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage msg)
{
+            UUID nodeId = msg.creatorNodeId();
+
+            UUID locNodeId = getLocalNodeId();
+
+            boolean isLocNodeRouter = msg.routerNodeId().equals(locNodeId);
+
+            TcpDiscoveryNode node = ring.node(nodeId);
+
+            assert node == null || node.isClient();
+
+            if (node != null) {
+                node.clientRouterNodeId(msg.routerNodeId());
+                node.clientAliveTime(spi.clientFailureDetectionTimeout());
+            }
+
+            if (!msg.verified()) {
+                if (isLocNodeRouter || isLocalNodeCoordinator()) {
+                    if (node != null) {
+                        Collection<TcpDiscoveryAbstractMessage> pending = msgHist.messages(msg.lastMessageId(),
node);
+
+                        if (pending != null) {
+                            msg.verify(locNodeId);
+                            msg.pendingMessages(pending);
+                            msg.success(true);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Accept client reconnect, restored pending messages
" +
+                                    "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId
+ ']');
+                        }
+                        else if (!isLocalNodeCoordinator()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to restore pending messages for reconnecting
client. " +
+                                    "Forwarding reconnection message to coordinator " +
+                                    "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId
+ ']');
+                        }
+                        else {
+                            msg.verify(locNodeId);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Failing reconnecting client node because failed
to restore pending " +
+                                    "messages [locNodeId=" + locNodeId + ", clientNodeId="
+ nodeId + ']');
+
+                            TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId,
+                                node.id(), node.internalOrder());
+
+                            msgWorker.addMessage(nodeFailedMsg);
+                        }
+                    }
+                    else {
+                        msg.verify(locNodeId);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Reconnecting client node is already failed [nodeId="
+ nodeId + ']');
+                    }
+
+                    if (msg.verified() && isLocNodeRouter) {
+                        ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+                        if (wrk != null)
+                            wrk.addMessage(msg);
+                        else if (log.isDebugEnabled())
+                            log.debug("Failed to reconnect client node (disconnected during
the process) [locNodeId=" +
+                                locNodeId + ", clientNodeId=" + nodeId + ']');
+                    }
+                    else
+                        msgWorker.addMessage(msg);
+                }
+                else
+                    msgWorker.addMessage(msg);
+            }
+            else {
+                if (isLocalNodeCoordinator())
+                    msgWorker.addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(),
false));
+
+                if (isLocNodeRouter) {
+                    ClientMessageWorker wrk = clientMsgWorkers.get(nodeId);
+
+                    if (wrk != null)
+                        wrk.addMessage(msg);
+                    else if (log.isDebugEnabled())
+                        log.debug("Failed to reconnect client node (disconnected during the
process) [locNodeId=" +
+                            locNodeId + ", clientNodeId=" + nodeId + ']');
+                }
+                else if (ring.hasRemoteNodes() && !isLocalNodeCoordinator())
+                    msgWorker.addMessage(msg);
+            }
+        }
+
+        /**
          * Processes client metrics update message.
          *
          * @param msg Client metrics update message.

http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index b31e2e4..f3cf48d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -299,9 +299,9 @@ abstract class TcpDiscoveryImpl {
     /**
      * <strong>FOR TEST ONLY!!!</strong>
      *
-     * @return Worker thread.
+     * @return Worker threads.
      */
-    protected abstract IgniteSpiThread workerThread();
+    protected abstract Collection<IgniteSpiThread> threads();
 
     /**
      * @throws IgniteSpiException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 689ac72..f1c826a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -56,15 +56,9 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
     /** */
     private final static long FAILURE_THRESHOLD = 10_000;
 
-    /** */
-    private final static long CLIENT_FAILURE_THRESHOLD = 30_000;
-
     /** Failure detection timeout for nodes configuration. */
     private static long failureThreshold = FAILURE_THRESHOLD;
 
-    /** Client failure detection timeout for nodes configuration. */
-    private static long clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
-
     /** */
     private static boolean useTestSpi;
 
@@ -75,7 +69,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
 
     /** {@inheritDoc} */
     @Override protected long clientFailureDetectionTimeout() {
-        return clientFailureThreshold;
+        return clientFailureDetectionTimeout;
     }
 
     /** {@inheritDoc} */
@@ -153,7 +147,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
      */
     public void testFailureTimeoutServerClient() throws Exception {
         failureThreshold = 3000;
-        clientFailureThreshold = 2000;
+        clientFailureDetectionTimeout = 2000;
 
         try {
             startServerNodes(1);
@@ -190,13 +184,12 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
             long detectTime = failureDetectTime[0] - failureTime;
 
             assertTrue("Client node failure detected too fast: " + detectTime + "ms",
-                detectTime > clientFailureThreshold - 200);
+                detectTime > clientFailureDetectionTimeout - 200);
             assertTrue("Client node failure detected too slow:  " + detectTime + "ms",
-                detectTime < clientFailureThreshold + 5000);
+                detectTime < clientFailureDetectionTimeout + 5000);
         }
         finally {
             failureThreshold = FAILURE_THRESHOLD;
-            clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
         }
     }
 
@@ -207,7 +200,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
      */
     public void testFailureTimeout3Server() throws Exception {
         failureThreshold = 1000;
-        clientFailureThreshold = 10000;
+        clientFailureDetectionTimeout = 10000;
         useTestSpi = true;
 
         try {
@@ -254,11 +247,10 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
             assertTrue("Server node failure detected too fast: " + detectTime + "ms",
                 detectTime > failureThreshold - 100);
             assertTrue("Server node failure detected too slow:  " + detectTime + "ms",
-                detectTime < clientFailureThreshold);
+                detectTime < clientFailureDetectionTimeout);
         }
         finally {
             failureThreshold = FAILURE_THRESHOLD;
-            clientFailureThreshold = CLIENT_FAILURE_THRESHOLD;
             useTestSpi = false;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/56a63f80/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 329783e..ee88b0f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -62,8 +62,8 @@ import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
+import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage;
@@ -73,6 +73,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
+
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
@@ -87,7 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
  */
 public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
     protected static final AtomicInteger srvIdx = new AtomicInteger();
@@ -123,6 +124,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
     private static CountDownLatch clientFailedLatch;
 
     /** */
+    private static CountDownLatch clientReconnectedLatch;
+
+    /** */
     private static CountDownLatch msgLatch;
 
     /** */
@@ -138,10 +142,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
     protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
 
     /** */
+    protected Integer reconnectCnt;
+
+    /** */
     private boolean longSockTimeouts;
 
     /** */
-    private long clientFailureDetectionTimeout = 1000;
+    protected long clientFailureDetectionTimeout = 1000;
 
     /** */
     private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
@@ -207,6 +214,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         disco.setJoinTimeout(joinTimeout);
         disco.setNetworkTimeout(netTimeout);
 
+        if (reconnectCnt != null)
+            disco.setReconnectCount(reconnectCnt);
+
         disco.setClientReconnectDisabled(reconnectDisabled);
 
         if (disco instanceof TestTcpDiscoverySpi)
@@ -253,6 +263,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         clientIpFinder = null;
         joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
         netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+        clientFailureDetectionTimeout = 1000;
         longSockTimeouts = false;
 
         assert G.allGrids().isEmpty();
@@ -558,6 +569,221 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * Client should reconnect to available server without EVT_CLIENT_NODE_RECONNECTED event.
+     *
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectOnRouterSuspend() throws Exception {
+        reconnectAfterSuspend(false);
+    }
+
+    /**
+     * Client should receive all topology updates after reconnect.
+     *
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectOnRouterSuspendTopologyChange() throws Exception {
+        reconnectAfterSuspend(true);
+    }
+
+    /**
+     * @param changeTop If {@code true} topology is changed after client disconnects
+     * @throws Exception if failed.
+     */
+    private void reconnectAfterSuspend(boolean changeTop) throws Exception {
+        reconnectCnt = 2;
+
+        startServerNodes(2);
+
+        Ignite srv0 = grid("server-0");
+        TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode();
+
+        TcpDiscoveryNode srv1Node = (TcpDiscoveryNode)grid("server-1").cluster().localNode();
+
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+        clientIpFinder.setAddresses(
+            Collections.singleton("localhost:" + srv0Node.discoveryPort()));
+
+        startClientNodes(1);
+
+        Ignite client = grid("client-0");
+        TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+        TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+
+        UUID clientNodeId = clientNode.id();
+
+        checkNodes(2, 1);
+
+        clientIpFinder.setAddresses(Collections.singleton("localhost:" + srv1Node.discoveryPort()));
+
+        srvFailedLatch = new CountDownLatch(1);
+
+        attachListeners(2, 1);
+
+        log.info("Pausing router");
+
+        TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv0.configuration().getDiscoverySpi();
+
+        int joinedNodesNum = 3;
+        final CountDownLatch srvJoinedLatch = new CountDownLatch(joinedNodesNum);
+
+        if (changeTop) {
+            client.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event e) {
+                    srvJoinedLatch.countDown();
+
+                    return true;
+                }
+            }, EVT_NODE_JOINED);
+        }
+
+        srvSpi.pauseAll(true);
+
+        if (changeTop)
+            startServerNodes(joinedNodesNum);
+
+        try {
+            await(srvFailedLatch, 60_000);
+
+            if (changeTop)
+                await(srvJoinedLatch, 5000);
+
+            assertEquals("connected", clientSpi.getSpiState());
+            assertEquals(clientNodeId, clientNode.id());
+            assertEquals(srv1Node.id(), clientNode.clientRouterNodeId());
+        }
+        finally {
+            srvSpi.resumeAll();
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testClientReconnectHistoryMissingOnRouter() throws Exception {
+        clientFailureDetectionTimeout = 60000;
+        netTimeout = 60000;
+
+        startServerNodes(2);
+
+        Ignite srv0 = grid("server-0");
+        TcpDiscoveryNode srv0Node = (TcpDiscoveryNode)srv0.cluster().localNode();
+
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+        clientIpFinder.setAddresses(
+            Collections.singleton("localhost:" + srv0Node.discoveryPort()));
+
+        startClientNodes(1);
+
+        attachListeners(0, 1);
+
+        Ignite client = grid("client-0");
+        TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+        TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+        UUID clientNodeId = clientNode.id();
+
+        checkNodes(2, 1);
+
+        clientSpi.pauseAll(true);
+
+        stopGrid(srv0.name());
+
+        startServerNodes(1);
+
+        Ignite srv2 = grid("server-2");
+        TcpDiscoveryNode srv2Node = (TcpDiscoveryNode)srv2.cluster().localNode();
+        clientIpFinder.setAddresses(
+            Collections.singleton("localhost:" + srv2Node.discoveryPort()));
+
+        clientSpi.resumeAll();
+
+        awaitPartitionMapExchange();
+
+        assertEquals("connected", clientSpi.getSpiState());
+        assertEquals(clientNodeId, clientNode.id());
+        assertEquals(srv2Node.id(), clientNode.clientRouterNodeId());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectAfterPause() throws Exception {
+        startServerNodes(2);
+        startClientNodes(1);
+
+        Ignite client = grid("client-0");
+        TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+
+        clientReconnectedLatch = new CountDownLatch(1);
+
+        attachListeners(0, 1);
+
+        clientSpi.pauseAll(false);
+
+        try {
+            clientSpi.brakeConnection();
+
+            Thread.sleep(clientFailureDetectionTimeout() * 2);
+        }
+        finally {
+            clientSpi.resumeAll();
+        }
+
+        await(clientReconnectedLatch);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testReconnectAfterMassiveTopologyChange() throws Exception {
+        clientIpFinder = IP_FINDER;
+
+        clientFailureDetectionTimeout = 60000;
+        netTimeout = 60000;
+
+        int initSrvsNum = 5;
+        int killNum = 3;
+        int iterations = 10;
+
+        startServerNodes(initSrvsNum);
+        startClientNodes(1);
+
+        Ignite client = grid("client-0");
+        TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+        TestTcpDiscoverySpi clientSpi = (TestTcpDiscoverySpi)client.configuration().getDiscoverySpi();
+        final UUID clientNodeId = clientNode.id();
+
+        final CountDownLatch srvJoinedLatch = new CountDownLatch(iterations * killNum);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event e) {
+                srvJoinedLatch.countDown();
+
+                return true;
+            }
+        }, EVT_NODE_JOINED);
+
+        int minAliveSrvId = 0;
+
+        for (int i = 0; i < iterations; i++) {
+            for (int j = 0; j < killNum; j++) {
+                stopGrid(minAliveSrvId);
+
+                minAliveSrvId++;
+            }
+
+            startServerNodes(killNum);
+
+            awaitPartitionMapExchange();
+        }
+
+        await(srvJoinedLatch);
+        assertEquals("connected", clientSpi.getSpiState());
+        assertEquals(clientNodeId, clientNode.id());
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testClientReconnectOnNetworkProblem() throws Exception {
@@ -1410,17 +1636,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
 
         srvSpi.failNode(client.cluster().localNode().id(), null);
 
-        if (changeTop) {
-            Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
+        assertTrue(failLatch.await(5000, MILLISECONDS));
 
-            srvNodeIds.add(g.cluster().localNode().id());
+        if (changeTop) {
+            startServerNodes(1);
 
             clientSpi.resumeAll();
         }
 
-        assertTrue(disconnectLatch.await(5000, MILLISECONDS));
         assertTrue(reconnectLatch.await(5000, MILLISECONDS));
-        assertTrue(failLatch.await(5000, MILLISECONDS));
         assertTrue(joinLatch.await(5000, MILLISECONDS));
 
         long topVer = changeTop ? 5L : 4L;
@@ -2026,6 +2251,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
                 }, EVT_NODE_FAILED);
             }
         }
+
+        if (clientReconnectedLatch != null) {
+            for (int i = 0; i < clientCnt; i++) {
+                G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>()
{
+                    @Override public boolean apply(Event evt) {
+                        info("Reconnected event fired on client: " + evt);
+
+                        clientReconnectedLatch.countDown();
+
+                        return true;
+                    }
+                }, EVT_CLIENT_NODE_RECONNECTED);
+            }
+        }
     }
 
     /**
@@ -2095,7 +2334,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
      * @throws InterruptedException If interrupted.
      */
     protected void await(CountDownLatch latch) throws InterruptedException {
-        assertTrue("Latch count: " + latch.getCount(), latch.await(awaitTime(), MILLISECONDS));
+        await(latch, awaitTime());
+    }
+
+    /**
+     * @param latch Latch.
+     * @param timeout Timeout.
+     * @throws InterruptedException If interrupted.
+     */
+    protected void await(CountDownLatch latch, long timeout) throws InterruptedException
{
+        assertTrue("Latch count: " + latch.getCount(), latch.await(timeout, MILLISECONDS));
     }
 
     /**
@@ -2324,8 +2572,10 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         public void pauseAll(boolean suspend) {
             pauseResumeOperation(true, openSockLock, writeLock);
 
-            if (suspend)
-                impl.workerThread().suspend();
+            if (suspend) {
+                for (Thread t : impl.threads())
+                    t.suspend();
+            }
         }
 
         /**
@@ -2334,7 +2584,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         public void resumeAll() {
             pauseResumeOperation(false, openSockLock, writeLock);
 
-            impl.workerThread().resume();
+            for (IgniteSpiThread t : impl.threads())
+                t.resume();
         }
 
         /** {@inheritDoc} */


Mime
View raw message