ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [26/50] incubator-ignite git commit: # ignite-883 issues with client connect/reconnect
Date Mon, 29 Jun 2015 19:47:11 GMT
# ignite-883 issues with client connect/reconnect


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/efa92c54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/efa92c54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/efa92c54

Branch: refs/heads/ignite-788-dev-review
Commit: efa92c54ab07d7f72b9c83aa3a09e03627d72e4a
Parents: 3e8ddb4
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jun 26 11:06:41 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jun 26 11:06:41 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |   6 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 151 ++++++-----
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 103 +++++--
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   3 +-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      | 265 ++++++++++++++++++-
 5 files changed, 448 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index edd0ad7..af87685 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -826,7 +826,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         updated |= top.update(null, entry.getValue()) != null;
                 }
 
-                if (updated)
+                if (!cctx.kernalContext().clientNode() && updated)
                     refreshPartitions();
             }
             else
@@ -985,7 +985,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     // If not first preloading and no more topology events present,
                     // then we periodically refresh partition map.
-                    if (futQ.isEmpty() && preloadFinished) {
+                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() &&
preloadFinished) {
                         refreshPartitions(timeout);
 
                         timeout = cctx.gridConfig().getNetworkTimeout();
@@ -1051,7 +1051,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                             startEvtFired = true;
 
-                            if (changed && futQ.isEmpty())
+                            if (!cctx.kernalContext().clientNode() && changed &&
futQ.isEmpty())
                                 refreshPartitions();
                         }
                         else {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 0c2c059..04276d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1123,7 +1123,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (joinLatch.getCount() > 0) {
                             joinError(new IgniteSpiException("Join process timed out, did
not receive response for " +
                                 "join request (consider increasing 'joinTimeout' configuration
property) " +
-                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock +']'));
+                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
 
                             break;
                         }
@@ -1282,17 +1282,21 @@ class ClientImpl extends TcpDiscoveryImpl {
                         "[msg=" + msg + ", locNode=" + locNode + ']');
             }
             else {
-                boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
+                if (nodeAdded()) {
+                    boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null;
 
-                if (topChanged) {
-                    if (log.isDebugEnabled())
-                        log.debug("Added new node to topology: " + node);
+                    if (topChanged) {
+                        if (log.isDebugEnabled())
+                            log.debug("Added new node to topology: " + node);
 
-                    Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+                        Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
 
-                    if (data != null)
-                        spi.onExchange(newNodeId, newNodeId, data, null);
+                        if (data != null)
+                            spi.onExchange(newNodeId, newNodeId, data, null);
+                    }
                 }
+                else if (log.isDebugEnabled())
+                    log.debug("Ignore topology message, local node not added to topology:
" + msg);
             }
         }
 
@@ -1332,54 +1336,58 @@ class ClientImpl extends TcpDiscoveryImpl {
                         "[msg=" + msg + ", locNode=" + locNode + ']');
             }
             else {
-                TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
+                if (nodeAdded()) {
+                    TcpDiscoveryNode node = rmtNodes.get(msg.nodeId());
 
-                if (node == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Discarding node add finished message since node is not
found [msg=" + msg + ']');
+                    if (node == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Discarding node add finished message since node is
not found [msg=" + msg + ']');
 
-                    return;
-                }
+                        return;
+                    }
 
-                boolean evt = false;
+                    boolean evt = false;
 
-                long topVer = msg.topologyVersion();
+                    long topVer = msg.topologyVersion();
 
-                assert topVer > 0 : msg;
+                    assert topVer > 0 : msg;
 
-                if (!node.visible()) {
-                    node.order(topVer);
-                    node.visible(true);
+                    if (!node.visible()) {
+                        node.order(topVer);
+                        node.visible(true);
 
-                    if (spi.locNodeVer.equals(node.version()))
-                        node.version(spi.locNodeVer);
+                        if (spi.locNodeVer.equals(node.version()))
+                            node.version(spi.locNodeVer);
 
-                    evt = true;
-                }
-                else {
-                    if (log.isDebugEnabled())
-                        log.debug("Skip node join event, node already joined [msg=" + msg
+ ", node=" + node + ']');
+                        evt = true;
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Skip node join event, node already joined [msg=" +
msg + ", node=" + node + ']');
 
-                    assert node.order() == topVer : node;
-                }
+                        assert node.order() == topVer : node;
+                    }
 
-                Collection<ClusterNode> top = updateTopologyHistory(topVer, msg);
+                    Collection<ClusterNode> top = updateTopologyHistory(topVer, msg);
 
-                assert top != null && top.contains(node) : "Topology does not contain
node [msg=" + msg +
-                    ", node=" + node + ", top=" + top + ']';
+                    assert top != null && top.contains(node) : "Topology does not
contain node [msg=" + msg +
+                        ", node=" + node + ", top=" + top + ']';
 
-                if (!pending && joinLatch.getCount() > 0) {
-                    if (log.isDebugEnabled())
-                        log.debug("Discarding node add finished message (join process is
not finished): " + msg);
+                    if (!pending && joinLatch.getCount() > 0) {
+                        if (log.isDebugEnabled())
+                            log.debug("Discarding node add finished message (join process
is not finished): " + msg);
 
-                    return;
-                }
+                        return;
+                    }
 
-                if (evt) {
-                    notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
+                    if (evt) {
+                        notifyDiscovery(EVT_NODE_JOINED, topVer, node, top);
 
-                    spi.stats.onNodeJoined();
+                        spi.stats.onNodeJoined();
+                    }
                 }
+                else if (log.isDebugEnabled())
+                    log.debug("Ignore topology message, local node not added to topology:
" + msg);
             }
         }
 
@@ -1397,31 +1405,42 @@ class ClientImpl extends TcpDiscoveryImpl {
                 if (spi.getSpiContext().isStopping())
                     return;
 
-                TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
+                if (nodeAdded()) {
+                    TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId());
 
-                if (node == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Discarding node left message since node is not found [msg="
+ msg + ']');
+                    if (node == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Discarding node left message since node is not found
[msg=" + msg + ']');
 
-                    return;
-                }
+                        return;
+                    }
 
-                Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(),
msg);
+                    Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(),
msg);
 
-                if (!pending && joinLatch.getCount() > 0) {
-                    if (log.isDebugEnabled())
-                        log.debug("Discarding node left message (join process is not finished):
" + msg);
+                    if (!pending && joinLatch.getCount() > 0) {
+                        if (log.isDebugEnabled())
+                            log.debug("Discarding node left message (join process is not
finished): " + msg);
 
-                    return;
-                }
+                        return;
+                    }
 
-                notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
+                    notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top);
 
-                spi.stats.onNodeLeft();
+                    spi.stats.onNodeLeft();
+                }
+                else if (log.isDebugEnabled())
+                    log.debug("Ignore topology message, local node not added to topology:
" + msg);
             }
         }
 
         /**
+         * @return {@code True} if received node added message for local node.
+         */
+        private boolean nodeAdded() {
+            return !topHist.isEmpty();
+        }
+
+        /**
          * @param msg Message.
          */
         private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) {
@@ -1514,9 +1533,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                 return;
 
             if (getLocalNodeId().equals(msg.creatorNodeId())) {
-                assert msg.success() : msg;
-
                 if (reconnector != null) {
+                    assert msg.success() : msg;
+
                     currSock = reconnector.sock;
 
                     sockWriter.setSocket(currSock);
@@ -1529,7 +1548,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     try {
                         for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
{
                             if (log.isDebugEnabled())
-                                log.debug("Process message on reconnect [msg=" + pendingMsg
+ ']');
+                                log.debug("Process pending message on reconnect [msg=" +
pendingMsg + ']');
 
                             processDiscoveryMessage(pendingMsg);
                         }
@@ -1538,8 +1557,22 @@ class ClientImpl extends TcpDiscoveryImpl {
                         pending = false;
                     }
                 }
-                else if (log.isDebugEnabled())
-                    log.debug("Discarding reconnect message, reconnect is completed: " +
msg);
+                else {
+                    if (joinLatch.getCount() > 0) {
+                        if (msg.success()) {
+                            for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages())
{
+                                if (log.isDebugEnabled())
+                                    log.debug("Process pending message on connect [msg="
+ pendingMsg + ']');
+
+                                processDiscoveryMessage(pendingMsg);
+                            }
+
+                            assert joinLatch.getCount() == 0 : msg;
+                        }
+                    }
+                    else if (log.isDebugEnabled())
+                        log.debug("Discarding reconnect message, reconnect is completed:
" + msg);
+                }
             }
             else if (log.isDebugEnabled())
                 log.debug("Discarding reconnect message for another client: " + msg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2458f85..fa3e564 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2452,7 +2452,40 @@ class ServerImpl extends TcpDiscoveryImpl {
                         return;
                     }
 
-                    if (log.isDebugEnabled())
+                    if (msg.client()) {
+                        TcpDiscoveryClientReconnectMessage reconMsg = new TcpDiscoveryClientReconnectMessage(node.id(),
+                            node.clientRouterNodeId(),
+                            null);
+
+                        reconMsg.verify(getLocalNodeId());
+
+                        Collection<TcpDiscoveryAbstractMessage> msgs = msgHist.messages(null,
node);
+
+                        if (msgs != null) {
+                            reconMsg.pendingMessages(msgs);
+
+                            reconMsg.success(true);
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Send reconnect message to already joined client "
+
+                                "[clientNode=" + existingNode + ", msg=" + reconMsg + ']');
+
+                        if (getLocalNodeId().equals(node.clientRouterNodeId())) {
+                            ClientMessageWorker wrk = clientMsgWorkers.get(node.id());
+
+                            if (wrk != null)
+                                wrk.addMessage(reconMsg);
+                            else if (log.isDebugEnabled())
+                                log.debug("Failed to find client message worker " +
+                                    "[clientNode=" + existingNode + ", msg=" + reconMsg +
']');
+                        }
+                        else {
+                            if (ring.hasRemoteNodes())
+                                sendMessageAcrossRing(reconMsg);
+                        }
+                    }
+                    else if (log.isDebugEnabled())
                         log.debug("Ignoring join request message since node is already in
topology: " + msg);
 
                     return;
@@ -4104,15 +4137,44 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
 
                     if (req.client()) {
+                        ClientMessageWorker clientMsgWrk0 = new ClientMessageWorker(sock,
nodeId);
+
+                        while (true) {
+                            ClientMessageWorker old = clientMsgWorkers.putIfAbsent(nodeId,
clientMsgWrk0);
+
+                            if (old == null)
+                                break;
+
+                            if (old.isInterrupted()) {
+                                clientMsgWorkers.remove(nodeId, old);
+
+                                continue;
+                            }
+
+                            old.join(500);
+
+                            old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0);
+
+                            if (old == null)
+                                break;
+
+                            if (log.isDebugEnabled())
+                                log.debug("Already have client message worker, closing connection
" +
+                                    "[locNodeId=" + locNodeId +
+                                    ", rmtNodeId=" + nodeId +
+                                    ", workerSock=" + old.sock +
+                                    ", sock=" + sock + ']');
+
+                            return;
+                        }
+
                         if (log.isDebugEnabled())
                             log.debug("Created client message worker [locNodeId=" + locNodeId
+
                                 ", rmtNodeId=" + nodeId + ", sock=" + sock + ']');
 
-                        clientMsgWrk = new ClientMessageWorker(sock, nodeId);
-
-                        clientMsgWrk.start();
+                        assert clientMsgWrk0 == clientMsgWorkers.get(nodeId);
 
-                        clientMsgWorkers.put(nodeId, clientMsgWrk);
+                        clientMsgWrk = clientMsgWrk0;
                     }
 
                     if (log.isDebugEnabled())
@@ -4188,7 +4250,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;
 
                             if (!req.responded()) {
-                                boolean ok = processJoinRequestMessage(req);
+                                boolean ok = processJoinRequestMessage(req, clientMsgWrk);
 
                                 if (clientMsgWrk != null && ok)
                                     continue;
@@ -4202,14 +4264,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 TcpDiscoverySpiState state = spiStateCopy();
 
                                 if (state == CONNECTED) {
-                                    spi.writeToSocket(sock, RES_OK);
+                                    spi.writeToSocket(msg, sock, RES_OK);
+
+                                    if (clientMsgWrk != null && clientMsgWrk.getState()
== State.NEW)
+                                        clientMsgWrk.start();
 
                                     msgWorker.addMessage(msg);
 
                                     continue;
                                 }
                                 else {
-                                    spi.writeToSocket(sock, RES_CONTINUE_JOIN);
+                                    spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN);
 
                                     break;
                                 }
@@ -4217,7 +4282,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryDuplicateIdMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK);
 
                             boolean ignored = false;
 
@@ -4246,7 +4311,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryAuthFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK);
 
                             boolean ignored = false;
 
@@ -4275,7 +4340,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryCheckFailedMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK);
 
                             boolean ignored = false;
 
@@ -4304,7 +4369,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                         else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) {
                             // Send receipt back.
-                            spi.writeToSocket(sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK);
 
                             boolean ignored = false;
 
@@ -4346,7 +4411,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         // Send receipt back.
                         if (clientMsgWrk == null)
-                            spi.writeToSocket(sock, RES_OK);
+                            spi.writeToSocket(msg, sock, RES_OK);
                     }
                     catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
@@ -4435,24 +4500,29 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /**
          * @param msg Join request message.
+         * @param clientMsgWrk Client message worker to start.
          * @return Whether connection was successful.
          * @throws IOException If IO failed.
          */
         @SuppressWarnings({"IfMayBeConditional"})
-        private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) throws
IOException {
+        private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg,
+            @Nullable ClientMessageWorker clientMsgWrk) throws IOException {
             assert msg != null;
             assert !msg.responded();
 
             TcpDiscoverySpiState state = spiStateCopy();
 
             if (state == CONNECTED) {
-                spi.writeToSocket(sock, RES_OK);
+                spi.writeToSocket(msg, sock, RES_OK);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res="
+ RES_OK + ']');
 
                 msg.responded(true);
 
+                if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW)
+                    clientMsgWrk.start();
+
                 msgWorker.addMessage(msg);
 
                 return true;
@@ -4477,7 +4547,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Local node is stopping. Remote node should try next one.
                     res = RES_CONTINUE_JOIN;
 
-                spi.writeToSocket(sock, res);
+                spi.writeToSocket(msg, sock, res);
 
                 if (log.isDebugEnabled())
                     log.debug("Responded to join request message [msg=" + msg + ", res="
+ res + ']');
@@ -4632,6 +4702,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @return Ping result.
          * @throws InterruptedException If interrupted.
          */
         public boolean ping() throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 1d1916a..7663fe6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1227,12 +1227,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
     /**
      * Writes response to the socket.
      *
+     * @param msg Received message.
      * @param sock Socket.
      * @param res Integer response.
      * @throws IOException If IO failed or write timed out.
      */
     @SuppressWarnings("ThrowFromFinallyBlock")
-    protected void writeToSocket(Socket sock, int res) throws IOException {
+    protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws
IOException {
         assert sock != null;
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 8147958..ec6a526 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -24,7 +24,9 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
@@ -106,11 +108,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
     /** */
     private int maxMissedClientHbs = TcpDiscoverySpi.DFLT_MAX_MISSED_CLIENT_HEARTBEATS;
 
+    /** */
+    private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+        TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
 
         disco.setMaxMissedClientHeartbeats(maxMissedClientHbs);
 
@@ -154,6 +159,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         disco.setJoinTimeout(joinTimeout);
         disco.setNetworkTimeout(netTimeout);
 
+        disco.afterWrite(afterWrite);
+
         cfg.setDiscoverySpi(disco);
 
         if (nodeId != null)
@@ -1016,6 +1023,189 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testJoinError3() throws Exception {
+        startServerNodes(1);
+
+        Ignite ignite = G.ignite("server-0");
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi());
+
+        srvSpi.failNodeAddFinishedMessage();
+
+        startClientNodes(1);
+
+        checkNodes(1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinErrorMissedAddFinishedMessage1() throws Exception {
+        missedAddFinishedMessage(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinErrorMissedAddFinishedMessage2() throws Exception {
+        missedAddFinishedMessage(false);
+    }
+
+    /**
+     * @param singleSrv If {@code true} starts one server node two otherwise.
+     * @throws Exception If failed.
+     */
+    private void missedAddFinishedMessage(boolean singleSrv) throws Exception {
+        int srvs = singleSrv ? 1 : 2;
+
+        startServerNodes(srvs);
+
+        afterWrite = new CIX2<TcpDiscoveryAbstractMessage, Socket>() {
+            private boolean first = true;
+
+            @Override public void applyx(TcpDiscoveryAbstractMessage msg, Socket sock) throws
IgniteCheckedException {
+                if (first && (msg instanceof TcpDiscoveryJoinRequestMessage)) {
+                    first = false;
+
+                    log.info("Close socket after message write [msg=" + msg + "]");
+
+                    try {
+                        sock.close();
+                    }
+                    catch (IOException e) {
+                        throw new IgniteCheckedException(e);
+                    }
+
+                    log.info("Delay after message write [msg=" + msg + "]");
+
+                    U.sleep(5000); // Wait when server process join request.
+                }
+            }
+        };
+
+        Ignite srv = singleSrv ? G.ignite("server-0") : G.ignite("server-1");
+
+        TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+        assertEquals(singleSrv ? 1 : 2, srvNode.order());
+
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+        clientIpFinder.setAddresses(Collections.singleton("localhost:" + srvNode.discoveryPort()));
+
+        startClientNodes(1);
+
+        TcpDiscoveryNode clientNode = (TcpDiscoveryNode)G.ignite("client-0").cluster().localNode();
+
+        assertEquals(srvNode.id(), clientNode.clientRouterNodeId());
+
+        checkNodes(srvs, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientMessageWorkerStartSingleServer() throws Exception {
+        clientMessageWorkerStart(1, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientMessageWorkerStartTwoServers1() throws Exception {
+        clientMessageWorkerStart(2, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClientMessageWorkerStartTwoServers2() throws Exception {
+        clientMessageWorkerStart(2, 2);
+    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param connectTo What server connect to.
+     * @throws Exception If failed.
+     */
+    private void clientMessageWorkerStart(int srvs, int connectTo) throws Exception {
+        startServerNodes(srvs);
+
+        Ignite srv = G.ignite("server-" + (connectTo - 1));
+
+        final TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+        assertEquals((long)connectTo, srvNode.order());
+
+        TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+
+        final String client0 = "client-" + clientIdx.getAndIncrement();
+
+        srvSpi.delayJoinAckFor = client0;
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>()
{
+            @Override public Object call() throws Exception {
+                clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+                clientIpFinder.setAddresses(Collections.singleton("localhost:" + srvNode.discoveryPort()));
+
+                Ignite client = startGrid(client0);
+
+                clientIpFinder = null;
+
+                clientNodeIds.add(client.cluster().localNode().id());
+
+                TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+                assertFalse(clientSpi.invalidResponse());
+
+                TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode();
+
+                assertEquals(srvNode.id(), clientNode.clientRouterNodeId());
+
+                return null;
+            }
+        });
+
+        final String client1 = "client-" + clientIdx.getAndIncrement();
+
+        while (!fut.isDone()) {
+            startGrid(client1);
+
+            stopGrid(client1);
+        }
+
+        fut.get();
+
+        checkNodes(srvs, 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinMutlithreaded() throws Exception {
+        startServerNodes(1);
+
+        final int CLIENTS = 30;
+
+        clientsPerSrv = CLIENTS;
+
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+
+                clientNodeIds.add(g.cluster().localNode().id());
+
+                return null;
+            }
+        }, CLIENTS, "start-client");
+
+        checkNodes(1, CLIENTS);
+    }
+
+    /**
      * @param clientIdx Client index.
      * @param srvIdx Server index.
      * @throws Exception In case of error.
@@ -1267,8 +1457,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         private AtomicInteger failNodeAdded = new AtomicInteger();
 
         /** */
+        private AtomicInteger failNodeAddFinished = new AtomicInteger();
+
+        /** */
         private AtomicInteger failClientReconnect = new AtomicInteger();
 
+        /** */
+        private IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite;
+
+        /** */
+        private volatile boolean invalidRes;
+
+        /** */
+        private volatile String delayJoinAckFor;
+
         /**
          * @param lock Lock.
          */
@@ -1287,6 +1489,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         }
 
         /**
+         * @param afterWrite After write callback.
+         */
+        void afterWrite(IgniteInClosure2X<TcpDiscoveryAbstractMessage, Socket> afterWrite)
{
+            this.afterWrite = afterWrite;
+        }
+
+        /**
+         * @return {@code True} if received unexpected ack.
+         */
+        boolean invalidResponse() {
+            return invalidRes;
+        }
+
+        /**
          *
          */
         void failNodeAddedMessage() {
@@ -1296,6 +1512,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
         /**
          *
          */
+        void failNodeAddFinishedMessage() {
+            failNodeAddFinished.set(1);
+        }
+
+        /**
+         *
+         */
         void failClientReconnectMessage() {
             failClientReconnect.set(1);
         }
@@ -1322,6 +1545,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
 
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
                 fail = failNodeAdded.getAndDecrement() > 0;
+            else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)
+                fail = failNodeAddFinished.getAndDecrement() > 0;
             else if (msg instanceof TcpDiscoveryClientReconnectMessage)
                 fail = failClientReconnect.getAndDecrement() > 0;
 
@@ -1332,6 +1557,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
             }
 
             super.writeToSocket(sock, msg, bout);
+
+            if (afterWrite != null)
+                afterWrite.apply(msg, sock);
         }
 
         /** {@inheritDoc} */
@@ -1365,5 +1593,40 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
 
             impl.workerThread().resume();
         }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock,
int res) throws IOException {
+            if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage)
{
+                TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
+
+                if (delayJoinAckFor.equals(msg0.node().attribute(IgniteNodeAttributes.ATTR_GRID_NAME)))
{
+                    log.info("Delay response [sock=" + sock + ", msg=" + msg0 + ", res="
+ res + ']');
+
+                    delayJoinAckFor = null;
+
+                    try {
+                        Thread.sleep(2000);
+                    }
+                    catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+
+            super.writeToSocket(msg, sock, res);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected int readReceipt(Socket sock, long timeout) throws IOException
{
+            int res = super.readReceipt(sock, timeout);
+
+            if (res != TcpDiscoveryImpl.RES_OK) {
+                invalidRes = true;
+
+                log.info("Received unexpected response: " + res);
+            }
+
+            return res;
+        }
     }
 }



Mime
View raw message