Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CA4B918C75 for ; Tue, 16 Jun 2015 11:09:51 +0000 (UTC) Received: (qmail 62069 invoked by uid 500); 16 Jun 2015 11:09:51 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 62037 invoked by uid 500); 16 Jun 2015 11:09:51 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 62028 invoked by uid 99); 16 Jun 2015 11:09:51 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jun 2015 11:09:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 1821CCE2C3 for ; Tue, 16 Jun 2015 11:09:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.153 X-Spam-Level: * X-Spam-Status: No, score=1.153 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.648, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id TnBWBazQoZZF for ; Tue, 16 Jun 2015 11:09:36 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 1F9DB24CEA for ; Tue, 16 Jun 2015 11:09:33 +0000 (UTC) Received: (qmail 61584 invoked by uid 99); 16 Jun 2015 11:09:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jun 2015 11:09:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 32EC9E36E7; Tue, 16 Jun 2015 11:09:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sergi@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 16 Jun 2015 11:09:37 -0000 Message-Id: In-Reply-To: <4607310c77954f3db29d586910408956@git.apache.org> References: <4607310c77954f3db29d586910408956@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/19] incubator-ignite git commit: # ignite-883 # ignite-883 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8870a177 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8870a177 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8870a177 Branch: refs/heads/ignite-484-1 Commit: 8870a177acaa16f6757615cab6017b21aa274c01 Parents: 16f3d32 Author: sboikov Authored: Fri Jun 12 10:05:22 2015 +0300 Committer: sboikov Committed: Fri Jun 12 17:20:14 2015 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 2 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 232 ++++++++++++------- .../ignite/spi/discovery/tcp/ServerImpl.java | 170 +++++++++++--- .../ipfinder/TcpDiscoveryIpFinderAdapter.java | 34 ++- .../TcpDiscoveryMulticastIpFinder.java | 19 +- .../messages/TcpDiscoveryAbstractMessage.java | 10 +- .../distributed/IgniteCacheManyClientsTest.java | 65 ++++-- .../tcp/TcpClientDiscoverySpiSelfTest.java | 42 +++- 8 files changed, 412 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index f19e25b..9e38788 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2028,7 +2028,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (X.hasCause(e, SocketTimeoutException.class)) LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " + - "configuration property) [addr=" + addr + ", connTimeout" + connTimeout + ']'); + "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']'); if (errs == null) errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 23e6f88..d8108e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -311,7 +311,7 @@ class ClientImpl extends TcpDiscoveryImpl { for (ClusterNode n : rmts) { rmtNodes.remove(n.id()); - Collection top = updateTopologyHistory(topVer + 1); + Collection top = updateTopologyHistory(topVer + 1, null); lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null); } @@ -348,13 +348,14 @@ class ClientImpl extends TcpDiscoveryImpl { /** * @param recon {@code True} if reconnects. + * @param timeout Timeout. * @return Opened socket or {@code null} if timeout. * @throws InterruptedException If interrupted. * @throws IgniteSpiException If failed. * @see TcpDiscoverySpi#joinTimeout */ @SuppressWarnings("BusyWait") - @Nullable private Socket joinTopology(boolean recon) throws IgniteSpiException, InterruptedException { + @Nullable private Socket joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException { Collection addrs = null; long startTime = U.currentTimeMillis(); @@ -371,11 +372,11 @@ class ClientImpl extends TcpDiscoveryImpl { log.debug("Resolved addresses from IP finder: " + addrs); } else { - U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder); - - if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout) + if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) return null; + U.warn(log, "No addresses registered in the IP finder (will retry in 2000ms): " + spi.ipFinder); + Thread.sleep(2000); } } @@ -421,13 +422,13 @@ class ClientImpl extends TcpDiscoveryImpl { } if (addrs.isEmpty()) { - if (spi.joinTimeout > 0 && (U.currentTimeMillis() - startTime) > spi.joinTimeout) + if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) return null; - Thread.sleep(2000); - U.warn(log, "Failed to connect to any address from IP finder (will retry to join topology " + "in 2000ms): " + addrs0); + + Thread.sleep(2000); } } } @@ -541,16 +542,17 @@ class ClientImpl extends TcpDiscoveryImpl { /** * @param topVer New topology version. + * @param msg Discovery message. * @return Latest topology snapshot. */ - private NavigableSet updateTopologyHistory(long topVer) { + private NavigableSet updateTopologyHistory(long topVer, @Nullable TcpDiscoveryAbstractMessage msg) { this.topVer = topVer; NavigableSet allNodes = allVisibleNodes(); if (!topHist.containsKey(topVer)) { assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 : - "lastVer=" + topHist.lastKey() + ", newVer=" + topVer; + "lastVer=" + topHist.lastKey() + ", newVer=" + topVer + ", locNode=" + locNode + ", msg=" + msg; topHist.put(topVer, allNodes); @@ -619,6 +621,17 @@ class ClientImpl extends TcpDiscoveryImpl { } /** + * @param err Error. + */ + private void joinError(IgniteSpiException err) { + assert err != null; + + joinErr = err; + + joinLatch.countDown(); + } + + /** * Heartbeat sender. */ private class HeartbeatSender extends TimerTask { @@ -727,7 +740,7 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onMessageReceived(msg); - if (spi.ensured(msg)) + if (spi.ensured(msg) && joinLatch.getCount() == 0L) lastMsgId = msg.id(); msgWorker.addMessage(msg); @@ -866,11 +879,16 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private volatile Socket sock; + /** */ + private boolean join; + /** - * + * @param join {@code True} if reconnects during join. */ - protected Reconnector() { + protected Reconnector(boolean join) { super(spi.ignite().name(), "tcp-client-disco-msg-worker", log); + + this.join = join; } /** @@ -888,51 +906,94 @@ class ClientImpl extends TcpDiscoveryImpl { boolean success = false; + Exception err = null; + + long timeout = join ? spi.joinTimeout : spi.netTimeout; + + long startTime = U.currentTimeMillis(); + try { - sock = joinTopology(true); + while (true) { + sock = joinTopology(true, timeout); - if (sock == null) { - U.error(log, "Failed to reconnect to cluster: timeout."); + if (sock == null) { + if (join) { + joinError(new IgniteSpiException("Join process timed out, connection failed and " + + "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + + "[networkTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); + } + else + U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + + "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); - return; - } + return; + } - if (isInterrupted()) - throw new InterruptedException(); + if (isInterrupted()) + throw new InterruptedException(); - InputStream in = new BufferedInputStream(sock.getInputStream()); + int oldTimeout = 0; - sock.setKeepAlive(true); - sock.setTcpNoDelay(true); + try { + oldTimeout = sock.getSoTimeout(); - // Wait for - while (!isInterrupted()) { - TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); + sock.setSoTimeout((int)spi.netTimeout); - if (msg instanceof TcpDiscoveryClientReconnectMessage) { - TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg; + InputStream in = new BufferedInputStream(sock.getInputStream()); - if (res.creatorNodeId().equals(getLocalNodeId())) { - if (res.success()) { - msgWorker.addMessage(res); + sock.setKeepAlive(true); + sock.setTcpNoDelay(true); - success = true; - } + // Wait for + while (!isInterrupted()) { + TcpDiscoveryAbstractMessage msg = spi.marsh.unmarshal(in, U.gridClassLoader()); - break; + if (msg instanceof TcpDiscoveryClientReconnectMessage) { + TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg; + + if (res.creatorNodeId().equals(getLocalNodeId())) { + if (res.success()) { + msgWorker.addMessage(res); + + success = true; + } + + return; + } + } } } + catch (IOException | IgniteCheckedException e) { + U.closeQuiet(sock); + + if (log.isDebugEnabled()) + log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e); + if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) + throw e; + else + U.warn(log, "Failed to reconnect to cluster (will retry): " + e); + } + finally { + if (success) + sock.setSoTimeout(oldTimeout); + } } } catch (IOException | IgniteCheckedException e) { + err = e; + U.error(log, "Failed to reconnect", e); } finally { if (!success) { U.closeQuiet(sock); - msgWorker.addMessage(SPI_RECONNECT_FAILED); + if (join) + joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " + + "to reconnect.", err)); + else + msgWorker.addMessage(SPI_RECONNECT_FAILED); } } } @@ -967,7 +1028,7 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onJoinStarted(); try { - final Socket sock = joinTopology(false); + final Socket sock = joinTopology(false, spi.joinTimeout); if (sock == null) { joinErr = new IgniteSpiException("Join process timed out."); @@ -981,12 +1042,14 @@ class ClientImpl extends TcpDiscoveryImpl { sockWriter.setSocket(sock); - timer.schedule(new TimerTask() { - @Override public void run() { - if (joinLatch.getCount() > 0) - queue.add(JOIN_TIMEOUT); - } - }, spi.netTimeout); + if (spi.joinTimeout > 0) { + timer.schedule(new TimerTask() { + @Override public void run() { + if (joinLatch.getCount() > 0) + queue.add(JOIN_TIMEOUT); + } + }, spi.joinTimeout); + } sockReader.setSocket(sock, locNode.clientRouterNodeId()); @@ -996,8 +1059,8 @@ class ClientImpl extends TcpDiscoveryImpl { if (msg == JOIN_TIMEOUT) { if (joinLatch.getCount() > 0) { joinErr = new IgniteSpiException("Join process timed out, did not receive response for " + - "join request (consider increasing 'networkTimeout' configuration property) " + - "[networkTimeout=" + spi.netTimeout + ", sock=" + sock +']'); + "join request (consider increasing 'joinTimeout' configuration property) " + + "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock +']'); joinLatch.countDown(); @@ -1021,30 +1084,23 @@ class ClientImpl extends TcpDiscoveryImpl { if (((SocketClosedMessage)msg).sock == currSock) { currSock = null; - if (joinLatch.getCount() > 0) { - joinErr = new IgniteSpiException("Failed to connect to cluster: socket closed."); + boolean join = joinLatch.getCount() > 0; - joinLatch.countDown(); + if (spi.getSpiContext().isStopping() || segmented) { + leaveLatch.countDown(); - break; + if (join) { + joinError(new IgniteSpiException("Failed to connect to cluster: socket closed.")); + + break; + } } else { - if (spi.getSpiContext().isStopping() || segmented) - leaveLatch.countDown(); - else { - assert reconnector == null; - - final Reconnector reconnector = new Reconnector(); - this.reconnector = reconnector; - reconnector.start(); - - timer.schedule(new TimerTask() { - @Override public void run() { - if (reconnector.isAlive()) - reconnector.cancel(); - } - }, spi.netTimeout); - } + assert reconnector == null; + + final Reconnector reconnector = new Reconnector(join); + this.reconnector = reconnector; + reconnector.start(); } } } @@ -1208,7 +1264,7 @@ class ClientImpl extends TcpDiscoveryImpl { locNode.order(topVer); - notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer)); + notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg)); joinErr = null; @@ -1230,6 +1286,14 @@ class ClientImpl extends TcpDiscoveryImpl { return; } + if (!topHist.isEmpty() && msg.topologyVersion() <= topHist.lastKey()) { + if (log.isDebugEnabled()) + log.debug("Discarding node add finished message since topology already updated " + + "[msg=" + msg + ", lastHistKey=" + topHist.lastKey() + ", node=" + node + ']'); + + return; + } + long topVer = msg.topologyVersion(); node.order(topVer); @@ -1238,7 +1302,7 @@ class ClientImpl extends TcpDiscoveryImpl { if (spi.locNodeVer.equals(node.version())) node.version(spi.locNodeVer); - NavigableSet top = updateTopologyHistory(topVer); + NavigableSet top = updateTopologyHistory(topVer, msg); if (!pending && joinLatch.getCount() > 0) { if (log.isDebugEnabled()) @@ -1276,7 +1340,7 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - NavigableSet top = updateTopologyHistory(msg.topologyVersion()); + NavigableSet top = updateTopologyHistory(msg.topologyVersion(), msg); if (!pending && joinLatch.getCount() > 0) { if (log.isDebugEnabled()) @@ -1319,7 +1383,7 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - NavigableSet top = updateTopologyHistory(msg.topologyVersion()); + NavigableSet top = updateTopologyHistory(msg.topologyVersion(), msg); if (!pending && joinLatch.getCount() > 0) { if (log.isDebugEnabled()) @@ -1376,28 +1440,32 @@ class ClientImpl extends TcpDiscoveryImpl { return; if (getLocalNodeId().equals(msg.creatorNodeId())) { - assert msg.success(); + assert msg.success() : msg; - currSock = reconnector.sock; + if (reconnector != null) { + currSock = reconnector.sock; - sockWriter.setSocket(currSock); - sockReader.setSocket(currSock, locNode.clientRouterNodeId()); + sockWriter.setSocket(currSock); + sockReader.setSocket(currSock, locNode.clientRouterNodeId()); - reconnector = null; + reconnector = null; - pending = true; + pending = true; - try { - for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { - if (log.isDebugEnabled()) - log.debug("Process message on reconnect [msg=" + pendingMsg + ']'); + try { + for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { + if (log.isDebugEnabled()) + log.debug("Process message on reconnect [msg=" + pendingMsg + ']'); - processDiscoveryMessage(pendingMsg); + processDiscoveryMessage(pendingMsg); + } + } + finally { + pending = false; } } - finally { - pending = false; - } + else if (log.isDebugEnabled()) + log.debug("Discarding reconnect message, reconnect is completed: " + msg); } else if (log.isDebugEnabled()) log.debug("Discarding reconnect message for another client: " + msg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 65bea9f..9041557 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -41,7 +41,6 @@ import org.jsr166.*; import java.io.*; import java.net.*; -import java.text.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -1192,7 +1191,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (node.id().equals(destNodeId)) { Collection allNodes = ring.allNodes(); - Collection topToSend = new ArrayList<>(allNodes.size()); + Collection topToSnd = new ArrayList<>(allNodes.size()); for (TcpDiscoveryNode n0 : allNodes) { assert n0.internalOrder() != 0 : n0; @@ -1202,10 +1201,10 @@ class ServerImpl extends TcpDiscoveryImpl { // There will be separate messages for nodes with greater // internal order. if (n0.internalOrder() < nodeAddedMsg.node().internalOrder()) - topToSend.add(n0); + topToSnd.add(n0); } - nodeAddedMsg.topology(topToSend); + nodeAddedMsg.topology(topToSnd); nodeAddedMsg.messages(msgs, discardMsgId); Map> hist; @@ -1646,6 +1645,108 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Discovery messages history used for client reconnect. + */ + private class EnsuredMessageHistory { + /** */ + private static final int MAX = 1024; + + /** Pending messages. */ + private final ArrayDeque msgs = new ArrayDeque<>(MAX * 2); + + /** + * @param msg Adds message. + */ + void add(TcpDiscoveryAbstractMessage msg) { + assert spi.ensured(msg) : msg; + + msgs.addLast(msg); + + while (msgs.size() > MAX) + msgs.pollFirst(); + } + + /** + * Gets messages starting from provided ID (exclusive). If such + * message is not found, {@code null} is returned (this indicates + * a failure condition when it was already removed from queue). + * + * @param lastMsgId Last message ID received on client. {@code Null} if client did not finish connect procedure. + * @param node Client node. + * @return Collection of messages. + */ + @Nullable Collection messages(@Nullable IgniteUuid lastMsgId, + TcpDiscoveryNode node) + { + assert node != null && node.isClient() : node; + + if (lastMsgId == null) { + // Client connection failed before it received TcpDiscoveryNodeAddedMessage. + List res = null; + + for (TcpDiscoveryAbstractMessage msg : msgs) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + if (node.id().equals(((TcpDiscoveryNodeAddedMessage) msg).node().id())) + res = new ArrayList<>(msgs.size()); + } + + if (res != null) + res.add(prepare(msg, node.id())); + } + + if (log.isDebugEnabled()) { + if (res == null) + log.debug("Failed to find node added message [node=" + node + ']'); + else + log.debug("Found add added message [node=" + node + ", hist=" + res + ']'); + } + + return res; + } + else { + if (msgs.isEmpty()) + return Collections.emptyList(); + + Collection cp = new ArrayList<>(msgs.size()); + + boolean skip = true; + + for (TcpDiscoveryAbstractMessage msg : msgs) { + if (skip) { + if (msg.id().equals(lastMsgId)) + skip = false; + } + else + cp.add(prepare(msg, node.id())); + } + + cp = !skip ? cp : null; + + if (log.isDebugEnabled()) { + if (cp == null) + log.debug("Failed to find messages history [node=" + node + ", lastMsgId" + lastMsgId + ']'); + else + log.debug("Found messages history [node=" + node + ", hist=" + cp + ']'); + } + + return cp; + } + } + + /** + * @param msg Message. + * @param destNodeId Client node ID. + * @return Prepared message. + */ + private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) + prepareNodeAddedMessage(msg, destNodeId, null, null); + + return msg; + } + } + + /** * Pending messages container. */ private static class PendingMessages { @@ -1678,33 +1779,27 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Gets messages starting from provided ID (exclusive). If such - * message is not found, {@code null} is returned (this indicates - * a failure condition when it was already removed from queue). + * Resets pending messages. * - * @param lastMsgId Last message ID. - * @return Collection of messages. + * @param msgs Message. + * @param discardId Discarded message ID. */ - @Nullable Collection messages(IgniteUuid lastMsgId) { - assert lastMsgId != null; - - if (msgs.isEmpty()) - return Collections.emptyList(); + void reset(@Nullable Collection msgs, @Nullable IgniteUuid discardId) { + this.msgs.clear(); - Collection cp = new ArrayList<>(msgs.size()); + if (msgs != null) + this.msgs.addAll(msgs); - boolean skip = true; + this.discardId = discardId; + } - for (TcpDiscoveryAbstractMessage msg : msgs) { - if (skip) { - if (msg.id().equals(lastMsgId)) - skip = false; - } - else - cp.add(msg); - } + /** + * Clears pending messages. + */ + void clear() { + msgs.clear(); - return !skip ? cp : null; + discardId = null; } /** @@ -1728,6 +1823,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** Pending messages. */ private final PendingMessages pendingMsgs = new PendingMessages(); + /** Messages history used for client reconnect. */ + private final EnsuredMessageHistory msgHist = new EnsuredMessageHistory(); + /** Last message that updated topology. */ private TcpDiscoveryAbstractMessage lastMsg; @@ -1794,6 +1892,9 @@ class ServerImpl extends TcpDiscoveryImpl { else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); + if (spi.ensured(msg)) + msgHist.add(msg); + spi.stats.onMessageProcessingFinished(msg); } @@ -2130,6 +2231,8 @@ class ServerImpl extends TcpDiscoveryImpl { onException("Failed to send message to next node [next=" + next.id() + ", msg=" + msg + ']', e); + log.error("Will resend [msg=" + msg + ", e=" + e + ']'); + if (e instanceof SocketTimeoutException || X.hasCause(e, SocketTimeoutException.class)) { ackTimeout0 *= 2; @@ -2619,11 +2722,15 @@ class ServerImpl extends TcpDiscoveryImpl { node.aliveCheck(spi.maxMissedClientHbs); if (isLocalNodeCoordinator()) { - Collection pending = pendingMsgs.messages(msg.lastMessageId()); + Collection pending = msgHist.messages(msg.lastMessageId(), node); if (pending != null) { msg.pendingMessages(pending); msg.success(true); + + if (log.isDebugEnabled()) + log.debug("Accept client reconnect, restored pending messages " + + "[locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); } else { if (log.isDebugEnabled()) @@ -2836,7 +2943,8 @@ class ServerImpl extends TcpDiscoveryImpl { topHist.clear(); topHist.putAll(msg.topologyHistory()); - pendingMsgs.discard(msg.discardedMessageId()); + // Restore pending messages. + pendingMsgs.reset(msg.messages(), msg.discardedMessageId()); // Clear data to minimize message size. msg.messages(null, null); @@ -3094,6 +3202,10 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Removed node from topology: " + leftNode); + // Clear pending messages map. + if (!ring.hasRemoteNodes()) + pendingMsgs.clear(); + long topVer; if (locNodeCoord) { @@ -3257,6 +3369,10 @@ class ServerImpl extends TcpDiscoveryImpl { assert node != null; + // Clear pending messages map. + if (!ring.hasRemoteNodes()) + pendingMsgs.clear(); + long topVer; if (locNodeCoord) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java index 99a2cdc..4d62ff2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/TcpDiscoveryIpFinderAdapter.java @@ -17,9 +17,13 @@ package org.apache.ignite.spi.discovery.tcp.ipfinder; +import org.apache.ignite.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; import java.net.*; import java.util.*; @@ -35,6 +39,11 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde @GridToStringExclude private volatile IgniteSpiContext spiCtx; + /** Ignite instance . */ + @IgniteInstanceResource + @GridToStringExclude + protected Ignite ignite; + /** {@inheritDoc} */ @Override public void onSpiContextInitialized(IgniteSpiContext spiCtx) throws IgniteSpiException { this.spiCtx = spiCtx; @@ -47,7 +56,8 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde /** {@inheritDoc} */ @Override public void initializeLocalAddresses(Collection addrs) throws IgniteSpiException { - registerAddresses(addrs); + if (!discoveryClientMode()) + registerAddresses(addrs); } /** {@inheritDoc} */ @@ -77,6 +87,28 @@ public abstract class TcpDiscoveryIpFinderAdapter implements TcpDiscoveryIpFinde } /** + * @return {@code True} if TCP discovery works in client mode. + */ + protected boolean discoveryClientMode() { + boolean clientMode; + + Ignite ignite0 = ignite; + + if (ignite0 != null) { // Can be null if used in tests without starting Ignite. + DiscoverySpi discoSpi = ignite0.configuration().getDiscoverySpi(); + + if (!(discoSpi instanceof TcpDiscoverySpi)) + throw new IgniteSpiException("TcpDiscoveryIpFinder should be used with TcpDiscoverySpi: " + discoSpi); + + clientMode = ignite0.configuration().isClientMode() && !((TcpDiscoverySpi)discoSpi).isForceServerMode(); + } + else + clientMode = false; + + return clientMode; + } + + /** * @return SPI context. */ protected IgniteSpiContext spiContext() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java index a992620..8e5a1fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java @@ -85,11 +85,6 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { @LoggerResource private IgniteLogger log; - /** Ignite instance . */ - @IgniteInstanceResource - @GridToStringExclude - private Ignite ignite; - /** Multicast IP address as string. */ private String mcastGrp = DFLT_MCAST_GROUP; @@ -256,19 +251,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder { "(it is recommended in production to specify at least one address in " + "TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)"); - boolean clientMode; - - if (ignite != null) { // Can be null if used in tests without starting Ignite. - DiscoverySpi discoSpi = ignite.configuration().getDiscoverySpi(); - - if (!(discoSpi instanceof TcpDiscoverySpi)) - throw new IgniteSpiException("TcpDiscoveryMulticastIpFinder should be used with " + - "TcpDiscoverySpi: " + discoSpi); - - clientMode = ((TcpDiscoverySpi)discoSpi).isClientMode(); - } - else - clientMode = false; + boolean clientMode = discoveryClientMode(); InetAddress mcastAddr; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 145b518..21dbf4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -41,7 +41,7 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2; /** Sender of the message (transient). */ - private transient UUID senderNodeId; + private transient UUID sndNodeId; /** Message ID. */ private IgniteUuid id; @@ -99,16 +99,16 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { * @return Sender node ID. */ public UUID senderNodeId() { - return senderNodeId; + return sndNodeId; } /** * Sets sender node ID. * - * @param senderNodeId Sender node ID. + * @param sndNodeId Sender node ID. */ - public void senderNodeId(UUID senderNodeId) { - this.senderNodeId = senderNodeId; + public void senderNodeId(UUID sndNodeId) { + this.sndNodeId = sndNodeId; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java index 77ddd40..4fb4387 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java @@ -63,18 +63,13 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(2 * 60_000); if (!clientDiscovery) ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); cfg.setClientMode(client); - if (client) { -// cfg.setPublicThreadPoolSize(1); -// cfg.setPeerClassLoadingThreadPoolSize(1); -// cfg.setIgfsThreadPoolSize(1); - } - CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setCacheMode(PARTITIONED); @@ -197,43 +192,62 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { try { IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { @Override public Object call() throws Exception { - try (Ignite ignite = startGrid(idx.getAndIncrement())) { - log.info("Started node: " + ignite.name()); + boolean counted = false; - assertTrue(ignite.configuration().isClientMode()); + try { + int nodeIdx = idx.getAndIncrement(); - IgniteCache cache = ignite.cache(null); + Thread.currentThread().setName("client-thread-node-" + nodeIdx); - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + try (Ignite ignite = startGrid(nodeIdx)) { + log.info("Started node: " + ignite.name()); - int iter = 0; + assertTrue(ignite.configuration().isClientMode()); - Integer key = rnd.nextInt(0, 1000); + IgniteCache cache = ignite.cache(null); - cache.put(key, iter++); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - assertNotNull(cache.get(key)); + int iter = 0; - latch.countDown(); - - while (!stop.get()) { - key = rnd.nextInt(0, 1000); + Integer key = rnd.nextInt(0, 1000); cache.put(key, iter++); assertNotNull(cache.get(key)); - Thread.sleep(1); + latch.countDown(); + + counted = true; + + while (!stop.get()) { + key = rnd.nextInt(0, 1000); + + cache.put(key, iter++); + + assertNotNull(cache.get(key)); + + Thread.sleep(1); + } + + log.info("Stopping node: " + ignite.name()); } - log.info("Stopping node: " + ignite.name()); + return null; } + catch (Throwable e) { + log.error("Unexpected error in client thread: " + e, e); - return null; + throw e; + } + finally { + if (!counted) + latch.countDown(); + } } }, THREADS, "client-thread"); - latch.await(); + assertTrue(latch.await(getTestTimeout(), TimeUnit.MILLISECONDS)); log.info("All clients started."); @@ -245,6 +259,11 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { fut.get(); } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + throw e; + } finally { stop.set(true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8870a177/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 44fe299..8147958 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -193,8 +193,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** - * - * @throws Exception + * @throws Exception If failed. */ public void testJoinTimeout() throws Exception { clientIpFinder = new TcpDiscoveryVmIpFinder(); @@ -544,8 +543,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testClientReconnectTopologyChange2() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-998"); - maxMissedClientHbs = 100; clientsPerSrv = 1; @@ -1001,6 +998,24 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testJoinError2() throws Exception { + startServerNodes(1); + + Ignite ignite = G.ignite("server-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); + + srvSpi.failNodeAddedMessage(); + srvSpi.failClientReconnectMessage(); + + startClientNodes(1); + + checkNodes(1, 1); + } + + /** * @param clientIdx Client index. * @param srvIdx Server index. * @throws Exception In case of error. @@ -1251,6 +1266,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ private AtomicInteger failNodeAdded = new AtomicInteger(); + /** */ + private AtomicInteger failClientReconnect = new AtomicInteger(); + /** * @param lock Lock. */ @@ -1276,6 +1294,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * + */ + void failClientReconnectMessage() { + failClientReconnect.set(1); + } + + /** * @param isPause Is lock. * @param locks Locks. */ @@ -1293,7 +1318,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException { waitFor(writeLock); - if (msg instanceof TcpDiscoveryNodeAddedMessage && failNodeAdded.getAndDecrement() > 0) { + boolean fail = false; + + if (msg instanceof TcpDiscoveryNodeAddedMessage) + fail = failNodeAdded.getAndDecrement() > 0; + else if (msg instanceof TcpDiscoveryClientReconnectMessage) + fail = failClientReconnect.getAndDecrement() > 0; + + if (fail) { log.info("Close socket on message write [msg=" + msg + "]"); sock.close();