Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C1A94200B82 for ; Fri, 16 Sep 2016 11:33:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C033B160AC4; Fri, 16 Sep 2016 09:33:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2D732160A8C for ; Fri, 16 Sep 2016 11:33:37 +0200 (CEST) Received: (qmail 30145 invoked by uid 500); 16 Sep 2016 09:33:36 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 30135 invoked by uid 99); 16 Sep 2016 09:33:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Sep 2016 09:33:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ED88CDFE61; Fri, 16 Sep 2016 09:33:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <59acccead2db4935a9e0bab0354b328b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-comm-opts2 Date: Fri, 16 Sep 2016 09:33:35 +0000 (UTC) archived-at: Fri, 16 Sep 2016 09:33:38 -0000 Repository: ignite Updated Branches: refs/heads/ignite-comm-opts-2conn [created] c9f62b815 ignite-comm-opts2 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9f62b81 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9f62b81 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9f62b81 Branch: refs/heads/ignite-comm-opts-2conn Commit: c9f62b815571ab6192736154ef7b90e8e46e95b3 Parents: c81e0d9 Author: sboikov Authored: Fri Sep 16 12:33:21 2016 +0300 Committer: sboikov Committed: Fri Sep 16 12:33:21 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 40 +-- .../managers/communication/GridIoMessage.java | 4 + .../ignite/internal/util/nio/GridNioServer.java | 3 +- .../communication/tcp/TcpCommunicationSpi.java | 244 +++++++++++++------ .../spi/GridTcpSpiForwardingSelfTest.java | 4 +- 5 files changed, 193 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 9e547ca..f869c5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -529,26 +529,26 @@ public class GridIoManager extends GridManagerAdapter { sb.append(" Connection info [") .append("rmtAddr=").append(ses.remoteAddress()) - .append(", locAddr=").append(ses.localAddress()); + .append(", locAddr=").append(ses.localAddress()) + .append(", in=").append(ses.accepted()); GridNioRecoveryDescriptor desc = ses.recoveryDescriptor(); http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 3292412..9110add 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.GridSpinReadWriteLock; @@ -350,7 +351,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - UUID id = ses.meta(NODE_ID_META); + ConnectionId id = ses.meta(NODE_ID_META); if (id != null) { GridCommunicationClient client = clients.get(id); @@ -368,7 +369,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioRecoveryDescriptor recoveryData = ses.recoveryDescriptor(); if (recoveryData != null) { - if (recoveryData.nodeAlive(getSpiContext().node(id))) { + if (recoveryData.nodeAlive(getSpiContext().node(id.id))) { if (!recoveryData.messagesFutures().isEmpty()) { reconnect = true; @@ -381,7 +382,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter recoveryData.onNodeLeft(); } - DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(recoveryData, + DisconnectedSessionInfo disconnectData = new DisconnectedSessionInfo(id, + recoveryData, reconnect); commWorker.addProcessDisconnectRequest(disconnectData); @@ -390,7 +392,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter CommunicationListener lsnr0 = lsnr; if (lsnr0 != null) - lsnr0.onDisconnected(id); + lsnr0.onDisconnected(id.id); } } @@ -399,24 +401,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param msg Message. */ private void onFirstMessage(GridNioSession ses, Message msg) { - UUID sndId; + ConnectionId sndId; if (msg instanceof NodeIdMessage) - sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0); + sndId = new ConnectionId(-1, U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0)); else { assert msg instanceof HandshakeMessage : msg; - sndId = ((HandshakeMessage)msg).nodeId(); + sndId = new ConnectionId(((HandshakeMessage)msg).idx, ((HandshakeMessage)msg).nodeId()); } if (log.isDebugEnabled()) log.debug("Remote node ID received: " + sndId); - final UUID old = ses.addMeta(NODE_ID_META, sndId); + final ConnectionId old = ses.addMeta(NODE_ID_META, sndId); assert old == null; - final ClusterNode rmtNode = getSpiContext().node(sndId); + final ClusterNode rmtNode = getSpiContext().node(sndId.id); if (rmtNode == null) { if (log.isDebugEnabled()) @@ -462,7 +464,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter HandshakeMessage msg0 = (HandshakeMessage)msg; // - final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode); + final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode, sndId); // // if (oldFut == null) { // oldClient = clients.get(sndId); @@ -520,16 +522,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // else { // The code below causes a race condition between shmem and TCP (see IGNITE-1294) boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(), - new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut)); + new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut, sndId)); if (reserved) - connected(recoveryDesc, ses, rmtNode, msg0.received(), true, false); + connected(recoveryDesc, ses, rmtNode, msg0.received(), true, false, sndId); // } // } } @Override public void onMessage(GridNioSession ses, Message msg) { - UUID sndId = ses.meta(NODE_ID_META); + ConnectionId sndId = ses.meta(NODE_ID_META); if (sndId == null) { assert ses.accepted() : ses; @@ -601,7 +603,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // else c = NOOP; - notifyListener(sndId, msg, c); + notifyListener(sndId.id, msg, c); } } @@ -620,7 +622,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ClusterNode node, long rcvCnt, boolean sndRes, - boolean createClient) { + boolean createClient, + ConnectionId connId) { + assert node.id().equals(connId.id); + recovery.onHandshake(rcvCnt); ses.recoveryDescriptor(recovery); @@ -637,7 +642,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (createClient) { client = new GridTcpNioCommunicationClient(ses, log); - GridCommunicationClient oldClient = clients.putIfAbsent(node.id(), client); + GridCommunicationClient oldClient = clients.putIfAbsent(connId, client); assert oldClient == null : "Client already created [node=" + node + ", client=" + client + ", oldClient=" + oldClient + ", recoveryDesc=" + recovery + ']'; @@ -672,6 +677,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private final boolean createClient; + private final ConnectionId connId; + /** * @param ses Incoming session. * @param recoveryDesc Recovery descriptor. @@ -685,13 +692,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ClusterNode rmtNode, HandshakeMessage msg, boolean createClient, - GridFutureAdapter fut) { + GridFutureAdapter fut, + ConnectionId connId) { this.ses = ses; this.recoveryDesc = recoveryDesc; this.rmtNode = rmtNode; this.msg = msg; this.createClient = createClient; this.fut = fut; + this.connId = connId; } /** {@inheritDoc} */ @@ -703,7 +712,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter msgFut.get(); GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); + connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient, connId); fut.onDone(client); } @@ -717,7 +726,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter fut.onDone(); } finally { - clientFuts.remove(rmtNode.id(), fut); + clientFuts.remove(connId, fut); } } }; @@ -729,7 +738,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter fut.onDone(); } finally { - clientFuts.remove(rmtNode.id(), fut); + clientFuts.remove(connId, fut); } } } @@ -814,7 +823,60 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private final Collection shmemWorkers = new ConcurrentLinkedDeque8<>(); /** Clients. */ - private final ConcurrentMap clients = GridConcurrentFactory.newMap(); + private final ConcurrentMap clients = GridConcurrentFactory.newMap(); + + /** */ + private int connectionsPerNode = 2; + + public int getConnectionsPerNode() { + return connectionsPerNode; + } + + public void setConnectionsPerNode(int connectionsPerNode) { + this.connectionsPerNode = connectionsPerNode; + } + + /** + * + */ + public static final class ConnectionId { + /** */ + private final int idx; + + /** */ + private final UUID id; + + /** + * @param idx Connection index. + * @param id Node ID. + */ + ConnectionId(int idx, UUID id) { + this.idx = idx; + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + ConnectionId clientId = (ConnectionId) o; + + return idx == clientId.idx && id.equals(clientId.id); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = idx; + res = 31 * res + id.hashCode(); + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ConnectionId.class, this); + } + } /** SPI listener. */ private volatile CommunicationListener lsnr; @@ -861,7 +923,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter }; /** Client connect futures. */ - private final ConcurrentMap> clientFuts = + private final ConcurrentMap> clientFuts = GridConcurrentFactory.newMap(); /** */ @@ -1409,7 +1471,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter sb.append("Communication SPI clients: ").append(U.nl()); - for (Map.Entry entry : clients.entrySet()) { + for (Map.Entry entry : clients.entrySet()) { sb.append(" [node=").append(entry.getKey()) .append(", client=").append(entry.getValue()) .append(']').append(U.nl()); @@ -1640,9 +1702,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionId rmtNodeId = ses.meta(NODE_ID_META); - return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; + return rmtNodeId != null ? formatter.reader(rmtNodeId.id, msgFactory) : null; } }; @@ -1655,9 +1717,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionId rmtNodeId = ses.meta(NODE_ID_META); - return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; + return rmtNodeId != null ? formatter.writer(rmtNodeId.id) : null; } }; @@ -1894,16 +1956,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter void onNodeLeft(UUID nodeId) { assert nodeId != null; - GridCommunicationClient client = clients.get(nodeId); + for (Map.Entry client : clients.entrySet()) { + ConnectionId id = client.getKey(); - if (client != null) { - if (log.isDebugEnabled()) - log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + - ", client=" + client + ']'); + if (id.id.equals(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Forcing NIO client close since node has left [nodeId=" + nodeId + + ", client=" + client + ']'); - client.forceClose(); + client.getValue().forceClose(); - clients.remove(nodeId, client); + clients.remove(id, client.getValue()); + } } } @@ -1978,11 +2042,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else { GridCommunicationClient client = null; + ConnectionId id = new ConnectionId(((GridIoMessage)msg).connectionIndex() % connectionsPerNode, node.id()); + try { boolean retry; do { - client = reserveClient(node); + client = reserveClient(node, id); UUID nodeId = null; @@ -2013,7 +2079,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw new IgniteSpiException("Failed to send message to remote node: " + node, e); } finally { - if (client != null && clients.remove(node.id(), client)) + if (client != null && clients.remove(id, client)) client.forceClose(); } } @@ -2026,13 +2092,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return The existing or just created client. * @throws IgniteCheckedException Thrown if any exception occurs. */ - private GridCommunicationClient reserveClient(ClusterNode node) throws IgniteCheckedException { + private GridCommunicationClient reserveClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException { assert node != null; UUID nodeId = node.id(); while (true) { - GridCommunicationClient client = clients.get(nodeId); + GridCommunicationClient client = clients.get(id); if (client == null) { if (stopping) @@ -2041,17 +2107,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Do not allow concurrent connects. GridFutureAdapter fut = new ConnectFuture(); - GridFutureAdapter oldFut = clientFuts.putIfAbsent(nodeId, fut); + GridFutureAdapter oldFut = clientFuts.putIfAbsent(id, fut); if (oldFut == null) { try { - GridCommunicationClient client0 = clients.get(nodeId); + GridCommunicationClient client0 = clients.get(id); if (client0 == null) { - client0 = createNioClient(node); + client0 = createNioClient(node, id); if (client0 != null) { - GridCommunicationClient old = clients.put(nodeId, client0); + GridCommunicationClient old = clients.put(id, client0); assert old == null : "Client already created " + "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; @@ -2059,7 +2125,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (client0 instanceof GridTcpNioCommunicationClient) { GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); - if (tcpClient.session().closeTime() > 0 && clients.remove(nodeId, client0)) { + if (tcpClient.session().closeTime() > 0 && clients.remove(id, client0)) { if (log.isDebugEnabled()) log.debug("Session was closed after client creation, will retry " + "[node=" + node + ", client=" + client0 + ']'); @@ -2081,7 +2147,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter throw (Error)e; } finally { - clientFuts.remove(nodeId, fut); + clientFuts.remove(id, fut); } } else @@ -2093,7 +2159,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter continue; if (getSpiContext().node(nodeId) == null) { - if (clients.remove(nodeId, client)) + if (clients.remove(id, client)) client.forceClose(); throw new IgniteSpiException("Destination node is not in topology: " + node.id()); @@ -2104,7 +2170,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return client; else // Client has just been closed by idle worker. Help it and try again. - clients.remove(nodeId, client); + clients.remove(id, client); } } @@ -2113,7 +2179,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridCommunicationClient createNioClient(ClusterNode node) throws IgniteCheckedException { + @Nullable protected GridCommunicationClient createNioClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException { assert node != null; Integer shmemPort = node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT)); @@ -2154,7 +2220,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectGate.enter(); try { - GridCommunicationClient client = createTcpClient(node); + GridCommunicationClient client = createTcpClient(node, id); if (log.isDebugEnabled()) log.debug("TCP client created: " + client); @@ -2207,7 +2273,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null); + safeHandshake(client, null, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0), null, null); } catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) { client.forceClose(); @@ -2266,10 +2332,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) { if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) { - UUID id = ses.meta(NODE_ID_META); + ConnectionId id = ses.meta(NODE_ID_META); if (id != null) { - ClusterNode node = getSpiContext().node(id); + ClusterNode node = getSpiContext().node(id.id); if (node != null && node.isClient()) { String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " + @@ -2283,7 +2349,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log, msg); - getSpiContext().failNode(id, msg); + getSpiContext().failNode(id.id, msg); } } } @@ -2296,7 +2362,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Client. * @throws IgniteCheckedException If failed. */ - protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + protected GridCommunicationClient createTcpClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException { Collection rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); Collection rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); @@ -2364,7 +2430,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "(node left topology): " + node); } - GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node); + GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(node, id); if (!recoveryDesc.reserve()) { U.closeQuiet(ch); @@ -2386,7 +2452,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), - timeoutHelper.nextTimeoutChunk(connTimeout0), sslEngine); + timeoutHelper.nextTimeoutChunk(connTimeout0), sslEngine, id); if (rcvCnt == -1) return null; @@ -2399,7 +2465,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter try { Map meta = new HashMap<>(); - meta.put(NODE_ID_META, node.id()); + meta.put(NODE_ID_META, id); if (isSslEnabled()) { assert sslEngine != null; @@ -2568,7 +2634,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Nullable GridNioRecoveryDescriptor recovery, UUID rmtNodeId, long timeout, - @Nullable SSLEngine ssl + @Nullable SSLEngine ssl, + ConnectionId id ) throws IgniteCheckedException { HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); @@ -2650,12 +2717,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (recovery != null) { HandshakeMessage msg = new HandshakeMessage(locNode.id(), recovery.incrementConnectCount(), - recovery.received()); + recovery.received(), + id.idx); if (log.isDebugEnabled()) log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']'); - buf = ByteBuffer.allocate(33); + buf = ByteBuffer.allocate(37); buf.order(ByteOrder.nativeOrder()); @@ -2810,8 +2878,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param node Node. * @return Recovery receive data for given node. */ - private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) { - ClientKey id = new ClientKey(node.id(), node.order()); + private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node, ConnectionId clientId) { + ClientKey id = new ClientKey(node.id(), node.order(), clientId.idx); GridNioRecoveryDescriptor recovery = recoveryDescs.get(id); @@ -2874,18 +2942,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private static class ClientKey { /** */ - private UUID nodeId; + private final UUID nodeId; + + /** */ + private final long order; /** */ - private long order; + private final int idx; /** * @param nodeId Node ID. * @param order Node order. */ - private ClientKey(UUID nodeId, long order) { + private ClientKey(UUID nodeId, long order, int idx) { this.nodeId = nodeId; this.order = order; + this.idx = idx; } /** {@inheritDoc} */ @@ -2898,7 +2970,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ClientKey other = (ClientKey)obj; - return order == other.order && nodeId.equals(other.nodeId); + return idx == other.idx && order == other.order && nodeId.equals(other.nodeId); } @@ -2906,7 +2978,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter @Override public int hashCode() { int res = nodeId.hashCode(); - res = 31 * res + (int)(order ^ (order >>> 32)); + res = 31 * res + (int)(order ^ (order >>> 32)) + idx; return res; } @@ -3016,9 +3088,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionId rmtNodeId = ses.meta(NODE_ID_META); - return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; + return rmtNodeId != null ? formatter.writer(rmtNodeId.id) : null; } }; @@ -3032,9 +3104,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert formatter != null; - UUID rmtNodeId = ses.meta(NODE_ID_META); + ConnectionId rmtNodeId = ses.meta(NODE_ID_META); - return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; + return rmtNodeId != null ? formatter.reader(rmtNodeId.id, msgFactory) : null; } }; @@ -3115,8 +3187,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter private void processIdle() { cleanupRecovery(); - for (Map.Entry e : clients.entrySet()) { - UUID nodeId = e.getKey(); + for (Map.Entry e : clients.entrySet()) { + UUID nodeId = e.getKey().id; GridCommunicationClient client = e.getValue(); @@ -3128,7 +3200,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter client.forceClose(); - clients.remove(nodeId, client); + clients.remove(e.getKey(), client); continue; } @@ -3136,7 +3208,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioRecoveryDescriptor recovery = null; if (client instanceof GridTcpNioCommunicationClient) { - recovery = recoveryDescs.get(new ClientKey(node.id(), node.order())); + recovery = recoveryDescs.get(new ClientKey(node.id(), node.order(), e.getKey().idx)); if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); @@ -3170,7 +3242,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Closing idle node connection: " + nodeId); if (client.close() || client.closed()) - clients.remove(nodeId, client); + clients.remove(e.getKey(), client); } } } @@ -3223,7 +3295,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']'); - GridCommunicationClient client = reserveClient(node); + GridCommunicationClient client = reserveClient(node, sesInfo.connId); client.release(); } @@ -3431,6 +3503,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private long connectCnt; + /** */ + private int idx; + /** * Default constructor required by {@link Message}. */ @@ -3443,13 +3518,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @param connectCnt Connect count. * @param rcvCnt Number of received messages. */ - public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { + public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt, int idx) { assert nodeId != null; assert rcvCnt >= 0 : rcvCnt; this.nodeId = nodeId; this.connectCnt = connectCnt; this.rcvCnt = rcvCnt; + this.idx = idx; } /** @@ -3480,7 +3556,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - if (buf.remaining() < 33) + if (buf.remaining() < 37) return false; buf.put(HANDSHAKE_MSG_TYPE); @@ -3495,6 +3571,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter buf.putLong(connectCnt); + buf.putInt(idx); + return true; } @@ -3513,6 +3591,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter connectCnt = buf.getLong(); + idx = buf.getInt(); + return true; } @@ -3774,17 +3854,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter */ private static class DisconnectedSessionInfo { /** */ + private final ConnectionId connId; + + /** */ private final GridNioRecoveryDescriptor recoveryDesc; /** */ private final boolean reconnect; /** + * @param connId Node ID. * @param recoveryDesc Recovery descriptor. * @param reconnect Reconnect flag. */ - DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor recoveryDesc, + public DisconnectedSessionInfo(ConnectionId connId, + @Nullable GridNioRecoveryDescriptor recoveryDesc, boolean reconnect) { + this.connId = connId; this.recoveryDesc = recoveryDesc; this.reconnect = reconnect; } http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index 652e47f..4c46be9 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -111,14 +111,14 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { cfg.setConnectorConfiguration(null); TcpCommunicationSpi commSpi = new TcpCommunicationSpi() { - @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException { + @Override protected GridCommunicationClient createTcpClient(ClusterNode node, ConnectionId id) throws IgniteCheckedException { Map attrs = new HashMap<>(node.attributes()); attrs.remove(createSpiAttributeName(ATTR_PORT)); ((TcpDiscoveryNode)node).setAttributes(attrs); - return super.createTcpClient(node); + return super.createTcpClient(node, id); } };