Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D37211856F for ; Mon, 20 Jul 2015 08:30:49 +0000 (UTC) Received: (qmail 36582 invoked by uid 500); 20 Jul 2015 08:30:49 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 36543 invoked by uid 500); 20 Jul 2015 08:30:49 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 36529 invoked by uid 99); 20 Jul 2015 08:30:49 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jul 2015 08:30:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 37C241A745F for ; Mon, 20 Jul 2015 08:30:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 8C4dD7wKj7cI for ; Mon, 20 Jul 2015 08:30:36 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 370EF34414 for ; Mon, 20 Jul 2015 08:30:19 +0000 (UTC) Received: (qmail 33991 invoked by uid 99); 20 Jul 2015 08:30:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jul 2015 08:30:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D2266E0427; Mon, 20 Jul 2015 08:30:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ivasilinets@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 20 Jul 2015 08:30:44 -0000 Message-Id: <14ed48197c174984a13653279c50958c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/50] [abbrv] incubator-ignite git commit: # ignite-901 client reconnect support http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 4ca2995..2bce637 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -228,6 +228,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ public static final byte HANDSHAKE_MSG_TYPE = -3; + /** */ + private ConnectGateway connectGate; + /** Server listener. */ private final GridNioServerListener srvLsnr = new GridNioServerListenerAdapter() { @@ -248,7 +251,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Sending local node ID to newly accepted session: " + ses); - ses.send(nodeIdMsg); + ses.send(nodeIdMessage()); } } @@ -289,136 +292,163 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } - @Override public void onMessage(GridNioSession ses, Message msg) { - UUID sndId = ses.meta(NODE_ID_META); + /** + * @param ses Session. + * @param msg Message. + */ + private void onFirstMessage(GridNioSession ses, Message msg) { + UUID sndId; - if (sndId == null) { - assert ses.accepted(); + if (msg instanceof NodeIdMessage) + sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); + else { + assert msg instanceof HandshakeMessage : msg; - if (msg instanceof NodeIdMessage) - sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); - else { - assert msg instanceof HandshakeMessage : msg; + sndId = ((HandshakeMessage)msg).nodeId(); + } - sndId = ((HandshakeMessage)msg).nodeId(); - } + if (log.isDebugEnabled()) + log.debug("Remote node ID received: " + sndId); - if (log.isDebugEnabled()) - log.debug("Remote node ID received: " + sndId); + final UUID old = ses.addMeta(NODE_ID_META, sndId); - final UUID old = ses.addMeta(NODE_ID_META, sndId); + assert old == null; - assert old == null; + final ClusterNode rmtNode = getSpiContext().node(sndId); - final ClusterNode rmtNode = getSpiContext().node(sndId); + if (rmtNode == null) { + if (log.isDebugEnabled()) + log.debug("Close incoming connection, unknown node: " + sndId); - if (rmtNode == null) { - ses.close(); + ses.close(); - return; - } + return; + } - ClusterNode locNode = getSpiContext().localNode(); + ClusterNode locNode = getSpiContext().localNode(); - if (ses.remoteAddress() == null) - return; + if (ses.remoteAddress() == null) + return; - GridCommunicationClient oldClient = clients.get(sndId); + GridCommunicationClient oldClient = clients.get(sndId); - boolean hasShmemClient = false; + boolean hasShmemClient = false; - if (oldClient != null) { - if (oldClient instanceof GridTcpNioCommunicationClient) { - if (log.isDebugEnabled()) - log.debug("Received incoming connection when already connected " + + if (oldClient != null) { + if (oldClient instanceof GridTcpNioCommunicationClient) { + if (log.isDebugEnabled()) + log.debug("Received incoming connection when already connected " + "to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']'); - ses.send(new RecoveryLastReceivedMessage(-1)); + ses.send(new RecoveryLastReceivedMessage(-1)); - return; - } - else { - assert oldClient instanceof GridShmemCommunicationClient; + return; + } + else { + assert oldClient instanceof GridShmemCommunicationClient; - hasShmemClient = true; - } + hasShmemClient = true; } + } - GridFutureAdapter fut = new GridFutureAdapter<>(); + GridFutureAdapter fut = new GridFutureAdapter<>(); - GridFutureAdapter oldFut = clientFuts.putIfAbsent(sndId, fut); + GridFutureAdapter oldFut = clientFuts.putIfAbsent(sndId, fut); - assert msg instanceof HandshakeMessage : msg; + assert msg instanceof HandshakeMessage : msg; - HandshakeMessage msg0 = (HandshakeMessage)msg; + HandshakeMessage msg0 = (HandshakeMessage)msg; - final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); + final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); - if (oldFut == null) { - oldClient = clients.get(sndId); + if (oldFut == null) { + oldClient = clients.get(sndId); - if (oldClient != null) { - if (oldClient instanceof GridTcpNioCommunicationClient) { - if (log.isDebugEnabled()) - log.debug("Received incoming connection when already connected " + + if (oldClient != null) { + if (oldClient instanceof GridTcpNioCommunicationClient) { + if (log.isDebugEnabled()) + log.debug("Received incoming connection when already connected " + "to this node, rejecting [locNode=" + locNode.id() + ", rmtNode=" + sndId + ']'); - ses.send(new RecoveryLastReceivedMessage(-1)); + ses.send(new RecoveryLastReceivedMessage(-1)); - return; - } - else { - assert oldClient instanceof GridShmemCommunicationClient; + return; + } + else { + assert oldClient instanceof GridShmemCommunicationClient; - hasShmemClient = true; - } + hasShmemClient = true; } + } - boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); - if (log.isDebugEnabled()) - log.debug("Received incoming connection from remote node " + + if (log.isDebugEnabled()) + log.debug("Received incoming connection from remote node " + "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']'); - if (reserved) { - try { - GridTcpNioCommunicationClient client = + if (reserved) { + try { + GridTcpNioCommunicationClient client = connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); - fut.onDone(client); - } - finally { - clientFuts.remove(rmtNode.id(), fut); - } + fut.onDone(client); + } + finally { + clientFuts.remove(rmtNode.id(), fut); } } - else { - if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { - if (log.isDebugEnabled()) { - log.debug("Received incoming connection from remote node while " + + } + else { + if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { + if (log.isDebugEnabled()) { + log.debug("Received incoming connection from remote node while " + "connecting to this node, rejecting [locNode=" + locNode.id() + ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + ", rmtNodeOrder=" + rmtNode.order() + ']'); - } - - ses.send(new RecoveryLastReceivedMessage(-1)); } - else { - boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + + ses.send(new RecoveryLastReceivedMessage(-1)); + } + else { + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); - if (reserved) { - GridTcpNioCommunicationClient client = + if (reserved) { + GridTcpNioCommunicationClient client = connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); - fut.onDone(client); - } + fut.onDone(client); } } } + } + + @Override public void onMessage(GridNioSession ses, Message msg) { + UUID sndId = ses.meta(NODE_ID_META); + + if (sndId == null) { + assert ses.accepted() : ses; + + if (!connectGate.tryEnter()) { + if (log.isDebugEnabled()) + log.debug("Close incoming connection, failed to enter gateway."); + + ses.close(); + + return; + } + + try { + onFirstMessage(ses, msg); + } + finally { + connectGate.leave(); + } + } else { rcvdMsgsCnt.increment(); @@ -700,9 +730,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Address resolver. */ private AddressResolver addrRslvr; - /** Local node ID message. */ - private NodeIdMessage nodeIdMsg; - /** Received messages count. */ private final LongAdder8 rcvdMsgsCnt = new LongAdder8(); @@ -739,8 +766,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent; - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + assert evt instanceof DiscoveryEvent : evt; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; onNodeLeft(((DiscoveryEvent)evt).eventNode().id()); } @@ -1237,8 +1264,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public Map getNodeAttributes() throws IgniteSpiException { - nodeIdMsg = new NodeIdMessage(getLocalNodeId()); - assertParameter(locPort > 1023, "locPort > 1023"); assertParameter(locPort <= 0xffff, "locPort < 0xffff"); assertParameter(locPortRange >= 0, "locPortRange >= 0"); @@ -1346,6 +1371,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter registerMBean(gridName, this, TcpCommunicationSpiMBean.class); + connectGate = new ConnectGateway(); + if (shmemSrv != null) { shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv); @@ -1608,6 +1635,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Safety. ctxInitLatch.countDown(); + if (connectGate != null) + connectGate.stopped(); + // Force closing. for (GridCommunicationClient client : clients.values()) client.forceClose(); @@ -1617,6 +1647,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter getSpiContext().removeLocalEventListener(discoLsnr); } + /** {@inheritDoc} */ + @Override public void onClientDisconnected(IgniteFuture reconnectFut) { + connectGate.disconnected(reconnectFut); + + for (GridCommunicationClient client : clients.values()) + client.forceClose(); + + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, + "Failed to connect client node disconnected."); + + for (GridFutureAdapter clientFut : clientFuts.values()) + clientFut.onDone(err); + + recoveryDescs.clear(); + } + + /** {@inheritDoc} */ + @Override public void onClientReconnected(boolean clusterRestarted) { + connectGate.reconnected(); + } + /** * @param nodeId Left node ID. */ @@ -1666,10 +1717,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isTraceEnabled()) log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); - UUID locNodeId = getLocalNodeId(); - - if (node.id().equals(locNodeId)) - notifyListener(locNodeId, msg, NOOP); + if (node.id().equals(getLocalNode().id())) + notifyListener(node.id(), msg, NOOP); else { GridCommunicationClient client = null; @@ -1834,7 +1883,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } - return createTcpClient(node); + connectGate.enter(); + + try { + return createTcpClient(node); + } + finally { + connectGate.leave(); + } } /** @@ -2208,7 +2264,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); if (recovery != null) { - HandshakeMessage msg = new HandshakeMessage(getLocalNodeId(), + HandshakeMessage msg = new HandshakeMessage(getLocalNode().id(), recovery.incrementConnectCount(), recovery.receivedCount()); @@ -2228,7 +2284,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ch.write(buf); } else - ch.write(ByteBuffer.wrap(nodeIdMsg.nodeIdBytesWithType)); + ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)); if (recovery != null) { if (log.isDebugEnabled()) @@ -2355,6 +2411,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter getExceptionRegistry().onException(msg, e); } + /** + * @return Node ID message. + */ + private NodeIdMessage nodeIdMessage() { + return new NodeIdMessage(getLocalNode().id()); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpCommunicationSpi.class, this); @@ -2692,10 +2755,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) { ClusterNode node = recoveryDesc.node(); - if (clients.containsKey(node.id()) || - !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) || - !getSpiContext().pingNode(node.id())) + try { + if (clients.containsKey(node.id()) || + !recoveryDesc.nodeAlive(getSpiContext().node(node.id())) || + !getSpiContext().pingNode(node.id())) + return; + } + catch (IgniteClientDisconnectedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node, client disconnected."); + return; + } try { if (log.isDebugEnabled()) @@ -2860,15 +2931,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { + UUID id = getLocalNode().id(); + + NodeIdMessage msg = new NodeIdMessage(id); + out.write(U.IGNITE_HEADER); out.write(NODE_ID_MSG_TYPE); - out.write(nodeIdMsg.nodeIdBytes); + out.write(msg.nodeIdBytes); out.flush(); if (log.isDebugEnabled()) - log.debug("Sent local node ID [locNodeId=" + getLocalNodeId() + ", rmtNodeId=" - + rmtNodeId + ']'); + log.debug("Sent local node ID [locNodeId=" + id + ", rmtNodeId=" + rmtNodeId + ']'); } catch (IOException e) { throw new IgniteCheckedException("Failed to perform handshake.", e); @@ -3082,6 +3156,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param nodeId Node ID. */ private NodeIdMessage(UUID nodeId) { + assert nodeId != null; + nodeIdBytes = U.uuidToBytes(nodeId); nodeIdBytesWithType = new byte[nodeIdBytes.length + 1]; @@ -3131,4 +3207,86 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return S.toString(NodeIdMessage.class, this); } } + + /** + * + */ + private class ConnectGateway { + /** */ + private GridSpinReadWriteLock lock = new GridSpinReadWriteLock(); + + /** */ + private IgniteException err; + + /** + * + */ + void enter() { + lock.readLock(); + + if (err != null) { + lock.readUnlock(); + + throw err; + } + } + + /** + * @return {@code True} if entered gateway. + */ + boolean tryEnter() { + lock.readLock(); + + boolean res = err == null; + + if (!res) + lock.readUnlock(); + + return res; + } + + /** + * + */ + void leave() { + lock.readUnlock(); + } + + /** + * @param reconnectFut Reconnect future. + */ + void disconnected(IgniteFuture reconnectFut) { + lock.writeLock(); + + err = new IgniteClientDisconnectedException(reconnectFut, "Failed to connect, client node disconnected."); + + lock.writeUnlock(); + } + + /** + * + */ + void reconnected() { + lock.writeLock(); + + try { + if (err instanceof IgniteClientDisconnectedException) + err = null; + } + finally { + lock.writeUnlock(); + } + } + + /** + * + */ + void stopped() { + lock.readLock(); + + err = new IgniteException("Failed to connect, node stopped."); + + lock.readUnlock(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java index 46d6716..038ea59 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java @@ -39,7 +39,8 @@ public interface DiscoverySpiDataExchange { /** * Notifies discovery manager about data received from remote node. * - * @param joiningNodeId Remote node ID. + * @param joiningNodeId ID of new node that joins topology. + * @param nodeId ID of the node provided data. * @param data Collection of discovery data objects from different components. */ public void onExchange(UUID joiningNodeId, UUID nodeId, Map data); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 3f05f59..572ba2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -41,6 +42,7 @@ import java.util.concurrent.atomic.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; +import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.*; /** * @@ -71,7 +73,7 @@ class ClientImpl extends TcpDiscoveryImpl { private SocketReader sockReader; /** */ - private boolean segmented; + private volatile State state; /** Last message ID. */ private volatile IgniteUuid lastMsgId; @@ -94,6 +96,10 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ protected MessageWorker msgWorker; + /** */ + @GridToStringExclude + private int joinCnt; + /** * @param adapter Adapter. */ @@ -157,6 +163,9 @@ class ClientImpl extends TcpDiscoveryImpl { locNode = spi.locNode; + // Marshal credentials for backward compatibility and security. + marshalCredentials(locNode); + sockWriter = new SocketWriter(); sockWriter.start(); @@ -258,23 +267,36 @@ class ClientImpl extends TcpDiscoveryImpl { if (oldFut != null) fut = oldFut; else { - if (spi.getSpiContext().isStopping()) { + State state = this.state; + + if (spi.getSpiContext().isStopping() || state == STOPPED || state == SEGMENTED) { if (pingFuts.remove(nodeId, fut)) fut.onDone(false); return false; } + else if (state == DISCONNECTED) { + if (pingFuts.remove(nodeId, fut)) + fut.onDone(new IgniteClientDisconnectedCheckedException(null, + "Failed to ping node, client node disconnected.")); + } + else { + final GridFutureAdapter finalFut = fut; - final GridFutureAdapter finalFut = fut; - - timer.schedule(new TimerTask() { - @Override public void run() { - if (pingFuts.remove(nodeId, finalFut)) - finalFut.onDone(false); - } - }, spi.netTimeout); + timer.schedule(new TimerTask() { + @Override public void run() { + if (pingFuts.remove(nodeId, finalFut)) { + if (ClientImpl.this.state == DISCONNECTED) + finalFut.onDone(new IgniteClientDisconnectedCheckedException(null, + "Failed to ping node, client node disconnected.")); + else + finalFut.onDone(false); + } + } + }, spi.netTimeout); - sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); + sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); + } } } @@ -285,7 +307,7 @@ class ClientImpl extends TcpDiscoveryImpl { return false; } catch (IgniteCheckedException e) { - throw new IgniteSpiException(e); // Should newer occur. + throw new IgniteSpiException(e); } } @@ -325,8 +347,13 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { - if (segmented) - throw new IgniteException("Failed to send custom message: client is disconnected"); + State state = this.state; + + if (state == SEGMENTED) + throw new IgniteException("Failed to send custom message: client is segmented."); + + if (state == DISCONNECTED) + throw new IgniteClientDisconnectedException(null, "Failed to send custom message: client is disconnected."); try { sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, @@ -361,14 +388,11 @@ class ClientImpl extends TcpDiscoveryImpl { * @see TcpDiscoverySpi#joinTimeout */ @SuppressWarnings("BusyWait") - @Nullable private Socket joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException { + @Nullable private T2 joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException { Collection addrs = null; long startTime = U.currentTimeMillis(); - // Marshal credentials for backward compatibility and security. - marshalCredentials(locNode); - while (true) { if (Thread.currentThread().isInterrupted()) throw new InterruptedException(); @@ -400,7 +424,7 @@ class ClientImpl extends TcpDiscoveryImpl { InetSocketAddress addr = it.next(); - T2 sockAndRes = sendJoinRequest(recon, addr); + T3 sockAndRes = sendJoinRequest(recon, addr); if (sockAndRes == null) { it.remove(); @@ -414,7 +438,7 @@ class ClientImpl extends TcpDiscoveryImpl { switch (sockAndRes.get2()) { case RES_OK: - return sock; + return new T2<>(sock, sockAndRes.get3()); case RES_CONTINUE_JOIN: case RES_WAIT: @@ -445,9 +469,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** * @param recon {@code True} if reconnects. * @param addr Address. - * @return Socket and connect response. + * @return Socket, connect response and client acknowledge support flag. */ - @Nullable private T2 sendJoinRequest(boolean recon, InetSocketAddress addr) { + @Nullable private T3 sendJoinRequest(boolean recon, InetSocketAddress addr) { assert addr != null; if (log.isDebugEnabled()) @@ -493,9 +517,18 @@ class ClientImpl extends TcpDiscoveryImpl { tstamp = U.currentTimeMillis(); - TcpDiscoveryAbstractMessage msg = recon ? - new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId) : - new TcpDiscoveryJoinRequestMessage(locNode, spi.collectExchangeData(getLocalNodeId())); + TcpDiscoveryAbstractMessage msg; + + if (!recon) { + TcpDiscoveryNode node = locNode; + + if (locNode.order() > 0) + node = locNode.clientReconnectNode(); + + msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId())); + } + else + msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId); msg.client(true); @@ -507,7 +540,7 @@ class ClientImpl extends TcpDiscoveryImpl { log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + rmtNodeId + ']'); - return new T2<>(sock, spi.readReceipt(sock, ackTimeout0)); + return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), res.clientAck()); } catch (IOException | IgniteCheckedException e) { U.closeQuiet(sock); @@ -786,10 +819,16 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onMessageReceived(msg); - if (spi.ensured(msg) && joinLatch.getCount() == 0L) - lastMsgId = msg.id(); + boolean ack = msg instanceof TcpDiscoveryClientAckResponse; + + if (!ack) { + if (spi.ensured(msg) && joinLatch.getCount() == 0L) + lastMsgId = msg.id(); - msgWorker.addMessage(msg); + msgWorker.addMessage(msg); + } + else + sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg); } } catch (IOException e) { @@ -823,8 +862,14 @@ class ClientImpl extends TcpDiscoveryImpl { private Socket sock; /** */ + private boolean clientAck; + + /** */ private final Queue queue = new ArrayDeque<>(); + /** */ + private TcpDiscoveryAbstractMessage unackedMsg; + /** * */ @@ -845,11 +890,16 @@ class ClientImpl extends TcpDiscoveryImpl { /** * @param sock Socket. + * @param clientAck {@code True} is server supports client message acknowlede. */ - private void setSocket(Socket sock) { + private void setSocket(Socket sock, boolean clientAck) { synchronized (mux) { this.sock = sock; + this.clientAck = clientAck; + + unackedMsg = null; + mux.notifyAll(); } } @@ -863,6 +913,21 @@ class ClientImpl extends TcpDiscoveryImpl { } } + /** + * @param res Acknowledge response. + */ + void ackReceived(TcpDiscoveryClientAckResponse res) { + synchronized (mux) { + if (unackedMsg != null) { + assert unackedMsg.id().equals(res.messageId()) : unackedMsg; + + unackedMsg = null; + } + + mux.notifyAll(); + } + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { TcpDiscoveryAbstractMessage msg = null; @@ -892,10 +957,43 @@ class ClientImpl extends TcpDiscoveryImpl { for (IgniteInClosure msgLsnr : spi.sendMsgLsnrs) msgLsnr.apply(msg); + boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse); + try { + if (ack) { + synchronized (mux) { + assert unackedMsg == null : unackedMsg; + + unackedMsg = msg; + } + } + spi.writeToSocket(sock, msg); msg = null; + + if (ack) { + long waitEnd = U.currentTimeMillis() + spi.ackTimeout; + + TcpDiscoveryAbstractMessage unacked; + + synchronized (mux) { + while (unackedMsg != null && U.currentTimeMillis() < waitEnd) + mux.wait(waitEnd); + + unacked = unackedMsg; + + unackedMsg = null; + } + + if (unacked != null) { + if (log.isDebugEnabled()) + log.debug("Failed to get acknowledge for message, will try to reconnect " + + "[msg=" + unacked + ", timeout=" + spi.ackTimeout + ']'); + + throw new IOException("Failed to get acknowledge for message: " + unacked); + } + } } catch (IOException e) { if (log.isDebugEnabled()) @@ -926,6 +1024,9 @@ class ClientImpl extends TcpDiscoveryImpl { private volatile Socket sock; /** */ + private boolean clientAck; + + /** */ private boolean join; /** @@ -948,8 +1049,6 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - assert !segmented; - boolean success = false; Exception err = null; @@ -958,11 +1057,14 @@ class ClientImpl extends TcpDiscoveryImpl { long startTime = U.currentTimeMillis(); + if (log.isDebugEnabled()) + log.debug("Started reconnect process [join=" + join + ", timeout=" + timeout + ']'); + try { while (true) { - sock = joinTopology(true, timeout); + T2 joinRes = joinTopology(true, timeout); - if (sock == null) { + if (joinRes == null) { if (join) { joinError(new IgniteSpiException("Join process timed out, connection failed and " + "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + @@ -970,11 +1072,14 @@ class ClientImpl extends TcpDiscoveryImpl { } else U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + - "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); + "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); return; } + sock = joinRes.get1(); + clientAck = joinRes.get2(); + if (isInterrupted()) throw new InterruptedException(); @@ -999,6 +1104,10 @@ class ClientImpl extends TcpDiscoveryImpl { TcpDiscoveryClientReconnectMessage res = (TcpDiscoveryClientReconnectMessage)msg; if (res.creatorNodeId().equals(getLocalNodeId())) { + if (log.isDebugEnabled()) + log.debug("Received reconnect response [success=" + res.success() + + ", msg=" + msg + ']'); + if (res.success()) { msgWorker.addMessage(res); @@ -1008,9 +1117,11 @@ class ClientImpl extends TcpDiscoveryImpl { } success = true; - } - return; + return; + } + else + return; } } else if (spi.ensured(msg)) { @@ -1081,6 +1192,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private Reconnector reconnector; + /** */ + private boolean nodeAdded; + /** * */ @@ -1091,45 +1205,37 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @SuppressWarnings("InfiniteLoopStatement") @Override protected void body() throws InterruptedException { + state = STARTING; + spi.stats.onJoinStarted(); try { - final Socket sock = joinTopology(false, spi.joinTimeout); - - if (sock == null) { - joinError(new IgniteSpiException("Join process timed out.")); - - return; - } - - currSock = sock; - - sockWriter.setSocket(sock); - - if (spi.joinTimeout > 0) { - timer.schedule(new TimerTask() { - @Override public void run() { - if (joinLatch.getCount() > 0) - queue.add(JOIN_TIMEOUT); - } - }, spi.joinTimeout); - } - - sockReader.setSocket(sock, locNode.clientRouterNodeId()); + tryJoin(); while (true) { Object msg = queue.take(); if (msg == JOIN_TIMEOUT) { - if (joinLatch.getCount() > 0) { + if (state == STARTING) { joinError(new IgniteSpiException("Join process timed out, did not receive response for " + "join request (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); + "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); break; } + else if (state == DISCONNECTED) { + if (log.isDebugEnabled()) + log.debug("Failed to reconnect, local node segmented " + + "[joinTimeout=" + spi.joinTimeout + ']'); + + state = SEGMENTED; + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } } else if (msg == SPI_STOP) { + state = STOPPED; + assert spi.getSpiContext().isStopping(); if (currSock != null) { @@ -1148,7 +1254,7 @@ class ClientImpl extends TcpDiscoveryImpl { boolean join = joinLatch.getCount() > 0; - if (spi.getSpiContext().isStopping() || segmented) { + if (spi.getSpiContext().isStopping() || state == SEGMENTED) { leaveLatch.countDown(); if (join) { @@ -1158,6 +1264,9 @@ class ClientImpl extends TcpDiscoveryImpl { } } else { + if (log.isDebugEnabled()) + log.debug("Connection closed, will try to restore connection."); + assert reconnector == null; final Reconnector reconnector = new Reconnector(join); @@ -1167,19 +1276,64 @@ class ClientImpl extends TcpDiscoveryImpl { } } else if (msg == SPI_RECONNECT_FAILED) { - if (!segmented) { - segmented = true; + reconnector.cancel(); + reconnector.join(); - reconnector.cancel(); - reconnector.join(); + reconnector = null; - notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + if (spi.isClientReconnectDisabled()) { + if (state != SEGMENTED && state != STOPPED) { + if (log.isDebugEnabled()) { + log.debug("Failed to restore closed connection, reconnect disabled, " + + "local node segmented [networkTimeout=" + spi.netTimeout + ']'); + } + + state = SEGMENTED; + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } + } + else { + if (state == STARTING || state == CONNECTED) { + if (log.isDebugEnabled()) { + log.debug("Failed to restore closed connection, will try to reconnect " + + "[networkTimeout=" + spi.netTimeout + ", joinTimeout=" + spi.joinTimeout + ']'); + } + + state = DISCONNECTED; + + nodeAdded = false; + + IgniteClientDisconnectedCheckedException err = + new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " + + "client node disconnected."); + + for (Map.Entry> e : pingFuts.entrySet()) { + GridFutureAdapter fut = e.getValue(); + + if (pingFuts.remove(e.getKey(), fut)) + fut.onDone(err); + } + + notifyDiscovery(EVT_CLIENT_NODE_DISCONNECTED, topVer, locNode, allVisibleNodes()); + } + + UUID newId = UUID.randomUUID(); + + if (log.isInfoEnabled()) { + log.info("Client node disconnected from cluster, will try to reconnect with new id " + + "[newId=" + newId + ", prevId=" + locNode.id() + ", locNode=" + locNode + ']'); + } + + locNode.onClientDisconnected(newId); + + tryJoin(); } } else { TcpDiscoveryAbstractMessage discoMsg = (TcpDiscoveryAbstractMessage)msg; - if (joinLatch.getCount() > 0) { + if (joining()) { IgniteSpiException err = null; if (discoMsg instanceof TcpDiscoveryDuplicateIdMessage) @@ -1190,7 +1344,15 @@ class ClientImpl extends TcpDiscoveryImpl { err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); if (err != null) { - joinError(err); + if (state == DISCONNECTED) { + U.error(log, "Failed to reconnect, segment local node.", err); + + state = SEGMENTED; + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } + else + joinError(err); break; } @@ -1215,6 +1377,48 @@ class ClientImpl extends TcpDiscoveryImpl { } /** + * @throws InterruptedException If interrupted. + */ + private void tryJoin() throws InterruptedException { + assert state == DISCONNECTED || state == STARTING : state; + + boolean join = state == STARTING; + + joinCnt++; + + T2 joinRes = joinTopology(false, spi.joinTimeout); + + if (joinRes == null) { + if (join) + joinError(new IgniteSpiException("Join process timed out.")); + else { + state = SEGMENTED; + + notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); + } + + return; + } + + currSock = joinRes.get1(); + + sockWriter.setSocket(joinRes.get1(), joinRes.get2()); + + if (spi.joinTimeout > 0) { + final int joinCnt0 = joinCnt; + + timer.schedule(new TimerTask() { + @Override public void run() { + if (joinCnt == joinCnt0 && joining()) + queue.add(JOIN_TIMEOUT); + } + }, spi.joinTimeout); + } + + sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId()); + } + + /** * @param msg Message. */ protected void processDiscoveryMessage(TcpDiscoveryAbstractMessage msg) { @@ -1246,6 +1450,22 @@ class ClientImpl extends TcpDiscoveryImpl { } /** + * @return {@code True} if client in process of join. + */ + private boolean joining() { + ClientImpl.State state = ClientImpl.this.state; + + return state == STARTING || state == DISCONNECTED; + } + + /** + * @return {@code True} if client disconnected. + */ + private boolean disconnected() { + return state == DISCONNECTED; + } + + /** * @param msg Message. */ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { @@ -1257,12 +1477,15 @@ class ClientImpl extends TcpDiscoveryImpl { UUID newNodeId = node.id(); if (getLocalNodeId().equals(newNodeId)) { - if (joinLatch.getCount() > 0) { + if (joining()) { Collection top = msg.topology(); if (top != null) { spi.gridStartTime = msg.gridStartTime(); + if (disconnected()) + rmtNodes.clear(); + for (TcpDiscoveryNode n : top) { if (n.order() > 0) n.visible(true); @@ -1272,6 +1495,8 @@ class ClientImpl extends TcpDiscoveryImpl { topHist.clear(); + nodeAdded = true; + if (msg.topologyHistory() != null) topHist.putAll(msg.topologyHistory()); } @@ -1309,7 +1534,7 @@ class ClientImpl extends TcpDiscoveryImpl { return; if (getLocalNodeId().equals(msg.nodeId())) { - if (joinLatch.getCount() > 0) { + if (joining()) { Map> dataMap = msg.clientDiscoData(); if (dataMap != null) { @@ -1324,13 +1549,22 @@ class ClientImpl extends TcpDiscoveryImpl { locNode.order(topVer); - notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, updateTopologyHistory(topVer, msg)); + Collection nodes = updateTopologyHistory(topVer, msg); + + notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes); + + boolean disconnected = disconnected(); + + state = CONNECTED; + + if (disconnected) + notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes); + else + spi.stats.onJoinFinished(); joinErr.set(null);; joinLatch.countDown(); - - spi.stats.onJoinFinished(); } else if (log.isDebugEnabled()) log.debug("Discarding node add finished message (this message has already been processed) " + @@ -1438,7 +1672,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @return {@code True} if received node added message for local node. */ private boolean nodeAdded() { - return !topHist.isEmpty(); + return nodeAdded; } /** @@ -1539,7 +1773,7 @@ class ClientImpl extends TcpDiscoveryImpl { currSock = reconnector.sock; - sockWriter.setSocket(currSock); + sockWriter.setSocket(currSock, reconnector.clientAck); sockReader.setSocket(currSock, locNode.clientRouterNodeId()); reconnector = null; @@ -1583,7 +1817,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { - if (msg.verified() && joinLatch.getCount() == 0) { + if (msg.verified() && state == CONNECTED) { DiscoverySpiListener lsnr = spi.lsnr; if (lsnr != null) { @@ -1719,4 +1953,24 @@ class ClientImpl extends TcpDiscoveryImpl { this.sock = sock; } } + + /** + * + */ + enum State { + /** */ + STARTING, + + /** */ + CONNECTED, + + /** */ + DISCONNECTED, + + /** */ + SEGMENTED, + + /** */ + STOPPED + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index d51293e..1a28e86 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1447,6 +1447,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Heartbeats sender has been started."); + UUID nodeId = getConfiguredNodeId(); + while (!isInterrupted()) { if (spiStateCopy() != CONNECTED) { if (log.isDebugEnabled()) @@ -1455,7 +1457,7 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getLocalNodeId()); + TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(nodeId); msg.verify(getLocalNodeId()); @@ -1593,39 +1595,47 @@ class ServerImpl extends TcpDiscoveryImpl { // Addresses registered in IP finder. Collection regAddrs = spi.registeredAddresses(); - // Remove all addresses that belong to alive nodes, leave dead-node addresses. - Collection rmvAddrs = F.view( - regAddrs, - F.notContains(currAddrs), - new P1() { - private final Map pingResMap = new HashMap<>(); + P1 p = new P1() { + private final Map pingResMap = new HashMap<>(); - @Override public boolean apply(InetSocketAddress addr) { - Boolean res = pingResMap.get(addr); + @Override public boolean apply(InetSocketAddress addr) { + Boolean res = pingResMap.get(addr); - if (res == null) { - try { - res = pingNode(addr, null).get1() != null; - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to ping node [addr=" + addr + - ", err=" + e.getMessage() + ']'); - - res = false; - } - finally { - pingResMap.put(addr, res); - } + if (res == null) { + try { + res = pingNode(addr, null).get1() != null; } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to ping node [addr=" + addr + + ", err=" + e.getMessage() + ']'); - return !res; + res = false; + } + finally { + pingResMap.put(addr, res); + } } + + return !res; } - ); + }; + + ArrayList rmvAddrs = null; + + for (InetSocketAddress addr : regAddrs) { + boolean rmv = !F.contains(currAddrs, addr) && p.apply(addr); + + if (rmv) { + if (rmvAddrs == null) + rmvAddrs = new ArrayList<>(); + + rmvAddrs.add(addr); + } + } // Unregister dead-nodes addresses. - if (!rmvAddrs.isEmpty()) { + if (rmvAddrs != null) { spi.ipFinder.unregisterAddresses(rmvAddrs); if (log.isDebugEnabled()) @@ -4077,7 +4087,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - UUID locNodeId = getLocalNodeId(); + UUID locNodeId = getConfiguredNodeId(); ClientMessageWorker clientMsgWrk = null; @@ -4170,6 +4180,9 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryHandshakeResponse res = new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder()); + if (req.client()) + res.clientAck(true); + spi.writeToSocket(sock, res); // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, @@ -4313,7 +4326,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (state == CONNECTED) { spi.writeToSocket(msg, sock, RES_OK); - if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) + if (clientMsgWrk.getState() == State.NEW) clientMsgWrk.start(); msgWorker.addMessage(msg); @@ -4457,7 +4470,14 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker.addMessage(msg); // Send receipt back. - if (clientMsgWrk == null) + if (clientMsgWrk != null) { + TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id()); + + ack.verify(locNodeId); + + clientMsgWrk.addMessage(ack); + } + else spi.writeToSocket(msg, sock, RES_OK); } catch (IgniteCheckedException e) { @@ -4567,8 +4587,11 @@ class ServerImpl extends TcpDiscoveryImpl { msg.responded(true); - if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) + if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) { + clientMsgWrk.clientVersion(U.productVersion(msg.node())); + clientMsgWrk.start(); + } msgWorker.addMessage(msg); @@ -4679,6 +4702,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ private final AtomicReference> pingFut = new AtomicReference<>(); + /** */ + private IgniteProductVersion clientVer; + /** * @param sock Socket. * @param clientNodeId Node ID. @@ -4691,6 +4717,13 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @param clientVer Client version. + */ + void clientVersion(IgniteProductVersion clientVer) { + this.clientVer = clientVer; + } + + /** * @return Current client metrics. */ ClusterMetrics metrics() { @@ -4709,17 +4742,40 @@ class ServerImpl extends TcpDiscoveryImpl { try { assert msg.verified() : msg; - if (log.isDebugEnabled()) - log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" - + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + if (msg instanceof TcpDiscoveryClientAckResponse) { + if (clientVer == null) { + ClusterNode node = spi.getNode(clientNodeId); - try { - prepareNodeAddedMessage(msg, clientNodeId, null, null); + if (node != null) + clientVer = IgniteUtils.productVersion(node); + else if (log.isDebugEnabled()) + log.debug("Skip sending message ack to client, fail to get client node " + + "[sock=" + sock + ", locNodeId=" + getLocalNodeId() + + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + } + + if (clientVer != null && + clientVer.compareTo(TcpDiscoveryClientAckResponse.CLIENT_ACK_SINCE_VERSION) >= 0) { + if (log.isDebugEnabled()) + log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - writeToSocket(sock, msg); + writeToSocket(sock, msg); + } } - finally { - clearNodeAddedMessage(msg); + else { + try { + if (log.isDebugEnabled()) + log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" + + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + + prepareNodeAddedMessage(msg, clientNodeId, null, null); + + writeToSocket(sock, msg); + } + finally { + clearNodeAddedMessage(msg); + } } } catch (IgniteCheckedException | IOException e) { @@ -4829,7 +4885,7 @@ class ServerImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { if (log.isDebugEnabled()) - log.debug("Message worker started [locNodeId=" + getLocalNodeId() + ']'); + log.debug("Message worker started [locNodeId=" + getConfiguredNodeId() + ']'); while (!isInterrupted()) { TcpDiscoveryAbstractMessage msg = queue.poll(2000, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index ace917f..c271b7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -112,7 +112,14 @@ abstract class TcpDiscoveryImpl { * @return Local node ID. */ public UUID getLocalNodeId() { - return spi.getLocalNodeId(); + return spi.locNode.id(); + } + + /** + * @return Configured node ID (actual node ID can be different if client reconnects). + */ + public UUID getConfiguredNodeId() { + return spi.cfgNodeId; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 7663fe6..431d198 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -260,6 +260,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** Local node. */ protected TcpDiscoveryNode locNode; + /** */ + protected UUID cfgNodeId; + /** Local host. */ protected InetAddress locHost; @@ -327,6 +330,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T /** */ private boolean forceSrvMode; + /** */ + private boolean clientReconnectDisabled; + /** {@inheritDoc} */ @Override public String getSpiState() { return impl.getSpiState(); @@ -417,6 +423,29 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } /** + * If {@code true} client does not try to reconnect after + * server detected client node failure. + * + * @return Client reconnect disabled flag. + */ + public boolean isClientReconnectDisabled() { + return clientReconnectDisabled; + } + + /** + * Sets client reconnect disabled flag. + *

+ * If {@code true} client does not try to reconnect after + * server detected client node failure. + * + * @param clientReconnectDisabled Client reconnect disabled flag. + */ + @IgniteSpiConfiguration(optional = true) + public void setClientReconnectDisabled(boolean clientReconnectDisabled) { + this.clientReconnectDisabled = clientReconnectDisabled; + } + + /** * Inject resources * * @param ignite Ignite. @@ -844,7 +873,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T } locNode = new TcpDiscoveryNode( - getLocalNodeId(), + ignite.configuration().getNodeId(), addrs.get1(), addrs.get2(), srvPort, @@ -1615,6 +1644,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T mcastIpFinder.setLocalAddress(locAddr); } + cfgNodeId = ignite.configuration().getNodeId(); + impl.spiStart(gridName); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 22f56c3..032cf01 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -441,6 +441,25 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste this.clientRouterNodeId = clientRouterNodeId; } + /** + * @param newId New node ID. + */ + public void onClientDisconnected(UUID newId) { + id = newId; + } + + /** + * @return Copy of local node for client reconnect request. + */ + public TcpDiscoveryNode clientReconnectNode() { + TcpDiscoveryNode node = new TcpDiscoveryNode(id, addrs, hostNames, discPort, metricsProvider, ver); + + node.attrs = attrs; + node.clientRouterNodeId = clientRouterNodeId; + + return node; + } + /** {@inheritDoc} */ @Override public int compareTo(@Nullable TcpDiscoveryNode node) { if (node == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 21dbf4f..6f52152 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -40,6 +40,9 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { /** */ protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2; + /** */ + protected static final int CLIENT_ACK_FLAG_POS = 4; + /** Sender of the message (transient). */ private transient UUID sndNodeId; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java new file mode 100644 index 0000000..ce3943a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * + */ +public class TcpDiscoveryClientAckResponse extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + public static final IgniteProductVersion CLIENT_ACK_SINCE_VERSION = IgniteProductVersion.fromString("1.4.1"); + + /** */ + private final IgniteUuid msgId; + + /** + * @param creatorNodeId Creator node ID. + * @param msgId Message ID to ack. + */ + public TcpDiscoveryClientAckResponse(UUID creatorNodeId, IgniteUuid msgId) { + super(creatorNodeId); + + this.msgId = msgId; + } + + /** + * @return Acknowledged message ID. + */ + public IgniteUuid messageId() { + return msgId; + } + + /** {@inheritDoc} */ + @Override public boolean highPriority() { + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClientAckResponse.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java index 5c2f798..ac4be50 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java @@ -61,6 +61,20 @@ public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage { this.order = order; } + /** + * @return {@code True} if server supports client message acknowledge. + */ + public boolean clientAck() { + return getFlag(CLIENT_ACK_FLAG_POS); + } + + /** + * @param clientAck {@code True} if server supports client message acknowledge. + */ + public void clientAck(boolean clientAck) { + setFlag(CLIENT_ACK_FLAG_POS, clientAck); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java index 7a88426..000782a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java @@ -257,7 +257,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, registerMBean(gridName, this, FileSwapSpaceSpiMBean.class); - String path = baseDir + File.separator + gridName + File.separator + getLocalNodeId(); + String path = baseDir + File.separator + gridName + File.separator + ignite.configuration().getNodeId(); try { dir = U.resolveWorkDirectory(path, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java index abc9109..bf499c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.*; @@ -104,8 +105,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest { * Test kernal gateway that always return uninitialized user stack trace. */ private static final GridKernalGateway TEST_GATEWAY = new GridKernalGateway() { - @Override public void lightCheck() throws IllegalStateException {} - @Override public void readLock() throws IllegalStateException {} @Override public void readLockAnyway() {} @@ -122,10 +121,6 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest { @Override public void writeUnlock() {} - @Override public void addStopListener(Runnable lsnr) {} - - @Override public void removeStopListener(Runnable lsnr) {} - @Override public String userStackTrace() { return null; } @@ -133,5 +128,13 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest { @Override public boolean tryWriteLock(long timeout) { return false; } + + @Override public GridFutureAdapter onDisconnected() { + return null; + } + + @Override public void onReconnected() { + // No-op. + } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java new file mode 100644 index 0000000..fbaea11 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import java.io.*; +import java.net.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final long RECONNECT_TIMEOUT = 10_000; + + /** */ + protected boolean clientMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + disco.setJoinTimeout(2 * 60_000); + disco.setSocketTimeout(1000); + disco.setNetworkTimeout(2000); + + cfg.setDiscoverySpi(disco); + + BlockTpcCommunicationSpi commSpi = new BlockTpcCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + if (clientMode) + cfg.setClientMode(true); + + return cfg; + } + + /** + * @param latch Latch. + * @throws Exception If failed. + */ + protected void waitReconnectEvent(CountDownLatch latch) throws Exception { + if (!latch.await(RECONNECT_TIMEOUT, MILLISECONDS)) { + log.error("Failed to wait for reconnect event, will dump threads, latch count: " + latch.getCount()); + + U.dumpThreads(log); + + fail("Failed to wait for disconnect/reconnect event."); + } + } + + /** + * @return Number of server nodes started before tests. + */ + protected abstract int serverCount(); + + /** + * @return Number of client nodes started before tests. + */ + protected int clientCount() { + return 0; + } + + /** + * @param ignite Node. + * @return Discovery SPI. + */ + protected TestTcpDiscoverySpi spi(Ignite ignite) { + return ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); + } + + /** + * @param ignite Node. + * @return Communication SPI. + */ + protected BlockTpcCommunicationSpi commSpi(Ignite ignite) { + return ((BlockTpcCommunicationSpi)ignite.configuration().getCommunicationSpi()); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + int srvs = serverCount(); + + if (srvs > 0) + startGrids(srvs); + + int clients = clientCount(); + + if (clients > 0) { + clientMode = true; + + startGridsMultiThreaded(srvs, clients); + + clientMode = false; + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @param client Client. + * @return Server node client connected to. + */ + protected Ignite clientRouter(Ignite client) { + TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode(); + + assertTrue(node.isClient()); + assertNotNull(node.clientRouterNodeId()); + + Ignite srv = G.ignite(node.clientRouterNodeId()); + + assertNotNull(srv); + + return srv; + } + + /** + * @param fut Future. + * @throws Exception If failed. + */ + protected void assertNotDone(IgniteInternalFuture fut) throws Exception { + assertNotNull(fut); + + if (fut.isDone()) + fail("Future completed with result: " + fut.get()); + } + + /** + * Reconnect client node. + * + * @param client Client. + * @param srv Server. + * @param disconnectedC Closure which will be run when client node disconnected. + * @throws Exception If failed. + */ + protected void reconnectClientNode(Ignite client, Ignite srv, @Nullable Runnable disconnectedC) + throws Exception { + final TestTcpDiscoverySpi clientSpi = spi(client); + final TestTcpDiscoverySpi srvSpi = spi(srv); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + IgnitePredicate p = new IgnitePredicate() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }; + + client.events().localListen(p, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + waitReconnectEvent(disconnectLatch); + + if (disconnectedC != null) + disconnectedC.run(); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + waitReconnectEvent(reconnectLatch); + + client.events().stopLocalListen(p); + } + + /** + * @param e Client disconnected exception. + * @return Reconnect future. + */ + protected IgniteFuture check(CacheException e) { + log.info("Expected exception: " + e); + + if (!(e.getCause() instanceof IgniteClientDisconnectedException)) + log.error("Unexpected cause: " + e.getCause(), e); + + assertTrue("Unexpected cause: " + e.getCause(), e.getCause() instanceof IgniteClientDisconnectedException); + + IgniteClientDisconnectedException e0 = (IgniteClientDisconnectedException)e.getCause(); + + assertNotNull(e0.reconnectFuture()); + + return e0.reconnectFuture(); + } + + /** + * @param e Client disconnected exception. + */ + protected void checkAndWait(CacheException e) { + check(e).get(); + } + + /** + * @param e Client disconnected exception. + */ + protected void checkAndWait(IgniteClientDisconnectedException e) { + log.info("Expected exception: " + e); + + assertNotNull(e.reconnectFuture()); + + e.reconnectFuture().get(); + } + + /** + * + */ + protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + volatile CountDownLatch writeLatch; + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) + throws IOException, IgniteCheckedException { + if (msg instanceof TcpDiscoveryJoinRequestMessage) { + CountDownLatch writeLatch0 = writeLatch; + + if (writeLatch0 != null) { + log.info("Block join request send: " + msg); + + U.await(writeLatch0); + } + } + + super.writeToSocket(sock, msg); + } + } + + /** + * + */ + protected static class BlockTpcCommunicationSpi extends TcpCommunicationSpi { + /** */ + volatile Class msgCls; + + /** */ + AtomicBoolean collectStart = new AtomicBoolean(false); + + /** */ + ConcurrentHashMap classes = new ConcurrentHashMap<>(); + + /** */ + @LoggerResource + private IgniteLogger log; + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + Class msgCls0 = msgCls; + + if (collectStart.get() && msg instanceof GridIoMessage) + classes.put(((GridIoMessage)msg).message().getClass().getName(), node); + + if (msgCls0 != null && msg instanceof GridIoMessage + && ((GridIoMessage)msg).message().getClass().equals(msgCls)) { + log.info("Block message: " + msg); + + return; + } + + super.sendMessage(node, msg); + } + + /** + * @param clazz Class of messages which will be block. + */ + public void blockMessage(Class clazz) { + msgCls = clazz; + } + + /** + * Unlock all message. + */ + public void unblockMessage() { + msgCls = null; + } + + /** + * Start collect messages. + */ + public void start() { + collectStart.set(true); + } + + /** + * Print collected messages. + */ + public void print() { + for (String s : classes.keySet()) + log.error(s); + } + } +}