Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D55B218B35 for ; Wed, 1 Jul 2015 15:30:50 +0000 (UTC) Received: (qmail 91361 invoked by uid 500); 1 Jul 2015 15:30:50 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 91330 invoked by uid 500); 1 Jul 2015 15:30:50 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 91321 invoked by uid 99); 1 Jul 2015 15:30:50 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jul 2015 15:30:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 46E20D176F for ; Wed, 1 Jul 2015 15:30:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.23 X-Spam-Level: * X-Spam-Status: No, score=1.23 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.571, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id iMm-wy5kORdQ for ; Wed, 1 Jul 2015 15:30:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 5BE5B24CE8 for ; Wed, 1 Jul 2015 15:30:41 +0000 (UTC) Received: (qmail 91021 invoked by uid 99); 1 Jul 2015 15:30:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jul 2015 15:30:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7275FE025B; Wed, 1 Jul 2015 15:30:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 01 Jul 2015 15:30:43 -0000 Message-Id: <5c044a3c5d0c4b6d8213ebcba87957e6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/50] incubator-ignite git commit: # ignite-883 issues with client connect/reconnect # ignite-883 issues with client connect/reconnect Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/efa92c54 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/efa92c54 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/efa92c54 Branch: refs/heads/ignite-1053 Commit: efa92c54ab07d7f72b9c83aa3a09e03627d72e4a Parents: 3e8ddb4 Author: sboikov Authored: Fri Jun 26 11:06:41 2015 +0300 Committer: sboikov Committed: Fri Jun 26 11:06:41 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 6 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 151 ++++++----- .../ignite/spi/discovery/tcp/ServerImpl.java | 103 +++++-- .../spi/discovery/tcp/TcpDiscoverySpi.java | 3 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 265 ++++++++++++++++++- 5 files changed, 448 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index edd0ad7..af87685 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -826,7 +826,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana updated |= top.update(null, entry.getValue()) != null; } - if (updated) + if (!cctx.kernalContext().clientNode() && updated) refreshPartitions(); } else @@ -985,7 +985,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana // If not first preloading and no more topology events present, // then we periodically refresh partition map. - if (futQ.isEmpty() && preloadFinished) { + if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) { refreshPartitions(timeout); timeout = cctx.gridConfig().getNetworkTimeout(); @@ -1051,7 +1051,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana startEvtFired = true; - if (changed && futQ.isEmpty()) + if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty()) refreshPartitions(); } else { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 0c2c059..04276d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1123,7 +1123,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (joinLatch.getCount() > 0) { joinError(new IgniteSpiException("Join process timed out, did not receive response for " + "join request (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock +']')); + "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); break; } @@ -1282,17 +1282,21 @@ class ClientImpl extends TcpDiscoveryImpl { "[msg=" + msg + ", locNode=" + locNode + ']'); } else { - boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null; + if (nodeAdded()) { + boolean topChanged = rmtNodes.putIfAbsent(newNodeId, node) == null; - if (topChanged) { - if (log.isDebugEnabled()) - log.debug("Added new node to topology: " + node); + if (topChanged) { + if (log.isDebugEnabled()) + log.debug("Added new node to topology: " + node); - Map data = msg.newNodeDiscoveryData(); + Map data = msg.newNodeDiscoveryData(); - if (data != null) - spi.onExchange(newNodeId, newNodeId, data, null); + if (data != null) + spi.onExchange(newNodeId, newNodeId, data, null); + } } + else if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); } } @@ -1332,54 +1336,58 @@ class ClientImpl extends TcpDiscoveryImpl { "[msg=" + msg + ", locNode=" + locNode + ']'); } else { - TcpDiscoveryNode node = rmtNodes.get(msg.nodeId()); + if (nodeAdded()) { + TcpDiscoveryNode node = rmtNodes.get(msg.nodeId()); - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']'); + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Discarding node add finished message since node is not found [msg=" + msg + ']'); - return; - } + return; + } - boolean evt = false; + boolean evt = false; - long topVer = msg.topologyVersion(); + long topVer = msg.topologyVersion(); - assert topVer > 0 : msg; + assert topVer > 0 : msg; - if (!node.visible()) { - node.order(topVer); - node.visible(true); + if (!node.visible()) { + node.order(topVer); + node.visible(true); - if (spi.locNodeVer.equals(node.version())) - node.version(spi.locNodeVer); + if (spi.locNodeVer.equals(node.version())) + node.version(spi.locNodeVer); - evt = true; - } - else { - if (log.isDebugEnabled()) - log.debug("Skip node join event, node already joined [msg=" + msg + ", node=" + node + ']'); + evt = true; + } + else { + if (log.isDebugEnabled()) + log.debug("Skip node join event, node already joined [msg=" + msg + ", node=" + node + ']'); - assert node.order() == topVer : node; - } + assert node.order() == topVer : node; + } - Collection top = updateTopologyHistory(topVer, msg); + Collection top = updateTopologyHistory(topVer, msg); - assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg + - ", node=" + node + ", top=" + top + ']'; + assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg + + ", node=" + node + ", top=" + top + ']'; - if (!pending && joinLatch.getCount() > 0) { - if (log.isDebugEnabled()) - log.debug("Discarding node add finished message (join process is not finished): " + msg); + if (!pending && joinLatch.getCount() > 0) { + if (log.isDebugEnabled()) + log.debug("Discarding node add finished message (join process is not finished): " + msg); - return; - } + return; + } - if (evt) { - notifyDiscovery(EVT_NODE_JOINED, topVer, node, top); + if (evt) { + notifyDiscovery(EVT_NODE_JOINED, topVer, node, top); - spi.stats.onNodeJoined(); + spi.stats.onNodeJoined(); + } } + else if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); } } @@ -1397,31 +1405,42 @@ class ClientImpl extends TcpDiscoveryImpl { if (spi.getSpiContext().isStopping()) return; - TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId()); + if (nodeAdded()) { + TcpDiscoveryNode node = rmtNodes.remove(msg.creatorNodeId()); - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Discarding node left message since node is not found [msg=" + msg + ']'); + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Discarding node left message since node is not found [msg=" + msg + ']'); - return; - } + return; + } - Collection top = updateTopologyHistory(msg.topologyVersion(), msg); + Collection top = updateTopologyHistory(msg.topologyVersion(), msg); - if (!pending && joinLatch.getCount() > 0) { - if (log.isDebugEnabled()) - log.debug("Discarding node left message (join process is not finished): " + msg); + if (!pending && joinLatch.getCount() > 0) { + if (log.isDebugEnabled()) + log.debug("Discarding node left message (join process is not finished): " + msg); - return; - } + return; + } - notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top); + notifyDiscovery(EVT_NODE_LEFT, msg.topologyVersion(), node, top); - spi.stats.onNodeLeft(); + spi.stats.onNodeLeft(); + } + else if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); } } /** + * @return {@code True} if received node added message for local node. + */ + private boolean nodeAdded() { + return !topHist.isEmpty(); + } + + /** * @param msg Message. */ private void processNodeFailedMessage(TcpDiscoveryNodeFailedMessage msg) { @@ -1514,9 +1533,9 @@ class ClientImpl extends TcpDiscoveryImpl { return; if (getLocalNodeId().equals(msg.creatorNodeId())) { - assert msg.success() : msg; - if (reconnector != null) { + assert msg.success() : msg; + currSock = reconnector.sock; sockWriter.setSocket(currSock); @@ -1529,7 +1548,7 @@ class ClientImpl extends TcpDiscoveryImpl { try { for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { if (log.isDebugEnabled()) - log.debug("Process message on reconnect [msg=" + pendingMsg + ']'); + log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']'); processDiscoveryMessage(pendingMsg); } @@ -1538,8 +1557,22 @@ class ClientImpl extends TcpDiscoveryImpl { pending = false; } } - else if (log.isDebugEnabled()) - log.debug("Discarding reconnect message, reconnect is completed: " + msg); + else { + if (joinLatch.getCount() > 0) { + if (msg.success()) { + for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { + if (log.isDebugEnabled()) + log.debug("Process pending message on connect [msg=" + pendingMsg + ']'); + + processDiscoveryMessage(pendingMsg); + } + + assert joinLatch.getCount() == 0 : msg; + } + } + else if (log.isDebugEnabled()) + log.debug("Discarding reconnect message, reconnect is completed: " + msg); + } } else if (log.isDebugEnabled()) log.debug("Discarding reconnect message for another client: " + msg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2458f85..fa3e564 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2452,7 +2452,40 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - if (log.isDebugEnabled()) + if (msg.client()) { + TcpDiscoveryClientReconnectMessage reconMsg = new TcpDiscoveryClientReconnectMessage(node.id(), + node.clientRouterNodeId(), + null); + + reconMsg.verify(getLocalNodeId()); + + Collection msgs = msgHist.messages(null, node); + + if (msgs != null) { + reconMsg.pendingMessages(msgs); + + reconMsg.success(true); + } + + if (log.isDebugEnabled()) + log.debug("Send reconnect message to already joined client " + + "[clientNode=" + existingNode + ", msg=" + reconMsg + ']'); + + if (getLocalNodeId().equals(node.clientRouterNodeId())) { + ClientMessageWorker wrk = clientMsgWorkers.get(node.id()); + + if (wrk != null) + wrk.addMessage(reconMsg); + else if (log.isDebugEnabled()) + log.debug("Failed to find client message worker " + + "[clientNode=" + existingNode + ", msg=" + reconMsg + ']'); + } + else { + if (ring.hasRemoteNodes()) + sendMessageAcrossRing(reconMsg); + } + } + else if (log.isDebugEnabled()) log.debug("Ignoring join request message since node is already in topology: " + msg); return; @@ -4104,15 +4137,44 @@ class ServerImpl extends TcpDiscoveryImpl { } if (req.client()) { + ClientMessageWorker clientMsgWrk0 = new ClientMessageWorker(sock, nodeId); + + while (true) { + ClientMessageWorker old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0); + + if (old == null) + break; + + if (old.isInterrupted()) { + clientMsgWorkers.remove(nodeId, old); + + continue; + } + + old.join(500); + + old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0); + + if (old == null) + break; + + if (log.isDebugEnabled()) + log.debug("Already have client message worker, closing connection " + + "[locNodeId=" + locNodeId + + ", rmtNodeId=" + nodeId + + ", workerSock=" + old.sock + + ", sock=" + sock + ']'); + + return; + } + if (log.isDebugEnabled()) log.debug("Created client message worker [locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ", sock=" + sock + ']'); - clientMsgWrk = new ClientMessageWorker(sock, nodeId); - - clientMsgWrk.start(); + assert clientMsgWrk0 == clientMsgWorkers.get(nodeId); - clientMsgWorkers.put(nodeId, clientMsgWrk); + clientMsgWrk = clientMsgWrk0; } if (log.isDebugEnabled()) @@ -4188,7 +4250,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg; if (!req.responded()) { - boolean ok = processJoinRequestMessage(req); + boolean ok = processJoinRequestMessage(req, clientMsgWrk); if (clientMsgWrk != null && ok) continue; @@ -4202,14 +4264,17 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { - spi.writeToSocket(sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK); + + if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) + clientMsgWrk.start(); msgWorker.addMessage(msg); continue; } else { - spi.writeToSocket(sock, RES_CONTINUE_JOIN); + spi.writeToSocket(msg, sock, RES_CONTINUE_JOIN); break; } @@ -4217,7 +4282,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryDuplicateIdMessage) { // Send receipt back. - spi.writeToSocket(sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK); boolean ignored = false; @@ -4246,7 +4311,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryAuthFailedMessage) { // Send receipt back. - spi.writeToSocket(sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK); boolean ignored = false; @@ -4275,7 +4340,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryCheckFailedMessage) { // Send receipt back. - spi.writeToSocket(sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK); boolean ignored = false; @@ -4304,7 +4369,7 @@ class ServerImpl extends TcpDiscoveryImpl { } else if (msg instanceof TcpDiscoveryLoopbackProblemMessage) { // Send receipt back. - spi.writeToSocket(sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK); boolean ignored = false; @@ -4346,7 +4411,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Send receipt back. if (clientMsgWrk == null) - spi.writeToSocket(sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK); } catch (IgniteCheckedException e) { if (log.isDebugEnabled()) @@ -4435,24 +4500,29 @@ class ServerImpl extends TcpDiscoveryImpl { /** * @param msg Join request message. + * @param clientMsgWrk Client message worker to start. * @return Whether connection was successful. * @throws IOException If IO failed. */ @SuppressWarnings({"IfMayBeConditional"}) - private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg) throws IOException { + private boolean processJoinRequestMessage(TcpDiscoveryJoinRequestMessage msg, + @Nullable ClientMessageWorker clientMsgWrk) throws IOException { assert msg != null; assert !msg.responded(); TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { - spi.writeToSocket(sock, RES_OK); + spi.writeToSocket(msg, sock, RES_OK); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']'); msg.responded(true); + if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) + clientMsgWrk.start(); + msgWorker.addMessage(msg); return true; @@ -4477,7 +4547,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Local node is stopping. Remote node should try next one. res = RES_CONTINUE_JOIN; - spi.writeToSocket(sock, res); + spi.writeToSocket(msg, sock, res); if (log.isDebugEnabled()) log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']'); @@ -4632,6 +4702,7 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @return Ping result. * @throws InterruptedException If interrupted. */ public boolean ping() throws InterruptedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 1d1916a..7663fe6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1227,12 +1227,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** * Writes response to the socket. * + * @param msg Received message. * @param sock Socket. * @param res Integer response. * @throws IOException If IO failed or write timed out. */ @SuppressWarnings("ThrowFromFinallyBlock") - protected void writeToSocket(Socket sock, int res) throws IOException { + protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException { assert sock != null; SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efa92c54/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 8147958..ec6a526 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -24,7 +24,9 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; @@ -106,11 +108,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ private int maxMissedClientHbs = TcpDiscoverySpi.DFLT_MAX_MISSED_CLIENT_HEARTBEATS; + /** */ + private IgniteInClosure2X afterWrite; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - TcpDiscoverySpi disco = new TestTcpDiscoverySpi(); + TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi(); disco.setMaxMissedClientHeartbeats(maxMissedClientHbs); @@ -154,6 +159,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { disco.setJoinTimeout(joinTimeout); disco.setNetworkTimeout(netTimeout); + disco.afterWrite(afterWrite); + cfg.setDiscoverySpi(disco); if (nodeId != null) @@ -1016,6 +1023,189 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testJoinError3() throws Exception { + startServerNodes(1); + + Ignite ignite = G.ignite("server-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); + + srvSpi.failNodeAddFinishedMessage(); + + startClientNodes(1); + + checkNodes(1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testJoinErrorMissedAddFinishedMessage1() throws Exception { + missedAddFinishedMessage(true); + } + + /** + * @throws Exception If failed. + */ + public void testJoinErrorMissedAddFinishedMessage2() throws Exception { + missedAddFinishedMessage(false); + } + + /** + * @param singleSrv If {@code true} starts one server node two otherwise. + * @throws Exception If failed. + */ + private void missedAddFinishedMessage(boolean singleSrv) throws Exception { + int srvs = singleSrv ? 1 : 2; + + startServerNodes(srvs); + + afterWrite = new CIX2() { + private boolean first = true; + + @Override public void applyx(TcpDiscoveryAbstractMessage msg, Socket sock) throws IgniteCheckedException { + if (first && (msg instanceof TcpDiscoveryJoinRequestMessage)) { + first = false; + + log.info("Close socket after message write [msg=" + msg + "]"); + + try { + sock.close(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + log.info("Delay after message write [msg=" + msg + "]"); + + U.sleep(5000); // Wait when server process join request. + } + } + }; + + Ignite srv = singleSrv ? G.ignite("server-0") : G.ignite("server-1"); + + TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode(); + + assertEquals(singleSrv ? 1 : 2, srvNode.order()); + + clientIpFinder = new TcpDiscoveryVmIpFinder(); + + clientIpFinder.setAddresses(Collections.singleton("localhost:" + srvNode.discoveryPort())); + + startClientNodes(1); + + TcpDiscoveryNode clientNode = (TcpDiscoveryNode)G.ignite("client-0").cluster().localNode(); + + assertEquals(srvNode.id(), clientNode.clientRouterNodeId()); + + checkNodes(srvs, 1); + } + + /** + * @throws Exception If failed. + */ + public void testClientMessageWorkerStartSingleServer() throws Exception { + clientMessageWorkerStart(1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testClientMessageWorkerStartTwoServers1() throws Exception { + clientMessageWorkerStart(2, 1); + } + + /** + * @throws Exception If failed. + */ + public void testClientMessageWorkerStartTwoServers2() throws Exception { + clientMessageWorkerStart(2, 2); + } + + /** + * @param srvs Number of server nodes. + * @param connectTo What server connect to. + * @throws Exception If failed. + */ + private void clientMessageWorkerStart(int srvs, int connectTo) throws Exception { + startServerNodes(srvs); + + Ignite srv = G.ignite("server-" + (connectTo - 1)); + + final TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode(); + + assertEquals((long)connectTo, srvNode.order()); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + + final String client0 = "client-" + clientIdx.getAndIncrement(); + + srvSpi.delayJoinAckFor = client0; + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + clientIpFinder = new TcpDiscoveryVmIpFinder(); + + clientIpFinder.setAddresses(Collections.singleton("localhost:" + srvNode.discoveryPort())); + + Ignite client = startGrid(client0); + + clientIpFinder = null; + + clientNodeIds.add(client.cluster().localNode().id()); + + TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()); + + assertFalse(clientSpi.invalidResponse()); + + TcpDiscoveryNode clientNode = (TcpDiscoveryNode)client.cluster().localNode(); + + assertEquals(srvNode.id(), clientNode.clientRouterNodeId()); + + return null; + } + }); + + final String client1 = "client-" + clientIdx.getAndIncrement(); + + while (!fut.isDone()) { + startGrid(client1); + + stopGrid(client1); + } + + fut.get(); + + checkNodes(srvs, 1); + } + + /** + * @throws Exception If failed. + */ + public void testJoinMutlithreaded() throws Exception { + startServerNodes(1); + + final int CLIENTS = 30; + + clientsPerSrv = CLIENTS; + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); + + clientNodeIds.add(g.cluster().localNode().id()); + + return null; + } + }, CLIENTS, "start-client"); + + checkNodes(1, CLIENTS); + } + + /** * @param clientIdx Client index. * @param srvIdx Server index. * @throws Exception In case of error. @@ -1267,8 +1457,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { private AtomicInteger failNodeAdded = new AtomicInteger(); /** */ + private AtomicInteger failNodeAddFinished = new AtomicInteger(); + + /** */ private AtomicInteger failClientReconnect = new AtomicInteger(); + /** */ + private IgniteInClosure2X afterWrite; + + /** */ + private volatile boolean invalidRes; + + /** */ + private volatile String delayJoinAckFor; + /** * @param lock Lock. */ @@ -1287,6 +1489,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * @param afterWrite After write callback. + */ + void afterWrite(IgniteInClosure2X afterWrite) { + this.afterWrite = afterWrite; + } + + /** + * @return {@code True} if received unexpected ack. + */ + boolean invalidResponse() { + return invalidRes; + } + + /** * */ void failNodeAddedMessage() { @@ -1296,6 +1512,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * */ + void failNodeAddFinishedMessage() { + failNodeAddFinished.set(1); + } + + /** + * + */ void failClientReconnectMessage() { failClientReconnect.set(1); } @@ -1322,6 +1545,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { if (msg instanceof TcpDiscoveryNodeAddedMessage) fail = failNodeAdded.getAndDecrement() > 0; + else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) + fail = failNodeAddFinished.getAndDecrement() > 0; else if (msg instanceof TcpDiscoveryClientReconnectMessage) fail = failClientReconnect.getAndDecrement() > 0; @@ -1332,6 +1557,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } super.writeToSocket(sock, msg, bout); + + if (afterWrite != null) + afterWrite.apply(msg, sock); } /** {@inheritDoc} */ @@ -1365,5 +1593,40 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { impl.workerThread().resume(); } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException { + if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) { + TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg; + + if (delayJoinAckFor.equals(msg0.node().attribute(IgniteNodeAttributes.ATTR_GRID_NAME))) { + log.info("Delay response [sock=" + sock + ", msg=" + msg0 + ", res=" + res + ']'); + + delayJoinAckFor = null; + + try { + Thread.sleep(2000); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + super.writeToSocket(msg, sock, res); + } + + /** {@inheritDoc} */ + @Override protected int readReceipt(Socket sock, long timeout) throws IOException { + int res = super.readReceipt(sock, timeout); + + if (res != TcpDiscoveryImpl.RES_OK) { + invalidRes = true; + + log.info("Received unexpected response: " + res); + } + + return res; + } } }