Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B8007200BF6 for ; Mon, 26 Dec 2016 12:16:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B6AF9160B2A; Mon, 26 Dec 2016 11:16:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 87657160B44 for ; Mon, 26 Dec 2016 12:16:18 +0100 (CET) Received: (qmail 23126 invoked by uid 500); 26 Dec 2016 11:16:17 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 22969 invoked by uid 99); 26 Dec 2016 11:16:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Dec 2016 11:16:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 71C39F0C65; Mon, 26 Dec 2016 11:16:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 26 Dec 2016 11:16:25 -0000 Message-Id: <3b55f79879d148f49490cae0082c7e87@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/50] [abbrv] ignite git commit: IGNITE-3220 I/O bottleneck on server/client cluster configuration Communications optimizations: - possibility to open separate in/out connections - possibility to have multiple connections between nodes - implemented NI archived-at: Mon, 26 Dec 2016 11:16:21 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1fe437c..0c90414 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -46,6 +46,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; import org.apache.ignite.Ignite; @@ -53,6 +54,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; @@ -103,6 +105,7 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; @@ -179,6 +182,7 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META *
  • Node local IP address (see {@link #setLocalAddress(String)})
  • *
  • Node local port number (see {@link #setLocalPort(int)})
  • *
  • Local port range (see {@link #setLocalPortRange(int)}
  • + *
  • Connections per node (see {@link #setConnectionsPerNode(int)})
  • *
  • Connection buffer flush frequency (see {@link #setConnectionBufferFlushFrequency(long)})
  • *
  • Connection buffer size (see {@link #setConnectionBufferSize(int)})
  • *
  • Idle connection timeout (see {@link #setIdleConnectionTimeout(long)})
  • @@ -238,6 +242,9 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META @IgniteSpiConsistencyChecked(optional = false) public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi, TcpCommunicationSpiMBean { + /** */ + private static final IgniteProductVersion MULTIPLE_CONN_SINCE_VER = IgniteProductVersion.fromString("1.8.2"); + /** IPC error message. */ public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " + "(switching to TCP, may be slower)."; @@ -257,11 +264,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Node attribute that is mapped to node's external addresses (value is comm.tcp.ext-addrs). */ public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs"; + /** */ + public static final String ATTR_PAIRED_CONN = "comm.tcp.pairedConnection"; + /** Default port which node sets listener to (value is 47100). */ public static final int DFLT_PORT = 47100; /** Default port which node sets listener for shared memory connections (value is 48100). */ - public static final int DFLT_SHMEM_PORT = 48100; + public static final int DFLT_SHMEM_PORT = -1; /** Default idle connection timeout (value is 30000ms). */ public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; @@ -283,12 +293,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * Default count of selectors for TCP server equals to - * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}. + * {@code "Math.min(8, Runtime.getRuntime().availableProcessors())"}. */ - public static final int DFLT_SELECTORS_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); + public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2); - /** Node ID meta for session. */ - private static final int NODE_ID_META = GridNioSessionMetaKey.nextUniqueKey(); + /** Connection index meta for session. */ + private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); /** Message tracker meta for session. */ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); @@ -303,11 +313,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final boolean DFLT_TCP_NODELAY = true; /** Default received messages threshold for sending ack. */ - public static final int DFLT_ACK_SND_THRESHOLD = 16; + public static final int DFLT_ACK_SND_THRESHOLD = 32; /** Default socket write timeout. */ public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000; + /** Default connections per node. */ + public static final int DFLT_CONN_PER_NODE = 1; + /** No-op runnable. */ private static final IgniteRunnable NOOP = new IgniteRunnable() { @Override public void run() { @@ -327,11 +340,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private ConnectGateway connectGate; + /** */ + private ConnectionPolicy connPlc; + /** Server listener. */ private final GridNioServerListener srvLsnr = new GridNioServerListenerAdapter() { @Override public void onSessionWriteTimeout(GridNioSession ses) { - LT.warn(log, "Communication SPI Session write timed out (consider increasing " + + LT.warn(log,"Communication SPI session write timed out (consider increasing " + "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() + ", writeTimeout=" + sockWriteTimeout + ']'); @@ -347,46 +363,53 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Sending local node ID to newly accepted session: " + ses); - ses.send(nodeIdMessage()); + try { + ses.sendNoFuture(nodeIdMessage()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } } } @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - UUID id = ses.meta(NODE_ID_META); + ConnectionKey connId = ses.meta(CONN_IDX_META); - if (id != null) { - GridCommunicationClient client = clients.get(id); + if (connId != null) { + UUID id = connId.nodeId(); - if (client instanceof GridTcpNioCommunicationClient && - ((GridTcpNioCommunicationClient) client).session() == ses) { - client.close(); + GridCommunicationClient[] nodeClients = clients.get(id); - clients.remove(id, client); + if (nodeClients != null) { + for (GridCommunicationClient client : nodeClients) { + if (client instanceof GridTcpNioCommunicationClient && + ((GridTcpNioCommunicationClient)client).session() == ses) { + client.close(); + + removeNodeClient(id, client); + } + } } if (!stopping) { - boolean reconnect = false; - - GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); - - if (recoveryData != null) { - if (recoveryData.nodeAlive(getSpiContext().node(id))) { - if (!recoveryData.messagesFutures().isEmpty()) { - reconnect = true; + GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor(); + if (outDesc != null) { + if (outDesc.nodeAlive(getSpiContext().node(id))) { + if (!outDesc.messagesRequests().isEmpty()) { if (log.isDebugEnabled()) log.debug("Session was closed but there are unacknowledged messages, " + - "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']'); + "will try to reconnect [rmtNode=" + outDesc.node().id() + ']'); + + DisconnectedSessionInfo disconnectData = + new DisconnectedSessionInfo(outDesc, connId.connectionIndex()); + + commWorker.addProcessDisconnectRequest(disconnectData); } } else - recoveryData.onNodeLeft(); + outDesc.onNodeLeft(); } - - DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData, - reconnect); - - commWorker.addProcessDisconnectRequest(disconnectData); } CommunicationListener lsnr0 = lsnr; @@ -403,21 +426,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private void onFirstMessage(GridNioSession ses, Message msg) { UUID sndId; - if (msg instanceof NodeIdMessage) - sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); + ConnectionKey connKey; + + if (msg instanceof NodeIdMessage) { + sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); + connKey = new ConnectionKey(sndId, 0, -1); + } else { assert msg instanceof HandshakeMessage : msg; + HandshakeMessage msg0 = (HandshakeMessage)msg; + sndId = ((HandshakeMessage)msg).nodeId(); + connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount()); } if (log.isDebugEnabled()) log.debug("Remote node ID received: " + sndId); - final UUID old = ses.addMeta(NODE_ID_META, sndId); - - assert old == null; - final ClusterNode rmtNode = getSpiContext().node(sndId); if (rmtNode == null) { @@ -429,57 +455,65 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return; } + final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey); + + assert old == null; + ClusterNode locNode = getSpiContext().localNode(); if (ses.remoteAddress() == null) return; - GridCommunicationClient oldClient = clients.get(sndId); + assert msg instanceof HandshakeMessage : msg; - boolean hasShmemClient = false; + HandshakeMessage msg0 = (HandshakeMessage)msg; - if (oldClient != null) { - if (oldClient instanceof GridTcpNioCommunicationClient) { - if (log.isDebugEnabled()) - log.debug("Received incoming connection when already connected " + - "to this node, rejecting [locNode=" + locNode.id() + - ", rmtNode=" + sndId + ']'); + if (usePairedConnections(rmtNode)) { + final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey); - ses.send(new RecoveryLastReceivedMessage(-1)); + ConnectClosureNew c = new ConnectClosureNew(ses, recoveryDesc, rmtNode); - return; - } + boolean reserve = recoveryDesc.tryReserve(msg0.connectCount(), c); + + if (reserve) + connectedNew(recoveryDesc, ses, true); else { - assert oldClient instanceof GridShmemCommunicationClient; + if (c.failed) { + ses.send(new RecoveryLastReceivedMessage(-1)); + + for (GridNioSession ses0 : nioSrvr.sessions()) { + ConnectionKey key0 = ses0.meta(CONN_IDX_META); - hasShmemClient = true; + if (ses0.accepted() && key0 != null && + key0.nodeId().equals(connKey.nodeId()) && + key0.connectionIndex() == connKey.connectionIndex() && + key0.connectCount() < connKey.connectCount()) + ses0.close(); + } + } } } + else { + assert connKey.connectionIndex() >= 0 : connKey; - GridFutureAdapter fut = new GridFutureAdapter<>(); - - GridFutureAdapter oldFut = clientFuts.putIfAbsent(sndId, fut); - - assert msg instanceof HandshakeMessage : msg; - - HandshakeMessage msg0 = (HandshakeMessage)msg; + GridCommunicationClient[] curClients = clients.get(sndId); - final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); + GridCommunicationClient oldClient = + curClients != null && connKey.connectionIndex() < curClients.length ? + curClients[connKey.connectionIndex()] : + null; - if (oldFut == null) { - oldClient = clients.get(sndId); + boolean hasShmemClient = false; if (oldClient != null) { if (oldClient instanceof GridTcpNioCommunicationClient) { if (log.isDebugEnabled()) log.debug("Received incoming connection when already connected " + - "to this node, rejecting [locNode=" + locNode.id() + - ", rmtNode=" + sndId + ']'); + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); ses.send(new RecoveryLastReceivedMessage(-1)); - fut.onDone(oldClient); - return; } else { @@ -489,51 +523,86 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } } - boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + GridFutureAdapter fut = new GridFutureAdapter<>(); - if (log.isDebugEnabled()) - log.debug("Received incoming connection from remote node " + + GridFutureAdapter oldFut = clientFuts.putIfAbsent(connKey, fut); + + final GridNioRecoveryDescriptor recoveryDesc = inRecoveryDescriptor(rmtNode, connKey); + + if (oldFut == null) { + curClients = clients.get(sndId); + + oldClient = curClients != null && connKey.connectionIndex() < curClients.length ? + curClients[connKey.connectionIndex()] : null; + + if (oldClient != null) { + if (oldClient instanceof GridTcpNioCommunicationClient) { + assert oldClient.connectionIndex() == connKey.connectionIndex() : oldClient; + + if (log.isDebugEnabled()) + log.debug("Received incoming connection when already connected " + + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); + + ses.send(new RecoveryLastReceivedMessage(-1)); + + fut.onDone(oldClient); + + return; + } + else { + assert oldClient instanceof GridShmemCommunicationClient; + + hasShmemClient = true; + } + } + + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut)); + + if (log.isDebugEnabled()) + log.debug("Received incoming connection from remote node " + "[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']'); - if (reserved) { - try { - GridTcpNioCommunicationClient client = + if (reserved) { + try { + GridTcpNioCommunicationClient client = connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); - fut.onDone(client); - } - finally { - clientFuts.remove(rmtNode.id(), fut); + fut.onDone(client); + } + finally { + clientFuts.remove(connKey, fut); + } } } - } - else { - if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { - if (log.isDebugEnabled()) { - log.debug("Received incoming connection from remote node while " + + else { + if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) { + if (log.isDebugEnabled()) { + log.debug("Received incoming connection from remote node while " + "connecting to this node, rejecting [locNode=" + locNode.id() + ", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() + ", rmtNodeOrder=" + rmtNode.order() + ']'); - } + } - ses.send(new RecoveryLastReceivedMessage(-1)); - } - else { - // The code below causes a race condition between shmem and TCP (see IGNITE-1294) - boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + ses.send(new RecoveryLastReceivedMessage(-1)); + } + else { + // The code below causes a race condition between shmem and TCP (see IGNITE-1294) + boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), + new ConnectClosure(ses, recoveryDesc, rmtNode, connKey, msg0, !hasShmemClient, fut)); - if (reserved) - connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); + if (reserved) + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient); + } } } } @Override public void onMessage(GridNioSession ses, Message msg) { - UUID sndId = ses.meta(NODE_ID_META); + ConnectionKey connKey = ses.meta(CONN_IDX_META); - if (sndId == null) { + if (connKey == null) { assert ses.accepted() : ses; if (!connectGate.tryEnter()) { @@ -555,29 +624,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else { rcvdMsgsCnt.increment(); - GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); + if (msg instanceof RecoveryLastReceivedMessage) { + GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor(); - if (recovery != null) { - if (msg instanceof RecoveryLastReceivedMessage) { + if (recovery != null) { RecoveryLastReceivedMessage msg0 = (RecoveryLastReceivedMessage)msg; - if (log.isDebugEnabled()) - log.debug("Received recovery acknowledgement [rmtNode=" + sndId + + if (log.isDebugEnabled()) { + log.debug("Received recovery acknowledgement [rmtNode=" + connKey.nodeId() + + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + msg0.received() + ']'); + } recovery.ackReceived(msg0.received()); return; } - else { + } + else { + GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor(); + + if (recovery != null) { long rcvCnt = recovery.onReceived(); if (rcvCnt % ackSndThreshold == 0) { - if (log.isDebugEnabled()) - log.debug("Send recovery acknowledgement [rmtNode=" + sndId + + if (log.isDebugEnabled()) { + log.debug("Send recovery acknowledgement [rmtNode=" + connKey.nodeId() + + ", connIdx=" + connKey.connectionIndex() + ", rcvCnt=" + rcvCnt + ']'); + } - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt)); + ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt)); recovery.lastAcknowledged(rcvCnt); } @@ -603,7 +680,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else c = NOOP; - notifyListener(sndId, msg, c); + notifyListener(connKey.nodeId(), msg, c); } } @@ -611,7 +688,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param recovery Recovery descriptor. * @param ses Session. * @param node Node. - * @param rcvCnt Number of received messages.. + * @param rcvCnt Number of received messages. * @param sndRes If {@code true} sends response for recovery handshake. * @param createClient If {@code true} creates NIO communication client. * @return Client. @@ -623,32 +700,128 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter long rcvCnt, boolean sndRes, boolean createClient) { + ConnectionKey connKey = ses.meta(CONN_IDX_META); + + assert connKey != null && connKey.connectionIndex() >= 0 : connKey; + assert !usePairedConnections(node); + recovery.onHandshake(rcvCnt); - ses.recoveryDescriptor(recovery); + ses.inRecoveryDescriptor(recovery); + ses.outRecoveryDescriptor(recovery); nioSrvr.resend(ses); - if (sndRes) - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); + try { + if (sndRes) + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } - recovery.connected(); + recovery.onConnected(); GridTcpNioCommunicationClient client = null; if (createClient) { - client = new GridTcpNioCommunicationClient(ses, log); - - GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + client = new GridTcpNioCommunicationClient(connKey.connectionIndex(), ses, log); - assert oldClient == null : "Client already created [node=" + node + ", client=" + client + - ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; + addNodeClient(node, connKey.connectionIndex(), client); } return client; } /** + * @param recovery Recovery descriptor. + * @param ses Session. + * @param sndRes If {@code true} sends response for recovery handshake. + */ + private void connectedNew( + GridNioRecoveryDescriptor recovery, + GridNioSession ses, + boolean sndRes) { + try { + ses.inRecoveryDescriptor(recovery); + + if (sndRes) + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); + + recovery.onConnected(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } + } + + /** + * + */ + class ConnectClosureNew implements IgniteInClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridNioSession ses; + + /** */ + private final GridNioRecoveryDescriptor recoveryDesc; + + /** */ + private final ClusterNode rmtNode; + + /** */ + private boolean failed; + + /** + * @param ses Incoming session. + * @param recoveryDesc Recovery descriptor. + * @param rmtNode Remote node. + */ + ConnectClosureNew(GridNioSession ses, + GridNioRecoveryDescriptor recoveryDesc, + ClusterNode rmtNode) { + this.ses = ses; + this.recoveryDesc = recoveryDesc; + this.rmtNode = rmtNode; + } + + /** {@inheritDoc} */ + @Override public void apply(Boolean success) { + try { + failed = !success; + + if (success) { + IgniteInClosure> lsnr = new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture msgFut) { + try { + msgFut.get(); + + connectedNew(recoveryDesc, ses, false); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + + recoveryDesc.release(); + } + } + }; + + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + } + else + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1)); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } + } + } + + /** * */ @SuppressWarnings("PackageVisibleInnerClass") @@ -674,10 +847,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private final boolean createClient; + /** */ + private final ConnectionKey connKey; + /** * @param ses Incoming session. * @param recoveryDesc Recovery descriptor. * @param rmtNode Remote node. + * @param connKey Connection key. * @param msg Handshake message. * @param createClient If {@code true} creates NIO communication client.. * @param fut Connect future. @@ -685,12 +862,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ConnectClosure(GridNioSession ses, GridNioRecoveryDescriptor recoveryDesc, ClusterNode rmtNode, + ConnectionKey connKey, HandshakeMessage msg, boolean createClient, GridFutureAdapter fut) { this.ses = ses; this.recoveryDesc = recoveryDesc; this.rmtNode = rmtNode; + this.connKey = connKey; this.msg = msg; this.createClient = createClient; this.fut = fut; @@ -699,39 +878,44 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void apply(Boolean success) { if (success) { - IgniteInClosure> lsnr = new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture msgFut) { - try { - msgFut.get(); + try { + IgniteInClosure> lsnr = new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture msgFut) { + try { + msgFut.get(); - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); - fut.onDone(client); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send recovery handshake " + - "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + fut.onDone(client); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); - recoveryDesc.release(); + recoveryDesc.release(); - fut.onDone(); - } - finally { - clientFuts.remove(rmtNode.id(), fut); + fut.onDone(); + } + finally { + clientFuts.remove(connKey, fut); + } } - } - }; + }; - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } } else { try { fut.onDone(); } finally { - clientFuts.remove(rmtNode.id(), fut); + clientFuts.remove(connKey, fut); } } } @@ -794,6 +978,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Shared memory server. */ private IpcSharedMemoryServerEndpoint shmemSrv; + /** */ + private boolean usePairedConnections = true; + + /** */ + private int connectionsPerNode = DFLT_CONN_PER_NODE; + /** {@code TCP_NODELAY} option value for created sockets. */ private boolean tcpNoDelay = DFLT_TCP_NODELAY; @@ -816,7 +1006,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private final Collection shmemWorkers = new ConcurrentLinkedDeque8<>(); /** Clients. */ - private final ConcurrentMap clients = GridConcurrentFactory.newMap(); + private final ConcurrentMap clients = GridConcurrentFactory.newMap(); /** SPI listener. */ private volatile CommunicationListener lsnr; @@ -830,6 +1020,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Count of selectors to use in TCP server. */ private int selectorsCnt = DFLT_SELECTORS_CNT; + /** + * Defines how many non-blocking {@code selector.selectNow()} should be made before + * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}. + * Can be set to {@code Long.MAX_VALUE} so selector threads will never block. + */ + private long selectorSpins = IgniteSystemProperties.getLong("IGNITE_SELECTOR_SPINS", 0L); + /** Address resolver. */ private AddressResolver addrRslvr; @@ -863,11 +1060,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter }; /** Client connect futures. */ - private final ConcurrentMap> clientFuts = + private final ConcurrentMap> clientFuts = GridConcurrentFactory.newMap(); /** */ - private final ConcurrentMap recoveryDescs = GridConcurrentFactory.newMap(); + private final ConcurrentMap recoveryDescs = GridConcurrentFactory.newMap(); + + /** */ + private final ConcurrentMap outRecDescs = GridConcurrentFactory.newMap(); + + /** */ + private final ConcurrentMap inRecDescs = GridConcurrentFactory.newMap(); /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @@ -976,6 +1179,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return locPortRange; } + /** {@inheritDoc} */ + @Override public boolean isUsePairedConnections() { + return usePairedConnections; + } + + /** + * Set this to {@code true} if {@code TcpCommunicationSpi} should + * maintain connection for outgoing and incoming messages separately. + * In this case total number of connections between local and each remote node + * is {@link #getConnectionsPerNode()} * 2. + *

    + * Set this to {@code false} if each connection of {@link #getConnectionsPerNode()} + * should be used for outgoing and incoming messages. In this case total number + * of connections between local and each remote node is {@link #getConnectionsPerNode()}. + * In this case load NIO selectors load + * balancing of {@link GridNioServer} will be disabled. + *

    + * Default is {@code true}. + * + * @param usePairedConnections {@code true} to use paired connections and {@code false} otherwise. + * @see #getConnectionsPerNode() + */ + public void setUsePairedConnections(boolean usePairedConnections) { + this.usePairedConnections = usePairedConnections; + } + + /** + * Sets number of connections to each remote node. if {@link #isUsePairedConnections()} + * is {@code true} then number of connections is doubled and half is used for incoming and + * half for outgoing messages. + * + * @param maxConnectionsPerNode Number of connections per node. + * @see #isUsePairedConnections() + */ + public void setConnectionsPerNode(int maxConnectionsPerNode) { + this.connectionsPerNode = maxConnectionsPerNode; + } + + /** {@inheritDoc} */ + @Override public int getConnectionsPerNode() { + return connectionsPerNode; + } + /** * Sets local port to accept shared memory connections. *

    @@ -1222,6 +1468,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return selectorsCnt; } + /** {@inheritDoc} */ + @Override public long getSelectorSpins() { + return selectorSpins; + } + + /** + * Defines how many non-blocking {@code selector.selectNow()} should be made before + * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}. + * Can be set to {@code Long.MAX_VALUE} so selector threads will never block. + * + * @param selectorSpins Selector thread busy-loop iterations. + */ + public void setSelectorSpins(long selectorSpins) { + this.selectorSpins = selectorSpins; + } + /** * Sets value for {@code TCP_NODELAY} socket option. Each * socket will be opened using provided value. @@ -1396,7 +1658,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log != null) { StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl()); - for (Map.Entry entry : recoveryDescs.entrySet()) { + for (Map.Entry entry : recoveryDescs.entrySet()) { GridNioRecoveryDescriptor desc = entry.getValue(); sb.append(" [key=").append(entry.getKey()) @@ -1409,14 +1671,48 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .append(']').append(U.nl()); } - sb.append("Communication SPI clients: ").append(U.nl()); + for (Map.Entry entry : outRecDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); + + sb.append(" [key=").append(entry.getKey()) + .append(", msgsSent=").append(desc.sent()) + .append(", msgsAckedByRmt=").append(desc.acked()) + .append(", reserveCnt=").append(desc.reserveCount()) + .append(", connected=").append(desc.connected()) + .append(", reserved=").append(desc.reserved()) + .append(", descIdHash=").append(System.identityHashCode(desc)) + .append(']').append(U.nl()); + } + + for (Map.Entry entry : inRecDescs.entrySet()) { + GridNioRecoveryDescriptor desc = entry.getValue(); - for (Map.Entry entry : clients.entrySet()) { - sb.append(" [node=").append(entry.getKey()) - .append(", client=").append(entry.getValue()) + sb.append(" [key=").append(entry.getKey()) + .append(", msgsRcvd=").append(desc.received()) + .append(", lastAcked=").append(desc.lastAcknowledged()) + .append(", reserveCnt=").append(desc.reserveCount()) + .append(", connected=").append(desc.connected()) + .append(", reserved=").append(desc.reserved()) + .append(", handshakeIdx=").append(desc.handshakeIndex()) + .append(", descIdHash=").append(System.identityHashCode(desc)) .append(']').append(U.nl()); } + sb.append("Communication SPI clients: ").append(U.nl()); + + for (Map.Entry entry : clients.entrySet()) { + UUID nodeId = entry.getKey(); + GridCommunicationClient[] clients0 = entry.getValue(); + + for (GridCommunicationClient client : clients0) { + if (client != null) { + sb.append(" [node=").append(nodeId) + .append(", client=").append(client) + .append(']').append(U.nl()); + } + } + } + U.warn(log, sb.toString()); } @@ -1426,6 +1722,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.dumpStats(); } + /** */ + private final ThreadLocal threadConnIdx = new ThreadLocal<>(); + + /** */ + private final AtomicInteger connIdx = new AtomicInteger(); + /** {@inheritDoc} */ @Override public Map getNodeAttributes() throws IgniteSpiException { initFailureDetectionTimeout(); @@ -1439,6 +1741,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0"); assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || shmemPort == -1"); assertParameter(selectorsCnt > 0, "selectorsCnt > 0"); + assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0"); + assertParameter(connectionsPerNode <= 1024, "connectionsPerNode <= 1024"); if (!failureDetectionTimeoutEnabled()) { assertParameter(reconCnt > 0, "reconnectCnt > 0"); @@ -1458,6 +1762,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "Specified 'unackedMsgsBufSize' is too low, it should be at least 'ackSndThreshold * 5'."); } + if (connectionsPerNode > 1) { + connPlc = new ConnectionPolicy() { + @Override public int connectionIndex() { + return (int)(U.safeAbs(Thread.currentThread().getId()) % connectionsPerNode); + } + }; + } + else { + connPlc = new ConnectionPolicy() { + @Override public int connectionIndex() { + return 0; + } + }; + } + try { locHost = U.resolveLocalHost(locAddr); } @@ -1495,6 +1814,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter res.put(createSpiAttributeName(ATTR_PORT), boundTcpPort); res.put(createSpiAttributeName(ATTR_SHMEM_PORT), boundTcpShmemPort >= 0 ? boundTcpShmemPort : null); res.put(createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); + res.put(createSpiAttributeName(ATTR_PAIRED_CONN), usePairedConnections); return res; } @@ -1524,6 +1844,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug(configInfo("sockRcvBuf", sockRcvBuf)); log.debug(configInfo("shmemPort", shmemPort)); log.debug(configInfo("msgQueueLimit", msgQueueLimit)); + log.debug(configInfo("connectionsPerNode", connectionsPerNode)); if (failureDetectionTimeoutEnabled()) { log.debug(configInfo("connTimeout", connTimeout)); @@ -1548,6 +1869,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ", slowClientQueueLimit=" + slowClientQueueLimit + ']'); } + if (msgQueueLimit == 0) + U.quietAndWarn(log, "Message queue limit is set to 0 which may lead to " + + "potential OOMEs when running cache operations in FULL_ASYNC or PRIMARY_SYNC modes " + + "due to message queues growth on sender and reciever sides."); + registerMBean(gridName, this, TcpCommunicationSpiMBean.class); connectGate = new ConnectGateway(); @@ -1642,9 +1968,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionKey key = ses.meta(CONN_IDX_META); - return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; + return key != null ? formatter.reader(key.nodeId(), msgFactory) : null; } }; @@ -1657,9 +1983,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionKey key = ses.meta(CONN_IDX_META); - return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; + return key != null ? formatter.writer(key.nodeId()) : null; } }; @@ -1716,6 +2042,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .logger(log) .selectorCount(selectorsCnt) .gridName(gridName) + .serverName("tcp-comm") .tcpNoDelay(tcpNoDelay) .directBuffer(directBuf) .byteOrder(ByteOrder.nativeOrder()) @@ -1725,18 +2052,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .directMode(true) .metricsListener(metricsLsnr) .writeTimeout(sockWriteTimeout) + .selectorSpins(selectorSpins) .filters(filters) .writerFactory(writerFactory) .skipRecoveryPredicate(skipRecoveryPred) .messageQueueSizeListener(queueSizeMonitor) + .balancing(usePairedConnections) // Current balancing logic assumes separate in/out connections. .build(); boundTcpPort = port; // Ack Port the TCP server was bound to. if (log.isInfoEnabled()) - log.info("Successfully bound to TCP port [port=" + boundTcpPort + - ", locHost=" + locHost + ']'); + log.info("Successfully bound communication NIO server to TCP port " + + "[port=" + boundTcpPort + ", locHost=" + locHost + ", selectorsCnt=" + selectorsCnt + + ", selectorSpins=" + srvr.selectorSpins() + ']'); srvr.idleTimeout(idleConnTimeout); @@ -1837,8 +2167,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter shmemWorkers.clear(); // Force closing on stop (safety). - for (GridCommunicationClient client : clients.values()) - client.forceClose(); + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) { + if (client != null) + client.forceClose(); + } + } // Clear resources. nioSrvr = null; @@ -1863,8 +2197,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectGate.stopped(); // Force closing. - for (GridCommunicationClient client : clients.values()) - client.forceClose(); + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) { + if (client != null) + client.forceClose(); + } + } getSpiContext().deregisterPorts(); @@ -1875,8 +2213,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Override public void onClientDisconnected(IgniteFuture reconnectFut) { connectGate.disconnected(reconnectFut); - for (GridCommunicationClient client : clients.values()) - client.forceClose(); + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) { + if (client != null) + client.forceClose(); + } + } IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, "Failed to connect client node disconnected."); @@ -1885,6 +2227,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter clientFut.onDone(err); recoveryDescs.clear(); + inRecDescs.clear(); + outRecDescs.clear(); } /** {@inheritDoc} */ @@ -1898,16 +2242,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter void onNodeLeft(UUID nodeId) { assert nodeId != null; - GridCommunicationClient client = clients.get(nodeId); - - if (client != null) { - if (log.isDebugEnabled()) - log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + - ", client=" + client + ']'); + GridCommunicationClient[] clients0 = clients.remove(nodeId); - client.forceClose(); + if (clients0 != null) { + for (GridCommunicationClient client : clients0) { + if (client != null) { + if (log.isDebugEnabled()) + log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + + ", client=" + client + ']'); - clients.remove(nodeId, client); + client.forceClose(); + } + } } } @@ -1982,11 +2328,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else { GridCommunicationClient client = null; + int connIdx = useMultipleConnections(node) ? connPlc.connectionIndex() : 0; + try { boolean retry; do { - client = reserveClient(node); + client = reserveClient(node, connIdx); UUID nodeId = null; @@ -2000,7 +2348,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (!retry) sentMsgsCnt.increment(); else { - clients.remove(node.id(), client); + removeNodeClient(node.id(), client); ClusterNode node0 = getSpiContext().node(node.id()); @@ -2017,26 +2365,94 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw new IgniteSpiException("Failed to send message to remote node: " + node, e); } finally { - if (client != null && clients.remove(node.id(), client)) + if (client != null && removeNodeClient(node.id(), client)) client.forceClose(); } } } /** + * @param nodeId Node ID. + * @param rmvClient Client to remove. + * @return {@code True} if client was removed. + */ + private boolean removeNodeClient(UUID nodeId, GridCommunicationClient rmvClient) { + for (;;) { + GridCommunicationClient[] curClients = clients.get(nodeId); + + if (curClients == null || rmvClient.connectionIndex() >= curClients.length || curClients[rmvClient.connectionIndex()] != rmvClient) + return false; + + GridCommunicationClient[] newClients = Arrays.copyOf(curClients, curClients.length); + + newClients[rmvClient.connectionIndex()] = null; + + if (clients.replace(nodeId, curClients, newClients)) + return true; + } + } + + /** + * @param node Node. + * @param connIdx Connection index. + * @param addClient Client to add. + */ + private void addNodeClient(ClusterNode node, int connIdx, GridCommunicationClient addClient) { + assert connectionsPerNode > 0 : connectionsPerNode; + assert connIdx == addClient.connectionIndex() : addClient; + + if (connIdx >= connectionsPerNode) { + assert !usePairedConnections(node); + + return; + } + + for (;;) { + GridCommunicationClient[] curClients = clients.get(node.id()); + + assert curClients == null || curClients[connIdx] == null : "Client already created [node=" + node.id() + + ", connIdx=" + connIdx + + ", client=" + addClient + + ", oldClient=" + curClients[connIdx] + ']'; + + GridCommunicationClient[] newClients; + + if (curClients == null) { + newClients = new GridCommunicationClient[useMultipleConnections(node) ? connectionsPerNode : 1]; + newClients[connIdx] = addClient; + + if (clients.putIfAbsent(node.id(), newClients) == null) + break; + } + else { + newClients = Arrays.copyOf(curClients, curClients.length); + newClients[connIdx] = addClient; + + if (clients.replace(node.id(), curClients, newClients)) + break; + } + } + } + + /** * Returns existing or just created client to node. * * @param node Node to which client should be open. + * @param connIdx Connection index. * @return The existing or just created client. * @throws IgniteCheckedException Thrown if any exception occurs. */ - private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException { + private GridCommunicationClient reserveClient(ClusterNode node, int connIdx) throws IgniteCheckedException { assert node != null; + assert (connIdx >= 0 && connIdx < connectionsPerNode) || !usePairedConnections(node) : connIdx; UUID nodeId = node.id(); while (true) { - GridCommunicationClient client = clients.get(nodeId); + GridCommunicationClient[] curClients = clients.get(nodeId); + + GridCommunicationClient client = curClients != null && connIdx < curClients.length ? + curClients[connIdx] : null; if (client == null) { if (stopping) @@ -2045,25 +2461,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Do not allow concurrent connects. GridFutureAdapter fut = new ConnectFuture(); - GridFutureAdapter oldFut = clientFuts.putIfAbsent(nodeId, fut); + ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1); + + GridFutureAdapter oldFut = clientFuts.putIfAbsent(connKey, fut); if (oldFut == null) { try { - GridCommunicationClient client0 = clients.get(nodeId); + GridCommunicationClient[] curClients0 = clients.get(nodeId); + + GridCommunicationClient client0 = curClients0 != null && connIdx < curClients0.length ? + curClients0[connIdx] : null; if (client0 == null) { - client0 = createNioClient(node); + client0 = createNioClient(node, connIdx); if (client0 != null) { - GridCommunicationClient old = clients.put(nodeId, client0); - - assert old == null : "Client already created " + - "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; + addNodeClient(node, connIdx, client0); if (client0 instanceof GridTcpNioCommunicationClient) { GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); - if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) { + if (tcpClient.session().closeTime() > 0 && removeNodeClient(nodeId, client0)) { if (log.isDebugEnabled()) log.debug("Session was closed after client creation, will retry " + "[node=" + node + ", client=" + client0 + ']'); @@ -2085,7 +2503,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw (Error)e; } finally { - clientFuts.remove(nodeId, fut); + clientFuts.remove(connKey, fut); } } else @@ -2097,27 +2515,31 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter continue; if (getSpiContext().node(nodeId) == null) { - if (clients.remove(nodeId, client)) + if (removeNodeClient(nodeId, client)) client.forceClose(); throw new IgniteSpiException("Destination node is not in topology: " + node.id()); } } + assert connIdx == client.connectionIndex() : client; + if (client.reserve()) return client; else // Client has just been closed by idle worker. Help it and try again. - clients.remove(nodeId, client); + removeNodeClient(nodeId, client); } } /** * @param node Node to create client for. + * @param connIdx Connection index. * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException { + @Nullable private GridCommunicationClient createNioClient(ClusterNode node, int connIdx) + throws IgniteCheckedException { assert node != null; Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); @@ -2136,6 +2558,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter try { GridCommunicationClient client = createShmemClient( node, + connIdx, shmemPort); if (log.isDebugEnabled()) @@ -2158,7 +2581,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectGate.enter(); try { - GridCommunicationClient client = createTcpClient(node); + GridCommunicationClient client = createTcpClient(node, connIdx); if (log.isDebugEnabled()) log.debug("TCP client created: " + client); @@ -2173,10 +2596,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * @param node Node. * @param port Port. + * @param connIdx Connection index. * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, + @Nullable private GridCommunicationClient createShmemClient(ClusterNode node, + int connIdx, Integer port) throws IgniteCheckedException { int attempt = 1; @@ -2190,7 +2615,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient client; try { - client = new GridShmemCommunicationClient(metricsLsnr, + client = new GridShmemCommunicationClient( + connIdx, + metricsLsnr, port, timeoutHelper.nextTimeoutChunk(connTimeout), log, @@ -2211,7 +2638,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null); + safeHandshake(client, + null, + node.id(), + timeoutHelper.nextTimeoutChunk(connTimeout0), + null, + null); } catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { client.forceClose(); @@ -2270,10 +2702,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) { if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) { - UUID id = ses.meta(NODE_ID_META); + ConnectionKey id = ses.meta(CONN_IDX_META); if (id != null) { - ClusterNode node = getSpiContext().node(id); + ClusterNode node = getSpiContext().node(id.nodeId); if (node != null && node.isClient()) { String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " + @@ -2283,11 +2715,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ", clientNode=" + node + ", slowClientQueueLimit=" + slowClientQueueLimit + ']'; - U.quietAndWarn( - log, - msg); + U.quietAndWarn(log, msg); - getSpiContext().failNode(id, msg); + getSpiContext().failNode(id.nodeId(), msg); } } } @@ -2297,10 +2727,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * Establish TCP connection to remote node and returns client. * * @param node Remote node. + * @param connIdx Connection index. * @return Client. * @throws IgniteCheckedException If failed. */ - protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { Collection rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); Collection rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); @@ -2368,7 +2799,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "(node left topology): " + node); } - GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node); + ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1); + + GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey); if (!recoveryDesc.reserve()) { U.closeQuiet(ch); @@ -2395,11 +2828,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter sslMeta.sslEngine(sslEngine); } + Integer handshakeConnIdx = useMultipleConnections(node) ? connIdx : null; + rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), - sslMeta); + sslMeta, + handshakeConnIdx); if (rcvCnt == -1) return null; @@ -2410,7 +2846,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - meta.put(NODE_ID_META, node.id()); + meta.put(CONN_IDX_META, connKey); if (recoveryDesc != null) { recoveryDesc.onHandshake(rcvCnt); @@ -2420,7 +2856,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioSession ses = nioSrvr.createSession(ch, meta).get(); - client = new GridTcpNioCommunicationClient(ses, log); + client = new GridTcpNioCommunicationClient(connIdx, ses, log); conn = true; } @@ -2564,6 +3000,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param rmtNodeId Remote node. * @param timeout Timeout for handshake. * @param sslMeta Session meta. + * @param handshakeConnIdx Non null connection index if need send it in handshake. * @throws IgniteCheckedException If handshake failed or wasn't completed withing timeout. * @return Handshake response. */ @@ -2573,7 +3010,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Nullable GridNioRecoveryDescriptor recovery, UUID rmtNodeId, long timeout, - GridSslMeta sslMeta + GridSslMeta sslMeta, + @Nullable Integer handshakeConnIdx ) throws IgniteCheckedException { HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); @@ -2655,14 +3093,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); if (recovery != null) { - HandshakeMessage msg = new HandshakeMessage(locNode.id(), - recovery.incrementConnectCount(), - recovery.received()); + HandshakeMessage msg; + + int msgSize = 33; + + if (handshakeConnIdx != null) { + msg = new HandshakeMessage2(locNode.id(), + recovery.incrementConnectCount(), + recovery.received(), + handshakeConnIdx); + + msgSize += 4; + } + else { + msg = new HandshakeMessage(locNode.id(), + recovery.incrementConnectCount(), + recovery.received()); + } if (log.isDebugEnabled()) log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); - buf = ByteBuffer.allocate(33); + buf = ByteBuffer.allocate(msgSize); buf.order(ByteOrder.nativeOrder()); @@ -2689,6 +3141,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)); } + if (recovery != null) { if (log.isDebugEnabled()) log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']'); @@ -2818,26 +3271,81 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter U.join(commWorker, log); - for (GridCommunicationClient client : clients.values()) - client.forceClose(); + for (GridCommunicationClient[] clients0 : clients.values()) { + for (GridCommunicationClient client : clients0) { + if (client != null) + client.forceClose(); + } + } + } + + /** + * @param node Node. + * @param key Connection key. + * @return Recovery descriptor for outgoing connection. + */ + private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, ConnectionKey key) { + if (usePairedConnections(node)) + return recoveryDescriptor(outRecDescs, true, node, key); + else + return recoveryDescriptor(recoveryDescs, false, node, key); } /** * @param node Node. - * @return Recovery receive data for given node. + * @param key Connection key. + * @return Recovery descriptor for incoming connection. + */ + private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, ConnectionKey key) { + if (usePairedConnections(node)) + return recoveryDescriptor(inRecDescs, true, node, key); + else + return recoveryDescriptor(recoveryDescs, false, node, key); + } + + /** + * @param node Node. + * @return {@code True} if given node supports multiple connections per-node for communication. + */ + private boolean useMultipleConnections(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0; + } + + /** + * @param node Node. + * @return {@code True} if can use in/out connection pair for communication. */ - private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) { - ClientKey id = new ClientKey(node.id(), node.order()); + private boolean usePairedConnections(ClusterNode node) { + if (usePairedConnections) { + Boolean attr = node.attribute(createSpiAttributeName(ATTR_PAIRED_CONN)); + + return attr != null && attr; + } + + return false; + } - GridNioRecoveryDescriptor recovery = recoveryDescs.get(id); + /** + * @param recoveryDescs Descriptors map. + * @param pairedConnections {@code True} if in/out connections pair is used for communication with node. + * @param node Node. + * @param key Connection key. + * @return Recovery receive data for given node. + */ + private GridNioRecoveryDescriptor recoveryDescriptor( + ConcurrentMap recoveryDescs, + boolean pairedConnections, + ClusterNode node, + ConnectionKey key) { + GridNioRecoveryDescriptor recovery = recoveryDescs.get(key); if (recovery == null) { int maxSize = Math.max(msgQueueLimit, ackSndThreshold); - int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5); + int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 128); - GridNioRecoveryDescriptor old = - recoveryDescs.putIfAbsent(id, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log)); + GridNioRecoveryDescriptor old = recoveryDescs.putIfAbsent(key, + recovery = new GridNioRecoveryDescriptor(pairedConnections, queueLimit, node, log)); if (old != null) recovery = old; @@ -2879,54 +3387,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return S.toString(TcpCommunicationSpi.class, this); } - /** - * - */ - private static class ClientKey { - /** */ - private UUID nodeId; - - /** */ - private long order; - - /** - * @param nodeId Node ID. - * @param order Node order. - */ - private ClientKey(UUID nodeId, long order) { - this.nodeId = nodeId; - this.order = order; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (this == obj) - return true; - - if (obj == null || getClass() != obj.getClass()) - return false; - - ClientKey other = (ClientKey)obj; - - return order == other.order && nodeId.equals(other.nodeId); - - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - - res = 31 * res + (int)(order ^ (order >>> 32)); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ClientKey.class, this); - } - } - /** Internal exception class for proper timeout handling. */ private static class HandshakeTimeoutException extends IgniteCheckedException { /** */ @@ -3026,9 +3486,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionKey connKey = ses.meta(CONN_IDX_META); - return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; + return connKey != null ? formatter.writer(connKey.nodeId()) : null; } }; @@ -3042,9 +3502,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionKey connKey = ses.meta(CONN_IDX_META); - return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; + return connKey != null ? formatter.reader(connKey.nodeId(), msgFactory) : null; } }; @@ -3125,62 +3585,108 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private void processIdle() { cleanupRecovery(); - for (Map.Entry e : clients.entrySet()) { + for (Map.Entry e : clients.entrySet()) { UUID nodeId = e.getKey(); - GridCommunicationClient client = e.getValue(); + for (GridCommunicationClient client : e.getValue()) { + if (client == null) + continue; - ClusterNode node = getSpiContext().node(nodeId); + ClusterNode node = getSpiContext().node(nodeId); - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Forcing close of non-existent node connection: " + nodeId); + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Forcing close of non-existent node connection: " + nodeId); - client.forceClose(); + client.forceClose(); - clients.remove(nodeId, client); + removeNodeClient(nodeId, client); - continue; - } + continue; + } - GridNioRecoveryDescriptor recovery = null; + GridNioRecoveryDescriptor recovery = null; - if (client instanceof GridTcpNioCommunicationClient) { - recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); + if (!usePairedConnections(node) && client instanceof GridTcpNioCommunicationClient) { + recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1)); - if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { - RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); + if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { + RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); - if (log.isDebugEnabled()) - log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + - ", rcvCnt=" + msg.received() + ']'); + if (log.isDebugEnabled()) + log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + + ", rcvCnt=" + msg.received() + ']'); - nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); + try { + nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); - recovery.lastAcknowledged(msg.received()); + recovery.lastAcknowledged(msg.received()); + } + catch (IgniteCheckedException err) { + U.error(log, "Failed to send message: " + err, err); + } - continue; + continue; + } } - } - long idleTime = client.getIdleTime(); + long idleTime = client.getIdleTime(); + + if (idleTime >= idleConnTimeout) { + if (recovery == null && usePairedConnections(node)) + recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1)); + + if (recovery != null && + recovery.nodeAlive(getSpiContext().node(nodeId)) && + !recovery.messagesRequests().isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Node connection is idle, but there are unacknowledged messages, " + + "will wait: " + nodeId); + + continue; + } - if (idleTime >= idleConnTimeout) { - if (recovery != null && - recovery.nodeAlive(getSpiContext().node(nodeId)) && - !recovery.messagesFutures().isEmpty()) { if (log.isDebugEnabled()) - log.debug("Node connection is idle, but there are unacknowledged messages, " + - "will wait: " + nodeId); + log.debug("Closing idle node connection: " + nodeId); - continue; + if (client.close() || client.closed()) + removeNodeClient(nodeId, client); } + } + } - if (log.isDebugEnabled()) - log.debug("Closing idle node connection: " + nodeId); + for (GridNioSession ses : nioSrvr.sessions()) { + GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor(); + + if (recovery != null && usePairedConnections(recovery.node())) { + assert ses.accepted() : ses; + + sendAckOnTimeout(recovery, ses); + } + } + } + + /** + * @param recovery Recovery descriptor. + * @param ses Session. + */ + private void sendAckOnTimeout(GridNioRecoveryDescriptor recovery, GridNioSession ses) { + if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { + RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); + + if (log.isDebugEnabled()) { + log.debug("Send recovery acknowledgement on timeout [rmtNode=" + recovery.node().id() + + ", rcvCnt=" + msg.received() + + ", lastAcked=" + recovery.lastAcknowledged() + ']'); + } + + try { + nioSrvr.sendSystem(ses, msg); - if (client.close() || client.closed()) - clients.remove(nodeId, client); + recovery.lastAcknowledged(msg.received()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); } } } @@ -3189,15 +3695,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * */ private void cleanupRecovery() { - Set left = null; + cleanupRecovery(recoveryDescs); + cleanupRecovery(inRecDescs); + cleanupRecovery(outRecDescs); + } + + /** + * @param recoveryDescs Recovery descriptors to cleanup. + */ + private void cleanupRecovery(ConcurrentMap recoveryDescs) { + Set left = null; - for (Map.Entry e : recoveryDescs.entrySet()) { + for (Map.Entry e : recoveryDescs.entrySet()) { if (left != null && left.contains(e.getKey())) continue; - GridNioRecoveryDescriptor recoverySnd = e.getValue(); + GridNioRecoveryDescriptor recoveryDesc = e.getValue(); - if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) { + if (!recoveryDesc.nodeAlive(getSpiContext().node(e.getKey().nodeId()))) { if (left == null) left = new HashSet<>(); @@ -3208,11 +3723,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (left != null) { assert !left.isEmpty(); - for (ClientKey id : left) { - GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id); + for (ConnectionKey id : left) { + GridNioRecoveryDescriptor recoveryDesc = recoveryDescs.get(id); - if (recoverySnd != null && recoverySnd.onNodeLeft()) - recoveryDescs.remove(id); + if (recoveryDesc != null && recoveryDesc.onNodeLeft()) + recoveryDescs.remove(id, recoveryDesc); } } } @@ -3221,45 +3736,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param sesInfo Disconnected session information. */ private void processDisconnect(DisconnectedSessionInfo sesInfo) { - if (sesInfo.reconnect) { - GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc; - - ClusterNode node = recoveryDesc.node(); + GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc; - if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) - return; + ClusterNode node = recoveryDesc.node(); - try { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); + if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) + return; - GridCommunicationClient client = reserveClient(node); + try { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); - client.release(); - } - catch (IgniteCheckedException | IgniteException e) { - try { - if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, will retry " + - "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + GridCommunicationClient client = reserveClient(node, sesInfo.connIdx); - addProcessDisconnectRequest(sesInfo); - } - else { - if (log.isDebugEnabled()) - log.debug("Recovery reconnect failed, " + - "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + client.release(); + } + catch (IgniteCheckedException | IgniteException e) { + try { + if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) { + if (log.isDebugEnabled()) + log.debug("Recovery reconnect failed, will retry " + + "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); - onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]", - e); - } + addProcessDisconnectRequest(sesInfo); } - catch (IgniteClientDisconnectedException e0) { + else { if (log.isDebugEnabled()) - log.debug("Failed to ping node, client disconnected."); + log.debug("Recovery reconnect failed, " + + "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']'); + + onE